From 9e999cdd813f1689a0028a2256f86edd1fad12c4 Mon Sep 17 00:00:00 2001
From: Pierre Krieger <pierre.krieger1708@gmail.com>
Date: Tue, 12 Feb 2019 15:36:15 +0100
Subject: [PATCH] Make network-libp2p's Service generic over the message
 (#1708)

* Make network-libp2p's Service generic over the message

* Apply suggestions from code review

Co-Authored-By: tomaka <pierre.krieger1708@gmail.com>

* Fix warning
---
 .../core/network-libp2p/src/behaviour.rs      |  47 ++++---
 .../src/custom_proto/behaviour.rs             |  38 +++---
 .../src/custom_proto/handler.rs               |  64 +++++-----
 .../network-libp2p/src/custom_proto/mod.rs    |   2 +-
 .../src/custom_proto/upgrade.rs               | 117 +++++++++++++-----
 substrate/core/network-libp2p/src/lib.rs      |   2 +-
 .../core/network-libp2p/src/service_task.rs   |  45 +++----
 substrate/core/network-libp2p/tests/test.rs   |  20 +--
 substrate/core/network/src/message.rs         |  14 +++
 substrate/core/network/src/on_demand.rs       |  11 +-
 substrate/core/network/src/protocol.rs        |  13 +-
 substrate/core/network/src/service.rs         |  78 +++++-------
 substrate/core/network/src/test/mod.rs        |  35 ++----
 13 files changed, 262 insertions(+), 224 deletions(-)

diff --git a/substrate/core/network-libp2p/src/behaviour.rs b/substrate/core/network-libp2p/src/behaviour.rs
index 5819aedc91d..904ece5e125 100644
--- a/substrate/core/network-libp2p/src/behaviour.rs
+++ b/substrate/core/network-libp2p/src/behaviour.rs
@@ -16,7 +16,6 @@
 
 use crate::custom_proto::{CustomProtos, CustomProtosOut, RegisteredProtocols};
 use crate::{NetworkConfiguration, ProtocolId};
-use bytes::Bytes;
 use futures::prelude::*;
 use libp2p::NetworkBehaviour;
 use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey};
@@ -33,12 +32,12 @@ use void;
 
 /// General behaviour of the network.
 #[derive(NetworkBehaviour)]
