From 2f9e2577c18df6ca0cf6033760ba55ef0d68a54f Mon Sep 17 00:00:00 2001
From: Pierre Krieger <pierre.krieger1708@gmail.com>
Date: Wed, 2 Sep 2020 16:30:41 +0200
Subject: [PATCH] Ensure that handshake is sent back even in case of
 back-pressure (#6979)

* Ensure that handshake is sent back even in case of back-pressure

* Update client/network/src/protocol/generic_proto/handler/group.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Also process OpenRequest and Closed

* Fix bad merge

* God I'm so lost with all these merges

* Immediately return Closed

Co-authored-by: Max Inden <mail@max-inden.de>
---
 .../protocol/generic_proto/handler/group.rs   | 66 +++++++++++--------
 .../generic_proto/handler/notif_in.rs         | 27 ++++++++
 .../generic_proto/upgrade/notifications.rs    | 46 ++++++++++++-
 3 files changed, 110 insertions(+), 29 deletions(-)

diff --git a/substrate/client/network/src/protocol/generic_proto/handler/group.rs b/substrate/client/network/src/protocol/generic_proto/handler/group.rs
index 43627f3d604..6804dd3c789 100644
--- a/substrate/client/network/src/protocol/generic_proto/handler/group.rs
+++ b/substrate/client/network/src/protocol/generic_proto/handler/group.rs
@@ -674,36 +674,48 @@ impl ProtocolsHandler for NotifsHandler {
 						return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))),
 				}
 			}
+		}
+
+		for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() {
+			loop {
+				let poll = if self.pending_legacy_handshake.is_none() {
+					handler.poll(cx)
+				} else {
+					handler.poll_process(cx)
+				};
 
-			for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() {
-				while let Poll::Ready(ev) = handler.poll(cx) {
-					match ev {
-						ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } =>
-							error!("Incoming substream handler tried to open a substream"),
-						ProtocolsHandlerEvent::Close(err) => void::unreachable(err),
-						ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) =>
-							match self.enabled {
-								EnabledState::Initial => self.pending_in.push(handler_num),
-								EnabledState::Enabled => {
-									// We create `handshake_message` on a separate line to be sure
-									// that the lock is released as soon as possible.
-									let handshake_message = handshake_message.read().clone();
-									handler.inject_event(NotifsInHandlerIn::Accept(handshake_message))
-								},
-								EnabledState::Disabled =>
-									handler.inject_event(NotifsInHandlerIn::Refuse),
+				let ev = match poll {
+					Poll::Ready(e) => e,
+					Poll::Pending => break,
+				};
+
+				match ev {
+					ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } =>
+						error!("Incoming substream handler tried to open a substream"),
+					ProtocolsHandlerEvent::Close(err) => void::unreachable(err),
+					ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) =>
+						match self.enabled {
+							EnabledState::Initial => self.pending_in.push(handler_num),
+							EnabledState::Enabled => {
+								// We create `handshake_message` on a separate line to be sure
+								// that the lock is released as soon as possible.
+								let handshake_message = handshake_message.read().clone();
+								handler.inject_event(NotifsInHandlerIn::Accept(handshake_message))
 							},
-						ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {},
-						ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => {
-							if self.notifications_sink_rx.is_some() {
-								let msg = NotifsHandlerOut::Notification {
-									message,
-									protocol_name: handler.protocol_name().clone(),
-								};
-								return Poll::Ready(ProtocolsHandlerEvent::Custom(msg));
-							}
+							EnabledState::Disabled =>
+								handler.inject_event(NotifsInHandlerIn::Refuse),
 						},
-					}
+					ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {},
+					ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => {
+						debug_assert!(self.pending_legacy_handshake.is_none());
+						if self.notifications_sink_rx.is_some() {
+							let msg = NotifsHandlerOut::Notification {
+								message,
+								protocol_name: handler.protocol_name().clone(),
+							};
+							return Poll::Ready(ProtocolsHandlerEvent::Custom(msg));
+						}
+					},
 				}
 			}
 		}
diff --git a/substrate/client/network/src/protocol/generic_proto/handler/notif_in.rs b/substrate/client/network/src/protocol/generic_proto/handler/notif_in.rs
index 9eb8ec74716..5a50cce2681 100644
--- a/substrate/client/network/src/protocol/generic_proto/handler/notif_in.rs
+++ b/substrate/client/network/src/protocol/generic_proto/handler/notif_in.rs
@@ -139,6 +139,33 @@ impl NotifsInHandler {
 	pub fn protocol_name(&self) -> &Cow<'static, str> {
 		self.in_protocol.protocol_name()
 	}
