From a7f578d63afb25a43be3ccabace9f2a78e7dff83 Mon Sep 17 00:00:00 2001
From: Pierre Krieger <pierre.krieger1708@gmail.com>
Date: Wed, 22 Apr 2020 10:58:26 +0200
Subject: [PATCH] Add a protocol that answers finality proofs (#5718)

* Add a protocol that answers finality proofs

* Fix documentation

* Use Toggle
---
 substrate/client/network/build.rs             |   1 +
 substrate/client/network/src/behaviour.rs     |   6 +-
 substrate/client/network/src/protocol.rs      |   5 +
 .../network/src/protocol/finality_requests.rs | 266 ++++++++++++++++++
 .../src/protocol/schema/finality.v1.proto     |  19 ++
 substrate/client/network/src/service.rs       |  13 +-
 6 files changed, 307 insertions(+), 3 deletions(-)
 create mode 100644 substrate/client/network/src/protocol/finality_requests.rs
 create mode 100644 substrate/client/network/src/protocol/schema/finality.v1.proto

diff --git a/substrate/client/network/build.rs b/substrate/client/network/build.rs
index 0fd1f128660..991b1cba5d6 100644
--- a/substrate/client/network/build.rs
+++ b/substrate/client/network/build.rs
@@ -1,5 +1,6 @@
 const PROTOS: &[&str] = &[
 	"src/protocol/schema/api.v1.proto",
+	"src/protocol/schema/finality.v1.proto",
 	"src/protocol/schema/light.v1.proto"
 ];
 
diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs
index 880b381e669..14b2245be0a 100644
--- a/substrate/client/network/src/behaviour.rs
+++ b/substrate/client/network/src/behaviour.rs
@@ -25,7 +25,7 @@ use codec::Encode as _;
 use libp2p::NetworkBehaviour;
 use libp2p::core::{Multiaddr, PeerId, PublicKey};
 use libp2p::kad::record;
-use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters};
+use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, toggle::Toggle};
 use log::debug;
 use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}};
 use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId, Justification};
@@ -45,6 +45,8 @@ pub struct Behaviour<B: BlockT, H: ExHashT> {
 	discovery: DiscoveryBehaviour,
 	/// Block request handling.
 	block_requests: protocol::BlockRequests<B>,
+	/// Finality proof request handling.
+	finality_proof_requests: Toggle<protocol::FinalityProofRequests<B>>,
 	/// Light client request handling.
 	light_client_handler: protocol::LightClientHandler<B>,
 
@@ -75,6 +77,7 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
 		user_agent: String,
 		local_public_key: PublicKey,
 		block_requests: protocol::BlockRequests<B>,
+		finality_proof_requests: Option<protocol::FinalityProofRequests<B>>,
 		light_client_handler: protocol::LightClientHandler<B>,
 		disco_config: DiscoveryConfig,
 	) -> Self {
@@ -83,6 +86,7 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
 			debug_info: debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()),
 			discovery: disco_config.finish(),
 			block_requests,
+			finality_proof_requests: From::from(finality_proof_requests),
 			light_client_handler,
 			events: Vec::new(),
 			role,
diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs
index c89aa4cf502..dca74c8d607 100644
--- a/substrate/client/network/src/protocol.rs
+++ b/substrate/client/network/src/protocol.rs
@@ -62,6 +62,9 @@ use wasm_timer::Instant;
 pub mod api {
 	pub mod v1 {
 		include!(concat!(env!("OUT_DIR"), "/api.v1.rs"));
+		pub mod finality {
+			include!(concat!(env!("OUT_DIR"), "/api.v1.finality.rs"));
+		}
 		pub mod light {
 			include!(concat!(env!("OUT_DIR"), "/api.v1.light.rs"));
 		}
@@ -72,12 +75,14 @@ mod generic_proto;
 mod util;
 
 pub mod block_requests;
+pub mod finality_requests;
 pub mod message;
 pub mod event;
 pub mod light_client_handler;
 pub mod sync;
 
 pub use block_requests::BlockRequests;
+pub use finality_requests::FinalityProofRequests;
 pub use light_client_handler::LightClientHandler;
 pub use generic_proto::LegacyConnectionKillError;
 
diff --git a/substrate/client/network/src/protocol/finality_requests.rs b/substrate/client/network/src/protocol/finality_requests.rs
new file mode 100644
index 00000000000..b12b79f41bc
--- /dev/null
+++ b/substrate/client/network/src/protocol/finality_requests.rs
@@ -0,0 +1,266 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Substrate.
+//
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
+
+//! `NetworkBehaviour` implementation which handles incoming finality proof requests.
+//!
+//! Every request is coming in on a separate connection substream which gets
+//! closed after we have sent the response back. Incoming requests are encoded
+//! as protocol buffers (cf. `finality.v1.proto`).
+
+#![allow(unused)]
+
+use bytes::Bytes;
+use codec::{Encode, Decode};
+use crate::{
+	chain::FinalityProofProvider,
+	config::ProtocolId,
+	protocol::{api, message::BlockAttributes}
+};
+use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
+use libp2p::{
+	core::{
+		ConnectedPoint,
+		Multiaddr,
+		PeerId,
+		connection::ConnectionId,
+		upgrade::{InboundUpgrade, ReadOneError, UpgradeInfo, Negotiated},
+		upgrade::{DeniedUpgrade, read_one, write_one}
+	},
+	swarm::{
+		NegotiatedSubstream,
+		NetworkBehaviour,
+		NetworkBehaviourAction,
+		OneShotHandler,
+		OneShotHandlerConfig,
+		PollParameters,
+		SubstreamProtocol
+	}
+};
+use prost::Message;
+use sp_runtime::{generic::BlockId, traits::{Block, Header, One, Zero}};
+use std::{
+	cmp::min,
+	io,
+	iter,
+	sync::Arc,
+	time::Duration,
+	task::{Context, Poll}
+};
+use void::{Void, unreachable};
+
+// Type alias for convenience.
+pub type Error = Box<dyn std::error::Error + 'static>;
+
+/// Configuration options for `FinalityProofRequests`.
+#[derive(Debug, Clone)]
+pub struct Config {
+	max_request_len: usize,
+	inactivity_timeout: Duration,
+	protocol: Bytes,
+}
+
+impl Config {
+	/// Create a fresh configuration with the following options:
+	///
+	/// - max. request size = 1 MiB
+	/// - inactivity timeout = 15s
+	pub fn new(id: &ProtocolId) -> Self {
+		let mut c = Config {
+			max_request_len: 1024 * 1024,
+			inactivity_timeout: Duration::from_secs(15),
+			protocol: Bytes::new(),
+		};
+		c.set_protocol(id);
+		c
+	}
+
+	/// Limit the max. length of incoming finality proof request bytes.
+	pub fn set_max_request_len(&mut self, v: usize) -> &mut Self {
+		self.max_request_len = v;
+		self
+	}
+
+	/// Limit the max. duration the substream may remain inactive before closing it.
+	pub fn set_inactivity_timeout(&mut self, v: Duration) -> &mut Self {
+		self.inactivity_timeout = v;
+		self
+	}
+
+	/// Set protocol to use for upgrade negotiation.
+	pub fn set_protocol(&mut self, id: &ProtocolId) -> &mut Self {
+		let mut v = Vec::new();
+		v.extend_from_slice(b"/");
+		v.extend_from_slice(id.as_bytes());
+		v.extend_from_slice(b"/finality-proof/1");
+		self.protocol = v.into();
+		self
+	}
+}
+
+/// The finality proof request handling behaviour.
+pub struct FinalityProofRequests<B: Block> {
+	/// This behaviour's configuration.
+	config: Config,
+	/// How to construct finality proofs.
+	finality_proof_provider: Arc<dyn FinalityProofProvider<B>>,
+	/// Futures sending back the finality proof request responses.
+	outgoing: FuturesUnordered<BoxFuture<'static, ()>>,
+}
+
+impl<B> FinalityProofRequests<B>
+where
+	B: Block,
+{
+	/// Initializes the behaviour.
+	pub fn new(cfg: Config, finality_proof_provider: Arc<dyn FinalityProofProvider<B>>) -> Self {
+		FinalityProofRequests {
+			config: cfg,
+			finality_proof_provider,
+			outgoing: FuturesUnordered::new(),
+		}
+	}
+
+	/// Callback, invoked when a new finality request has been received from remote.
+	fn on_finality_request(&mut self, peer: &PeerId, request: &api::v1::finality::FinalityProofRequest)
+		-> Result<api::v1::finality::FinalityProofResponse, Error>
+	{
+		let block_hash = Decode::decode(&mut request.block_hash.as_ref())?;
+
+		log::trace!(target: "sync", "Finality proof request from {} for {}", peer, block_hash);
+
+		let finality_proof = self.finality_proof_provider
+			.prove_finality(block_hash, &request.request)?
+			.unwrap_or(Vec::new());
+		// Note that an empty Vec is sent if no proof is available.
+
+		Ok(api::v1::finality::FinalityProofResponse { proof: finality_proof })
+	}
+}
+
+impl<B> NetworkBehaviour for FinalityProofRequests<B>
+where
+	B: Block
+{
+	type ProtocolsHandler = OneShotHandler<Protocol, DeniedUpgrade, Request<NegotiatedSubstream>>;
+	type OutEvent = Void;
+
+	fn new_handler(&mut self) -> Self::ProtocolsHandler {
+		let p = Protocol {
+			max_request_len: self.config.max_request_len,
+			protocol: self.config.protocol.clone(),
+		};
+		let mut cfg = OneShotHandlerConfig::default();
+		cfg.inactive_timeout = self.config.inactivity_timeout;
+		OneShotHandler::new(SubstreamProtocol::new(p), cfg)
+	}
+
+	fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
+		Vec::new()
+	}
+
+	fn inject_connected(&mut self, _peer: &PeerId) {
+	}
+
+	fn inject_disconnected(&mut self, _peer: &PeerId) {
+	}
+
+	fn inject_event(
+		&mut self,
+		peer: PeerId,
+		connection: ConnectionId,
+		Request(request, mut stream): Request<NegotiatedSubstream>
+	) {
+		match self.on_finality_request(&peer, &request) {
+			Ok(res) => {
+				log::trace!("enqueueing finality response for peer {}", peer);
+				let mut data = Vec::with_capacity(res.encoded_len());
+				if let Err(e) = res.encode(&mut data) {
+					log::debug!("error encoding finality response for peer {}: {}", peer, e)
+				} else {
+					let future = async move {
+						if let Err(e) = write_one(&mut stream, data).await {
+							log::debug!("error writing finality response: {}", e)
+						}
+					};
+					self.outgoing.push(future.boxed())
+				}
+			}
+			Err(e) => log::debug!("error handling finality request from peer {}: {}", peer, e)
+		}
+	}
+
+	fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<DeniedUpgrade, Void>> {
+		while let Poll::Ready(Some(_)) = self.outgoing.poll_next_unpin(cx) {}
+		Poll::Pending
+	}
+}
+
+/// The incoming finality proof request.
+///
+/// Holds the protobuf value and the connection substream which made the
+/// request and over which to send the response.
+#[derive(Debug)]
+pub struct Request<T>(api::v1::finality::FinalityProofRequest, T);
+
+impl<T> From<Void> for Request<T> {
+	fn from(v: Void) -> Self {
+		unreachable(v)
+	}
+}
+
+/// Substream upgrade protocol.
+///
+/// We attempt to parse an incoming protobuf encoded request (cf. `Request`)
+/// which will be handled by the `FinalityProofRequests` behaviour, i.e. the request
+/// will become visible via `inject_node_event` which then dispatches to the
+/// relevant callback to process the message and prepare a response.
+#[derive(Debug, Clone)]
+pub struct Protocol {
+	/// The max. request length in bytes.
+	max_request_len: usize,
+	/// The protocol to use during upgrade negotiation.
+	protocol: Bytes,
+}
+
+impl UpgradeInfo for Protocol {
+	type Info = Bytes;
+	type InfoIter = iter::Once<Self::Info>;
+
+	fn protocol_info(&self) -> Self::InfoIter {
+		iter::once(self.protocol.clone())
+	}
+}
+
+impl<T> InboundUpgrade<T> for Protocol
+where
+	T: AsyncRead + AsyncWrite + Unpin + Send + 'static
+{
+	type Output = Request<T>;
+	type Error = ReadOneError;
+	type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
+
+	fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future {
+		async move {
+			let len = self.max_request_len;
+			let vec = read_one(&mut s, len).await?;
+			match api::v1::finality::FinalityProofRequest::decode(&vec[..]) {
+				Ok(r) => Ok(Request(r, s)),
+				Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)))
+			}
+		}.boxed()
+	}
+}
+
diff --git a/substrate/client/network/src/protocol/schema/finality.v1.proto b/substrate/client/network/src/protocol/schema/finality.v1.proto
new file mode 100644
index 00000000000..843bc4eca09
--- /dev/null
+++ b/substrate/client/network/src/protocol/schema/finality.v1.proto
@@ -0,0 +1,19 @@
+// Schema definition for finality proof request/responses.
+
+syntax = "proto3";
+
+package api.v1.finality;
+
+// Request a finality proof from a peer.
+message FinalityProofRequest {
+	// SCALE-encoded hash of the block to request.
+	bytes block_hash = 1;
+	// Opaque chain-specific additional request data.
+	bytes request = 2;
+}
+
+// Response to a finality proof request.
+message FinalityProofResponse {
+	// Opaque chain-specific finality proof. Empty if no such proof exists.
+	bytes proof = 1; // optional
+}
diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs
index d29cb94ee8a..e360a8defec 100644
--- a/substrate/client/network/src/service.rs
+++ b/substrate/client/network/src/service.rs
@@ -225,6 +225,12 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
 				let config = protocol::block_requests::Config::new(&params.protocol_id);
 				protocol::BlockRequests::new(config, params.chain.clone())
 			};
