Skip to content
Snippets Groups Projects
  • Gavin Wood's avatar
    A few missing emojies (#5490) · b86c7e87
    Gavin Wood authored
    * A few missing emojies
    
    * P2p message emojis
    
    * Add a bit of colour
    
    * format
    
    * Introduce a couple of spaces
    
    * Some spaces
    b86c7e87
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
node.rs 9.19 KiB
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.

// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Substrate.  If not, see <http://www.gnu.org/licenses/>.

//! Contains the `Node` struct, which handles communications with a single telemetry endpoint.

use bytes::BytesMut;
use futures::prelude::*;
use futures_timer::Delay;
use libp2p::Multiaddr;
use libp2p::core::transport::Transport;
use log::{trace, debug, warn, error};
use rand::Rng as _;
use std::{collections::VecDeque, fmt, mem, pin::Pin, task::Context, task::Poll, time::Duration};

/// Maximum number of pending telemetry messages.
const MAX_PENDING: usize = 10;

/// Handler for a single telemetry node.
pub struct Node<TTrans: Transport> {
	/// Address of the node.
	addr: Multiaddr,
	/// State of the connection.
	socket: NodeSocket<TTrans>,
	/// Transport used to establish new connections.
	transport: TTrans,
}

enum NodeSocket<TTrans: Transport> {
	/// We're connected to the node. This is the normal state.
	Connected(NodeSocketConnected<TTrans>),
	/// We are currently dialing the node.
	Dialing(TTrans::Dial),
	/// A new connection should be started as soon as possible.
	ReconnectNow,
	/// Waiting before attempting to dial again.
	WaitingReconnect(Delay),
	/// Temporary transition state.
	Poisoned,
}

struct NodeSocketConnected<TTrans: Transport> {
	/// Where to send data.
	sink: TTrans::Output,
	/// Queue of packets to send.
	pending: VecDeque<BytesMut>,
	/// If true, we need to flush the sink.
	need_flush: bool,
	/// A timeout for the socket to write data.
	timeout: Option<Delay>,
}

/// Event that can happen with this node.
#[derive(Debug)]
pub enum NodeEvent<TSinkErr> {
	/// We are now connected to this node.
	Connected,
	/// We are now disconnected from this node.
	Disconnected(ConnectionError<TSinkErr>),
}

/// Reason for disconnecting from a node.
#[derive(Debug)]
pub enum ConnectionError<TSinkErr> {
	/// The connection timed-out.
	Timeout,
	/// Reading from the socket returned and end-of-file, indicating that the socket has been
	/// closed.
	Closed,
	/// The sink errored.
	Sink(TSinkErr),
}

impl<TTrans: Transport> Node<TTrans> {
	/// Builds a new node handler.
	pub fn new(transport: TTrans, addr: Multiaddr) -> Self {
		Node {
			addr,
			socket: NodeSocket::ReconnectNow,
			transport,
		}
	}

	/// Returns the address that was passed to `new`.
	pub fn addr(&self) -> &Multiaddr {
		&self.addr
	}
}

impl<TTrans: Transport, TSinkErr> Node<TTrans>
where TTrans: Clone + Unpin, TTrans::Dial: Unpin,
	TTrans::Output: Sink<BytesMut, Error = TSinkErr>
		+ Stream<Item=Result<BytesMut, TSinkErr>>
		+ Unpin,
	TSinkErr: fmt::Debug
{
	/// Sends a WebSocket frame to the node. Returns an error if we are not connected to the node.
	///
	/// After calling this method, you should call `poll` in order for it to be properly processed.
	pub fn send_message(&mut self, payload: impl Into<BytesMut>) -> Result<(), ()> {
		if let NodeSocket::Connected(NodeSocketConnected { pending, .. }) = &mut self.socket {
			if pending.len() <= MAX_PENDING {
				trace!(target: "telemetry", "Adding log entry to queue for {:?}", self.addr);
				pending.push_back(payload.into());
				Ok(())
			} else {
				warn!(target: "telemetry", "⚠️ Rejected log entry because queue is full for {:?}",
					self.addr);
				Err(())
			}
		} else {
			Err(())
		}
	}

	/// Polls the node for updates. Must be performed regularly.
	pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<NodeEvent<TSinkErr>> {
		let mut socket = mem::replace(&mut self.socket, NodeSocket::Poisoned);
		self.socket = loop {
			match socket {
				NodeSocket::Connected(mut conn) => {
					match NodeSocketConnected::poll(Pin::new(&mut conn), cx, &self.addr) {
						Poll::Ready(Ok(v)) => match v {},
						Poll::Pending => {
							break NodeSocket::Connected(conn)
						},
						Poll::Ready(Err(err)) => {
							warn!(target: "telemetry", "⚠️ Disconnected from {}: {:?}", self.addr, err);
							let timeout = gen_rand_reconnect_delay();
							self.socket = NodeSocket::WaitingReconnect(timeout);
							return Poll::Ready(NodeEvent::Disconnected(err))
						}
					}
				}
				NodeSocket::Dialing(mut s) => match Future::poll(Pin::new(&mut s), cx) {
					Poll::Ready(Ok(sink)) => {
						debug!(target: "telemetry", "✅ Connected to {}", self.addr);
						let conn = NodeSocketConnected {
							sink,
							pending: VecDeque::new(),
							need_flush: false,
							timeout: None,
						};
						self.socket = NodeSocket::Connected(conn);
						return Poll::Ready(NodeEvent::Connected)
					},
					Poll::Pending => break NodeSocket::Dialing(s),
					Poll::Ready(Err(err)) => {
						warn!(target: "telemetry", "❌ Error while dialing {}: {:?}", self.addr, err);
						let timeout = gen_rand_reconnect_delay();
						socket = NodeSocket::WaitingReconnect(timeout);
					}
				}
				NodeSocket::ReconnectNow => match self.transport.clone().dial(self.addr.clone()) {
					Ok(d) => {
						debug!(target: "telemetry", "Started dialing {}", self.addr);
						socket = NodeSocket::Dialing(d);
					}
					Err(err) => {
						warn!(target: "telemetry", "❌ Error while dialing {}: {:?}", self.addr, err);
						let timeout = gen_rand_reconnect_delay();
						socket = NodeSocket::WaitingReconnect(timeout);
					}
				}
				NodeSocket::WaitingReconnect(mut s) =>
					if let Poll::Ready(_) = Future::poll(Pin::new(&mut s), cx) {
						socket = NodeSocket::ReconnectNow;
					} else {
						break NodeSocket::WaitingReconnect(s)
					}
				NodeSocket::Poisoned => {
					error!(target: "telemetry", "‼️ Poisoned connection with {}", self.addr);
					break NodeSocket::Poisoned
				}
			}
		};

		Poll::Pending
	}
}

/// Generates a `Delay` object with a random timeout.
///
/// If there are general connection issues, not all endpoints should be synchronized in their
/// re-connection time.
fn gen_rand_reconnect_delay() -> Delay {
	let random_delay = rand::thread_rng().gen_range(5, 10);
	Delay::new(Duration::from_secs(random_delay))
}

impl<TTrans: Transport, TSinkErr> NodeSocketConnected<TTrans>
where TTrans::Output: Sink<BytesMut, Error = TSinkErr>
	+ Stream<Item=Result<BytesMut, TSinkErr>>
	+ Unpin
{
	/// Processes the queue of messages for the connected socket.
	///
	/// The address is passed for logging purposes only.
	fn poll(
		mut self: Pin<&mut Self>,
		cx: &mut Context,
		my_addr: &Multiaddr,
	) -> Poll<Result<futures::never::Never, ConnectionError<TSinkErr>>> {

		while let Some(item) = self.pending.pop_front() {
			if let Poll::Ready(result) = Sink::poll_ready(Pin::new(&mut self.sink), cx) {
				if let Err(err) = result {
					return Poll::Ready(Err(ConnectionError::Sink(err)))
				}

				let item_len = item.len();
				if let Err(err) = Sink::start_send(Pin::new(&mut self.sink), item) {
					return Poll::Ready(Err(ConnectionError::Sink(err)))
				}
				trace!(
					target: "telemetry", "Successfully sent {:?} bytes message to {}",
					item_len, my_addr
				);
				self.need_flush = true;

			} else {
				self.pending.push_front(item);
				if self.timeout.is_none() {
					self.timeout = Some(Delay::new(Duration::from_secs(10)));
				}
				break;
			}
		}

		if self.need_flush {
			match Sink::poll_flush(Pin::new(&mut self.sink), cx) {
				Poll::Pending => {
					if self.timeout.is_none() {
						self.timeout = Some(Delay::new(Duration::from_secs(10)));
					}
				},
				Poll::Ready(Err(err)) => {
					self.timeout = None;
					return Poll::Ready(Err(ConnectionError::Sink(err)))
				},
				Poll::Ready(Ok(())) => {
					self.timeout = None;
					self.need_flush = false;
				},
			}
		}

		if let Some(timeout) = self.timeout.as_mut() {
			match Future::poll(Pin::new(timeout), cx) {
				Poll::Pending => {},
				Poll::Ready(()) => {
					self.timeout = None;
					return Poll::Ready(Err(ConnectionError::Timeout))
				}
			}
		}

		match Stream::poll_next(Pin::new(&mut self.sink), cx) {
			Poll::Ready(Some(Ok(_))) => {
				// We poll the telemetry `Stream` because the underlying implementation relies on
				// this in order to answer PINGs.
				// We don't do anything with incoming messages, however.
			},
			Poll::Ready(Some(Err(err))) => {
				return Poll::Ready(Err(ConnectionError::Sink(err)))
			},
			Poll::Ready(None) => {
				return Poll::Ready(Err(ConnectionError::Closed))
			},
			Poll::Pending => {},
		}

		Poll::Pending
	}
}

impl<TTrans: Transport> fmt::Debug for Node<TTrans> {
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
		let state = match self.socket {
			NodeSocket::Connected(_) => "Connected",
			NodeSocket::Dialing(_) => "Dialing",
			NodeSocket::ReconnectNow => "Pending reconnect",
			NodeSocket::WaitingReconnect(_) => "Pending reconnect",
			NodeSocket::Poisoned => "Poisoned",
		};

		f.debug_struct("Node")
			.field("addr", &self.addr)
			.field("state", &state)
			.finish()
	}
}