From cd86643f33881b4d7270e21dbd5f32839c3bafce Mon Sep 17 00:00:00 2001
From: Pierre Krieger <pierre.krieger1708@gmail.com>
Date: Wed, 23 Jan 2019 14:30:20 +0100
Subject: [PATCH] Report when peer is clogged (#1528)

---
 .../core/network-libp2p/src/behaviour.rs      | 17 ++++++--
 .../src/custom_proto/behaviour.rs             | 17 ++++++++
 .../src/custom_proto/handler.rs               | 18 +++++++-
 .../src/custom_proto/upgrade.rs               | 41 ++++++++++++++-----
 .../core/network-libp2p/src/service_task.rs   | 15 +++++++
 substrate/core/network/src/protocol.rs        | 12 ++++++
 substrate/core/network/src/service.rs         |  3 ++
 7 files changed, 107 insertions(+), 16 deletions(-)

diff --git a/substrate/core/network-libp2p/src/behaviour.rs b/substrate/core/network-libp2p/src/behaviour.rs
index 74d62040100..52b03c83357 100644
--- a/substrate/core/network-libp2p/src/behaviour.rs
+++ b/substrate/core/network-libp2p/src/behaviour.rs
@@ -160,6 +160,14 @@ pub enum BehaviourOut {
 		data: Bytes,
 	},
 
+	/// A substream with a remote is clogged. We should avoid sending more data to it if possible.
+	Clogged {
+		/// Id of the peer the message came from.
+		peer_id: PeerId,
+		/// Protocol which generated the message.
+		protocol_id: ProtocolId,
+	},
+
 	/// We have obtained debug information from a peer.
 	Identified {
 		/// Id of the peer that has been identified.
@@ -174,13 +182,16 @@ impl From<CustomProtosOut> for BehaviourOut {
 		match other {
 			CustomProtosOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint } => {
 				BehaviourOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint }
-			},
+			}
 			CustomProtosOut::CustomProtocolClosed { protocol_id, peer_id, result } => {
 				BehaviourOut::CustomProtocolClosed { protocol_id, peer_id, result }
-			},
+			}
 			CustomProtosOut::CustomMessage { protocol_id, peer_id, data } => {
 				BehaviourOut::CustomMessage { protocol_id, peer_id, data }
-			},
+			}
+			CustomProtosOut::Clogged { protocol_id, peer_id } => {
+				BehaviourOut::Clogged { protocol_id, peer_id }
+			}
 		}
 	}
 }
diff --git a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs
index 679f44853aa..b76e5adcd6f 100644
--- a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs
+++ b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs
@@ -107,6 +107,15 @@ pub enum CustomProtosOut {
 		/// Data that has been received.
 		data: Bytes,
 	},
+
+	/// The substream used by the protocol is pretty large. We should print avoid sending more
+	/// data on it if possible.
+	Clogged {
+		/// Id of the peer which is clogged.
+		peer_id: PeerId,
+		/// Protocol which has a problem.
+		protocol_id: ProtocolId,
+	},
 }
 
 impl<TSubstream> CustomProtos<TSubstream> {
@@ -436,6 +445,14 @@ where
 
 				self.events.push(NetworkBehaviourAction::GenerateEvent(event));
 			}
+			CustomProtosHandlerOut::Clogged { protocol_id } => {
+				warn!(target: "sub-libp2p", "Queue of packets to send to {:?} (protocol: {:?}) is \
+					pretty large", source, protocol_id);
+				self.events.push(NetworkBehaviourAction::GenerateEvent(CustomProtosOut::Clogged {
+					peer_id: source,
+					protocol_id,
+				}));
+			}
 		}
 	}
 
diff --git a/substrate/core/network-libp2p/src/custom_proto/handler.rs b/substrate/core/network-libp2p/src/custom_proto/handler.rs
index 45a9a6f2737..dac707ca162 100644
--- a/substrate/core/network-libp2p/src/custom_proto/handler.rs
+++ b/substrate/core/network-libp2p/src/custom_proto/handler.rs
@@ -15,7 +15,7 @@
 // along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
 
 use crate::ProtocolId;