+			let finality_proof_requests = if let Some(pb) = &params.finality_proof_provider {
+				let config = protocol::finality_requests::Config::new(&params.protocol_id);
+				Some(protocol::FinalityProofRequests::new(config, pb.clone()))
+			} else {
+				None
+			};
 			let light_client_handler = {
 				let config = protocol::light_client_handler::Config::new(&params.protocol_id);
 				protocol::LightClientHandler::new(
@@ -261,6 +267,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
 				user_agent,
 				local_public,
 				block_requests,
+				finality_proof_requests,
 				light_client_handler,
 				discovery_config
 			);
@@ -1113,10 +1120,12 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
 							ConnectionError::IO(_) =>
 								metrics.connections_closed_total.with_label_values(&[dir, "transport-error"]).inc(),
 							ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
-								EitherError::A(EitherError::B(EitherError::A(PingFailure::Timeout))))))) =>
+								EitherError::A(EitherError::A(EitherError::B(
+								EitherError::A(PingFailure::Timeout)))))))) =>
 								metrics.connections_closed_total.with_label_values(&[dir, "ping-timeout"]).inc(),
 							ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
-								EitherError::A(EitherError::A(EitherError::B(LegacyConnectionKillError))))))) =>
+								EitherError::A(EitherError::A(EitherError::A(
+								EitherError::B(LegacyConnectionKillError)))))))) =>
 								metrics.connections_closed_total.with_label_values(&[dir, "force-closed"]).inc(),
 							ConnectionError::Handler(NodeHandlerWrapperError::Handler(_)) =>
 								metrics.connections_closed_total.with_label_values(&[dir, "protocol-error"]).inc(),
-- 
GitLab