From 462eaa3f411307e89bc182a034cad9ba5e76e338 Mon Sep 17 00:00:00 2001
From: Pierre Krieger <pierre.krieger1708@gmail.com>
Date: Mon, 30 Mar 2020 10:00:34 +0200
Subject: [PATCH] =?UTF-8?q?Make=20transactions=20and=20block=20announces?=
 =?UTF-8?q?=20use=20notifications=20substre=E2=80=A6=20(#5360)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* Make transactions and block announces use notifications

* Add documentation
---
 substrate/client/network/src/lib.rs           |   8 +
 substrate/client/network/src/protocol.rs      | 151 ++++++++++++++----
 .../src/protocol/generic_proto/behaviour.rs   |  54 +++----
 .../protocol/generic_proto/handler/group.rs   |  95 +++++------
 .../src/protocol/generic_proto/tests.rs       |   4 +-
 5 files changed, 197 insertions(+), 115 deletions(-)

diff --git a/substrate/client/network/src/lib.rs b/substrate/client/network/src/lib.rs
index a5107a02559..23233ee9048 100644
--- a/substrate/client/network/src/lib.rs
+++ b/substrate/client/network/src/lib.rs
@@ -129,6 +129,14 @@
 //! light-client-related requests for information about the state. Each request is the encoding of
 //! a `light::Request` and each response is the encoding of a `light::Response`, as defined in the
 //! `light.v1.proto` file in this source tree.
+//! - **`/<protocol-id>/transactions/1`** is a notifications protocol (see below) where
+//! transactions are pushed to other nodes. The handshake is empty on both sides. The message
+//! format is a SCALE-encoded list of transactions, where each transaction is an opaque list of
+//! bytes.
+//! - **`/<protocol-id>/block-announces/1`** is a notifications protocol (see below) where
+//! block announces are pushed to other nodes. The handshake is empty on both sides. The message
+//! format is a SCALE-encoded tuple containing a block header followed with an opaque list of
+//! bytes containing some data associated with this block announcement, e.g. a candidate message.
 //! - Notifications protocols that are registered using the `register_notifications_protocol`
 //! method. For example: `/paritytech/grandpa/1`. See below for more information.
 //!
diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs
index 7cf568065e8..e21a2df9c80 100644
--- a/substrate/client/network/src/protocol.rs
+++ b/substrate/client/network/src/protocol.rs
@@ -39,7 +39,7 @@ use sp_runtime::traits::{
 };
 use sp_arithmetic::traits::SaturatedConversion;
 use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId};
-use message::generic::Message as GenericMessage;
+use message::generic::{Message as GenericMessage, ConsensusMessage};
 use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData};
 use prometheus_endpoint::{Registry, Gauge, GaugeVec, PrometheusError, Opts, register, U64};
 use sync::{ChainSync, SyncState};
@@ -221,8 +221,12 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
 	behaviour: GenericProto,
 	/// For each legacy gossiping engine ID, the corresponding new protocol name.
 	protocol_name_by_engine: HashMap<ConsensusEngineId, Cow<'static, [u8]>>,
-	/// For each protocol name, the legacy gossiping engine ID.
-	protocol_engine_by_name: HashMap<Cow<'static, [u8]>, ConsensusEngineId>,
+	/// For each protocol name, the legacy equivalent.
+	legacy_equiv_by_name: HashMap<Cow<'static, [u8]>, Fallback>,
+	/// Name of the protocol used for transactions.
+	transactions_protocol: Cow<'static, [u8]>,
+	/// Name of the protocol used for block announces.
+	block_announces_protocol: Cow<'static, [u8]>,
 	/// Prometheus metrics.
 	metrics: Option<Metrics>,
 	/// The `PeerId`'s of all boot nodes.
@@ -424,6 +428,17 @@ impl Default for ProtocolConfig {
 	}
 }
 
