From 8e04515912016a12a80c80451ba3f9801524f47e Mon Sep 17 00:00:00 2001
From: Pierre Krieger <pierre.krieger1708@gmail.com>
Date: Tue, 19 Jan 2021 12:00:37 +0100
Subject: [PATCH] Add explicit limits to notifications sizes and adjust yamux
 buffer size (#7925)

* Add explicit limits to notifications sizes and adjust yamux buffer size

* Docfix

* Tests

* Document these 10 bytes
---
 substrate/client/finality-grandpa/src/lib.rs  |  2 +
 substrate/client/network/src/config.rs        |  2 +
 substrate/client/network/src/gossip/tests.rs  |  2 +
 substrate/client/network/src/protocol.rs      | 13 +++--
 .../src/protocol/generic_proto/behaviour.rs   |  6 +-
 .../src/protocol/generic_proto/handler.rs     | 31 ++++++----
 .../src/protocol/generic_proto/tests.rs       |  2 +-
 .../generic_proto/upgrade/notifications.rs    | 46 ++++++++++-----
 substrate/client/network/src/service.rs       | 58 ++++++++++++++-----
 substrate/client/network/src/service/tests.rs |  4 ++
 substrate/client/network/src/transport.rs     |  9 ++-
 substrate/client/network/test/src/lib.rs      |  1 +
 12 files changed, 125 insertions(+), 51 deletions(-)

diff --git a/substrate/client/finality-grandpa/src/lib.rs b/substrate/client/finality-grandpa/src/lib.rs
index 6215e2b9f99..040748448de 100644
--- a/substrate/client/finality-grandpa/src/lib.rs
+++ b/substrate/client/finality-grandpa/src/lib.rs
@@ -672,6 +672,8 @@ pub struct GrandpaParams<Block: BlockT, C, N, SC, VR> {
 pub fn grandpa_peers_set_config() -> sc_network::config::NonDefaultSetConfig {
 	sc_network::config::NonDefaultSetConfig {
 		notifications_protocol: communication::GRANDPA_PROTOCOL_NAME.into(),
+		// Notifications reach ~256kiB in size at the time of writing on Kusama and Polkadot.
+		max_notification_size: 1024 * 1024,
 		set_config: sc_network::config::SetConfig {
 			in_peers: 25,
 			out_peers: 25,
diff --git a/substrate/client/network/src/config.rs b/substrate/client/network/src/config.rs
index 8c100875bef..c0e2c66482b 100644
--- a/substrate/client/network/src/config.rs
+++ b/substrate/client/network/src/config.rs
@@ -528,6 +528,8 @@ pub struct NonDefaultSetConfig {
 	/// > **Note**: This field isn't present for the default set, as this is handled internally
 	/// >           by the networking code.
 	pub notifications_protocol: Cow<'static, str>,
+	/// Maximum allowed size of single notifications.
+	pub max_notification_size: u64,
 	/// Base configuration.
 	pub set_config: SetConfig,
 }
diff --git a/substrate/client/network/src/gossip/tests.rs b/substrate/client/network/src/gossip/tests.rs
index d2bf4eeca61..e0941357e84 100644
--- a/substrate/client/network/src/gossip/tests.rs
+++ b/substrate/client/network/src/gossip/tests.rs
@@ -144,6 +144,7 @@ fn build_nodes_one_proto()
 		extra_sets: vec![
 			config::NonDefaultSetConfig {
 				notifications_protocol: PROTOCOL_NAME,
+				max_notification_size: 1024 * 1024,
 				set_config: Default::default()
 			}
 		],
@@ -157,6 +158,7 @@ fn build_nodes_one_proto()
 		extra_sets: vec![
 			config::NonDefaultSetConfig {
 				notifications_protocol: PROTOCOL_NAME,
+				max_notification_size: 1024 * 1024,
 				set_config: config::SetConfig {
 					reserved_nodes: vec![config::MultiaddrWithPeerId {
 						multiaddr: listen_addr,
diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs
index 0a9efbb3ba0..31ba770e932 100644
--- a/substrate/client/network/src/protocol.rs
+++ b/substrate/client/network/src/protocol.rs
@@ -475,16 +475,19 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
 				best_hash,
 				genesis_hash,
 			).encode();
+
 			GenericProto::new(
 				protocol_id.clone(),
 				versions,
 				build_status_message::<B>(&config, best_number, best_hash, genesis_hash),
 				peerset,
-				iter::once((block_announces_protocol, block_announces_handshake))
-					.chain(iter::once((transactions_protocol, vec![])))
-					.chain(network_config.extra_sets.iter()
-						.map(|s| (s.notifications_protocol.clone(), handshake_message.clone()))
-					),
+				iter::once((block_announces_protocol, block_announces_handshake, 1024 * 1024))
+					.chain(iter::once((transactions_protocol, vec![], 1024 * 1024)))
+					.chain(network_config.extra_sets.iter().map(|s| (
+						s.notifications_protocol.clone(),
+						handshake_message.clone(),
+						s.max_notification_size
+					))),
 			)
 		};
 
diff --git a/substrate/client/network/src/protocol/generic_proto/behaviour.rs b/substrate/client/network/src/protocol/generic_proto/behaviour.rs
index 0547f96a311..000d334d184 100644
--- a/substrate/client/network/src/protocol/generic_proto/behaviour.rs
+++ b/substrate/client/network/src/protocol/generic_proto/behaviour.rs
@@ -103,7 +103,7 @@ pub struct GenericProto {
 	/// Notification protocols. Entries are only ever added and not removed.
 	/// Contains, for each protocol, the protocol name and the message to send as part of the
 	/// initial handshake.
-	notif_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>,
+	notif_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>,
 
 	/// Receiver for instructions about who to connect to or disconnect from.
 	peerset: sc_peerset::Peerset,
@@ -374,10 +374,10 @@ impl GenericProto {
 		versions: &[u8],
 		handshake_message: Vec<u8>,
 		peerset: sc_peerset::Peerset,
-		notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>)>,
+		notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>, u64)>,
 	) -> Self {
 		let notif_protocols = notif_protocols
-			.map(|(n, hs)| (n, Arc::new(RwLock::new(hs))))
+			.map(|(n, hs, sz)| (n, Arc::new(RwLock::new(hs)), sz))
 			.collect::<Vec<_>>();
 
 		assert!(!notif_protocols.is_empty());
diff --git a/substrate/client/network/src/protocol/generic_proto/handler.rs b/substrate/client/network/src/protocol/generic_proto/handler.rs
index 6d7e8b145a6..6fdcef1d7a2 100644
--- a/substrate/client/network/src/protocol/generic_proto/handler.rs
+++ b/substrate/client/network/src/protocol/generic_proto/handler.rs
@@ -113,7 +113,7 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
 pub struct NotifsHandlerProto {
 	/// Name of protocols, prototypes for upgrades for inbound substreams, and the message we
 	/// send or respond with in the handshake.
-	protocols: Vec<(Cow<'static, str>, NotificationsIn, Arc<RwLock<Vec<u8>>>)>,
+	protocols: Vec<(Cow<'static, str>, NotificationsIn, Arc<RwLock<Vec<u8>>>, u64)>,
 
 	/// Configuration for the legacy protocol upgrade.
 	legacy_protocol: RegisteredProtocol,
@@ -161,6 +161,9 @@ struct Protocol {
 	/// Handshake to send when opening a substream or receiving an open request.
 	handshake: Arc<RwLock<Vec<u8>>>,
 
+	/// Maximum allowed size of individual notifications.
+	max_notification_size: u64,
+
 	/// Current state of the substreams for this protocol.
 	state: State,
 }
@@ -226,7 +229,7 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
 
 	fn inbound_protocol(&self) -> SelectUpgrade<UpgradeCollec<NotificationsIn>, RegisteredProtocol> {
 		let protocols = self.protocols.iter()
-			.map(|(_, p, _)| p.clone())
+			.map(|(_, p, _, _)| p.clone())
 			.collect::<UpgradeCollec<_>>();
 
 		SelectUpgrade::new(protocols, self.legacy_protocol.clone())
@@ -234,14 +237,15 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
 
 	fn into_handler(self, peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
 		NotifsHandler {
-			protocols: self.protocols.into_iter().map(|(name, in_upgrade, handshake)| {
+			protocols: self.protocols.into_iter().map(|(name, in_upgrade, handshake, max_size)| {
 				Protocol {
 					name,
 					in_upgrade,
 					handshake,
 					state: State::Closed {
 						pending_opening: false,
-					}
+					},
+					max_notification_size: max_size,
 				}
 			}).collect(),
 			peer_id: peer_id.clone(),
@@ -467,18 +471,19 @@ pub enum NotifsHandlerError {
 impl NotifsHandlerProto {
 	/// Builds a new handler.
 	///
-	/// `list` is a list of notification protocols names, and the message to send as part of the
-	/// handshake. At the moment, the message is always the same whether we open a substream
-	/// ourselves or respond to handshake from the remote.
+	/// `list` is a list of notification protocols names, the message to send as part of the
+	/// handshake, and the maximum allowed size of a notification. At the moment, the message
+	/// is always the same whether we open a substream ourselves or respond to handshake from
+	/// the remote.
 	pub fn new(
 		legacy_protocol: RegisteredProtocol,
-		list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>>,
+		list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>>,
 	) -> Self {
 		let protocols =	list
 			.into()
 			.into_iter()
-			.map(|(proto_name, msg)| {
-				(proto_name.clone(), NotificationsIn::new(proto_name), msg)
+			.map(|(proto_name, msg, max_notif_size)| {
+				(proto_name.clone(), NotificationsIn::new(proto_name, max_notif_size), msg, max_notif_size)
 			})
 			.collect();
 
@@ -624,7 +629,8 @@ impl ProtocolsHandler for NotifsHandler {
 						if !*pending_opening {
 							let proto = NotificationsOut::new(
 								protocol_info.name.clone(),
-								protocol_info.handshake.read().clone()
+								protocol_info.handshake.read().clone(),
+								protocol_info.max_notification_size
 							);
 
 							self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
@@ -643,7 +649,8 @@ impl ProtocolsHandler for NotifsHandler {
 						if !*pending_opening {
 							let proto = NotificationsOut::new(
 								protocol_info.name.clone(),
-								handshake_message.clone()
+								handshake_message.clone(),
+								protocol_info.max_notification_size,
 							);
 
 							self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
diff --git a/substrate/client/network/src/protocol/generic_proto/tests.rs b/substrate/client/network/src/protocol/generic_proto/tests.rs
index 7f8de599ed7..967c0e9f8df 100644
--- a/substrate/client/network/src/protocol/generic_proto/tests.rs
+++ b/substrate/client/network/src/protocol/generic_proto/tests.rs
@@ -82,7 +82,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
 		let behaviour = CustomProtoWithAddr {
 			inner: GenericProto::new(
 				"test", &[1], vec![], peerset,
-				iter::once(("/foo".into(), Vec::new()))
+				iter::once(("/foo".into(), Vec::new(), 1024 * 1024))
 			),
 			addrs: addrs
 				.iter()
diff --git a/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs b/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs
index 29561bafd7a..eba96441bcf 100644
--- a/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs
+++ b/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs
@@ -41,7 +41,7 @@ use futures::prelude::*;
 use asynchronous_codec::Framed;
 use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade};
 use log::error;
-use std::{borrow::Cow, convert::Infallible, io, iter, mem, pin::Pin, task::{Context, Poll}};
+use std::{borrow::Cow, convert::{Infallible, TryFrom as _}, io, iter, mem, pin::Pin, task::{Context, Poll}};
 use unsigned_varint::codec::UviBytes;
 
 /// Maximum allowed size of the two handshake messages, in bytes.
@@ -53,6 +53,8 @@ const MAX_HANDSHAKE_SIZE: usize = 1024;
 pub struct NotificationsIn {
 	/// Protocol name to use when negotiating the substream.
 	protocol_name: Cow<'static, str>,
+	/// Maximum allowed size for a single notification.
+	max_notification_size: u64,
 }
 
 /// Upgrade that opens a substream, waits for the remote to accept by sending back a status
@@ -63,6 +65,8 @@ pub struct NotificationsOut {
 	protocol_name: Cow<'static, str>,
 	/// Message to send when we start the handshake.
 	initial_message: Vec<u8>,
+	/// Maximum allowed size for a single notification.
+	max_notification_size: u64,
 }
 
 /// A substream for incoming notification messages.
@@ -102,9 +106,10 @@ pub struct NotificationsOutSubstream<TSubstream> {
 
 impl NotificationsIn {
 	/// Builds a new potential upgrade.
-	pub fn new(protocol_name: impl Into<Cow<'static, str>>) -> Self {
+	pub fn new(protocol_name: impl Into<Cow<'static, str>>, max_notification_size: u64) -> Self {
 		NotificationsIn {
 			protocol_name: protocol_name.into(),
+			max_notification_size,
 		}
 	}
 }
@@ -148,8 +153,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
 				socket.read_exact(&mut initial_message).await?;
 			}
 
+			let mut codec = UviBytes::default();
+			codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::max_value()));
+
 			let substream = NotificationsInSubstream {
-				socket: Framed::new(socket, UviBytes::default()),
+				socket: Framed::new(socket, codec),
 				handshake: NotificationsInSubstreamHandshake::NotSent,
 			};
 
@@ -287,7 +295,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
 
 impl NotificationsOut {
 	/// Builds a new potential upgrade.
-	pub fn new(protocol_name: impl Into<Cow<'static, str>>, initial_message: impl Into<Vec<u8>>) -> Self {
+	pub fn new(
+		protocol_name: impl Into<Cow<'static, str>>,
+		initial_message: impl Into<Vec<u8>>,
+		max_notification_size: u64,
+	) -> Self {
 		let initial_message = initial_message.into();
 		if initial_message.len() > MAX_HANDSHAKE_SIZE {
 			error!(target: "sub-libp2p", "Outbound networking handshake is above allowed protocol limit");
@@ -296,6 +308,7 @@ impl NotificationsOut {
 		NotificationsOut {
 			protocol_name: protocol_name.into(),
 			initial_message,
+			max_notification_size,
 		}
 	}
 }
@@ -342,8 +355,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
 				socket.read_exact(&mut handshake).await?;
 			}
 
+			let mut codec = UviBytes::default();
+			codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::max_value()));
+
 			Ok((handshake, NotificationsOutSubstream {
-				socket: Framed::new(socket, UviBytes::default()),
+				socket: Framed::new(socket, codec),
 			}))
 		})
 	}
@@ -436,7 +452,7 @@ mod tests {
 			let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
 			let (handshake, mut substream) = upgrade::apply_outbound(
 				socket,
-				NotificationsOut::new(PROTO_NAME, &b"initial message"[..]),
+				NotificationsOut::new(PROTO_NAME, &b"initial message"[..], 1024 * 1024),
 				upgrade::Version::V1
 			).await.unwrap();
 
@@ -451,7 +467,7 @@ mod tests {
 			let (socket, _) = listener.accept().await.unwrap();
 			let (initial_message, mut substream) = upgrade::apply_inbound(
 				socket,
-				NotificationsIn::new(PROTO_NAME)
+				NotificationsIn::new(PROTO_NAME, 1024 * 1024)
 			).await.unwrap();
 
 			assert_eq!(initial_message, b"initial message");
@@ -475,7 +491,7 @@ mod tests {
 			let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
 			let (handshake, mut substream) = upgrade::apply_outbound(
 				socket,
-				NotificationsOut::new(PROTO_NAME, vec![]),
+				NotificationsOut::new(PROTO_NAME, vec![], 1024 * 1024),
 				upgrade::Version::V1
 			).await.unwrap();
 
@@ -490,7 +506,7 @@ mod tests {
 			let (socket, _) = listener.accept().await.unwrap();
 			let (initial_message, mut substream) = upgrade::apply_inbound(
 				socket,
-				NotificationsIn::new(PROTO_NAME)
+				NotificationsIn::new(PROTO_NAME, 1024 * 1024)
 			).await.unwrap();
 
 			assert!(initial_message.is_empty());
@@ -512,7 +528,7 @@ mod tests {
 			let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
 			let outcome = upgrade::apply_outbound(
 				socket,
-				NotificationsOut::new(PROTO_NAME, &b"hello"[..]),
+				NotificationsOut::new(PROTO_NAME, &b"hello"[..], 1024 * 1024),
 				upgrade::Version::V1
 			).await;
 
@@ -529,7 +545,7 @@ mod tests {
 			let (socket, _) = listener.accept().await.unwrap();
 			let (initial_msg, substream) = upgrade::apply_inbound(
 				socket,
-				NotificationsIn::new(PROTO_NAME)
+				NotificationsIn::new(PROTO_NAME, 1024 * 1024)
 			).await.unwrap();
 
 			assert_eq!(initial_msg, b"hello");
@@ -551,7 +567,7 @@ mod tests {
 			let ret = upgrade::apply_outbound(
 				socket,
 				// We check that an initial message that is too large gets refused.
-				NotificationsOut::new(PROTO_NAME, (0..32768).map(|_| 0).collect::<Vec<_>>()),
+				NotificationsOut::new(PROTO_NAME, (0..32768).map(|_| 0).collect::<Vec<_>>(), 1024 * 1024),
 				upgrade::Version::V1
 			).await;
 			assert!(ret.is_err());
@@ -564,7 +580,7 @@ mod tests {
 			let (socket, _) = listener.accept().await.unwrap();
 			let ret = upgrade::apply_inbound(
 				socket,
-				NotificationsIn::new(PROTO_NAME)
+				NotificationsIn::new(PROTO_NAME, 1024 * 1024)
 			).await;
 			assert!(ret.is_err());
 		});
@@ -581,7 +597,7 @@ mod tests {
 			let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
 			let ret = upgrade::apply_outbound(
 				socket,
-				NotificationsOut::new(PROTO_NAME, &b"initial message"[..]),
+				NotificationsOut::new(PROTO_NAME, &b"initial message"[..], 1024 * 1024),
 				upgrade::Version::V1
 			).await;
 			assert!(ret.is_err());
@@ -594,7 +610,7 @@ mod tests {
 			let (socket, _) = listener.accept().await.unwrap();
 			let (initial_message, mut substream) = upgrade::apply_inbound(
 				socket,
-				NotificationsIn::new(PROTO_NAME)
+				NotificationsIn::new(PROTO_NAME, 1024 * 1024)
 			).await.unwrap();
 			assert_eq!(initial_message, b"initial message");
 
diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs
index fec444846a3..3d05d578bf6 100644
--- a/substrate/client/network/src/service.rs
+++ b/substrate/client/network/src/service.rs
@@ -83,7 +83,9 @@ use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnbound
 use std::{
 	borrow::Cow,
 	collections::{HashMap, HashSet},
+	convert::TryFrom as _,
 	fs,
+	iter,
 	marker::PhantomData,
 	num:: NonZeroUsize,
 	pin::Pin,
@@ -283,6 +285,48 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
 				config
 			};
 
+			let (transport, bandwidth) = {
+				let (config_mem, config_wasm) = match params.network_config.transport {
+					TransportConfig::MemoryOnly => (true, None),
+					TransportConfig::Normal { wasm_external_transport, .. } =>
+						(false, wasm_external_transport)
+				};
+
+				// The yamux buffer size limit is configured to be equal to the maximum frame size
+				// of all protocols. 10 bytes are added to each limit for the length prefix that
+				// is not included in the upper layer protocols limit but is still present in the
+				// yamux buffer. These 10 bytes correspond to the maximum size required to encode
+				// a variable-length-encoding 64bits number. In other words, we make the
+				// assumption that no notification larger than 2^64 will ever be sent.
+				let yamux_maximum_buffer_size = {
+					let requests_max = params.network_config
+						.request_response_protocols.iter()
+						.map(|cfg| usize::try_from(cfg.max_request_size).unwrap_or(usize::max_value()));
+					let responses_max = params.network_config
+						.request_response_protocols.iter()
+						.map(|cfg| usize::try_from(cfg.max_response_size).unwrap_or(usize::max_value()));
+					let notifs_max = params.network_config
+						.extra_sets.iter()
+						.map(|cfg| usize::try_from(cfg.max_notification_size).unwrap_or(usize::max_value()));
+
+					// A "default" max is added to cover all the other protocols: ping, identify,
+					// kademlia.
+					let default_max = 1024 * 1024;
+					iter::once(default_max)
+						.chain(requests_max).chain(responses_max).chain(notifs_max)
+						.max().expect("iterator known to always yield at least one element; qed")
+						.saturating_add(10)
+				};
+
+				transport::build_transport(
+					local_identity,
+					config_mem,
+					config_wasm,
+					params.network_config.yamux_window_size,
+					yamux_maximum_buffer_size
+				)
+			};
+
 			let behaviour = {
 				let result = Behaviour::new(
 					protocol,
@@ -305,20 +349,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
 				}
 			};
 
-			let (transport, bandwidth) = {
-				let (config_mem, config_wasm) = match params.network_config.transport {
-					TransportConfig::MemoryOnly => (true, None),
-					TransportConfig::Normal { wasm_external_transport, .. } =>
-						(false, wasm_external_transport)
-				};
-
-				transport::build_transport(
-					local_identity,
-					config_mem,
-					config_wasm,
-					params.network_config.yamux_window_size
-				)
-			};
 			let mut builder = SwarmBuilder::new(transport, behaviour, local_peer_id.clone())
 				.connection_limits(ConnectionLimits::default()
 					.with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32))
diff --git a/substrate/client/network/src/service/tests.rs b/substrate/client/network/src/service/tests.rs
index e31158a9926..8f16040aee3 100644
--- a/substrate/client/network/src/service/tests.rs
+++ b/substrate/client/network/src/service/tests.rs
@@ -144,6 +144,7 @@ fn build_nodes_one_proto()
 		extra_sets: vec![
 			config::NonDefaultSetConfig {
 				notifications_protocol: PROTOCOL_NAME,
+				max_notification_size: 1024 * 1024,
 				set_config: Default::default()
 			}
 		],
@@ -156,6 +157,7 @@ fn build_nodes_one_proto()
 		extra_sets: vec![
 			config::NonDefaultSetConfig {
 				notifications_protocol: PROTOCOL_NAME,
+				max_notification_size: 1024 * 1024,
 				set_config: config::SetConfig {
 					reserved_nodes: vec![config::MultiaddrWithPeerId {
 						multiaddr: listen_addr,
@@ -311,6 +313,7 @@ fn lots_of_incoming_peers_works() {
 		extra_sets: vec![
 			config::NonDefaultSetConfig {
 				notifications_protocol: PROTOCOL_NAME,
+				max_notification_size: 1024 * 1024,
 				set_config: config::SetConfig {
 					in_peers: u32::max_value(),
 					.. Default::default()
@@ -335,6 +338,7 @@ fn lots_of_incoming_peers_works() {
 			extra_sets: vec![
 				config::NonDefaultSetConfig {
 					notifications_protocol: PROTOCOL_NAME,
+					max_notification_size: 1024 * 1024,
 					set_config: config::SetConfig {
 						reserved_nodes: vec![config::MultiaddrWithPeerId {
 							multiaddr: listen_addr.clone(),
diff --git a/substrate/client/network/src/transport.rs b/substrate/client/network/src/transport.rs
index da0e5aa059b..483cf47037f 100644
--- a/substrate/client/network/src/transport.rs
+++ b/substrate/client/network/src/transport.rs
@@ -35,9 +35,14 @@ pub use self::bandwidth::BandwidthSinks;
 /// If `memory_only` is true, then only communication within the same process are allowed. Only
 /// addresses with the format `/memory/...` are allowed.
 ///
-///`yamux_window_size` is the maximum size of the Yamux receive windows. `None` to leave the
+/// `yamux_window_size` is the maximum size of the Yamux receive windows. `None` to leave the
 /// default (256kiB).
 ///
+/// `yamux_maximum_buffer_size` is the maximum allowed size of the Yamux buffer. This should be
+/// set either to the maximum of all the maximum allowed sizes of messages frames of all
+/// high-level protocols combined, or to some generously high value if you are sure that a maximum
+/// size is enforced on all high-level protocols.
+///
 /// Returns a `BandwidthSinks` object that allows querying the average bandwidth produced by all
 /// the connections spawned with this transport.
 pub fn build_transport(
@@ -45,6 +50,7 @@ pub fn build_transport(
 	memory_only: bool,
 	wasm_external_transport: Option<wasm_ext::ExtTransport>,
 	yamux_window_size: Option<u32>,
+	yamux_maximum_buffer_size: usize,
 ) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc<BandwidthSinks>) {
 	// Build the base layer of the transport.
 	let transport = if let Some(t) = wasm_external_transport {
@@ -101,6 +107,7 @@ pub fn build_transport(
 		// Enable proper flow-control: window updates are only sent when
 		// buffered data has been consumed.
 		yamux_config.set_window_update_mode(libp2p::yamux::WindowUpdateMode::on_read());
+		yamux_config.set_max_buffer_size(yamux_maximum_buffer_size);
 
 		if let Some(yamux_window_size) = yamux_window_size {
 			yamux_config.set_receive_window_size(yamux_window_size);
diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs
index 86cc7a54738..ec5ab5e88d6 100644
--- a/substrate/client/network/test/src/lib.rs
+++ b/substrate/client/network/test/src/lib.rs
@@ -685,6 +685,7 @@ pub trait TestNetFactory: Sized {
 		network_config.extra_sets = config.notifications_protocols.into_iter().map(|p| {
 			NonDefaultSetConfig {
 				notifications_protocol: p,
+				max_notification_size: 1024 * 1024,
 				set_config: Default::default()
 			}
 		}).collect();
-- 
GitLab