From 7c0592a9b67c580a561be25a654cfe500c0fd599 Mon Sep 17 00:00:00 2001
From: Pierre Krieger <pierre.krieger1708@gmail.com>
Date: Fri, 25 Oct 2019 15:17:55 +0200
Subject: [PATCH] Telemetry timeout (#3916)

* telemetry worker: add connection timeout

* restructure

* only add timeout when writing data

* don't overwrite an existing delay

* set timeout only around writing data

* address comments

* dedicated error enum

* remove whitespace

* move timeout to inside struct

* fix timeout

* remove prints

* move polling

* address comment

* Implement

* More work
---
 substrate/core/telemetry/src/worker/node.rs | 107 ++++++++++++++------
 1 file changed, 77 insertions(+), 30 deletions(-)

diff --git a/substrate/core/telemetry/src/worker/node.rs b/substrate/core/telemetry/src/worker/node.rs
index 11b1f2a81e6..0f606e40638 100644
--- a/substrate/core/telemetry/src/worker/node.rs
+++ b/substrate/core/telemetry/src/worker/node.rs
@@ -58,6 +58,8 @@ struct NodeSocketConnected<TTrans: Transport> {
 	pending: VecDeque<BytesMut>,
 	/// If true, we need to flush the sink.
 	need_flush: bool,
+	/// A timeout for the socket to write data.
+	timeout: Option<Delay>,
 }
 
 /// Event that can happen with this node.
@@ -66,7 +68,16 @@ pub enum NodeEvent<TSinkErr> {
 	/// We are now connected to this node.
 	Connected,
 	/// We are now disconnected from this node.
-	Disconnected(TSinkErr),
+	Disconnected(ConnectionError<TSinkErr>),
+}
+
+/// Reason for disconnecting from a node.
+#[derive(Debug)]
+pub enum ConnectionError<TSinkErr> {
+	/// The connection timed-out.
+	Timeout,
+	/// The sink errored.
+	Sink(TSinkErr),
 }
 
 impl<TTrans: Transport> Node<TTrans> {
@@ -116,10 +127,12 @@ where TTrans: Clone + Unpin, TTrans::Dial: Unpin,
 		let mut socket = mem::replace(&mut self.socket, NodeSocket::Poisoned);
 		self.socket = loop {
 			match socket {
-				NodeSocket::Connected(mut conn) =>
+				NodeSocket::Connected(mut conn) => {
 					match NodeSocketConnected::poll(Pin::new(&mut conn), cx, &self.addr) {
-						Poll::Ready(Ok(v)) => match v {}
-						Poll::Pending => break NodeSocket::Connected(conn),
+						Poll::Ready(Ok(v)) => match v {},
+						Poll::Pending => {
+							break NodeSocket::Connected(conn)
+						},
 						Poll::Ready(Err(err)) => {
 							warn!(target: "telemetry", "Disconnected from {}: {:?}", self.addr, err);
 							let timeout = gen_rand_reconnect_delay();
@@ -127,10 +140,16 @@ where TTrans: Clone + Unpin, TTrans::Dial: Unpin,
 							return Poll::Ready(NodeEvent::Disconnected(err))
 						}
 					}
+				}
 				NodeSocket::Dialing(mut s) => match Future::poll(Pin::new(&mut s), cx) {
 					Poll::Ready(Ok(sink)) => {
 						debug!(target: "telemetry", "Connected to {}", self.addr);
-						let conn = NodeSocketConnected { sink, pending: VecDeque::new(), need_flush: false };
+						let conn = NodeSocketConnected {
+							sink,
+							pending: VecDeque::new(),
+							need_flush: false,
+							timeout: None,
+						};
 						self.socket = NodeSocket::Connected(conn);
 						return Poll::Ready(NodeEvent::Connected)
 					},
@@ -189,18 +208,15 @@ where TTrans::Output: Sink<BytesMut, Error = TSinkErr>
 	fn poll(
 		mut self: Pin<&mut Self>,
 		cx: &mut Context,
-		my_addr: &Multiaddr
-	) -> Poll<Result<futures::never::Never, TSinkErr>> {
-		loop {
-			if let Some(item) = self.pending.pop_front() {
-				if let Poll::Pending = Sink::poll_ready(Pin::new(&mut self.sink), cx) {
-					self.pending.push_front(item);
-					return Poll::Pending
-				}
+		my_addr: &Multiaddr,
+	) -> Poll<Result<futures::never::Never, ConnectionError<TSinkErr>>> {
 
+		while let Some(item) = self.pending.pop_front() {
+			if let Poll::Ready(_) = Sink::poll_ready(Pin::new(&mut self.sink), cx) {
 				let item_len = item.len();
 				if let Err(err) = Sink::start_send(Pin::new(&mut self.sink), item) {
-					return Poll::Ready(Err(err))
+					self.timeout = None;
+					return Poll::Ready(Err(ConnectionError::Sink(err)))
 				}
 				trace!(
 					target: "telemetry", "Successfully sent {:?} bytes message to {}",
@@ -208,28 +224,59 @@ where TTrans::Output: Sink<BytesMut, Error = TSinkErr>
 				);
 				self.need_flush = true;
 
-			} else if self.need_flush {
-				match Sink::poll_flush(Pin::new(&mut self.sink), cx) {
-					Poll::Pending => return Poll::Pending,
-					Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
-					Poll::Ready(Ok(())) => self.need_flush = false,
+			} else {
+				self.pending.push_front(item);
+				if self.timeout.is_none() {
+					self.timeout = Some(Delay::new(Duration::from_secs(10)));
 				}
+				break;
+			}
+		}
 
-			} else {
-				match Stream::poll_next(Pin::new(&mut self.sink), cx) {
-					Poll::Ready(Some(Ok(_))) => {
-						// We poll the telemetry `Stream` because the underlying implementation relies on
-						// this in order to answer PINGs.
-						// We don't do anything with incoming messages, however.
-					},
-					Poll::Ready(Some(Err(err))) => {
-						return Poll::Ready(Err(err))
-					},
-					Poll::Pending | Poll::Ready(None) => break,
+		if self.need_flush {
+			match Sink::poll_flush(Pin::new(&mut self.sink), cx) {
+				Poll::Pending => {
+					if self.timeout.is_none() {
+						self.timeout = Some(Delay::new(Duration::from_secs(10)));
+					}
+				},
+				Poll::Ready(Err(err)) => {
+					self.timeout = None;
+					return Poll::Ready(Err(ConnectionError::Sink(err)))
+				},
+				Poll::Ready(Ok(())) => {
+					self.timeout = None;
+					self.need_flush = false;
+				},
+			}
+		}
+
+		if let Some(timeout) = self.timeout.as_mut() {
+			match Future::poll(Pin::new(timeout), cx) {
+				Poll::Pending => {},
+				Poll::Ready(Err(err)) => {
+					self.timeout = None;
+					warn!(target: "telemetry", "Connection timeout error for {} {:?}", my_addr, err);
+				}
+				Poll::Ready(Ok(_)) => {
+					self.timeout = None;
+					return Poll::Ready(Err(ConnectionError::Timeout))
 				}
 			}
 		}
 
+		match Stream::poll_next(Pin::new(&mut self.sink), cx) {
+			Poll::Ready(Some(Ok(_))) => {
+				// We poll the telemetry `Stream` because the underlying implementation relies on
+				// this in order to answer PINGs.
+				// We don't do anything with incoming messages, however.
+			},
+			Poll::Ready(Some(Err(err))) => {
+				return Poll::Ready(Err(ConnectionError::Sink(err)))
+			},
+			Poll::Pending | Poll::Ready(None) => {},
+		}
+
 		Poll::Pending
 	}
 }
-- 
GitLab