+/// Fallback mechanism to use to send a notification if no substream is open.
+#[derive(Debug, Clone, PartialEq, Eq)]
+enum Fallback {
+	/// Use a `Message::Consensus` with the given engine ID.
+	Consensus(ConsensusEngineId),
+	/// The message is the bytes encoding of a `Transactions<E>` (which is itself defined as a `Vec<E>`).
+	Transactions,
+	/// The message is the bytes encoding of a `BlockAnnounce<H>`.
+	BlockAnnounce,
+}
+
 impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 	/// Create a new instance.
 	pub fn new(
@@ -460,7 +475,27 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 
 		let (peerset, peerset_handle) = sc_peerset::Peerset::from_config(peerset_config);
 		let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
-		let behaviour = GenericProto::new(protocol_id, versions, peerset);
+		let mut behaviour = GenericProto::new(protocol_id.clone(), versions, peerset);
+
+		let mut legacy_equiv_by_name = HashMap::new();
+
+		let transactions_protocol: Cow<'static, [u8]> = Cow::from({
+			let mut proto = b"/".to_vec();
+			proto.extend(protocol_id.as_bytes());
+			proto.extend(b"/transactions/1");
+			proto
+		});
+		behaviour.register_notif_protocol(transactions_protocol.clone(), Vec::new());
+		legacy_equiv_by_name.insert(transactions_protocol.clone(), Fallback::Transactions);
+
+		let block_announces_protocol: Cow<'static, [u8]> = Cow::from({
+			let mut proto = b"/".to_vec();
+			proto.extend(protocol_id.as_bytes());
+			proto.extend(b"/block-announces/1");
+			proto
+		});
+		behaviour.register_notif_protocol(block_announces_protocol.clone(), Vec::new());
+		legacy_equiv_by_name.insert(block_announces_protocol.clone(), Fallback::BlockAnnounce);
 
 		let protocol = Protocol {
 			tick_timeout: Box::pin(interval(TICK_TIMEOUT)),
@@ -481,7 +516,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 			peerset_handle: peerset_handle.clone(),
 			behaviour,
 			protocol_name_by_engine: HashMap::new(),
-			protocol_engine_by_name: HashMap::new(),
+			legacy_equiv_by_name,
+			transactions_protocol,
+			block_announces_protocol,
 			metrics: if let Some(r) = metrics_registry {
 				Some(Metrics::register(r)?)
 			} else {
@@ -731,12 +768,18 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 		);
 	}
 
-	fn send_message(&mut self, who: &PeerId, message: Message<B>) {
+	fn send_message(
+		&mut self,
+		who: &PeerId,
+		message: Option<(Cow<'static, [u8]>, Vec<u8>)>,
+		legacy: Message<B>,
+	) {
 		send_message::<B>(
 			&mut self.behaviour,
 			&mut self.context_data.stats,
 			who,
 			message,
+			legacy,
 		);
 	}
 
@@ -793,11 +836,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 		}
 	}
 
-	fn on_block_request(
-		&mut self,
-		peer: PeerId,
-		request: message::BlockRequest<B>
-	) {
+	fn on_block_request(&mut self, peer: PeerId, request: message::BlockRequest<B>) {
 		trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?} for {:?}",
 			request.id,
 			peer,
@@ -874,7 +913,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 			blocks: blocks,
 		};
 		trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len());
-		self.send_message(&peer, GenericMessage::BlockResponse(response))
+		self.send_message(&peer, None, GenericMessage::BlockResponse(response))
 	}
 
 	/// Adjusts the reputation of a node.
@@ -1132,10 +1171,15 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 		&mut self,
 		target: PeerId,
 		engine_id: ConsensusEngineId,