-#[behaviour(out_event = "BehaviourOut", poll_method = "poll")]
-pub struct Behaviour<TSubstream> {
+#[behaviour(out_event = "BehaviourOut<TMessage>", poll_method = "poll")]
+pub struct Behaviour<TMessage, TSubstream> {
 	/// Periodically ping nodes, and close the connection if it's unresponsive.
 	ping: Ping<TSubstream>,
 	/// Custom protocols (dot, bbq, sub, etc.).
-	custom_protocols: CustomProtos<TSubstream>,
+	custom_protocols: CustomProtos<TMessage, TSubstream>,
 	/// Discovers nodes of the network. Defined below.
 	discovery: DiscoveryBehaviour<TSubstream>,
 	/// Periodically identifies the remote and responds to incoming requests.
@@ -46,13 +45,13 @@ pub struct Behaviour<TSubstream> {
 
 	/// Queue of events to produce for the outside.
 	#[behaviour(ignore)]
-	events: Vec<BehaviourOut>,
+	events: Vec<BehaviourOut<TMessage>>,
 }
 
-impl<TSubstream> Behaviour<TSubstream> {
+impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
 	/// Builds a new `Behaviour`.
 	// TODO: redundancy between config and local_public_key (https://github.com/libp2p/rust-libp2p/issues/745)
-	pub fn new(config: &NetworkConfiguration, local_public_key: PublicKey, protocols: RegisteredProtocols) -> Self {
+	pub fn new(config: &NetworkConfiguration, local_public_key: PublicKey, protocols: RegisteredProtocols<TMessage>) -> Self {
 		let identify = {
 			let proto_version = "/substrate/1.0".to_string();
 			let user_agent = format!("{} ({})", config.client_version, config.node_name);
@@ -78,7 +77,7 @@ impl<TSubstream> Behaviour<TSubstream> {
 	/// Also note that even we have a valid open substream, it may in fact be already closed
 	/// without us knowing, in which case the packet will not be received.
 	#[inline]
-	pub fn send_custom_message(&mut self, target: &PeerId, protocol_id: ProtocolId, data: impl Into<Bytes>) {
+	pub fn send_custom_message(&mut self, target: &PeerId, protocol_id: ProtocolId, data: TMessage) {
 		self.custom_protocols.send_packet(target, protocol_id, data)
 	}
 
@@ -147,7 +146,7 @@ impl<TSubstream> Behaviour<TSubstream> {
 
 /// Event that can be emitted by the behaviour.
 #[derive(Debug)]
-pub enum BehaviourOut {
+pub enum BehaviourOut<TMessage> {
 	/// Opened a custom protocol with the remote.
 	CustomProtocolOpen {
 		/// Identifier of the protocol.
@@ -176,8 +175,8 @@ pub enum BehaviourOut {
 		peer_id: PeerId,
 		/// Protocol which generated the message.
 		protocol_id: ProtocolId,
-		/// Data that has been received.
-		data: Bytes,
+		/// Message that has been received.
+		message: TMessage,
 	},
 
 	/// A substream with a remote is clogged. We should avoid sending more data to it if possible.
@@ -187,7 +186,7 @@ pub enum BehaviourOut {
 		/// Protocol which generated the message.
 		protocol_id: ProtocolId,
 		/// Copy of the messages that are within the buffer, for further diagnostic.
-		messages: Vec<Bytes>,
+		messages: Vec<TMessage>,
 	},
 
 	/// We have obtained debug information from a peer.
@@ -199,8 +198,8 @@ pub enum BehaviourOut {
 	},
 }
 
-impl From<CustomProtosOut> for BehaviourOut {
-	fn from(other: CustomProtosOut) -> BehaviourOut {
+impl<TMessage> From<CustomProtosOut<TMessage>> for BehaviourOut<TMessage> {
+	fn from(other: CustomProtosOut<TMessage>) -> BehaviourOut<TMessage> {
 		match other {
 			CustomProtosOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint } => {
 				BehaviourOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint }
@@ -208,8 +207,8 @@ impl From<CustomProtosOut> for BehaviourOut {
 			CustomProtosOut::CustomProtocolClosed { protocol_id, peer_id, result } => {
 				BehaviourOut::CustomProtocolClosed { protocol_id, peer_id, result }
 			}
-			CustomProtosOut::CustomMessage { protocol_id, peer_id, data } => {
-				BehaviourOut::CustomMessage { protocol_id, peer_id, data }
+			CustomProtosOut::CustomMessage { protocol_id, peer_id, message } => {
+				BehaviourOut::CustomMessage { protocol_id, peer_id, message }
 			}
 			CustomProtosOut::Clogged { protocol_id, peer_id, messages } => {
 				BehaviourOut::Clogged { protocol_id, peer_id, messages }
@@ -218,19 +217,19 @@ impl From<CustomProtosOut> for BehaviourOut {
 	}
 }
 
-impl<TSubstream> NetworkBehaviourEventProcess<void::Void> for Behaviour<TSubstream> {
+impl<TMessage, TSubstream> NetworkBehaviourEventProcess<void::Void> for Behaviour<TMessage, TSubstream> {
 	fn inject_event(&mut self, event: void::Void) {
 		void::unreachable(event)
 	}
 }
 
-impl<TSubstream> NetworkBehaviourEventProcess<CustomProtosOut> for Behaviour<TSubstream> {
-	fn inject_event(&mut self, event: CustomProtosOut) {
+impl<TMessage, TSubstream> NetworkBehaviourEventProcess<CustomProtosOut<TMessage>> for Behaviour<TMessage, TSubstream> {
+	fn inject_event(&mut self, event: CustomProtosOut<TMessage>) {
 		self.events.push(event.into());
 	}
 }
 
-impl<TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TSubstream> {
+impl<TMessage, TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TMessage, TSubstream> {
 	fn inject_event(&mut self, event: IdentifyEvent) {
 		match event {
 			IdentifyEvent::Identified { peer_id, mut info, .. } => {
@@ -260,7 +259,7 @@ impl<TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TSubs
 	}
 }
 
-impl<TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behaviour<TSubstream> {
+impl<TMessage, TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behaviour<TMessage, TSubstream> {
 	fn inject_event(&mut self, out: KademliaOut) {
 		match out {
 			KademliaOut::Discovered { peer_id, addresses, ty } => {
@@ -282,7 +281,7 @@ impl<TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behaviour<TSubstr
 	}
 }
 
-impl<TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TSubstream> {
+impl<TMessage, TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TMessage, TSubstream> {
 	fn inject_event(&mut self, event: PingEvent) {
 		match event {
 			PingEvent::PingSuccess { peer, time } => {
@@ -292,8 +291,8 @@ impl<TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TSubstrea
 	}
 }
 
-impl<TSubstream> Behaviour<TSubstream> {
-	fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, BehaviourOut>> {
+impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
+	fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, BehaviourOut<TMessage>>> {
 		if !self.events.is_empty() {
 			return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
 		}
diff --git a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs
index 20fb8d0e9e6..ca8fc55d378 100644
--- a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs
+++ b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs
@@ -16,10 +16,9 @@
 
 use crate::custom_proto::handler::{CustomProtosHandler, CustomProtosHandlerOut, CustomProtosHandlerIn};
 use crate::custom_proto::topology::NetTopology;
-use crate::custom_proto::upgrade::RegisteredProtocols;
+use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocols};
 use crate::{NetworkConfiguration, NonReservedPeerMode, ProtocolId};
 use crate::parse_str_addr;
-use bytes::Bytes;
 use fnv::{FnvHashMap, FnvHashSet};
 use futures::prelude::*;
 use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
@@ -36,9 +35,9 @@ const NODES_FILE: &str = "nodes.json";
 const PEER_DISABLE_DURATION: Duration = Duration::from_secs(5 * 60);
 
 /// Network behaviour that handles opening substreams for custom protocols with other nodes.
-pub struct CustomProtos<TSubstream> {
+pub struct CustomProtos<TMessage, TSubstream> {
 	/// List of protocols to open with peers. Never modified.
-	registered_protocols: RegisteredProtocols,
+	registered_protocols: RegisteredProtocols<TMessage>,
 
 	/// Topology of the network.
 	topology: NetTopology,
@@ -77,7 +76,7 @@ pub struct CustomProtos<TSubstream> {
 	next_connect_to_nodes: Delay,
 
 	/// Events to produce from `poll()`.
-	events: SmallVec<[NetworkBehaviourAction<CustomProtosHandlerIn, CustomProtosOut>; 4]>,
+	events: SmallVec<[NetworkBehaviourAction<CustomProtosHandlerIn<TMessage>, CustomProtosOut<TMessage>>; 4]>,
 
 	/// Marker to pin the generics.
 	marker: PhantomData<TSubstream>,
@@ -85,7 +84,7 @@ pub struct CustomProtos<TSubstream> {
 
 /// Event that can be emitted by the `CustomProtos`.
 #[derive(Debug)]
-pub enum CustomProtosOut {
+pub enum CustomProtosOut<TMessage> {
 	/// Opened a custom protocol with the remote.
 	CustomProtocolOpen {
 		/// Identifier of the protocol.
@@ -114,25 +113,25 @@ pub enum CustomProtosOut {
 		peer_id: PeerId,
 		/// Protocol which generated the message.
 		protocol_id: ProtocolId,
-		/// Data that has been received.
-		data: Bytes,
+		/// Message that has been received.
+		message: TMessage,
 	},
 
 	/// The substream used by the protocol is pretty large. We should print avoid sending more
-	/// data on it if possible.
+	/// messages on it if possible.
 	Clogged {
 		/// Id of the peer which is clogged.
 		peer_id: PeerId,
 		/// Protocol which has a problem.
 		protocol_id: ProtocolId,
 		/// Copy of the messages that are within the buffer, for further diagnostic.
-		messages: Vec<Bytes>,
+		messages: Vec<TMessage>,
 	},
 }
 
-impl<TSubstream> CustomProtos<TSubstream> {
+impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
 	/// Creates a `CustomProtos`.
-	pub fn new(config: &NetworkConfiguration, local_peer_id: &PeerId, registered_protocols: RegisteredProtocols) -> Self {
+	pub fn new(config: &NetworkConfiguration, local_peer_id: &PeerId, registered_protocols: RegisteredProtocols<TMessage>) -> Self {
 		// Initialize the topology of the network.
 		let mut topology = if let Some(ref path) = config.net_config_path {
 			let path = Path::new(path).join(NODES_FILE);
@@ -265,12 +264,12 @@ impl<TSubstream> CustomProtos<TSubstream> {
 	///
 	/// Also note that even we have a valid open substream, it may in fact be already closed
 	/// without us knowing, in which case the packet will not be received.
-	pub fn send_packet(&mut self, target: &PeerId, protocol_id: ProtocolId, data: impl Into<Bytes>) {
+	pub fn send_packet(&mut self, target: &PeerId, protocol_id: ProtocolId, message: TMessage) {
 		self.events.push(NetworkBehaviourAction::SendEvent {
 			peer_id: target.clone(),
 			event: CustomProtosHandlerIn::SendCustomMessage {
 				protocol: protocol_id,
-				data: data.into(),
+				message,
 			}
 		});
 	}
@@ -369,12 +368,13 @@ impl<TSubstream> CustomProtos<TSubstream> {
 	}
 }
 
-impl<TSubstream> NetworkBehaviour for CustomProtos<TSubstream>
+impl<TMessage, TSubstream> NetworkBehaviour for CustomProtos<TMessage, TSubstream>
 where
 	TSubstream: AsyncRead + AsyncWrite,
+	TMessage: CustomMessage,
 {
-	type ProtocolsHandler = CustomProtosHandler<TSubstream>;
-	type OutEvent = CustomProtosOut;
+	type ProtocolsHandler = CustomProtosHandler<TMessage, TSubstream>;
+	type OutEvent = CustomProtosOut<TMessage>;
 
 	fn new_handler(&mut self) -> Self::ProtocolsHandler {
 		CustomProtosHandler::new(self.registered_protocols.clone())
@@ -550,14 +550,14 @@ where
 					self.events.push(NetworkBehaviourAction::GenerateEvent(event));
 				}
 			}
-			CustomProtosHandlerOut::CustomMessage { protocol_id, data } => {
+			CustomProtosHandlerOut::CustomMessage { protocol_id, message } => {
 				debug_assert!(self.open_protocols.iter().any(|(s, p)|
 					s == &source && p == &protocol_id
 				));
 				let event = CustomProtosOut::CustomMessage {
 					peer_id: source,
 					protocol_id,
-					data,
+					message,
 				};
 
 				self.events.push(NetworkBehaviourAction::GenerateEvent(event));
diff --git a/substrate/core/network-libp2p/src/custom_proto/handler.rs b/substrate/core/network-libp2p/src/custom_proto/handler.rs
index b44407ff7f7..8feedc52e4d 100644
--- a/substrate/core/network-libp2p/src/custom_proto/handler.rs
+++ b/substrate/core/network-libp2p/src/custom_proto/handler.rs
@@ -15,8 +15,8 @@
 // along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
 
 use crate::ProtocolId;
-use crate::custom_proto::upgrade::{RegisteredProtocol, RegisteredProtocols, RegisteredProtocolSubstream, RegisteredProtocolEvent};
-use bytes::Bytes;
+use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol, RegisteredProtocols};
+use crate::custom_proto::upgrade::{RegisteredProtocolSubstream, RegisteredProtocolEvent};
 use futures::prelude::*;
 use libp2p::core::{
 	ProtocolsHandler, ProtocolsHandlerEvent,
@@ -36,29 +36,29 @@ use void::Void;
 /// `Enable` message.
 /// The handler can then be enabled and disabled at any time with the `Enable` and `Disable`
 /// messages.
-pub struct CustomProtosHandler<TSubstream> {
+pub struct CustomProtosHandler<TMessage, TSubstream> {
 	/// List of all the protocols we support.
-	protocols: RegisteredProtocols,
+	protocols: RegisteredProtocols<TMessage>,
 
 	/// See the documentation of `State`.
-	state: State<TSubstream>,
+	state: State<TMessage, TSubstream>,
 
 	/// Value to be returned by `connection_keep_alive()`.
 	keep_alive: KeepAlive,
 
 	/// The active substreams. There should always ever be only one substream per protocol.
-	substreams: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 6]>,
+	substreams: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 6]>,
 
 	/// Queue of events to send to the outside.
-	events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol, ProtocolId, CustomProtosHandlerOut>; 16]>,
+	events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol<TMessage>, ProtocolId, CustomProtosHandlerOut<TMessage>>; 16]>,
 }
 
 /// State of the handler.
-enum State<TSubstream> {
+enum State<TMessage, TSubstream> {
 	/// Waiting for the behaviour to tell the handler whether it is enabled or disabled.
 	/// Contains a list of substreams opened by the remote and that we will integrate to
 	/// `substreams` only if we get enabled.
-	Init(SmallVec<[RegisteredProtocolSubstream<TSubstream>; 6]>),
+	Init(SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 6]>),
 
 	/// Normal functionning.
 	Normal,
@@ -74,7 +74,7 @@ enum State<TSubstream> {
 
 /// Event that can be received by a `CustomProtosHandler`.
 #[derive(Debug)]
-pub enum CustomProtosHandlerIn {
+pub enum CustomProtosHandlerIn<TMessage> {
 	/// The node should start using custom protocols and actively open substreams.
 	EnableActive,
 
@@ -88,14 +88,14 @@ pub enum CustomProtosHandlerIn {
 	SendCustomMessage {
 		/// The protocol to use.
 		protocol: ProtocolId,
-		/// The data to send.
-		data: Bytes,
+		/// The message to send.
+		message: TMessage,
 	},
 }
 
 /// Event that can be emitted by a `CustomProtosHandler`.
 #[derive(Debug)]
-pub enum CustomProtosHandlerOut {
+pub enum CustomProtosHandlerOut<TMessage> {
 	/// Opened a custom protocol with the remote.
 	CustomProtocolOpen {
 		/// Identifier of the protocol.
@@ -116,8 +116,8 @@ pub enum CustomProtosHandlerOut {
 	CustomMessage {
 		/// Protocol which generated the message.
 		protocol_id: ProtocolId,
-		/// Data that has been received.
-		data: Bytes,
+		/// Message that has been received.
+		message: TMessage,
 	},
 
 	/// A substream to the remote is clogged. The send buffer is very large, and we should print
@@ -126,7 +126,7 @@ pub enum CustomProtosHandlerOut {
 		/// Protocol which is clogged.
 		protocol_id: ProtocolId,
 		/// Copy of the messages that are within the buffer, for further diagnostic.
-		messages: Vec<Bytes>,
+		messages: Vec<TMessage>,
 	},
 
 	/// An error has happened on the protocol level with this node.
@@ -138,12 +138,12 @@ pub enum CustomProtosHandlerOut {
 	},
 }
 
-impl<TSubstream> CustomProtosHandler<TSubstream>
+impl<TMessage, TSubstream> CustomProtosHandler<TMessage, TSubstream>
 where
 	TSubstream: AsyncRead + AsyncWrite,
 {
 	/// Builds a new `CustomProtosHandler`.
-	pub fn new(protocols: RegisteredProtocols) -> Self {
+	pub fn new(protocols: RegisteredProtocols<TMessage>) -> Self {
 		CustomProtosHandler {
 			protocols,
 			// We keep the connection alive for at least 5 seconds, waiting for what happens.
@@ -157,7 +157,7 @@ where
 	/// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`.
 	fn inject_fully_negotiated(
 		&mut self,
-		proto: RegisteredProtocolSubstream<TSubstream>,
+		proto: RegisteredProtocolSubstream<TMessage, TSubstream>,
 	) {
 		if self.substreams.iter().any(|p| p.protocol_id() == proto.protocol_id()) {
 			// Skipping protocol that's already open.
@@ -189,16 +189,14 @@ where
 	}
 }
 
-impl<TSubstream> ProtocolsHandler for CustomProtosHandler<TSubstream>
-where
-	TSubstream: AsyncRead + AsyncWrite,
-{
-	type InEvent = CustomProtosHandlerIn;
-	type OutEvent = CustomProtosHandlerOut;
+impl<TMessage, TSubstream> ProtocolsHandler for CustomProtosHandler<TMessage, TSubstream>
+where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
+	type InEvent = CustomProtosHandlerIn<TMessage>;
+	type OutEvent = CustomProtosHandlerOut<TMessage>;
 	type Substream = TSubstream;
 	type Error = Void;
-	type InboundProtocol = RegisteredProtocols;
-	type OutboundProtocol = RegisteredProtocol;
+	type InboundProtocol = RegisteredProtocols<TMessage>;
+	type OutboundProtocol = RegisteredProtocol<TMessage>;
 	type OutboundOpenInfo = ProtocolId;
 
 	#[inline]
@@ -222,7 +220,7 @@ where
 		self.inject_fully_negotiated(proto);
 	}
 
-	fn inject_event(&mut self, message: CustomProtosHandlerIn) {
+	fn inject_event(&mut self, message: CustomProtosHandlerIn<TMessage>) {
 		match message {
 			CustomProtosHandlerIn::Disable => {
 				match self.state {
@@ -271,7 +269,7 @@ where
 					}
 				}
 			},
-			CustomProtosHandlerIn::SendCustomMessage { protocol, data } => {
+			CustomProtosHandlerIn::SendCustomMessage { protocol, message } => {
 				debug_assert!(self.protocols.has_protocol(protocol),
 					"invalid protocol id requested in the API of the libp2p networking");
 				let proto = match self.substreams.iter_mut().find(|p| p.protocol_id() == protocol) {
@@ -285,7 +283,7 @@ where
 					},
 				};
 
-				proto.send_message(data);
+				proto.send_message(message);
 			},
 		}
 	}
@@ -342,10 +340,10 @@ where
 		for n in (0..self.substreams.len()).rev() {
 			let mut substream = self.substreams.swap_remove(n);
 			match substream.poll() {
-				Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(data)))) => {
+				Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => {
 					let event = CustomProtosHandlerOut::CustomMessage {
 						protocol_id: substream.protocol_id(),
-						data
+						message
 					};
 					self.substreams.push(substream);
 					return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event)))
@@ -389,7 +387,7 @@ where
 	}
 }
 
-impl<TSubstream> fmt::Debug for CustomProtosHandler<TSubstream>
+impl<TMessage, TSubstream> fmt::Debug for CustomProtosHandler<TMessage, TSubstream>
 where
 	TSubstream: AsyncRead + AsyncWrite,
 {
diff --git a/substrate/core/network-libp2p/src/custom_proto/mod.rs b/substrate/core/network-libp2p/src/custom_proto/mod.rs
index d140571132b..8a0b55d5cc4 100644
--- a/substrate/core/network-libp2p/src/custom_proto/mod.rs
+++ b/substrate/core/network-libp2p/src/custom_proto/mod.rs
@@ -15,7 +15,7 @@
 // along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
 
 pub use self::behaviour::{CustomProtos, CustomProtosOut};
-pub use self::upgrade::{RegisteredProtocol, RegisteredProtocols};
+pub use self::upgrade::{CustomMessage, RegisteredProtocol, RegisteredProtocols};
 
 mod behaviour;
 mod handler;
diff --git a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs b/substrate/core/network-libp2p/src/custom_proto/upgrade.rs
index a2dfcf724a2..3f31dc0a5a7 100644
--- a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs
+++ b/substrate/core/network-libp2p/src/custom_proto/upgrade.rs
@@ -18,7 +18,8 @@ use crate::ProtocolId;
 use bytes::Bytes;
 use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
 use libp2p::tokio_codec::Framed;
-use std::{collections::VecDeque, io, vec::IntoIter as VecIntoIter};
+use log::warn;
+use std::{collections::VecDeque, io, marker::PhantomData, vec::IntoIter as VecIntoIter};
 use futures::{prelude::*, future, stream};
 use tokio_io::{AsyncRead, AsyncWrite};
 use unsigned_varint::codec::UviBytes;
@@ -27,8 +28,7 @@ use unsigned_varint::codec::UviBytes;
 ///
 /// Note that "a single protocol" here refers to `par` for example. However
 /// each protocol can have multiple different versions for networking purposes.
-#[derive(Clone)]
-pub struct RegisteredProtocol {
+pub struct RegisteredProtocol<TMessage> {
 	/// Id of the protocol for API purposes.
 	id: ProtocolId,
 	/// Base name of the protocol as advertised on the network.
@@ -37,9 +37,11 @@ pub struct RegisteredProtocol {
 	/// List of protocol versions that we support.
 	/// Ordered in descending order so that the best comes first.
 	supported_versions: Vec<u8>,
+	/// Marker to pin the generic.
+	marker: PhantomData<TMessage>,
 }
 
-impl RegisteredProtocol {
+impl<TMessage> RegisteredProtocol<TMessage> {
 	/// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be
 	/// passed inside the `RegisteredProtocolOutput`.
 	pub fn new(protocol: ProtocolId, versions: &[u8])
@@ -56,6 +58,7 @@ impl RegisteredProtocol {
 				tmp.sort_unstable_by(|a, b| b.cmp(&a));
 				tmp
 			},
+			marker: PhantomData,
 		}
 	}
 
@@ -66,16 +69,27 @@ impl RegisteredProtocol {
 	}
 }
 
+impl<TMessage> Clone for RegisteredProtocol<TMessage> {
+	fn clone(&self) -> Self {
+		RegisteredProtocol {
+			id: self.id,
+			base_name: self.base_name.clone(),
+			supported_versions: self.supported_versions.clone(),
+			marker: PhantomData,
+		}
+	}
+}
+
 /// Output of a `RegisteredProtocol` upgrade.
-pub struct RegisteredProtocolSubstream<TSubstream> {
+pub struct RegisteredProtocolSubstream<TMessage, TSubstream> {
 	/// If true, we are in the process of closing the sink.
 	is_closing: bool,
 	/// Buffer of packets to send.
-	send_queue: VecDeque<Bytes>,
+	send_queue: VecDeque<Vec<u8>>,
 	/// If true, we should call `poll_complete` on the inner sink.
 	requires_poll_complete: bool,
 	/// The underlying substream.
-	inner: stream::Fuse<Framed<TSubstream, UviBytes<Bytes>>>,
+	inner: stream::Fuse<Framed<TSubstream, UviBytes<Vec<u8>>>>,
 	/// Id of the protocol.
 	protocol_id: ProtocolId,
 	/// Version of the protocol that was negotiated.
@@ -83,9 +97,11 @@ pub struct RegisteredProtocolSubstream<TSubstream> {
 	/// If true, we have sent a "remote is clogged" event recently and shouldn't send another one
 	/// unless the buffer empties then fills itself again.
 	clogged_fuse: bool,
+	/// Marker to pin the generic.
+	marker: PhantomData<TMessage>,
 }
 
-impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
+impl<TMessage, TSubstream> RegisteredProtocolSubstream<TMessage, TSubstream> {
 	/// Returns the protocol id.
 	#[inline]
 	pub fn protocol_id(&self) -> ProtocolId {
@@ -110,33 +126,53 @@ impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
 	}
 
 	/// Sends a message to the substream.
-	pub fn send_message(&mut self, data: Bytes) {
+	pub fn send_message(&mut self, data: TMessage)
+	where TMessage: CustomMessage {
 		if self.is_closing {
 			return
 		}
 
-		self.send_queue.push_back(data);
+		self.send_queue.push_back(data.into_bytes());
+	}
+}
+
+/// Implemented on messages that can be sent or received on the network.
+pub trait CustomMessage {
+	/// Turns a message into raw bytes.
+	fn into_bytes(self) -> Vec<u8>;
+	/// Tries to part `bytes` into a message.
+	fn from_bytes(bytes: &[u8]) -> Result<Self, ()>
+		where Self: Sized;
+}
+
+/// This trait implementation exists mostly for testing convenience.
+impl CustomMessage for Vec<u8> {
+	fn into_bytes(self) -> Vec<u8> {
+		self
+	}
+
+	fn from_bytes(bytes: &[u8]) -> Result<Self, ()> {
+		Ok(bytes.to_vec())
 	}
 }
 
 /// Event produced by the `RegisteredProtocolSubstream`.
 #[derive(Debug, Clone)]
-pub enum RegisteredProtocolEvent {
+pub enum RegisteredProtocolEvent<TMessage> {
 	/// Received a message from the remote.
-	Message(Bytes),
+	Message(TMessage),
 
 	/// Diagnostic event indicating that the connection is clogged and we should avoid sending too
 	/// many messages to it.
 	Clogged {
 		/// Copy of the messages that are within the buffer, for further diagnostic.
-		messages: Vec<Bytes>,
+		messages: Vec<TMessage>,
 	},
 }
 
-impl<TSubstream> Stream for RegisteredProtocolSubstream<TSubstream>
-where TSubstream: AsyncRead + AsyncWrite,
-{
-	type Item = RegisteredProtocolEvent;
+impl<TMessage, TSubstream> Stream for RegisteredProtocolSubstream<TMessage, TSubstream>
+where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
+	type Item = RegisteredProtocolEvent<TMessage>;
 	type Error = io::Error;
 
 	fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@@ -164,7 +200,10 @@ where TSubstream: AsyncRead + AsyncWrite,
 				//	thus never read any message from the network.
 				self.clogged_fuse = true;
 				return Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged {
-					messages: self.send_queue.iter().cloned().collect(),
+					messages: self.send_queue.iter()
+						.map(|m| CustomMessage::from_bytes(&m))
+						.filter_map(Result::ok)
+						.collect(),
 				})))
 			}
 		} else {
@@ -181,8 +220,14 @@ where TSubstream: AsyncRead + AsyncWrite,
 		// Receiving incoming packets.
 		// Note that `inner` is wrapped in a `Fuse`, therefore we can poll it forever.
 		match self.inner.poll()? {
-			Async::Ready(Some(data)) =>
-				Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(data.freeze())))),
+			Async::Ready(Some(data)) => {
+				let message = <TMessage as CustomMessage>::from_bytes(&data)
+					.map_err(|()| {
+						warn!(target: "sub-libp2p", "Couldn't decode packet sent by the remote: {:?}", data);
+						io::ErrorKind::InvalidData
+					})?;
+				Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message))))
+			},
 			Async::Ready(None) =>
 				if !self.requires_poll_complete && self.send_queue.is_empty() {
 					Ok(Async::Ready(None))
@@ -194,7 +239,7 @@ where TSubstream: AsyncRead + AsyncWrite,
 	}
 }
 
-impl UpgradeInfo for RegisteredProtocol {
+impl<TMessage> UpgradeInfo for RegisteredProtocol<TMessage> {
 	type Info = RegisteredProtocolName;
 	type InfoIter = VecIntoIter<Self::Info>;
 
@@ -228,10 +273,10 @@ impl ProtocolName for RegisteredProtocolName {
 	}
 }
 
-impl<TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocol
+impl<TMessage, TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocol<TMessage>
 where TSubstream: AsyncRead + AsyncWrite,
 {
-	type Output = RegisteredProtocolSubstream<TSubstream>;
+	type Output = RegisteredProtocolSubstream<TMessage, TSubstream>;
 	type Future = future::FutureResult<Self::Output, io::Error>;
 	type Error = io::Error;
 
@@ -250,11 +295,12 @@ where TSubstream: AsyncRead + AsyncWrite,
 			protocol_id: self.id,
 			protocol_version: info.version,
 			clogged_fuse: false,
+			marker: PhantomData,
 		})
 	}
 }
 
-impl<TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocol
+impl<TMessage, TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocol<TMessage>
 where TSubstream: AsyncRead + AsyncWrite,
 {
 	type Output = <Self as InboundUpgrade<TSubstream>>::Output;
@@ -272,10 +318,9 @@ where TSubstream: AsyncRead + AsyncWrite,
 }
 
 // Connection upgrade for all the protocols contained in it.
-#[derive(Clone)]
-pub struct RegisteredProtocols(pub Vec<RegisteredProtocol>);
+pub struct RegisteredProtocols<TMessage>(pub Vec<RegisteredProtocol<TMessage>>);
 
-impl RegisteredProtocols {
+impl<TMessage> RegisteredProtocols<TMessage> {
 	/// Returns the number of protocols.
 	#[inline]
 	pub fn len(&self) -> usize {
@@ -288,13 +333,13 @@ impl RegisteredProtocols {
 	}
 }
 
-impl Default for RegisteredProtocols {
+impl<TMessage> Default for RegisteredProtocols<TMessage> {
 	fn default() -> Self {
 		RegisteredProtocols(Vec::new())
 	}
 }
 
-impl UpgradeInfo for RegisteredProtocols {
+impl<TMessage> UpgradeInfo for RegisteredProtocols<TMessage> {
 	type Info = RegisteredProtocolsName;
 	type InfoIter = VecIntoIter<Self::Info>;
 
@@ -314,6 +359,12 @@ impl UpgradeInfo for RegisteredProtocols {
 	}
 }
 
+impl<TMessage> Clone for RegisteredProtocols<TMessage> {
+	fn clone(&self) -> Self {
+		RegisteredProtocols(self.0.clone())
+	}
+}
+
 /// Implementation of `ProtocolName` for several custom protocols.
 #[derive(Debug, Clone)]
 pub struct RegisteredProtocolsName {
@@ -329,11 +380,11 @@ impl ProtocolName for RegisteredProtocolsName {
 	}
 }
 
-impl<TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocols
+impl<TMessage, TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocols<TMessage>
 where TSubstream: AsyncRead + AsyncWrite,
 {
-	type Output = <RegisteredProtocol as InboundUpgrade<TSubstream>>::Output;
-	type Future = <RegisteredProtocol as InboundUpgrade<TSubstream>>::Future;
+	type Output = <RegisteredProtocol<TMessage> as InboundUpgrade<TSubstream>>::Output;
+	type Future = <RegisteredProtocol<TMessage> as InboundUpgrade<TSubstream>>::Future;
 	type Error = io::Error;
 
 	#[inline]
@@ -349,7 +400,7 @@ where TSubstream: AsyncRead + AsyncWrite,
 	}
 }
 
-impl<TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocols
+impl<TMessage, TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocols<TMessage>
 where TSubstream: AsyncRead + AsyncWrite,
 {
 	type Output = <Self as InboundUpgrade<TSubstream>>::Output;
diff --git a/substrate/core/network-libp2p/src/lib.rs b/substrate/core/network-libp2p/src/lib.rs
index 448d39e9c6b..994fc3cbeb5 100644
--- a/substrate/core/network-libp2p/src/lib.rs
+++ b/substrate/core/network-libp2p/src/lib.rs
@@ -24,7 +24,7 @@ mod service_task;
 mod traits;
 mod transport;
 
-pub use crate::custom_proto::RegisteredProtocol;
+pub use crate::custom_proto::{CustomMessage, RegisteredProtocol};
 pub use crate::error::{Error, ErrorKind, DisconnectReason};
 pub use crate::secret::obtain_private_key;
 pub use crate::service_task::{start_service, Service, ServiceEvent};
diff --git a/substrate/core/network-libp2p/src/service_task.rs b/substrate/core/network-libp2p/src/service_task.rs
index 68959942d07..f6d5769b478 100644
--- a/substrate/core/network-libp2p/src/service_task.rs
+++ b/substrate/core/network-libp2p/src/service_task.rs
@@ -18,9 +18,8 @@ use crate::{
 	behaviour::Behaviour, behaviour::BehaviourOut, secret::obtain_private_key_from_config,
 	transport
 };
-use crate::custom_proto::{RegisteredProtocol, RegisteredProtocols};
+use crate::custom_proto::{CustomMessage, RegisteredProtocol, RegisteredProtocols};
 use crate::{Error, NetworkConfiguration, NodeIndex, ProtocolId, parse_str_addr};
-use bytes::Bytes;
 use fnv::FnvHashMap;
 use futures::{prelude::*, Stream};
 use libp2p::{multiaddr::Protocol, Multiaddr, PeerId, multiaddr};
@@ -39,11 +38,12 @@ use tokio_timer::Interval;
 /// Starts the substrate libp2p service.
 ///
 /// Returns a stream that must be polled regularly in order for the networking to function.
-pub fn start_service<TProtos>(
+pub fn start_service<TProtos, TMessage>(
 	config: NetworkConfiguration,
 	registered_custom: TProtos,
-) -> Result<Service, Error>
-where TProtos: IntoIterator<Item = RegisteredProtocol> {
+) -> Result<Service<TMessage>, Error>
+where TProtos: IntoIterator<Item = RegisteredProtocol<TMessage>>,
+	TMessage: CustomMessage + Send + 'static {
 
 	if let Some(ref path) = config.net_config_path {
 		fs::create_dir_all(Path::new(path))?;
@@ -131,7 +131,7 @@ where TProtos: IntoIterator<Item = RegisteredProtocol> {
 
 /// Event produced by the service.
 #[derive(Debug)]
-pub enum ServiceEvent {
+pub enum ServiceEvent<TMessage> {
 	/// A custom protocol substream has been opened with a node.
 	OpenedCustomProtocol {
 		/// Index of the node.
@@ -172,8 +172,8 @@ pub enum ServiceEvent {
 		node_index: NodeIndex,
 		/// Protocol which generated the message.
 		protocol_id: ProtocolId,
-		/// Data that has been received.
-		data: Bytes,
+		/// Message that has been received.
+		message: TMessage,
 	},
 
 	/// The substream with a node is clogged. We should avoid sending data to it if possible.
@@ -183,14 +183,14 @@ pub enum ServiceEvent {
 		/// Protocol which generated the message.
 		protocol_id: ProtocolId,
 		/// Copy of the messages that are within the buffer, for further diagnostic.
-		messages: Vec<Bytes>,
+		messages: Vec<TMessage>,
 	},
 }
 
 /// Network service. Must be polled regularly in order for the networking to work.
-pub struct Service {
+pub struct Service<TMessage> where TMessage: CustomMessage {
 	/// Stream of events of the swarm.
-	swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), IoError>, Behaviour<Substream<StreamMuxerBox>>>,
+	swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), IoError>, Behaviour<TMessage, Substream<StreamMuxerBox>>>,
 
 	/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
 	bandwidth: Arc<transport::BandwidthSinks>,
@@ -209,7 +209,7 @@ pub struct Service {
 	cleanup: Interval,
 
 	/// Events to produce on the Stream.
-	injected_events: Vec<ServiceEvent>,
+	injected_events: Vec<ServiceEvent<TMessage>>,
 }
 
 /// Information about a node we're connected to.
@@ -223,7 +223,8 @@ struct NodeInfo {
 	client_version: Option<String>,
 }
 
-impl Service {
+impl<TMessage> Service<TMessage>
+where TMessage: CustomMessage + Send + 'static {
 	/// Returns an iterator that produces the list of addresses we're listening on.
 	#[inline]
 	pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
@@ -305,10 +306,10 @@ impl Service {
 		&mut self,
 		node_index: NodeIndex,
 		protocol: ProtocolId,
-		data: Vec<u8>
+		message: TMessage
 	) {
 		if let Some(peer_id) = self.nodes_info.get(&node_index).map(|info| &info.peer_id) {
-			self.swarm.send_custom_message(peer_id, protocol, data);
+			self.swarm.send_custom_message(peer_id, protocol, message);
 		} else {
 			warn!(target: "sub-libp2p", "Tried to send message to unknown node: {:}", node_index);
 		}
@@ -375,7 +376,7 @@ impl Service {
 	}
 
 	/// Polls for what happened on the network.
-	fn poll_swarm(&mut self) -> Poll<Option<ServiceEvent>, IoError> {
+	fn poll_swarm(&mut self) -> Poll<Option<ServiceEvent<TMessage>>, IoError> {
 		loop {
 			match self.swarm.poll() {
 				Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { protocol_id, peer_id, version, endpoint }))) => {
@@ -397,12 +398,12 @@ impl Service {
 						debug_info: self.peer_debug_info(node_index),
 					})))
 				}
-				Ok(Async::Ready(Some(BehaviourOut::CustomMessage { protocol_id, peer_id, data }))) => {
+				Ok(Async::Ready(Some(BehaviourOut::CustomMessage { protocol_id, peer_id, message }))) => {
 					let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour");
 					break Ok(Async::Ready(Some(ServiceEvent::CustomMessage {
 						node_index,
 						protocol_id,
-						data,
+						message,
 					})))
 				}
 				Ok(Async::Ready(Some(BehaviourOut::Clogged { protocol_id, peer_id, messages }))) => {
@@ -431,7 +432,7 @@ impl Service {
 	}
 
 	/// Polls the stream that fires when we need to cleanup and flush the topology.
-	fn poll_cleanup(&mut self) -> Poll<Option<ServiceEvent>, IoError> {
+	fn poll_cleanup(&mut self) -> Poll<Option<ServiceEvent<TMessage>>, IoError> {
 		loop {
 			match self.cleanup.poll() {
 				Ok(Async::NotReady) => return Ok(Async::NotReady),
@@ -457,7 +458,7 @@ impl Service {
 	}
 }
 
-impl Drop for Service {
+impl<TMessage> Drop for Service<TMessage> where TMessage: CustomMessage {
 	fn drop(&mut self) {
 		if let Err(err) = self.swarm.flush_topology() {
 			warn!(target: "sub-libp2p", "Failed to flush topology: {:?}", err);
@@ -465,8 +466,8 @@ impl Drop for Service {
 	}
 }
 
-impl Stream for Service {
-	type Item = ServiceEvent;
+impl<TMessage> Stream for Service<TMessage> where TMessage: CustomMessage + Send + 'static {
+	type Item = ServiceEvent<TMessage>;
 	type Error = IoError;
 
 	fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
diff --git a/substrate/core/network-libp2p/tests/test.rs b/substrate/core/network-libp2p/tests/test.rs
index a1876da1545..6dc82fcafe0 100644
--- a/substrate/core/network-libp2p/tests/test.rs
+++ b/substrate/core/network-libp2p/tests/test.rs
@@ -16,12 +16,14 @@
 
 use futures::{future, stream, prelude::*, try_ready};
 use std::{io, iter};
-use substrate_network_libp2p::{ServiceEvent, multiaddr};
+use substrate_network_libp2p::{CustomMessage, ServiceEvent, multiaddr};
 
 /// Builds two services. The second one and further have the first one as its bootstrap node.
 /// This is to be used only for testing, and a panic will happen if something goes wrong.
-fn build_nodes(num: usize) -> Vec<substrate_network_libp2p::Service> {
-	let mut result: Vec<substrate_network_libp2p::Service> = Vec::with_capacity(num);
+fn build_nodes<TMsg>(num: usize) -> Vec<substrate_network_libp2p::Service<TMsg>>
+	where TMsg: CustomMessage + Send + 'static
+{
+	let mut result: Vec<substrate_network_libp2p::Service<_>> = Vec::with_capacity(num);
 
 	for _ in 0 .. num {
 		let mut boot_nodes = Vec::new();
@@ -47,7 +49,7 @@ fn build_nodes(num: usize) -> Vec<substrate_network_libp2p::Service> {
 #[test]
 fn basic_two_nodes_connectivity() {
 	let (mut service1, mut service2) = {
-		let mut l = build_nodes(2).into_iter();
+		let mut l = build_nodes::<Vec<u8>>(2).into_iter();
 		let a = l.next().unwrap();
 		let b = l.next().unwrap();
 		(a, b)
@@ -86,7 +88,7 @@ fn two_nodes_transfer_lots_of_packets() {
 	const NUM_PACKETS: u32 = 20000;
 
 	let (mut service1, mut service2) = {
-		let mut l = build_nodes(2).into_iter();
+		let mut l = build_nodes::<Vec<u8>>(2).into_iter();
 		let a = l.next().unwrap();
 		let b = l.next().unwrap();
 		(a, b)
@@ -110,9 +112,9 @@ fn two_nodes_transfer_lots_of_packets() {
 		loop {
 			match try_ready!(service2.poll()) {
 				Some(ServiceEvent::OpenedCustomProtocol { .. }) => {},
-				Some(ServiceEvent::CustomMessage { data, .. }) => {
-					assert_eq!(data.len(), 1);
-					assert_eq!(u32::from(data[0]), packet_counter % 256);
+				Some(ServiceEvent::CustomMessage { message, .. }) => {
+					assert_eq!(message.len(), 1);
+					assert_eq!(u32::from(message[0]), packet_counter % 256);
 					packet_counter += 1;
 					if packet_counter == NUM_PACKETS {
 						return Ok(Async::Ready(()))
@@ -135,7 +137,7 @@ fn many_nodes_connectivity() {
 	// increased in the `NetworkConfiguration`.
 	const NUM_NODES: usize = 25;
 
-	let mut futures = build_nodes(NUM_NODES)
+	let mut futures = build_nodes::<Vec<u8>>(NUM_NODES)
 		.into_iter()
 		.map(move |mut node| {
 			let mut num_connecs = 0;
diff --git a/substrate/core/network/src/message.rs b/substrate/core/network/src/message.rs
index e98ca9cdff1..0aff9166b64 100644
--- a/substrate/core/network/src/message.rs
+++ b/substrate/core/network/src/message.rs
@@ -125,6 +125,8 @@ pub struct RemoteReadResponse {
 
 /// Generic types.
 pub mod generic {
+	use parity_codec::{Encode, Decode};
+	use network_libp2p::CustomMessage;
 	use runtime_primitives::Justification;
 	use parity_codec_derive::{Encode, Decode};
 	use crate::config::Roles;
@@ -197,6 +199,18 @@ pub mod generic {
 		ChainSpecific(Vec<u8>),
 	}
 
+	impl<Header, Hash, Number, Extrinsic> CustomMessage for Message<Header, Hash, Number, Extrinsic>
+		where Self: Decode + Encode
+	{
+		fn into_bytes(self) -> Vec<u8> {
+			self.encode()
+		}
+
+		fn from_bytes(bytes: &[u8]) -> Result<Self, ()> {
+			Decode::decode(&mut &bytes[..]).ok_or(())
+		}
+	}
+
 	/// Status sent on connection.
 	#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
 	pub struct Status<Hash, Number> {
diff --git a/substrate/core/network/src/on_demand.rs b/substrate/core/network/src/on_demand.rs
index 60b3933d17e..62868693598 100644
--- a/substrate/core/network/src/on_demand.rs
+++ b/substrate/core/network/src/on_demand.rs
@@ -16,7 +16,6 @@
 
 //! On-demand requests service.
 
-use parity_codec::Encode;
 use std::collections::{HashMap, VecDeque};
 use std::sync::Arc;
 use std::time::{Instant, Duration};
@@ -79,7 +78,7 @@ pub trait OnDemandService<Block: BlockT>: Send + Sync {
 pub struct OnDemand<B: BlockT> {
 	core: Mutex<OnDemandCore<B>>,
 	checker: Arc<FetchChecker<B>>,
-	network_sender: Mutex<Option<NetworkChan>>,
+	network_sender: Mutex<Option<NetworkChan<B>>>,
 }
 
 /// On-demand remote call response.
@@ -150,11 +149,11 @@ impl<B: BlockT> OnDemand<B> where
 	}
 
 	/// Sets weak reference to network service.
-	pub fn set_network_sender(&self, network_sender: NetworkChan) {
+	pub fn set_network_sender(&self, network_sender: NetworkChan<B>) {
 		self.network_sender.lock().replace(network_sender);
 	}
 
-	fn send(&self, msg: NetworkMsg) {
+	fn send(&self, msg: NetworkMsg<B>) {
 		let _ = self.network_sender
 			.lock()
 			.as_ref()
@@ -459,7 +458,7 @@ impl<B> OnDemandCore<B> where
 			let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed");
 			request.timestamp = Instant::now();
 			trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer);
-			on_demand.send(NetworkMsg::Outgoing(peer, request.message().encode()));
+			on_demand.send(NetworkMsg::Outgoing(peer, request.message()));
 			self.active_peers.insert(peer, request);
 		}
 
@@ -604,7 +603,7 @@ pub mod tests {
 		}
 	}
 
-	fn assert_disconnected_peer(network_port: NetworkPort, expected_severity: Severity) {
+	fn assert_disconnected_peer(network_port: NetworkPort<Block>, expected_severity: Severity) {
 		let mut disconnect_count = 0;
 		while let Ok(msg) = network_port.receiver().try_recv() {
 			match msg {
diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs
index b6072d1e66f..1035f35d256 100644
--- a/substrate/core/network/src/protocol.rs
+++ b/substrate/core/network/src/protocol.rs
@@ -14,7 +14,6 @@
 // You should have received a copy of the GNU General Public License
 // along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
 
-use parity_codec::Encode;
 use crossbeam_channel::{self as channel, Receiver, Sender, select};
 use network_libp2p::{NodeIndex, Severity};
 use primitives::storage::StorageKey;
@@ -56,7 +55,7 @@ const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192;
 
 // Lock must always be taken in order declared here.
 pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
-	network_chan: NetworkChan,
+	network_chan: NetworkChan<B>,
 	port: Receiver<ProtocolMsg<B, S>>,
 	config: ProtocolConfig,
 	on_demand: Option<Arc<OnDemandService<B>>>,
@@ -133,14 +132,14 @@ pub trait Context<B: BlockT> {
 
 /// Protocol context.
 pub(crate) struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> {
-	network_chan: &'a NetworkChan,
+	network_chan: &'a NetworkChan<B>,
 	context_data: &'a mut ContextData<B, H>,
 }
 
 impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> {
 	pub(crate) fn new(
 		context_data: &'a mut ContextData<B, H>,
-		network_chan: &'a NetworkChan,
+		network_chan: &'a NetworkChan<B>,
 	) -> Self {
 		ProtocolContext {
 			network_chan,
@@ -276,7 +275,7 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>,> {
 impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 	/// Create a new instance.
 	pub fn new<I: 'static + ImportQueue<B>>(
-		network_chan: NetworkChan,
+		network_chan: NetworkChan<B>,
 		config: ProtocolConfig,
 		chain: Arc<Client<B>>,
 		import_queue: Arc<I>,
@@ -1106,7 +1105,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 
 fn send_message<B: BlockT, H: ExHashT>(
 	peers: &mut HashMap<NodeIndex, Peer<B, H>>,
-	network_chan: &NetworkChan,
+	network_chan: &NetworkChan<B>,
 	who: NodeIndex,
 	mut message: Message<B>,
 ) {
@@ -1124,7 +1123,7 @@ fn send_message<B: BlockT, H: ExHashT>(
 		}
 		_ => (),
 	}
-	network_chan.send(NetworkMsg::Outgoing(who, message.encode()));
+	network_chan.send(NetworkMsg::Outgoing(who, message));
 }
 
 /// Construct a simple protocol that is composed of several sub protocols.
diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs
index 72d5bf83c64..ed26dd68d4d 100644
--- a/substrate/core/network/src/service.rs
+++ b/substrate/core/network/src/service.rs
@@ -25,8 +25,8 @@ use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, S
 use network_libp2p::{Protocol as Libp2pProtocol, RegisteredProtocol};
 use consensus::import_queue::{ImportQueue, Link};
 use crate::consensus_gossip::ConsensusGossip;
+use crate::message::Message;
 use crate::protocol::{self, Context, Protocol, ProtocolMsg, ProtocolStatus, PeerInfo};
-use parity_codec::Decode;
 use crate::config::Params;
 use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError};
 use crate::error::Error;
@@ -72,7 +72,7 @@ pub struct NetworkLink<B: BlockT, S: NetworkSpecialization<B>> {
 	/// The protocol sender
 	pub(crate) protocol_sender: Sender<ProtocolMsg<B, S>>,
 	/// The network sender
-	pub(crate) network_sender: NetworkChan,
+	pub(crate) network_sender: NetworkChan<B>,
 }
 
 impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
@@ -108,7 +108,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
 /// Substrate network service. Handles network IO and manages connectivity.
 pub struct Service<B: BlockT + 'static, S: NetworkSpecialization<B>> {
 	/// Network service
-	network: Arc<Mutex<NetworkService>>,
+	network: Arc<Mutex<NetworkService<Message<B>>>>,
 	/// Protocol sender
 	protocol_sender: Sender<ProtocolMsg<B, S>>,
 	/// Sender for messages to the background service task, and handle for the background thread.
@@ -123,7 +123,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
 		params: Params<B, S, H>,
 		protocol_id: ProtocolId,
 		import_queue: Arc<I>,
-	) -> Result<(Arc<Service<B, S>>, NetworkChan), Error> {
+	) -> Result<(Arc<Service<B, S>>, NetworkChan<B>), Error> {
 		let (network_chan, network_port) = network_channel(protocol_id);
 		let protocol_sender = Protocol::new(
 			network_chan.clone(),
@@ -139,7 +139,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
 		let (thread, network) = start_thread(
 			protocol_sender.clone(),
 			network_port,
-			network_chan.clone(),
 			params.network_config,
 			registered,
 		)?;
@@ -332,7 +331,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ManageNetwork for Service
 
 
 /// Create a NetworkPort/Chan pair.
-pub fn network_channel(protocol_id: ProtocolId) -> (NetworkChan, NetworkPort) {
+pub fn network_channel<B: BlockT + 'static>(protocol_id: ProtocolId) -> (NetworkChan<B>, NetworkPort<B>) {
 	let (network_sender, network_receiver) = channel::unbounded();
 	let task_notify = Arc::new(AtomicTask::new());
 	let network_port = NetworkPort::new(network_receiver, protocol_id, task_notify.clone());
@@ -343,14 +342,14 @@ pub fn network_channel(protocol_id: ProtocolId) -> (NetworkChan, NetworkPort) {
 
 /// A sender of NetworkMsg that notifies a task when a message has been sent.
 #[derive(Clone)]
-pub struct NetworkChan {
-	sender: Sender<NetworkMsg>,
+pub struct NetworkChan<B: BlockT + 'static> {
+	sender: Sender<NetworkMsg<B>>,
 	task_notify: Arc<AtomicTask>,
 }
 
-impl NetworkChan {
+impl<B: BlockT + 'static> NetworkChan<B> {
 	/// Create a new network chan.
-	pub fn new(sender: Sender<NetworkMsg>, task_notify: Arc<AtomicTask>) -> Self {
+	pub fn new(sender: Sender<NetworkMsg<B>>, task_notify: Arc<AtomicTask>) -> Self {
 		 NetworkChan {
 			sender,
 			task_notify,
@@ -358,13 +357,13 @@ impl NetworkChan {
 	}
 
 	/// Send a messaging, to be handled on a stream. Notify the task handling the stream.
-	pub fn send(&self, msg: NetworkMsg) {
+	pub fn send(&self, msg: NetworkMsg<B>) {
 		let _ = self.sender.send(msg);
 		self.task_notify.notify();
 	}
 }
 
-impl Drop for NetworkChan {
+impl<B: BlockT + 'static> Drop for NetworkChan<B> {
 	/// Notifying the task when a sender is dropped(when all are dropped, the stream is finished).
 	fn drop(&mut self) {
 		self.task_notify.notify();
@@ -373,15 +372,15 @@ impl Drop for NetworkChan {
 
 
 /// A receiver of NetworkMsg that makes the protocol-id available with each message.
-pub struct NetworkPort {
-	receiver: Receiver<NetworkMsg>,
+pub struct NetworkPort<B: BlockT + 'static> {
+	receiver: Receiver<NetworkMsg<B>>,
 	protocol_id: ProtocolId,
 	task_notify: Arc<AtomicTask>,
 }
 
-impl NetworkPort {
+impl<B: BlockT + 'static> NetworkPort<B> {
 	/// Create a new network port for a given protocol-id.
-	pub fn new(receiver: Receiver<NetworkMsg>, protocol_id: ProtocolId, task_notify: Arc<AtomicTask>) -> Self {
+	pub fn new(receiver: Receiver<NetworkMsg<B>>, protocol_id: ProtocolId, task_notify: Arc<AtomicTask>) -> Self {
 		Self {
 			receiver,
 			protocol_id,
@@ -391,7 +390,7 @@ impl NetworkPort {
 
 	/// Receive a message, if any is currently-enqueued.
 	/// Register the current tokio task for notification when a new message is available.
-	pub fn take_one_message(&self) -> Result<Option<(ProtocolId, NetworkMsg)>, ()> {
+	pub fn take_one_message(&self) -> Result<Option<(ProtocolId, NetworkMsg<B>)>, ()> {
 		self.task_notify.register();
 		match self.receiver.try_recv() {
 			Ok(msg) => Ok(Some((self.protocol_id.clone(), msg))),
@@ -402,18 +401,18 @@ impl NetworkPort {
 
 	/// Get a reference to the underlying crossbeam receiver.
 	#[cfg(any(test, feature = "test-helpers"))]
-	pub fn receiver(&self) -> &Receiver<NetworkMsg> {
+	pub fn receiver(&self) -> &Receiver<NetworkMsg<B>> {
 		&self.receiver
 	}
 }
 
 /// Messages to be handled by NetworkService.
 #[derive(Debug)]
-pub enum NetworkMsg {
+pub enum NetworkMsg<B: BlockT + 'static> {
 	/// Ask network to convert a list of nodes, to a list of peers.
 	PeerIds(Vec<NodeIndex>, Sender<Vec<(NodeIndex, Option<PeerId>)>>),
 	/// Send an outgoing custom message.
-	Outgoing(NodeIndex, Vec<u8>),
+	Outgoing(NodeIndex, Message<B>),
 	/// Report a peer.
 	ReportPeer(NodeIndex, Severity),
 	/// Get a peer id.
@@ -423,11 +422,10 @@ pub enum NetworkMsg {
 /// Starts the background thread that handles the networking.
 fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>(
 	protocol_sender: Sender<ProtocolMsg<B, S>>,
-	network_port: NetworkPort,
-	network_sender: NetworkChan,
+	network_port: NetworkPort<B>,
 	config: NetworkConfiguration,
-	registered: RegisteredProtocol,
-) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc<Mutex<NetworkService>>), Error> {
+	registered: RegisteredProtocol<Message<B>>,
+) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc<Mutex<NetworkService<Message<B>>>>), Error> {
 	let protocol_id = registered.id();
 
 	// Start the main service.
@@ -447,7 +445,7 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>(
 	let service_clone = service.clone();
 	let mut runtime = Runtime::new()?;
 	let thread = thread::Builder::new().name("network".to_string()).spawn(move || {
-		let fut = run_thread(protocol_sender, service_clone, network_sender, network_port, protocol_id)
+		let fut = run_thread(protocol_sender, service_clone, network_port, protocol_id)
 			.select(close_rx.then(|_| Ok(())))
 			.map(|(val, _)| val)
 			.map_err(|(err,_ )| err);
@@ -466,9 +464,8 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>(
 /// Runs the background thread that handles the networking.
 fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>(
 	protocol_sender: Sender<ProtocolMsg<B, S>>,
-	network_service: Arc<Mutex<NetworkService>>,
-	network_sender: NetworkChan,
-	network_port: NetworkPort,
+	network_service: Arc<Mutex<NetworkService<Message<B>>>>,
+	network_port: NetworkPort<B>,
 	protocol_id: ProtocolId,
 ) -> impl Future<Item = (), Error = io::Error> {
 
@@ -538,28 +535,15 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>(
 			NetworkServiceEvent::ClosedCustomProtocol { node_index, debug_info, .. } => {
 				let _ = protocol_sender.send(ProtocolMsg::PeerDisconnected(node_index, debug_info));
 			}
-			NetworkServiceEvent::CustomMessage { node_index, data, .. } => {
-				if let Some(m) = Decode::decode(&mut (&data as &[u8])) {
-					let _ = protocol_sender.send(ProtocolMsg::CustomMessage(node_index, m));
-					return Ok(())
-				}
-				let _ = network_sender.send(
-					NetworkMsg::ReportPeer(
-						node_index,
-						Severity::Bad("Peer sent us a packet with invalid format".to_string())
-					)
-				);
+			NetworkServiceEvent::CustomMessage { node_index, message, .. } => {
+				let _ = protocol_sender.send(ProtocolMsg::CustomMessage(node_index, message));
+				return Ok(())
 			}
 			NetworkServiceEvent::Clogged { node_index, messages, .. } => {
 				debug!(target: "sync", "{} clogging messages:", messages.len());
-				for msg_bytes in messages.iter().take(5) {
-					if let Some(msg) = Decode::decode(&mut (&msg_bytes as &[u8])) {
-						debug!(target: "sync", "{:?}", msg);
-						let _ = protocol_sender.send(ProtocolMsg::PeerClogged(node_index, Some(msg)));
-					} else {
-						debug!(target: "sync", "{:?}", msg_bytes);
-						let _ = protocol_sender.send(ProtocolMsg::PeerClogged(node_index, None));
-					}
+				for msg in messages.into_iter().take(5) {
+					debug!(target: "sync", "{:?}", msg);
+					let _ = protocol_sender.send(ProtocolMsg::PeerClogged(node_index, Some(msg)));
 				}
 			}
 		};
diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs
index 12c4a389039..0c35dcfee34 100644
--- a/substrate/core/network/src/test/mod.rs
+++ b/substrate/core/network/src/test/mod.rs
@@ -28,7 +28,6 @@ use std::time::Duration;
 use log::trace;
 use client;
 use client::block_builder::BlockBuilder;
-use parity_codec::{Decode, Encode};
 use crate::config::ProtocolConfig;
 use consensus::import_queue::{import_many_blocks, ImportQueue, ImportQueueStatus, IncomingBlock};
 use consensus::import_queue::{Link, SharedBlockImport, SharedJustificationImport, Verifier};
@@ -39,7 +38,9 @@ use crossbeam_channel::{self as channel, Sender, select};
 use futures::Future;
 use futures::sync::{mpsc, oneshot};
 use keyring::Keyring;
-use network_libp2p::{NodeIndex, ProtocolId, Severity};
+use crate::message::Message;
+use network_libp2p::{NodeIndex, ProtocolId};
+use parity_codec::Encode;
 use parking_lot::Mutex;
 use primitives::{H256, Ed25519AuthorityId};
 use crate::protocol::{Context, Protocol, ProtocolMsg, ProtocolStatus};
@@ -236,9 +237,9 @@ pub type PeersClient = client::Client<test_client::Backend, test_client::Executo
 pub struct Peer<V: 'static + Verifier<Block>, D> {
 	client: Arc<PeersClient>,
 	pub protocol_sender: Sender<ProtocolMsg<Block, DummySpecialization>>,
-	network_port: Mutex<NetworkPort>,
+	network_port: Mutex<NetworkPort<Block>>,
 	import_queue: Arc<SyncImportQueue<Block, V>>,
-	network_sender: NetworkChan,
+	network_sender: NetworkChan<Block>,
 	pub data: D,
 }
 
@@ -247,8 +248,8 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
 		client: Arc<PeersClient>,
 		import_queue: Arc<SyncImportQueue<Block, V>>,
 		protocol_sender: Sender<ProtocolMsg<Block, DummySpecialization>>,
-		network_sender: NetworkChan,
-		network_port: NetworkPort,
+		network_sender: NetworkChan<Block>,
+		network_port: NetworkPort<Block>,
 		data: D,
 	) -> Self {
 		let network_port = Mutex::new(network_port);
@@ -304,24 +305,14 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
 	}
 
 	/// Receive a message from another peer. Return a set of peers to disconnect.
-	fn receive_message(&self, from: NodeIndex, msg: Vec<u8>) {
-		match Decode::decode(&mut (&msg as &[u8])) {
-			Some(m) => {
-				let _ = self
-					.protocol_sender
-					.send(ProtocolMsg::CustomMessage(from, m));
-			}
-			None => {
-				let _ = self.network_sender.send(NetworkMsg::ReportPeer(
-					from,
-					Severity::Bad("Peer sent us a packet with invalid format".to_string()),
-				));
-			}
-		}
+	fn receive_message(&self, from: NodeIndex, msg: Message<Block>) {
+		let _ = self
+			.protocol_sender
+			.send(ProtocolMsg::CustomMessage(from, msg));
 	}
 
 	/// Produce the next pending message to send to another peer.
-	fn pending_message(&self) -> Option<NetworkMsg> {
+	fn pending_message(&self) -> Option<NetworkMsg<Block>> {
 		select! {
 			recv(self.network_port.lock().receiver()) -> msg => return msg.ok(),
 			// If there are no messages ready, give protocol a change to send one.
@@ -330,7 +321,7 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
 	}
 
 	/// Produce the next pending message to send to another peer, without waiting.
-	fn pending_message_fast(&self) -> Option<NetworkMsg> {
+	fn pending_message_fast(&self) -> Option<NetworkMsg<Block>> {
 		self.network_port.lock().receiver().try_recv().ok()
 	}
 
-- 
GitLab