-use crate::custom_proto::upgrade::{RegisteredProtocol, RegisteredProtocols, RegisteredProtocolSubstream};
+use crate::custom_proto::upgrade::{RegisteredProtocol, RegisteredProtocols, RegisteredProtocolSubstream, RegisteredProtocolEvent};
 use bytes::Bytes;
 use futures::prelude::*;
 use libp2p::core::{
@@ -111,6 +111,13 @@ pub enum CustomProtosHandlerOut {
 		/// Data that has been received.
 		data: Bytes,
 	},
+
+	/// A substream to the remote is clogged. The send buffer is very large, and we should print
+	/// a diagnostic message and/or avoid sending more data.
+	Clogged {
+		/// Protocol which is clogged.
+		protocol_id: ProtocolId,
+	},
 }
 
 impl<TSubstream> CustomProtosHandler<TSubstream>
@@ -283,7 +290,7 @@ where
 		for n in (0..self.substreams.len()).rev() {
 			let mut substream = self.substreams.swap_remove(n);
 			match substream.poll() {
-				Ok(Async::Ready(Some(data))) => {
+				Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(data)))) => {
 					let event = CustomProtosHandlerOut::CustomMessage {
 						protocol_id: substream.protocol_id(),
 						data
@@ -291,6 +298,13 @@ where
 					self.substreams.push(substream);
 					return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event)))
 				},
+				Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged))) => {
+					let event = CustomProtosHandlerOut::Clogged {
+						protocol_id: substream.protocol_id()
+					};
+					self.substreams.push(substream);
+					return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event)))
+				},
 				Ok(Async::NotReady) =>
 					self.substreams.push(substream),
 				Ok(Async::Ready(None)) => {
diff --git a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs b/substrate/core/network-libp2p/src/custom_proto/upgrade.rs
index 70eb517023b..69de76cc357 100644
--- a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs
+++ b/substrate/core/network-libp2p/src/custom_proto/upgrade.rs
@@ -18,7 +18,6 @@ use crate::ProtocolId;
 use bytes::Bytes;
 use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
 use libp2p::tokio_codec::Framed;
-use log::debug;
 use std::{collections::VecDeque, io, vec::IntoIter as VecIntoIter};
 use futures::{prelude::*, future, stream};
 use tokio_io::{AsyncRead, AsyncWrite};
@@ -81,6 +80,9 @@ pub struct RegisteredProtocolSubstream<TSubstream> {
 	protocol_id: ProtocolId,
 	/// Version of the protocol that was negotiated.
 	protocol_version: u8,
+	/// If true, we have sent a "remote is clogged" event recently and shouldn't send another one
+	/// unless the buffer empties then fills itself again.
+	clogged_fuse: bool,
 }
 
 impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
@@ -114,21 +116,23 @@ impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
 		}
 
 		self.send_queue.push_back(data);
-
-		// If the length of the queue goes over a certain arbitrary threshold, we print a warning.
-		if self.send_queue.len() >= 2048 {
-			// TODO: this used to be a warning, but is now a `debug` in order to avoid too much
-			//	noise in the logs; see https://github.com/paritytech/substrate/issues/1414
-			debug!(target: "sub-libp2p", "Queue of packets to send over substream is pretty \
-				large: {}", self.send_queue.len());
-		}
 	}
 }
 
+/// Event produced by the `RegisteredProtocolSubstream`.
+#[derive(Debug, Clone)]
+pub enum RegisteredProtocolEvent {
+	/// Received a message from the remote.
+	Message(Bytes),
+	/// Diagnostic event indicating that the connection is clogged and we should avoid sending too
+	/// many messages to it.
+	Clogged,
+}
+
 impl<TSubstream> Stream for RegisteredProtocolSubstream<TSubstream>
 where TSubstream: AsyncRead + AsyncWrite,
 {
-	type Item = Bytes;
+	type Item = RegisteredProtocolEvent;
 	type Error = io::Error;
 
 	fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@@ -148,6 +152,19 @@ where TSubstream: AsyncRead + AsyncWrite,
 			}
 		}
 