-		message: impl Into<Vec<u8>>
+		message: impl Into<Vec<u8>>,
 	) {
 		if let Some(protocol_name) = self.protocol_name_by_engine.get(&engine_id) {
-			self.behaviour.write_notification(&target, engine_id, protocol_name.clone(), message);
+			let message = message.into();
+			let fallback = GenericMessage::<(), (), (), ()>::Consensus(ConsensusMessage {
+				engine_id,
+				data: message.clone(),
+			}).encode();
+			self.behaviour.write_notification(&target, protocol_name.clone(), message, fallback);
 		} else {
 			error!(
 				target: "sub-libp2p",
@@ -1158,8 +1202,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 		if self.protocol_name_by_engine.insert(engine_id, protocol_name.clone()).is_some() {
 			error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", protocol_name);
 		} else {
-			self.behaviour.register_notif_protocol(protocol_name.clone(), engine_id, Vec::new());
-			self.protocol_engine_by_name.insert(protocol_name, engine_id);
+			self.behaviour.register_notif_protocol(protocol_name.clone(), Vec::new());
+			self.legacy_equiv_by_name.insert(protocol_name, Fallback::Consensus(engine_id));
 		}
 
 		// Registering a protocol while we already have open connections isn't great, but for now
@@ -1229,7 +1273,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 	fn do_propagate_extrinsics(
 		&mut self,
 		extrinsics: &[(H, B::Extrinsic)],
-	) -> HashMap<H,  Vec<String>> {
+	) -> HashMap<H, Vec<String>> {
 		let mut propagated_to = HashMap::new();
 		for (who, peer) in self.context_data.peers.iter_mut() {
 			// never send extrinsics to the light node
@@ -1251,10 +1295,12 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 						.push(who.to_base58());
 				}
 				trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
+				let encoded = to_send.encode();
 				send_message::<B> (
 					&mut self.behaviour,
 					&mut self.context_data.stats,
 					&who,
+					Some((self.transactions_protocol.clone(), encoded)),
 					GenericMessage::Transactions(to_send)
 				)
 			}
@@ -1309,7 +1355,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 			trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
 			let inserted = peer.known_blocks.insert(hash);
 			if inserted || force {
-				let message: Message<B> = GenericMessage::BlockAnnounce(message::BlockAnnounce {
+				let message = message::BlockAnnounce {
 					header: header.clone(),
 					state: if peer.info.protocol_version >= 4  {
 						if is_best {
@@ -1325,13 +1371,16 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 					} else {
 						None
 					},
-				});
+				};
+
+				let encoded = message.encode();
 
 				send_message::<B> (
 					&mut self.behaviour,
 					&mut self.context_data.stats,
 					&who,
-					message,
+					Some((self.block_announces_protocol.clone(), encoded)),
+					Message::<B>::BlockAnnounce(message),
 				)
 			}
 		}
@@ -1350,10 +1399,14 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 			chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible
 		};
 
-		self.send_message(&who, GenericMessage::Status(status))
+		self.send_message(&who, None, GenericMessage::Status(status))
 	}
 
-	fn on_block_announce(&mut self, who: PeerId, announce: BlockAnnounce<B::Header>) -> CustomMessageOutcome<B> {
+	fn on_block_announce(
+		&mut self,
+		who: PeerId,
+		announce: BlockAnnounce<B::Header>,
+	) -> CustomMessageOutcome<B> {
 		let hash = announce.header.hash();
 		if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
 			peer.known_blocks.insert(hash.clone());
@@ -1468,6 +1521,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 
 		self.send_message(
 			&who,
+			None,
 			GenericMessage::RemoteCallResponse(message::RemoteCallResponse {
 				id: request.id,
 				proof,
@@ -1598,6 +1652,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 		};
 		self.send_message(
 			&who,
+			None,
 			GenericMessage::RemoteReadResponse(message::RemoteReadResponse {
 				id: request.id,
 				proof,
@@ -1662,6 +1717,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 		};
 		self.send_message(
 			&who,
+			None,
 			GenericMessage::RemoteReadResponse(message::RemoteReadResponse {
 				id: request.id,
 				proof,
@@ -1702,6 +1758,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 		};
 		self.send_message(
 			&who,
+			None,
 			GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse {
 				id: request.id,
 				header,
@@ -1772,6 +1829,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 		};
 		self.send_message(
 			&who,
+			None,
 			GenericMessage::RemoteChangesResponse(message::RemoteChangesResponse {
 				id: request.id,
 				max: proof.max_block,
@@ -1822,6 +1880,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 		};
 		self.send_message(
 			&who,
+			None,
 			GenericMessage::FinalityProofResponse(message::FinalityProofResponse {
 				id: 0,
 				block: request.block,
@@ -1951,20 +2010,25 @@ fn send_request<B: BlockT, H: ExHashT>(
 			peer.block_request = Some((Instant::now(), r.clone()));
 		}
 	}
-	send_message::<B>(behaviour, stats, who, message)
+	send_message::<B>(behaviour, stats, who, None, message)
 }
 
 fn send_message<B: BlockT>(
 	behaviour: &mut GenericProto,
 	stats: &mut HashMap<&'static str, PacketStats>,
 	who: &PeerId,
-	message: Message<B>,
+	message: Option<(Cow<'static, [u8]>, Vec<u8>)>,
+	legacy_message: Message<B>,
 ) {
-	let encoded = message.encode();
-	let mut stats = stats.entry(message.id()).or_default();
+	let encoded = legacy_message.encode();
+	let mut stats = stats.entry(legacy_message.id()).or_default();
 	stats.bytes_out += encoded.len() as u64;
 	stats.count_out += 1;
-	behaviour.send_packet(who, encoded);
+	if let Some((proto, msg)) = message {
+		behaviour.write_notification(who, proto, msg, encoded);
+	} else {
+		behaviour.send_packet(who, encoded);
+	}
 }
 
 impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
@@ -2061,8 +2125,39 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
 			GenericProtoOut::CustomProtocolClosed { peer_id, .. } => {
 				self.on_peer_disconnected(peer_id.clone())
 			},
-			GenericProtoOut::CustomMessage { peer_id, message } =>
+			GenericProtoOut::LegacyMessage { peer_id, message } =>
 				self.on_custom_message(peer_id, message),
+			GenericProtoOut::Notification { peer_id, protocol_name, message } =>
+				match self.legacy_equiv_by_name.get(&protocol_name) {
+					Some(Fallback::Consensus(engine_id)) => {
+						CustomMessageOutcome::NotificationsReceived {
+							remote: peer_id,
+							messages: vec![(*engine_id, message.freeze())],
+						}
+					}
+					Some(Fallback::Transactions) => {
+						if let Ok(m) = message::Transactions::decode(&mut message.as_ref()) {
+							self.on_extrinsics(peer_id, m);
+						} else {
+							warn!(target: "sub-libp2p", "Failed to decode transactions list");
+						}
+						CustomMessageOutcome::None
+					}
+					Some(Fallback::BlockAnnounce) => {
+						if let Ok(announce) = message::BlockAnnounce::decode(&mut message.as_ref()) {
+							let outcome = self.on_block_announce(peer_id.clone(), announce);
+							self.update_peer_info(&peer_id);
+							outcome
+						} else {
+							warn!(target: "sub-libp2p", "Failed to decode block announce");
+							CustomMessageOutcome::None
+						}
+					}
+					None => {
+						error!(target: "sub-libp2p", "Received notification from unknown protocol {:?}", protocol_name);
+						CustomMessageOutcome::None
+					}
+				}
 			GenericProtoOut::Clogged { peer_id, messages } => {
 				debug!(target: "sync", "{} clogging messages:", messages.len());
 				for msg in messages.into_iter().take(5) {
diff --git a/substrate/client/network/src/protocol/generic_proto/behaviour.rs b/substrate/client/network/src/protocol/generic_proto/behaviour.rs
index 727415baaf5..63625f1c9ff 100644
--- a/substrate/client/network/src/protocol/generic_proto/behaviour.rs
+++ b/substrate/client/network/src/protocol/generic_proto/behaviour.rs
@@ -15,12 +15,10 @@
 // along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
 
 use crate::{DiscoveryNetBehaviour, config::ProtocolId};
-use crate::protocol::message::generic::{Message as GenericMessage, ConsensusMessage};
 use crate::protocol::generic_proto::handler::{NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn};
 use crate::protocol::generic_proto::upgrade::RegisteredProtocol;
 
 use bytes::BytesMut;
-use codec::Encode as _;
 use fnv::FnvHashMap;
 use futures::prelude::*;
 use libp2p::core::{ConnectedPoint, Multiaddr, PeerId};
@@ -28,10 +26,9 @@ use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
 use log::{debug, error, trace, warn};
 use rand::distributions::{Distribution as _, Uniform};
 use smallvec::SmallVec;
-use sp_runtime::ConsensusEngineId;
-use std::{borrow::Cow, collections::hash_map::Entry, cmp};
-use std::{error, mem, pin::Pin, str, time::Duration};
 use std::task::{Context, Poll};
+use std::{borrow::Cow, cmp, collections::hash_map::Entry};
+use std::{error, mem, pin::Pin, str, time::Duration};
 use wasm_timer::Instant;
 
 /// Network behaviour that handles opening substreams for custom protocols with other nodes.
@@ -84,7 +81,7 @@ pub struct GenericProto {
 	legacy_protocol: RegisteredProtocol,
 
 	/// Notification protocols. Entries are only ever added and not removed.
-	notif_protocols: Vec<(Cow<'static, [u8]>, ConsensusEngineId, Vec<u8>)>,
+	notif_protocols: Vec<(Cow<'static, [u8]>, Vec<u8>)>,
 
 	/// Receiver for instructions about who to connect to or disconnect from.
 	peerset: sc_peerset::Peerset,
@@ -238,12 +235,22 @@ pub enum GenericProtoOut {
 		reason: Cow<'static, str>,
 	},
 
+	/// Receives a message on the legacy substream.
+	LegacyMessage {
+		/// Id of the peer the message came from.
+		peer_id: PeerId,
+		/// Message that has been received.
+		message: BytesMut,
+	},
+
 	/// Receives a message on a custom protocol substream.
 	///
 	/// Also concerns received notifications for the notifications API.
-	CustomMessage {
+	Notification {
 		/// Id of the peer the message came from.
 		peer_id: PeerId,
+		/// Engine corresponding to the message.
+		protocol_name: Cow<'static, [u8]>,
 		/// Message that has been received.
 		message: BytesMut,
 	},
@@ -285,10 +292,9 @@ impl GenericProto {
 	pub fn register_notif_protocol(
 		&mut self,
 		protocol_name: impl Into<Cow<'static, [u8]>>,
-		engine_id: ConsensusEngineId,
 		handshake_msg: impl Into<Vec<u8>>
 	) {
-		self.notif_protocols.push((protocol_name.into(), engine_id, handshake_msg.into()));
+		self.notif_protocols.push((protocol_name.into(), handshake_msg.into()));
 	}
 
 	/// Returns the number of discovered nodes that we keep in memory.
@@ -406,14 +412,15 @@ impl GenericProto {
 	/// Also note that even if we have a valid open substream, it may in fact be already closed
 	/// without us knowing, in which case the packet will not be received.
 	///
-	/// > **Note**: Ideally the `engine_id` parameter wouldn't be necessary. See the documentation
-	/// >			of [`NotifsHandlerIn`] for more information.
+	/// The `fallback` parameter is used for backwards-compatibility reason if the remote doesn't
+	/// support our protocol. One needs to pass the equivalent of what would have been passed
+	/// with `send_packet`.
 	pub fn write_notification(
 		&mut self,
 		target: &PeerId,
-		engine_id: ConsensusEngineId,
 		protocol_name: Cow<'static, [u8]>,
 		message: impl Into<Vec<u8>>,
+		encoded_fallback_message: Vec<u8>,
 	) {
 		if !self.is_open(target) {
 			return;
@@ -421,7 +428,7 @@ impl GenericProto {
 
 		trace!(
 			target: "sub-libp2p",
-			"External API => Notification for {:?} with protocol {:?}",
+			"External API => Notification({:?}, {:?})",
 			target,
 			str::from_utf8(&protocol_name)
 		);
@@ -431,7 +438,7 @@ impl GenericProto {
 			peer_id: target.clone(),
 			event: NotifsHandlerIn::SendNotification {
 				message: message.into(),
-				engine_id,
+				encoded_fallback_message,
 				protocol_name,
 			},
 		});
@@ -999,7 +1006,7 @@ impl NetworkBehaviour for GenericProto {
 				debug_assert!(self.is_open(&source));
 				trace!(target: "sub-libp2p", "Handler({:?}) => Message", source);
 				trace!(target: "sub-libp2p", "External API <= Message({:?})", source);
-				let event = GenericProtoOut::CustomMessage {
+				let event = GenericProtoOut::LegacyMessage {
 					peer_id: source,
 					message,
 				};
@@ -1007,7 +1014,7 @@ impl NetworkBehaviour for GenericProto {
 				self.events.push(NetworkBehaviourAction::GenerateEvent(event));
 			}
 
-			NotifsHandlerOut::Notification { protocol_name, engine_id, message } => {
+			NotifsHandlerOut::Notification { protocol_name, message } => {
 				debug_assert!(self.is_open(&source));
 				trace!(
 					target: "sub-libp2p",
@@ -1015,18 +1022,11 @@ impl NetworkBehaviour for GenericProto {
 					source,
 					str::from_utf8(&protocol_name)
 				);
-				trace!(target: "sub-libp2p", "External API <= Message({:?})", source);
-				let event = GenericProtoOut::CustomMessage {
+				trace!(target: "sub-libp2p", "External API <= Message({:?}, {:?})", protocol_name, source);
+				let event = GenericProtoOut::Notification {
 					peer_id: source,
-					message: {
-						let message = GenericMessage::<(), (), (), ()>::Consensus(ConsensusMessage {
-							engine_id,
-							data: message.to_vec(),
-						});
-
-						// Note that we clone `message` here.
-						From::from(&message.encode()[..])
-					},
+					protocol_name,
+					message,
 				};
 
 				self.events.push(NetworkBehaviourAction::GenerateEvent(event));
diff --git a/substrate/client/network/src/protocol/generic_proto/handler/group.rs b/substrate/client/network/src/protocol/generic_proto/handler/group.rs
index d6d9919d3e1..b4321234b09 100644
--- a/substrate/client/network/src/protocol/generic_proto/handler/group.rs
+++ b/substrate/client/network/src/protocol/generic_proto/handler/group.rs
@@ -51,10 +51,8 @@ use crate::protocol::generic_proto::{
 	handler::notif_out::{NotifsOutHandlerProto, NotifsOutHandler, NotifsOutHandlerIn, NotifsOutHandlerOut},
 	upgrade::{NotificationsIn, NotificationsOut, NotificationsHandshakeError, RegisteredProtocol, UpgradeCollec},
 };
-use crate::protocol::message::generic::{Message as GenericMessage, ConsensusMessage};
 
 use bytes::BytesMut;
-use codec::Encode as _;
 use libp2p::core::{either::{EitherError, EitherOutput}, ConnectedPoint, PeerId};
 use libp2p::core::upgrade::{EitherUpgrade, UpgradeError, SelectUpgrade, InboundUpgrade, OutboundUpgrade};
 use libp2p::swarm::{
@@ -66,8 +64,7 @@ use libp2p::swarm::{
 	NegotiatedSubstream,
 };
 use log::error;
-use sp_runtime::ConsensusEngineId;
-use std::{borrow::Cow, error, io, task::{Context, Poll}};
+use std::{borrow::Cow, error, io, str, task::{Context, Poll}};
 
 /// Implements the `IntoProtocolsHandler` trait of libp2p.
 ///
@@ -78,10 +75,10 @@ use std::{borrow::Cow, error, io, task::{Context, Poll}};
 /// See the documentation at the module level for more information.
 pub struct NotifsHandlerProto {
 	/// Prototypes for handlers for inbound substreams.
-	in_handlers: Vec<(NotifsInHandlerProto, ConsensusEngineId)>,
+	in_handlers: Vec<NotifsInHandlerProto>,
 
 	/// Prototypes for handlers for outbound substreams.
-	out_handlers: Vec<(NotifsOutHandlerProto, ConsensusEngineId)>,
+	out_handlers: Vec<NotifsOutHandlerProto>,
 
 	/// Prototype for handler for backwards-compatibility.
 	legacy: LegacyProtoHandlerProto,
@@ -92,10 +89,10 @@ pub struct NotifsHandlerProto {
 /// See the documentation at the module level for more information.
 pub struct NotifsHandler {
 	/// Handlers for inbound substreams.
-	in_handlers: Vec<(NotifsInHandler, ConsensusEngineId)>,
+	in_handlers: Vec<NotifsInHandler>,
 
 	/// Handlers for outbound substreams.
-	out_handlers: Vec<(NotifsOutHandler, ConsensusEngineId)>,
+	out_handlers: Vec<NotifsOutHandler>,
 
 	/// Handler for backwards-compatibility.
 	legacy: LegacyProtoHandler,
@@ -121,7 +118,7 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
 
 	fn inbound_protocol(&self) -> SelectUpgrade<UpgradeCollec<NotificationsIn>, RegisteredProtocol> {
 		let in_handlers = self.in_handlers.iter()
-			.map(|(h, _)| h.inbound_protocol())
+			.map(|h| h.inbound_protocol())
 			.collect::<UpgradeCollec<_>>();
 
 		SelectUpgrade::new(in_handlers, self.legacy.inbound_protocol())
@@ -131,11 +128,11 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
 		NotifsHandler {
 			in_handlers: self.in_handlers
 				.into_iter()
-				.map(|(p, e)| (p.into_handler(remote_peer_id, connected_point), e))
+				.map(|p| p.into_handler(remote_peer_id, connected_point))
 				.collect(),
 			out_handlers: self.out_handlers
 				.into_iter()
-				.map(|(p, e)| (p.into_handler(remote_peer_id, connected_point), e))
+				.map(|p| p.into_handler(remote_peer_id, connected_point))
 				.collect(),
 			legacy: self.legacy.into_handler(remote_peer_id, connected_point),
 			enabled: EnabledState::Initial,
@@ -155,7 +152,8 @@ pub enum NotifsHandlerIn {
 
 	/// Sends a message through the custom protocol substream.
 	///
-	/// > **Note**: This must **not** be an encoded `ConsensusMessage` message.
+	/// > **Note**: This must **not** be a `ConsensusMessage`, `Transactions`, or
+	/// > 			`BlockAnnounce` message.
 	SendLegacy {
 		/// The message to send.
 		message: Vec<u8>,
@@ -166,17 +164,13 @@ pub enum NotifsHandlerIn {
 		/// Name of the protocol for the message.
 		///
 		/// Must match one of the registered protocols. For backwards-compatibility reasons, if
-		/// the remote doesn't support this protocol, we use the legacy substream to send a
-		/// `ConsensusMessage` message.
+		/// the remote doesn't support this protocol, we use the legacy substream.
 		protocol_name: Cow<'static, [u8]>,
 
-		/// The engine ID to use, in case we need to send this message over the legacy substream.
+		/// Message to send on the legacy substream if the protocol isn't available.
 		///
-		/// > **Note**: Ideally this field wouldn't be necessary, and we would deduce the engine
-		/// >			ID from the existing handlers. However, it is possible (especially in test
-		/// >			situations) that we open connections before all the notification protocols
-		/// >			have been registered, in which case we always rely on the legacy substream.
-		engine_id: ConsensusEngineId,
+		/// This corresponds to what you would have sent with `SendLegacy`.
+		encoded_fallback_message: Vec<u8>,
 
 		/// The message to send.
 		message: Vec<u8>,
@@ -206,17 +200,10 @@ pub enum NotifsHandlerOut {
 
 	/// Received a message on a custom protocol substream.
 	Notification {
-		/// Engine corresponding to the message.
+		/// Name of the protocol of the message.
 		protocol_name: Cow<'static, [u8]>,
 
-		/// For legacy reasons, the name to use if we had received the message from the legacy
-		/// substream.
-		engine_id: ConsensusEngineId,
-
 		/// Message that has been received.
-		///
-		/// If `protocol_name` is `None`, this decodes to a `Message`. If `protocol_name` is `Some`,
-		/// this is directly a gossiping message.
 		message: BytesMut,
 	},
 
@@ -238,12 +225,12 @@ pub enum NotifsHandlerOut {
 
 impl NotifsHandlerProto {
 	/// Builds a new handler.
-	pub fn new(legacy: RegisteredProtocol, list: impl Into<Vec<(Cow<'static, [u8]>, ConsensusEngineId, Vec<u8>)>>) -> Self {
+	pub fn new(legacy: RegisteredProtocol, list: impl Into<Vec<(Cow<'static, [u8]>, Vec<u8>)>>) -> Self {
 		let list = list.into();
 
 		NotifsHandlerProto {
-			in_handlers: list.clone().into_iter().map(|(p, e, _)| (NotifsInHandlerProto::new(p), e)).collect(),
-			out_handlers: list.clone().into_iter().map(|(p, e, _)| (NotifsOutHandlerProto::new(p), e)).collect(),
+			in_handlers: list.clone().into_iter().map(|(p, _)| NotifsInHandlerProto::new(p)).collect(),
+			out_handlers: list.clone().into_iter().map(|(p, _)| NotifsOutHandlerProto::new(p)).collect(),
 			legacy: LegacyProtoHandlerProto::new(legacy),
 		}
 	}
@@ -266,7 +253,7 @@ impl ProtocolsHandler for NotifsHandler {
 
 	fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
 		let in_handlers = self.in_handlers.iter()
-			.map(|h| h.0.listen_protocol().into_upgrade().1)
+			.map(|h| h.listen_protocol().into_upgrade().1)
 			.collect::<UpgradeCollec<_>>();
 
 		let proto = SelectUpgrade::new(in_handlers, self.legacy.listen_protocol().into_upgrade().1);
@@ -279,7 +266,7 @@ impl ProtocolsHandler for NotifsHandler {
 	) {
 		match out {
 			EitherOutput::First((out, num)) =>
-				self.in_handlers[num].0.inject_fully_negotiated_inbound(out),
+				self.in_handlers[num].inject_fully_negotiated_inbound(out),
 			EitherOutput::Second(out) =>
 				self.legacy.inject_fully_negotiated_inbound(out),
 		}
@@ -292,7 +279,7 @@ impl ProtocolsHandler for NotifsHandler {
 	) {
 		match (out, num) {
 			(EitherOutput::First(out), Some(num)) =>
-				self.out_handlers[num].0.inject_fully_negotiated_outbound(out, ()),
+				self.out_handlers[num].inject_fully_negotiated_outbound(out, ()),
 			(EitherOutput::Second(out), None) =>
 				self.legacy.inject_fully_negotiated_outbound(out, ()),
 			_ => error!("inject_fully_negotiated_outbound called with wrong parameters"),
@@ -304,13 +291,13 @@ impl ProtocolsHandler for NotifsHandler {
 			NotifsHandlerIn::Enable => {
 				self.enabled = EnabledState::Enabled;
 				self.legacy.inject_event(LegacyProtoHandlerIn::Enable);
-				for (handler, _) in &mut self.out_handlers {
+				for handler in &mut self.out_handlers {
 					handler.inject_event(NotifsOutHandlerIn::Enable {
 						initial_message: vec![]
 					});
 				}
 				for num in self.pending_in.drain(..) {
-					self.in_handlers[num].0.inject_event(NotifsInHandlerIn::Accept(vec![]));
+					self.in_handlers[num].inject_event(NotifsInHandlerIn::Accept(vec![]));
 				}
 			},
 			NotifsHandlerIn::Disable => {
@@ -318,38 +305,31 @@ impl ProtocolsHandler for NotifsHandler {
 				// The notifications protocols start in the disabled state. If we were in the
 				// "Initial" state, then we shouldn't disable the notifications protocols again.
 				if self.enabled != EnabledState::Initial {
-					for (handler, _) in &mut self.out_handlers {
+					for handler in &mut self.out_handlers {
 						handler.inject_event(NotifsOutHandlerIn::Disable);
 					}
 				}
 				self.enabled = EnabledState::Disabled;
 				for num in self.pending_in.drain(..) {
-					self.in_handlers[num].0.inject_event(NotifsInHandlerIn::Refuse);
+					self.in_handlers[num].inject_event(NotifsInHandlerIn::Refuse);
 				}
 			},
 			NotifsHandlerIn::SendLegacy { message } =>
 				self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message }),
-			NotifsHandlerIn::SendNotification { message, engine_id, protocol_name } => {
-				for (handler, ngn_id) in &mut self.out_handlers {
+			NotifsHandlerIn::SendNotification { message, encoded_fallback_message, protocol_name } => {
+				for handler in &mut self.out_handlers {
 					if handler.protocol_name() != &protocol_name[..] {
-						break;
+						continue;
 					}
 
 					if handler.is_open() {
 						handler.inject_event(NotifsOutHandlerIn::Send(message));
 						return;
-					} else {
-						debug_assert_eq!(engine_id, *ngn_id);
 					}
 				}
 
-				let message = GenericMessage::<(), (), (), ()>::Consensus(ConsensusMessage {
-					engine_id,
-					data: message,
-				});
-
 				self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage {
-					message: message.encode()
+					message: encoded_fallback_message,
 				});
 			},
 		}
@@ -362,21 +342,21 @@ impl ProtocolsHandler for NotifsHandler {
 	) {
 		match (err, num) {
 			(ProtocolsHandlerUpgrErr::Timeout, Some(num)) =>
-				self.out_handlers[num].0.inject_dial_upgrade_error(
+				self.out_handlers[num].inject_dial_upgrade_error(
 					(),
 					ProtocolsHandlerUpgrErr::Timeout
 				),
 			(ProtocolsHandlerUpgrErr::Timeout, None) =>
 				self.legacy.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout),
 			(ProtocolsHandlerUpgrErr::Timer, Some(num)) =>
-				self.out_handlers[num].0.inject_dial_upgrade_error(
+				self.out_handlers[num].inject_dial_upgrade_error(
 					(),
 					ProtocolsHandlerUpgrErr::Timer
 				),
 			(ProtocolsHandlerUpgrErr::Timer, None) =>
 				self.legacy.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timer),
 			(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), Some(num)) =>
-				self.out_handlers[num].0.inject_dial_upgrade_error(
+				self.out_handlers[num].inject_dial_upgrade_error(
 					(),
 					ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err))
 				),
@@ -386,7 +366,7 @@ impl ProtocolsHandler for NotifsHandler {
 					ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err))
 				),
 			(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(err))), Some(num)) =>
-				self.out_handlers[num].0.inject_dial_upgrade_error(
+				self.out_handlers[num].inject_dial_upgrade_error(
 					(),
 					ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err))
 				),
@@ -407,7 +387,7 @@ impl ProtocolsHandler for NotifsHandler {
 			return KeepAlive::Yes;
 		}
 
-		for (handler, _) in &self.in_handlers {
+		for handler in &self.in_handlers {
 			let val = handler.connection_keep_alive();
 			if val.is_yes() {
 				return KeepAlive::Yes;
@@ -415,7 +395,7 @@ impl ProtocolsHandler for NotifsHandler {
 			if ret < val { ret = val; }
 		}
 
-		for (handler, _) in &self.out_handlers {
+		for handler in &self.out_handlers {
 			let val = handler.connection_keep_alive();
 			if val.is_yes() {
 				return KeepAlive::Yes;
@@ -432,7 +412,7 @@ impl ProtocolsHandler for NotifsHandler {
 	) -> Poll<
 		ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>
 	> {
-		for (handler_num, (handler, engine_id)) in self.in_handlers.iter_mut().enumerate() {
+		for (handler_num, handler) in self.in_handlers.iter_mut().enumerate() {
 			while let Poll::Ready(ev) = handler.poll(cx) {
 				match ev {
 					ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } =>
@@ -453,7 +433,6 @@ impl ProtocolsHandler for NotifsHandler {
 						if self.legacy.is_open() {
 							let msg = NotifsHandlerOut::Notification {
 								message,
-								engine_id: *engine_id,
 								protocol_name: handler.protocol_name().to_owned().into(),
 							};
 							return Poll::Ready(ProtocolsHandlerEvent::Custom(msg));
@@ -463,7 +442,7 @@ impl ProtocolsHandler for NotifsHandler {
 			}
 		}
 
-		for (handler_num, (handler, _)) in self.out_handlers.iter_mut().enumerate() {
+		for (handler_num, handler) in self.out_handlers.iter_mut().enumerate() {
 			while let Poll::Ready(ev) = handler.poll(cx) {
 				match ev {
 					ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>
diff --git a/substrate/client/network/src/protocol/generic_proto/tests.rs b/substrate/client/network/src/protocol/generic_proto/tests.rs
index b63a725c074..00b840d5811 100644
--- a/substrate/client/network/src/protocol/generic_proto/tests.rs
+++ b/substrate/client/network/src/protocol/generic_proto/tests.rs
@@ -245,7 +245,7 @@ fn two_nodes_transfer_lots_of_packets() {
 		loop {
 			match ready!(service2.poll_next_unpin(cx)) {
 				Some(GenericProtoOut::CustomProtocolOpen { .. }) => {},
-				Some(GenericProtoOut::CustomMessage { message, .. }) => {
+				Some(GenericProtoOut::LegacyMessage { message, .. }) => {
 					match Message::<Block>::decode(&mut &message[..]).unwrap() {
 						Message::<Block>::BlockResponse(BlockResponse { id: _, blocks }) => {
 							assert!(blocks.is_empty());
@@ -315,7 +315,7 @@ fn basic_two_nodes_requests_in_parallel() {
 		loop {
 			match ready!(service2.poll_next_unpin(cx)) {
 				Some(GenericProtoOut::CustomProtocolOpen { .. }) => {},
-				Some(GenericProtoOut::CustomMessage { message, .. }) => {
+				Some(GenericProtoOut::LegacyMessage { message, .. }) => {
 					let pos = to_receive.iter().position(|m| m.encode() == message).unwrap();
 					to_receive.remove(pos);
 					if to_receive.is_empty() {
-- 
GitLab