+
+	/// Equivalent to the `poll` method of `ProtocolsHandler`, except that it is guaranteed to
+	/// never generate [`NotifsInHandlerOut::Notif`].
+	///
+	/// Use this method in situations where it is not desirable to receive events but still
+	/// necessary to drive any potential incoming handshake or request.
+	pub fn poll_process(
+		&mut self,
+		cx: &mut Context
+	) -> Poll<
+		ProtocolsHandlerEvent<DeniedUpgrade, (), NotifsInHandlerOut, void::Void>
+	> {
+		if let Some(event) = self.events_queue.pop_front() {
+			return Poll::Ready(event)
+		}
+
+		match self.substream.as_mut().map(|s| NotificationsInSubstream::poll_process(Pin::new(s), cx)) {
+			None | Some(Poll::Pending) => {},
+			Some(Poll::Ready(Ok(v))) => match v {},
+			Some(Poll::Ready(Err(_))) => {
+				self.substream = None;
+				return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed));
+			},
+		}
+
+		Poll::Pending
+	}
 }
 
 impl ProtocolsHandler for NotifsInHandler {
diff --git a/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs b/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs
index 51fbc8d9c60..64b4b980da0 100644
--- a/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs
+++ b/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs
@@ -39,7 +39,7 @@ use futures::prelude::*;
 use futures_codec::Framed;
 use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade};
 use log::error;
-use std::{borrow::Cow, io, iter, mem, pin::Pin, task::{Context, Poll}};
+use std::{borrow::Cow, convert::Infallible, io, iter, mem, pin::Pin, task::{Context, Poll}};
 use unsigned_varint::codec::UviBytes;
 
 /// Maximum allowed size of the two handshake messages, in bytes.
@@ -162,7 +162,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
 }
 
 impl<TSubstream> NotificationsInSubstream<TSubstream>
-where TSubstream: AsyncRead + AsyncWrite,
+where TSubstream: AsyncRead + AsyncWrite + Unpin,
 {
 	/// Sends the handshake in order to inform the remote that we accept the substream.
 	pub fn send_handshake(&mut self, message: impl Into<Vec<u8>>) {
@@ -173,6 +173,48 @@ where TSubstream: AsyncRead + AsyncWrite,
 
 		self.handshake = NotificationsInSubstreamHandshake::PendingSend(message.into());
 	}
+
+	/// Equivalent to `Stream::poll_next`, except that it only drives the handshake and is
+	/// guaranteed to not generate any notification.
+	pub fn poll_process(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Infallible, io::Error>> {
+		let mut this = self.project();
+
+		loop {
+			match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) {
+				NotificationsInSubstreamHandshake::PendingSend(msg) =>
+					match Sink::poll_ready(this.socket.as_mut(), cx) {
+						Poll::Ready(_) => {
+							*this.handshake = NotificationsInSubstreamHandshake::Flush;
+							match Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg)) {
+								Ok(()) => {},
+								Err(err) => return Poll::Ready(Err(err)),
+							}
+						},
+						Poll::Pending => {
+							*this.handshake = NotificationsInSubstreamHandshake::PendingSend(msg);
+							return Poll::Pending
+						}
+					},
+				NotificationsInSubstreamHandshake::Flush =>
+					match Sink::poll_flush(this.socket.as_mut(), cx)? {
+						Poll::Ready(()) =>
+							*this.handshake = NotificationsInSubstreamHandshake::Sent,
+						Poll::Pending => {
+							*this.handshake = NotificationsInSubstreamHandshake::Flush;
+							return Poll::Pending
+						}
+					},
+
+				st @ NotificationsInSubstreamHandshake::NotSent |
+				st @ NotificationsInSubstreamHandshake::Sent |
+				st @ NotificationsInSubstreamHandshake::ClosingInResponseToRemote |
+				st @ NotificationsInSubstreamHandshake::BothSidesClosed => {
+					*this.handshake = st;
+					return Poll::Pending;
+				}
+			}
+		}
+	}
 }
 
 impl<TSubstream> Stream for NotificationsInSubstream<TSubstream>
-- 
GitLab