+		// Indicating that the remote is clogged if that's the case.
+		if self.send_queue.len() >= 2048 {
+			if !self.clogged_fuse {
+				// Note: this fuse is important not just for preventing us from flooding the logs;
+				// 	if you remove the fuse, then we will always return early from this function and
+				//	thus never read any message from the network.
+				self.clogged_fuse = true;
+				return Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged)))
+			}
+		} else {
+			self.clogged_fuse = false;
+		}
+
 		// Flushing if necessary.
 		if self.requires_poll_complete {
 			if let Async::Ready(()) = self.inner.poll_complete()? {
@@ -158,7 +175,8 @@ where TSubstream: AsyncRead + AsyncWrite,
 		// Receiving incoming packets.
 		// Note that `inner` is wrapped in a `Fuse`, therefore we can poll it forever.
 		match self.inner.poll()? {
-			Async::Ready(Some(data)) => Ok(Async::Ready(Some(data.freeze()))),
+			Async::Ready(Some(data)) =>
+				Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(data.freeze())))),
 			Async::Ready(None) =>
 				if !self.requires_poll_complete && self.send_queue.is_empty() {
 					Ok(Async::Ready(None))
@@ -225,6 +243,7 @@ where TSubstream: AsyncRead + AsyncWrite,
 			inner: framed.fuse(),
 			protocol_id: self.id,
 			protocol_version: info.version,
+			clogged_fuse: false,
 		})
 	}
 }
diff --git a/substrate/core/network-libp2p/src/service_task.rs b/substrate/core/network-libp2p/src/service_task.rs
index 64f2ead99be..1ccba87a539 100644
--- a/substrate/core/network-libp2p/src/service_task.rs
+++ b/substrate/core/network-libp2p/src/service_task.rs
@@ -175,6 +175,14 @@ pub enum ServiceEvent {
 		/// Data that has been received.
 		data: Bytes,
 	},
+
+	/// The substream with a node is clogged. We should avoid sending data to it if possible.
+	Clogged {
+		/// Index of the node.
+		node_index: NodeIndex,
+		/// Protocol which generated the message.
+		protocol_id: ProtocolId,
+	},
 }
 
 /// Network service. Must be polled regularly in order for the networking to work.
@@ -370,6 +378,13 @@ impl Service {
 						data,
 					})))
 				}
+				Ok(Async::Ready(Some(BehaviourOut::Clogged { protocol_id, peer_id }))) => {
+					let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour");
+					break Ok(Async::Ready(Some(ServiceEvent::Clogged {
+						node_index,
+						protocol_id,
+					})))
+				}
 				Ok(Async::Ready(Some(BehaviourOut::Identified { peer_id, info }))) => {
 					// Contrary to the other events, this one can happen even on nodes which don't
 					// have any open custom protocol slot. Therefore it is not necessarily in the
diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs
index 7184bd6d4e0..ca7f6889b12 100644
--- a/substrate/core/network/src/protocol.rs
+++ b/substrate/core/network/src/protocol.rs
@@ -351,6 +351,18 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 		}
 	}
 
+	/// Called as a back-pressure mechanism if the networking detects that the peer cannot process
+	/// our messaging rate fast enough.
+	pub fn on_clogged_peer(&self, io: &mut SyncIo, who: NodeIndex) {
+		// We don't do anything but print some diagnostics for now.
+		if let Some(peer) = self.context_data.peers.read().get(&who) {
+			debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \
+				known_extrinsics: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})",
+				who, peer.protocol_version, peer.roles, peer.known_extrinsics, peer.known_blocks,
+				peer.best_hash, peer.best_number);
+		}
+	}
+
 	fn on_block_request(&self, io: &mut SyncIo, peer: NodeIndex, request: message::BlockRequest<B>) {
 		trace!(target: "sync", "BlockRequest {} from {} with fields {:?}: from {:?} to {:?} max {:?}",
 			request.id,
diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs
index 4c531da1c97..c05eff95860 100644
--- a/substrate/core/network/src/service.rs
+++ b/substrate/core/network/src/service.rs
@@ -401,6 +401,9 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
 			NetworkServiceEvent::CustomMessage { node_index, data, .. } => {
 				protocol.handle_packet(&mut net_sync, node_index, &data);
 			}
+			NetworkServiceEvent::Clogged { node_index, .. } => {
+				protocol.on_clogged_peer(&mut net_sync, node_index);
+			}
 		};
 
 		Ok(())
-- 
GitLab