diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index 7b1a35354c913d1b42f30ee7e78b11d757f0cc91..a34f6e0960c475ad3d0cf75843e7e52886f937af 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -17,14 +17,15 @@ // along with this program. If not, see <https://www.gnu.org/licenses/>. use crate::{ - config::{ProtocolId, Role}, light_client_handler, peer_info, request_responses, + config::{ProtocolId, Role}, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, protocol::{message::Roles, CustomMessageOutcome, NotificationsSink, Protocol}, + peer_info, request_responses, light_client_requests, ObservedRole, DhtEvent, ExHashT, }; use bytes::Bytes; -use futures::channel::oneshot; +use futures::{channel::oneshot, stream::StreamExt}; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; use libp2p::identify::IdentifyInfo; @@ -59,8 +60,6 @@ pub struct Behaviour<B: BlockT, H: ExHashT> { discovery: DiscoveryBehaviour, /// Generic request-reponse protocols. request_responses: request_responses::RequestResponsesBehaviour, - /// Light client request handling. - light_client_handler: light_client_handler::LightClientHandler<B>, /// Queue of events to produce for the outside. #[behaviour(ignore)] @@ -70,6 +69,10 @@ pub struct Behaviour<B: BlockT, H: ExHashT> { #[behaviour(ignore)] role: Role, + /// Light client request handling. + #[behaviour(ignore)] + light_client_request_sender: light_client_requests::sender::LightClientRequestSender<B>, + /// Protocol name used to send out block requests via /// [`request_responses::RequestResponsesBehaviour`]. #[behaviour(ignore)] @@ -174,10 +177,10 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> { role: Role, user_agent: String, local_public_key: PublicKey, - light_client_handler: light_client_handler::LightClientHandler<B>, + light_client_request_sender: light_client_requests::sender::LightClientRequestSender<B>, disco_config: DiscoveryConfig, - // Block request protocol config. block_request_protocol_config: request_responses::ProtocolConfig, + light_client_request_protocol_config: request_responses::ProtocolConfig, // All remaining request protocol configs. mut request_response_protocols: Vec<request_responses::ProtocolConfig>, ) -> Result<Self, request_responses::RegisterError> { @@ -185,13 +188,15 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> { let block_request_protocol_name = block_request_protocol_config.name.to_string(); request_response_protocols.push(block_request_protocol_config); + request_response_protocols.push(light_client_request_protocol_config); + Ok(Behaviour { substrate, peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key), discovery: disco_config.finish(), request_responses: request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?, - light_client_handler, + light_client_request_sender, events: VecDeque::new(), role, @@ -268,8 +273,11 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> { } /// Issue a light client request. - pub fn light_client_request(&mut self, r: light_client_handler::Request<B>) -> Result<(), light_client_handler::Error> { - self.light_client_handler.request(r) + pub fn light_client_request( + &mut self, + r: light_client_requests::sender::Request<B>, + ) -> Result<(), light_client_requests::sender::SendRequestError> { + self.light_client_request_sender.request(r) } } @@ -289,13 +297,6 @@ fn reported_roles_to_observed_role(local_role: &Role, remote: &PeerId, roles: Ro } } -impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<void::Void> for -Behaviour<B, H> { - fn inject_event(&mut self, event: void::Void) { - void::unreachable(event) - } -} - impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<CustomMessageOutcome<B>> for Behaviour<B, H> { fn inject_event(&mut self, event: CustomMessageOutcome<B>) { @@ -343,12 +344,16 @@ Behaviour<B, H> { self.events.push_back(BehaviourOut::NotificationsReceived { remote, messages }); }, CustomMessageOutcome::PeerNewBest(peer_id, number) => { - self.light_client_handler.update_best_block(&peer_id, number); + self.light_client_request_sender.update_best_block(&peer_id, number); + } + CustomMessageOutcome::SyncConnected(peer_id) => { + self.light_client_request_sender.inject_connected(peer_id); + self.events.push_back(BehaviourOut::SyncConnected(peer_id)) + } + CustomMessageOutcome::SyncDisconnected(peer_id) => { + self.light_client_request_sender.inject_disconnected(peer_id); + self.events.push_back(BehaviourOut::SyncDisconnected(peer_id)) } - CustomMessageOutcome::SyncConnected(peer_id) => - self.events.push_back(BehaviourOut::SyncConnected(peer_id)), - CustomMessageOutcome::SyncDisconnected(peer_id) => - self.events.push_back(BehaviourOut::SyncDisconnected(peer_id)), CustomMessageOutcome::None => {} } } @@ -443,7 +448,20 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut> } impl<B: BlockT, H: ExHashT> Behaviour<B, H> { - fn poll<TEv>(&mut self, _: &mut Context, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<TEv, BehaviourOut<B>>> { + fn poll<TEv>( + &mut self, + cx: &mut Context, + _: &mut impl PollParameters, + ) -> Poll<NetworkBehaviourAction<TEv, BehaviourOut<B>>> { + use light_client_requests::sender::OutEvent; + while let Poll::Ready(Some(event)) = self.light_client_request_sender.poll_next_unpin(cx) { + match event { + OutEvent::SendRequest { target, request, pending_response, protocol_name } => { + self.request_responses.send_request(&target, &protocol_name, request, pending_response) + } + } + } + if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) } diff --git a/substrate/client/network/src/block_request_handler.rs b/substrate/client/network/src/block_request_handler.rs index 1a6c09eff1303eccc6a03cc84c8abc568125d4e1..92f21f44f9d1c18ff634e1c0de5cd40b746a98be 100644 --- a/substrate/client/network/src/block_request_handler.rs +++ b/substrate/client/network/src/block_request_handler.rs @@ -39,7 +39,7 @@ const MAX_BLOCKS_IN_RESPONSE: usize = 128; const MAX_BODY_BYTES: usize = 8 * 1024 * 1024; /// Generates a [`ProtocolConfig`] for the block request protocol, refusing incoming requests. -pub fn generate_protocol_config(protocol_id: ProtocolId) -> ProtocolConfig { +pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig { ProtocolConfig { name: generate_protocol_name(protocol_id).into(), max_request_size: 1024 * 1024, @@ -50,7 +50,10 @@ pub fn generate_protocol_config(protocol_id: ProtocolId) -> ProtocolConfig { } /// Generate the block protocol name from chain specific protocol identifier. -fn generate_protocol_name(protocol_id: ProtocolId) -> String { +// +// Visibility `pub(crate)` to allow `crate::light_client_requests::sender` to generate block request +// protocol name and send block requests. +pub(crate) fn generate_protocol_name(protocol_id: &ProtocolId) -> String { let mut s = String::new(); s.push_str("/"); s.push_str(protocol_id.as_ref()); @@ -66,7 +69,7 @@ pub struct BlockRequestHandler<B> { impl <B: BlockT> BlockRequestHandler<B> { /// Create a new [`BlockRequestHandler`]. - pub fn new(protocol_id: ProtocolId, client: Arc<dyn Client<B>>) -> (Self, ProtocolConfig) { + pub fn new(protocol_id: &ProtocolId, client: Arc<dyn Client<B>>) -> (Self, ProtocolConfig) { // Rate of arrival multiplied with the waiting time in the queue equals the queue length. // // An average Polkadot sentry node serves less than 5 requests per second. The 95th percentile @@ -82,6 +85,22 @@ impl <B: BlockT> BlockRequestHandler<B> { (Self { client, request_receiver }, protocol_config) } + /// Run [`BlockRequestHandler`]. + pub async fn run(mut self) { + while let Some(request) = self.request_receiver.next().await { + let IncomingRequest { peer, payload, pending_response } = request; + + match self.handle_request(payload, pending_response) { + Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer), + Err(e) => debug!( + target: LOG_TARGET, + "Failed to handle block request from {}: {}", + peer, e, + ), + } + } + } + fn handle_request( &self, payload: Vec<u8>, @@ -186,22 +205,6 @@ impl <B: BlockT> BlockRequestHandler<B> { reputation_changes: Vec::new(), }).map_err(|_| HandleRequestError::SendResponse) } - - /// Run [`BlockRequestHandler`]. - pub async fn run(mut self) { - while let Some(request) = self.request_receiver.next().await { - let IncomingRequest { peer, payload, pending_response } = request; - - match self.handle_request(payload, pending_response) { - Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer), - Err(e) => debug!( - target: LOG_TARGET, - "Failed to handle block request from {}: {}", - peer, e, - ), - } - } - } } #[derive(derive_more::Display, derive_more::From)] diff --git a/substrate/client/network/src/config.rs b/substrate/client/network/src/config.rs index 29d238c368a780dad94fa7cdd5b7a95104c9d123..3eb53dabf045919fb19d1207a73a1a1fd77d824b 100644 --- a/substrate/client/network/src/config.rs +++ b/substrate/client/network/src/config.rs @@ -111,6 +111,14 @@ pub struct Params<B: BlockT, H: ExHashT> { /// [`block_request_handler::BlockRequestHandler::new`] allowing both outgoing and incoming /// requests. pub block_request_protocol_config: RequestResponseConfig, + + /// Request response configuration for the light client request protocol. + /// + /// Can be constructed either via [`light_client_requests::generate_protocol_config`] allowing + /// outgoing but not incoming requests, or constructed via + /// [`light_client_requests::handler::LightClientRequestHandler::new`] allowing both outgoing + /// and incoming requests. + pub light_client_request_protocol_config: RequestResponseConfig, } /// Role of the local node. diff --git a/substrate/client/network/src/gossip/tests.rs b/substrate/client/network/src/gossip/tests.rs index e0941357e8441889be6d78b0895a2cbfa7298d57..c0b8c5e730a117b5212b7d1f3079bd2d54eee740 100644 --- a/substrate/client/network/src/gossip/tests.rs +++ b/substrate/client/network/src/gossip/tests.rs @@ -17,6 +17,7 @@ // along with this program. If not, see <https://www.gnu.org/licenses/>. use crate::block_request_handler::BlockRequestHandler; +use crate::light_client_requests::handler::LightClientRequestHandler; use crate::gossip::QueuedSender; use crate::{config, Event, NetworkService, NetworkWorker}; @@ -96,7 +97,16 @@ fn build_test_full_node(network_config: config::NetworkConfiguration) let block_request_protocol_config = { let (handler, protocol_config) = BlockRequestHandler::new( - protocol_id.clone(), + &protocol_id, + client.clone(), + ); + async_std::task::spawn(handler.run().boxed()); + protocol_config + }; + + let light_client_request_protocol_config = { + let (handler, protocol_config) = LightClientRequestHandler::new( + &protocol_id, client.clone(), ); async_std::task::spawn(handler.run().boxed()); @@ -117,6 +127,7 @@ fn build_test_full_node(network_config: config::NetworkConfiguration) ), metrics_registry: None, block_request_protocol_config, + light_client_request_protocol_config, }) .unwrap(); diff --git a/substrate/client/network/src/lib.rs b/substrate/client/network/src/lib.rs index ab7625ff9fe8ae90843428f8276941ebd24cfd0d..007928ad425f7d46beb8d6fd22d8fcbf0bd5afee 100644 --- a/substrate/client/network/src/lib.rs +++ b/substrate/client/network/src/lib.rs @@ -249,7 +249,6 @@ mod behaviour; mod chain; mod peer_info; mod discovery; -mod light_client_handler; mod on_demand_layer; mod protocol; mod request_responses; @@ -259,6 +258,7 @@ mod transport; mod utils; pub mod block_request_handler; +pub mod light_client_requests; pub mod config; pub mod error; pub mod gossip; diff --git a/substrate/client/network/src/light_client_handler.rs b/substrate/client/network/src/light_client_handler.rs deleted file mode 100644 index 1062236e25eb36d874bc769877415d797e0eff5a..0000000000000000000000000000000000000000 --- a/substrate/client/network/src/light_client_handler.rs +++ /dev/null @@ -1,2061 +0,0 @@ -// This file is part of Substrate. - -// Copyright (C) 2020-2021 Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see <https://www.gnu.org/licenses/>. - -//! [`NetworkBehaviour`] implementation which handles light client requests. -//! -//! Every request is coming in on a separate connection substream which gets -//! closed after we have sent the response back. Requests and responses are -//! encoded as protocol buffers (cf. `api.v1.proto`). -//! -//! For every outgoing request we likewise open a separate substream. - -#![allow(unused)] - -use bytes::Bytes; -use codec::{self, Encode, Decode}; -use crate::{ - chain::Client, - config::ProtocolId, - protocol::message::{BlockAttributes, Direction, FromBlock}, - schema, -}; -use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered}; -use libp2p::{ - core::{ - ConnectedPoint, - Multiaddr, - PeerId, - connection::ConnectionId, - upgrade::{InboundUpgrade, ReadOneError, UpgradeInfo, Negotiated}, - upgrade::{OutboundUpgrade, read_one, write_one} - }, - swarm::{ - AddressRecord, - NegotiatedSubstream, - NetworkBehaviour, - NetworkBehaviourAction, - NotifyHandler, - OneShotHandler, - OneShotHandlerConfig, - PollParameters, - SubstreamProtocol, - } -}; -use nohash_hasher::IntMap; -use prost::Message; -use sc_client_api::{ - StorageProof, - light::{ - self, RemoteReadRequest, RemoteBodyRequest, ChangesProof, - RemoteCallRequest, RemoteChangesRequest, RemoteHeaderRequest, - } -}; -use sc_peerset::ReputationChange; -use sp_core::{ - storage::{ChildInfo, ChildType,StorageKey, PrefixedStorageKey}, - hexdisplay::HexDisplay, -}; -use smallvec::SmallVec; -use sp_blockchain::{Error as ClientError}; -use sp_runtime::{ - traits::{Block, Header, NumberFor, Zero}, - generic::BlockId, -}; -use std::{ - collections::{BTreeMap, VecDeque, HashMap}, - iter, - io, - sync::Arc, - time::Duration, - task::{Context, Poll} -}; -use void::Void; -use wasm_timer::Instant; - -/// Reputation change for a peer when a request timed out. -pub(crate) const TIMEOUT_REPUTATION_CHANGE: i32 = -(1 << 8); - -/// Configuration options for `LightClientHandler` behaviour. -#[derive(Debug, Clone)] -pub struct Config { - max_request_size: usize, - max_response_size: usize, - max_pending_requests: usize, - inactivity_timeout: Duration, - request_timeout: Duration, - light_protocol: Bytes, - block_protocol: Bytes, -} - -impl Config { - /// Create a fresh configuration with the following options: - /// - /// - max. request size = 1 MiB - /// - max. response size = 16 MiB - /// - max. pending requests = 128 - /// - inactivity timeout = 15s - /// - request timeout = 15s - pub fn new(id: &ProtocolId) -> Self { - let mut c = Config { - max_request_size: 1 * 1024 * 1024, - max_response_size: 16 * 1024 * 1024, - max_pending_requests: 128, - inactivity_timeout: Duration::from_secs(15), - request_timeout: Duration::from_secs(15), - light_protocol: Bytes::new(), - block_protocol: Bytes::new(), - }; - c.set_protocol(id); - c - } - - /// Limit the max. length in bytes of a request. - pub fn set_max_request_size(&mut self, v: usize) -> &mut Self { - self.max_request_size = v; - self - } - - /// Limit the max. length in bytes of a response. - pub fn set_max_response_size(&mut self, v: usize) -> &mut Self { - self.max_response_size = v; - self - } - - /// Limit the max. number of pending requests. - pub fn set_max_pending_requests(&mut self, v: usize) -> &mut Self { - self.max_pending_requests = v; - self - } - - /// Limit the max. duration the connection may remain inactive before closing it. - pub fn set_inactivity_timeout(&mut self, v: Duration) -> &mut Self { - self.inactivity_timeout = v; - self - } - - /// Limit the max. request duration. - pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self { - self.request_timeout = v; - self - } - - /// Set protocol to use for upgrade negotiation. - pub fn set_protocol(&mut self, id: &ProtocolId) -> &mut Self { - let mut vl = Vec::new(); - vl.extend_from_slice(b"/"); - vl.extend_from_slice(id.as_ref().as_bytes()); - vl.extend_from_slice(b"/light/2"); - self.light_protocol = vl.into(); - - let mut vb = Vec::new(); - vb.extend_from_slice(b"/"); - vb.extend_from_slice(id.as_ref().as_bytes()); - vb.extend_from_slice(b"/sync/2"); - self.block_protocol = vb.into(); - - self - } -} - -/// Possible errors while handling light clients. -#[derive(Debug, thiserror::Error)] -pub enum Error { - /// There are currently too many pending request. - #[error("too many pending requests")] - TooManyRequests, - /// The response type does not correspond to the issued request. - #[error("unexpected response")] - UnexpectedResponse, - /// A bad request has been received. - #[error("bad request: {0}")] - BadRequest(&'static str), - /// The chain client errored. - #[error("client error: {0}")] - Client(#[from] ClientError), - /// Encoding or decoding of some data failed. - #[error("codec error: {0}")] - Codec(#[from] codec::Error), -} - -/// The possible light client requests we support. -/// -/// The associated `oneshot::Sender` will be used to convey the result of -/// their request back to them (cf. `Reply`). -// -// This is modeled after light_dispatch.rs's `RequestData` which is not -// used because we currently only support a subset of those. -#[derive(Debug)] -pub enum Request<B: Block> { - Body { - request: RemoteBodyRequest<B::Header>, - sender: oneshot::Sender<Result<Vec<B::Extrinsic>, ClientError>> - }, - Header { - request: light::RemoteHeaderRequest<B::Header>, - sender: oneshot::Sender<Result<B::Header, ClientError>> - }, - Read { - request: light::RemoteReadRequest<B::Header>, - sender: oneshot::Sender<Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError>> - }, - ReadChild { - request: light::RemoteReadChildRequest<B::Header>, - sender: oneshot::Sender<Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError>> - }, - Call { - request: light::RemoteCallRequest<B::Header>, - sender: oneshot::Sender<Result<Vec<u8>, ClientError>> - }, - Changes { - request: light::RemoteChangesRequest<B::Header>, - sender: oneshot::Sender<Result<Vec<(NumberFor<B>, u32)>, ClientError>> - } -} - -/// The data to send back to the light client over the oneshot channel. -// -// It is unified here in order to be able to return it as a function -// result instead of delivering it to the client as a side effect of -// response processing. -#[derive(Debug)] -enum Reply<B: Block> { - VecU8(Vec<u8>), - VecNumberU32(Vec<(<B::Header as Header>::Number, u32)>), - MapVecU8OptVecU8(HashMap<Vec<u8>, Option<Vec<u8>>>), - Header(B::Header), - Extrinsics(Vec<B::Extrinsic>), -} - -/// Augments a light client request with metadata. -#[derive(Debug)] -struct RequestWrapper<B: Block, P> { - /// Time when this value was created. - timestamp: Instant, - /// Remaining retries. - retries: usize, - /// The actual request. - request: Request<B>, - /// The peer to send the request to, e.g. `PeerId`. - peer: P, - /// The connection to use for sending the request. - connection: Option<ConnectionId>, -} - -/// Information we have about some peer. -#[derive(Debug)] -struct PeerInfo<B: Block> { - connections: SmallVec<[(ConnectionId, Multiaddr); crate::MAX_CONNECTIONS_PER_PEER]>, - best_block: Option<NumberFor<B>>, - status: PeerStatus, -} - -impl<B: Block> Default for PeerInfo<B> { - fn default() -> Self { - PeerInfo { - connections: SmallVec::new(), - best_block: None, - status: PeerStatus::Idle, - } - } -} - -type RequestId = u64; - -/// A peer is either idle or busy processing a request from us. -#[derive(Debug, Clone, PartialEq, Eq)] -enum PeerStatus { - /// The peer is available. - Idle, - /// We wait for the peer to return us a response for the given request ID. - BusyWith(RequestId), -} - -/// The light client handler behaviour. -pub struct LightClientHandler<B: Block> { - /// This behaviour's configuration. - config: Config, - /// Blockchain client. - chain: Arc<dyn Client<B>>, - /// Verifies that received responses are correct. - checker: Arc<dyn light::FetchChecker<B>>, - /// Peer information (addresses, their best block, etc.) - peers: HashMap<PeerId, PeerInfo<B>>, - /// Futures sending back response to remote clients. - responses: FuturesUnordered<BoxFuture<'static, ()>>, - /// Pending (local) requests. - pending_requests: VecDeque<RequestWrapper<B, ()>>, - /// Requests on their way to remote peers. - outstanding: IntMap<RequestId, RequestWrapper<B, PeerId>>, - /// (Local) Request ID counter - next_request_id: RequestId, - /// Handle to use for reporting misbehaviour of peers. - peerset: sc_peerset::PeersetHandle, -} - -impl<B> LightClientHandler<B> -where - B: Block, -{ - /// Construct a new light client handler. - pub fn new( - cfg: Config, - chain: Arc<dyn Client<B>>, - checker: Arc<dyn light::FetchChecker<B>>, - peerset: sc_peerset::PeersetHandle, - ) -> Self { - LightClientHandler { - config: cfg, - chain, - checker, - peers: HashMap::new(), - responses: FuturesUnordered::new(), - pending_requests: VecDeque::new(), - outstanding: IntMap::default(), - next_request_id: 1, - peerset, - } - } - - /// We rely on external information about peers best blocks as we lack the - /// means to determine it ourselves. - pub fn update_best_block(&mut self, peer: &PeerId, num: NumberFor<B>) { - if let Some(info) = self.peers.get_mut(peer) { - log::trace!("new best block for {:?}: {:?}", peer, num); - info.best_block = Some(num) - } - } - - /// Issue a new light client request. - pub fn request(&mut self, req: Request<B>) -> Result<(), Error> { - if self.pending_requests.len() >= self.config.max_pending_requests { - return Err(Error::TooManyRequests) - } - let rw = RequestWrapper { - timestamp: Instant::now(), - retries: retries(&req), - request: req, - peer: (), // we do not know the peer yet - connection: None, - }; - self.pending_requests.push_back(rw); - Ok(()) - } - - fn next_request_id(&mut self) -> RequestId { - let id = self.next_request_id; - self.next_request_id += 1; - id - } - - /// Remove the given peer. - /// - /// If we have a request to this peer in flight, we move it back to - /// the pending requests queue. - fn remove_peer(&mut self, peer: &PeerId) { - if let Some(id) = self.outstanding.iter().find(|(_, rw)| &rw.peer == peer).map(|(k, _)| *k) { - let rw = self.outstanding.remove(&id).expect("key belongs to entry in this map"); - let rw = RequestWrapper { - timestamp: rw.timestamp, - retries: rw.retries, - request: rw.request, - peer: (), // need to find another peer - connection: None, - }; - self.pending_requests.push_back(rw); - } - self.peers.remove(peer); - } - - /// Prepares a request by selecting a suitable peer and connection to send it to. - /// - /// If there is currently no suitable peer for the request, the given request - /// is returned as `Err`. - fn prepare_request(&self, req: RequestWrapper<B, ()>) - -> Result<(PeerId, RequestWrapper<B, PeerId>), RequestWrapper<B, ()>> - { - let number = required_block(&req.request); - - let mut peer = None; - for (peer_id, peer_info) in self.peers.iter() { - if peer_info.status == PeerStatus::Idle { - match peer_info.best_block { - Some(n) => if n >= number { - peer = Some((peer_id, peer_info)); - break - }, - None => peer = Some((peer_id, peer_info)) - } - } - } - - if let Some((peer_id, peer_info)) = peer { - let connection = peer_info.connections.iter().next().map(|(id, _)| *id); - let rw = RequestWrapper { - timestamp: req.timestamp, - retries: req.retries, - request: req.request, - peer: peer_id.clone(), - connection, - }; - Ok((peer_id.clone(), rw)) - } else { - Err(req) - } - } - - /// Process a local request's response from remote. - /// - /// If successful, this will give us the actual, checked data we should be - /// sending back to the client, otherwise an error. - fn on_response - ( &mut self - , peer: &PeerId - , request: &Request<B> - , response: Response - ) -> Result<Reply<B>, Error> - { - log::trace!("response from {}", peer); - match response { - Response::Light(r) => self.on_response_light(peer, request, r), - Response::Block(r) => self.on_response_block(peer, request, r), - } - } - - fn on_response_light - ( &mut self - , peer: &PeerId - , request: &Request<B> - , response: schema::v1::light::Response - ) -> Result<Reply<B>, Error> - { - use schema::v1::light::response::Response; - match response.response { - Some(Response::RemoteCallResponse(response)) => - if let Request::Call { request , .. } = request { - let proof = Decode::decode(&mut response.proof.as_ref())?; - let reply = self.checker.check_execution_proof(request, proof)?; - Ok(Reply::VecU8(reply)) - } else { - Err(Error::UnexpectedResponse) - } - Some(Response::RemoteReadResponse(response)) => - match request { - Request::Read { request, .. } => { - let proof = Decode::decode(&mut response.proof.as_ref())?; - let reply = self.checker.check_read_proof(&request, proof)?; - Ok(Reply::MapVecU8OptVecU8(reply)) - } - Request::ReadChild { request, .. } => { - let proof = Decode::decode(&mut response.proof.as_ref())?; - let reply = self.checker.check_read_child_proof(&request, proof)?; - Ok(Reply::MapVecU8OptVecU8(reply)) - } - _ => Err(Error::UnexpectedResponse) - } - Some(Response::RemoteChangesResponse(response)) => - if let Request::Changes { request, .. } = request { - let max_block = Decode::decode(&mut response.max.as_ref())?; - let roots_proof = Decode::decode(&mut response.roots_proof.as_ref())?; - let roots = { - let mut r = BTreeMap::new(); - for pair in response.roots { - let k = Decode::decode(&mut pair.fst.as_ref())?; - let v = Decode::decode(&mut pair.snd.as_ref())?; - r.insert(k, v); - } - r - }; - let reply = self.checker.check_changes_proof(&request, light::ChangesProof { - max_block, - proof: response.proof, - roots, - roots_proof, - })?; - Ok(Reply::VecNumberU32(reply)) - } else { - Err(Error::UnexpectedResponse) - } - Some(Response::RemoteHeaderResponse(response)) => - if let Request::Header { request, .. } = request { - let header = - if response.header.is_empty() { - None - } else { - Some(Decode::decode(&mut response.header.as_ref())?) - }; - let proof = Decode::decode(&mut response.proof.as_ref())?; - let reply = self.checker.check_header_proof(&request, header, proof)?; - Ok(Reply::Header(reply)) - } else { - Err(Error::UnexpectedResponse) - } - None => Err(Error::UnexpectedResponse) - } - } - - fn on_response_block - ( &mut self - , peer: &PeerId - , request: &Request<B> - , response: schema::v1::BlockResponse - ) -> Result<Reply<B>, Error> - { - let request = if let Request::Body { request , .. } = &request { - request - } else { - return Err(Error::UnexpectedResponse); - }; - - let body: Vec<_> = match response.blocks.into_iter().next() { - Some(b) => b.body, - None => return Err(Error::UnexpectedResponse), - }; - - let body = body.into_iter() - .map(|mut extrinsic| B::Extrinsic::decode(&mut &extrinsic[..])) - .collect::<Result<_, _>>()?; - - let body = self.checker.check_body_proof(&request, body)?; - Ok(Reply::Extrinsics(body)) - } - - fn on_remote_call_request - ( &mut self - , peer: &PeerId - , request: &schema::v1::light::RemoteCallRequest - ) -> Result<schema::v1::light::Response, Error> - { - log::trace!("remote call request from {} ({} at {:?})", - peer, - request.method, - request.block, - ); - - let block = Decode::decode(&mut request.block.as_ref())?; - - let proof = match self.chain.execution_proof(&BlockId::Hash(block), &request.method, &request.data) { - Ok((_, proof)) => proof, - Err(e) => { - log::trace!("remote call request from {} ({} at {:?}) failed with: {}", - peer, - request.method, - request.block, - e, - ); - StorageProof::empty() - } - }; - - let response = { - let r = schema::v1::light::RemoteCallResponse { proof: proof.encode() }; - schema::v1::light::response::Response::RemoteCallResponse(r) - }; - - Ok(schema::v1::light::Response { response: Some(response) }) - } - - fn on_remote_read_request - ( &mut self - , peer: &PeerId - , request: &schema::v1::light::RemoteReadRequest - ) -> Result<schema::v1::light::Response, Error> - { - if request.keys.is_empty() { - log::debug!("invalid remote read request sent by {}", peer); - return Err(Error::BadRequest("remote read request without keys")) - } - - log::trace!("remote read request from {} ({} at {:?})", - peer, - fmt_keys(request.keys.first(), request.keys.last()), - request.block); - - let block = Decode::decode(&mut request.block.as_ref())?; - - let proof = match self.chain.read_proof(&BlockId::Hash(block), &mut request.keys.iter().map(AsRef::as_ref)) { - Ok(proof) => proof, - Err(error) => { - log::trace!("remote read request from {} ({} at {:?}) failed with: {}", - peer, - fmt_keys(request.keys.first(), request.keys.last()), - request.block, - error); - StorageProof::empty() - } - }; - - let response = { - let r = schema::v1::light::RemoteReadResponse { proof: proof.encode() }; - schema::v1::light::response::Response::RemoteReadResponse(r) - }; - - Ok(schema::v1::light::Response { response: Some(response) }) - } - - fn on_remote_read_child_request - ( &mut self - , peer: &PeerId - , request: &schema::v1::light::RemoteReadChildRequest - ) -> Result<schema::v1::light::Response, Error> - { - if request.keys.is_empty() { - log::debug!("invalid remote child read request sent by {}", peer); - return Err(Error::BadRequest("remove read child request without keys")) - } - - log::trace!("remote read child request from {} ({} {} at {:?})", - peer, - HexDisplay::from(&request.storage_key), - fmt_keys(request.keys.first(), request.keys.last()), - request.block); - - let block = Decode::decode(&mut request.block.as_ref())?; - - let prefixed_key = PrefixedStorageKey::new_ref(&request.storage_key); - let child_info = match ChildType::from_prefixed_key(prefixed_key) { - Some((ChildType::ParentKeyId, storage_key)) => Ok(ChildInfo::new_default(storage_key)), - None => Err(sp_blockchain::Error::InvalidChildStorageKey), - }; - let proof = match child_info.and_then(|child_info| self.chain.read_child_proof( - &BlockId::Hash(block), - &child_info, - &mut request.keys.iter().map(AsRef::as_ref) - )) { - Ok(proof) => proof, - Err(error) => { - log::trace!("remote read child request from {} ({} {} at {:?}) failed with: {}", - peer, - HexDisplay::from(&request.storage_key), - fmt_keys(request.keys.first(), request.keys.last()), - request.block, - error); - StorageProof::empty() - } - }; - - let response = { - let r = schema::v1::light::RemoteReadResponse { proof: proof.encode() }; - schema::v1::light::response::Response::RemoteReadResponse(r) - }; - - Ok(schema::v1::light::Response { response: Some(response) }) - } - - fn on_remote_header_request - ( &mut self - , peer: &PeerId - , request: &schema::v1::light::RemoteHeaderRequest - ) -> Result<schema::v1::light::Response, Error> - { - log::trace!("remote header proof request from {} ({:?})", peer, request.block); - - let block = Decode::decode(&mut request.block.as_ref())?; - let (header, proof) = match self.chain.header_proof(&BlockId::Number(block)) { - Ok((header, proof)) => (header.encode(), proof), - Err(error) => { - log::trace!("remote header proof request from {} ({:?}) failed with: {}", - peer, - request.block, - error); - (Default::default(), StorageProof::empty()) - } - }; - - let response = { - let r = schema::v1::light::RemoteHeaderResponse { header, proof: proof.encode() }; - schema::v1::light::response::Response::RemoteHeaderResponse(r) - }; - - Ok(schema::v1::light::Response { response: Some(response) }) - } - - fn on_remote_changes_request - ( &mut self - , peer: &PeerId - , request: &schema::v1::light::RemoteChangesRequest - ) -> Result<schema::v1::light::Response, Error> - { - log::trace!("remote changes proof request from {} for key {} ({:?}..{:?})", - peer, - if !request.storage_key.is_empty() { - format!("{} : {}", HexDisplay::from(&request.storage_key), HexDisplay::from(&request.key)) - } else { - HexDisplay::from(&request.key).to_string() - }, - request.first, - request.last); - - let first = Decode::decode(&mut request.first.as_ref())?; - let last = Decode::decode(&mut request.last.as_ref())?; - let min = Decode::decode(&mut request.min.as_ref())?; - let max = Decode::decode(&mut request.max.as_ref())?; - let key = StorageKey(request.key.clone()); - let storage_key = if request.storage_key.is_empty() { - None - } else { - Some(PrefixedStorageKey::new_ref(&request.storage_key)) - }; - - let proof = match self.chain.key_changes_proof(first, last, min, max, storage_key, &key) { - Ok(proof) => proof, - Err(error) => { - log::trace!("remote changes proof request from {} for key {} ({:?}..{:?}) failed with: {}", - peer, - format!("{} : {}", HexDisplay::from(&request.storage_key), HexDisplay::from(&key.0)), - request.first, - request.last, - error); - - light::ChangesProof::<B::Header> { - max_block: Zero::zero(), - proof: Vec::new(), - roots: BTreeMap::new(), - roots_proof: StorageProof::empty(), - } - } - }; - - let response = { - let r = schema::v1::light::RemoteChangesResponse { - max: proof.max_block.encode(), - proof: proof.proof, - roots: proof.roots.into_iter() - .map(|(k, v)| schema::v1::light::Pair { fst: k.encode(), snd: v.encode() }) - .collect(), - roots_proof: proof.roots_proof.encode(), - }; - schema::v1::light::response::Response::RemoteChangesResponse(r) - }; - - Ok(schema::v1::light::Response { response: Some(response) }) - } -} - -impl<B> NetworkBehaviour for LightClientHandler<B> -where - B: Block -{ - type ProtocolsHandler = OneShotHandler<InboundProtocol, OutboundProtocol, Event<NegotiatedSubstream>>; - type OutEvent = Void; - - fn new_handler(&mut self) -> Self::ProtocolsHandler { - let p = InboundProtocol { - max_request_size: self.config.max_request_size, - protocol: self.config.light_protocol.clone(), - }; - let mut cfg = OneShotHandlerConfig::default(); - cfg.keep_alive_timeout = self.config.inactivity_timeout; - OneShotHandler::new(SubstreamProtocol::new(p, ()), cfg) - } - - fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> { - self.peers.get(peer) - .map(|info| info.connections.iter().map(|(_, a)| a.clone()).collect()) - .unwrap_or_default() - } - - fn inject_connected(&mut self, peer: &PeerId) { - } - - fn inject_connection_established(&mut self, peer: &PeerId, conn: &ConnectionId, info: &ConnectedPoint) { - let peer_address = match info { - ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(), - ConnectedPoint::Dialer { address } => address.clone() - }; - - log::trace!("peer {} connected with address {}", peer, peer_address); - - let entry = self.peers.entry(peer.clone()).or_default(); - entry.connections.push((*conn, peer_address)); - } - - fn inject_disconnected(&mut self, peer: &PeerId) { - log::trace!("peer {} disconnected", peer); - self.remove_peer(peer) - } - - fn inject_connection_closed(&mut self, peer: &PeerId, conn: &ConnectionId, info: &ConnectedPoint) { - let peer_address = match info { - ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, - ConnectedPoint::Dialer { address } => address - }; - - log::trace!("connection to peer {} closed: {}", peer, peer_address); - - if let Some(info) = self.peers.get_mut(peer) { - info.connections.retain(|(c, _)| c != conn) - } - - // Add any outstanding requests on the closed connection back to the - // pending requests. - if let Some(id) = self.outstanding.iter() - .find(|(_, rw)| &rw.peer == peer && rw.connection == Some(*conn)) // (*) - .map(|(id, _)| *id) - { - let rw = self.outstanding.remove(&id).expect("by (*)"); - let rw = RequestWrapper { - timestamp: rw.timestamp, - retries: rw.retries, - request: rw.request, - peer: (), // need to find another peer - connection: None, - }; - self.pending_requests.push_back(rw); - } - } - - fn inject_event(&mut self, peer: PeerId, conn: ConnectionId, event: Event<NegotiatedSubstream>) { - match event { - // An incoming request from remote has been received. - Event::Request(request, mut stream) => { - log::trace!("incoming request from {}", peer); - let result = match &request.request { - Some(schema::v1::light::request::Request::RemoteCallRequest(r)) => - self.on_remote_call_request(&peer, r), - Some(schema::v1::light::request::Request::RemoteReadRequest(r)) => - self.on_remote_read_request(&peer, r), - Some(schema::v1::light::request::Request::RemoteHeaderRequest(r)) => - self.on_remote_header_request(&peer, r), - Some(schema::v1::light::request::Request::RemoteReadChildRequest(r)) => - self.on_remote_read_child_request(&peer, r), - Some(schema::v1::light::request::Request::RemoteChangesRequest(r)) => - self.on_remote_changes_request(&peer, r), - None => { - log::debug!("ignoring request without request data from peer {}", peer); - return - } - }; - match result { - Ok(response) => { - log::trace!("enqueueing response for peer {}", peer); - let mut data = Vec::new(); - if let Err(e) = response.encode(&mut data) { - log::debug!("error encoding response for peer {}: {}", peer, e) - } else { - let future = async move { - if let Err(e) = write_one(&mut stream, data).await { - log::debug!("error writing response: {}", e) - } - }; - self.responses.push(future.boxed()) - } - } - Err(Error::BadRequest(_)) => { - self.remove_peer(&peer); - self.peerset.report_peer(peer, ReputationChange::new(-(1 << 12), "bad request")) - } - Err(e) => log::debug!("error handling request from peer {}: {}", peer, e) - } - } - // A response to one of our own requests has been received. - Event::Response(id, response) => { - if let Some(request) = self.outstanding.remove(&id) { - // We first just check if the response originates from the expected peer - // and connection. - if request.peer != peer { - log::debug!("Expected response from {} instead of {}.", request.peer, peer); - self.outstanding.insert(id, request); - self.remove_peer(&peer); - self.peerset.report_peer(peer, ReputationChange::new_fatal("response from unexpected peer")); - return - } - - if let Some(info) = self.peers.get_mut(&peer) { - if info.status != PeerStatus::BusyWith(id) { - // If we get here, something is wrong with our internal handling of peer - // status information. At any time, a single peer processes at most one - // request from us and its status should contain the request ID we are - // expecting a response for. If a peer would send us a response with a - // random ID, we should not have an entry for it with this peer ID in - // our `outstanding` map, so a malicious peer should not be able to get - // us here. It is our own fault and must be fixed! - panic!("unexpected peer status {:?} for {}", info.status, peer); - } - - info.status = PeerStatus::Idle; // Make peer available again. - - match self.on_response(&peer, &request.request, response) { - Ok(reply) => send_reply(Ok(reply), request.request), - Err(Error::UnexpectedResponse) => { - log::debug!("unexpected response {} from peer {}", id, peer); - self.remove_peer(&peer); - self.peerset.report_peer(peer, ReputationChange::new_fatal("unexpected response from peer")); - let rw = RequestWrapper { - timestamp: request.timestamp, - retries: request.retries, - request: request.request, - peer: (), - connection: None, - }; - self.pending_requests.push_back(rw); - } - Err(other) => { - log::debug!("error handling response {} from peer {}: {}", id, peer, other); - self.remove_peer(&peer); - self.peerset.report_peer(peer, ReputationChange::new_fatal("invalid response from peer")); - if request.retries > 0 { - let rw = RequestWrapper { - timestamp: request.timestamp, - retries: request.retries - 1, - request: request.request, - peer: (), - connection: None, - }; - self.pending_requests.push_back(rw) - } else { - send_reply(Err(ClientError::RemoteFetchFailed), request.request) - } - } - } - } else { - // If we get here, something is wrong with our internal handling of peers. - // We apparently have an entry in our `outstanding` map and the peer is the one we - // expected. So, if we can not find an entry for it in our peer information table, - // then these two collections are out of sync which must not happen and is a clear - // programmer error that must be fixed! - panic!("missing peer information for {}; response {}", peer, id); - } - } else { - log::debug!("unexpected response {} from peer {}", id, peer); - self.remove_peer(&peer); - self.peerset.report_peer(peer, ReputationChange::new_fatal("response from unexpected peer")); - } - } - } - } - - fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<OutboundProtocol, Void>> { - // Process response sending futures. - while let Poll::Ready(Some(_)) = self.responses.poll_next_unpin(cx) {} - - // If we have a pending request to send, try to find an available peer and send it. - let now = Instant::now(); - while let Some(mut request) = self.pending_requests.pop_front() { - if now > request.timestamp + self.config.request_timeout { - if request.retries == 0 { - send_reply(Err(ClientError::RemoteFetchFailed), request.request); - continue - } - request.timestamp = Instant::now(); - request.retries -= 1 - } - - - match self.prepare_request(request) { - Err(request) => { - self.pending_requests.push_front(request); - log::debug!("no peer available to send request to"); - break - } - Ok((peer, request)) => { - let request_bytes = match serialize_request(&request.request) { - Ok(bytes) => bytes, - Err(error) => { - log::debug!("failed to serialize request: {}", error); - send_reply(Err(ClientError::RemoteFetchFailed), request.request); - continue - } - }; - - let (expected, protocol) = match request.request { - Request::Body { .. } => - (ExpectedResponseTy::Block, self.config.block_protocol.clone()), - _ => - (ExpectedResponseTy::Light, self.config.light_protocol.clone()), - }; - - let peer_id = peer.clone(); - let handler = request.connection.map_or(NotifyHandler::Any, NotifyHandler::One); - - let request_id = self.next_request_id(); - if let Some(p) = self.peers.get_mut(&peer) { - p.status = PeerStatus::BusyWith(request_id); - } - self.outstanding.insert(request_id, request); - - let event = OutboundProtocol { - request_id, - request: request_bytes, - expected, - max_response_size: self.config.max_response_size, - protocol, - }; - - log::trace!("sending request {} to peer {}", request_id, peer_id); - - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event, - }) - } - } - } - - // Look for ongoing requests that have timed out. - let mut expired = Vec::new(); - for (id, rw) in &self.outstanding { - if now > rw.timestamp + self.config.request_timeout { - log::debug!("request {} timed out", id); - expired.push(*id) - } - } - for id in expired { - if let Some(rw) = self.outstanding.remove(&id) { - self.remove_peer(&rw.peer); - self.peerset.report_peer(rw.peer.clone(), - ReputationChange::new(TIMEOUT_REPUTATION_CHANGE, "light request timeout")); - if rw.retries == 0 { - send_reply(Err(ClientError::RemoteFetchFailed), rw.request); - continue - } - let rw = RequestWrapper { - timestamp: Instant::now(), - retries: rw.retries - 1, - request: rw.request, - peer: (), - connection: None, - }; - self.pending_requests.push_back(rw) - } - } - - Poll::Pending - } -} - -fn required_block<B: Block>(request: &Request<B>) -> NumberFor<B> { - match request { - Request::Body { request, .. } => *request.header.number(), - Request::Header { request, .. } => request.block, - Request::Read { request, .. } => *request.header.number(), - Request::ReadChild { request, .. } => *request.header.number(), - Request::Call { request, .. } => *request.header.number(), - Request::Changes { request, .. } => request.max_block.0, - } -} - -fn retries<B: Block>(request: &Request<B>) -> usize { - let rc = match request { - Request::Body { request, .. } => request.retry_count, - Request::Header { request, .. } => request.retry_count, - Request::Read { request, .. } => request.retry_count, - Request::ReadChild { request, .. } => request.retry_count, - Request::Call { request, .. } => request.retry_count, - Request::Changes { request, .. } => request.retry_count, - }; - rc.unwrap_or(0) -} - -fn serialize_request<B: Block>(request: &Request<B>) -> Result<Vec<u8>, prost::EncodeError> { - let request = match request { - Request::Body { request, .. } => { - let rq = schema::v1::BlockRequest { - fields: BlockAttributes::BODY.to_be_u32(), - from_block: Some(schema::v1::block_request::FromBlock::Hash( - request.header.hash().encode(), - )), - to_block: Default::default(), - direction: schema::v1::Direction::Ascending as i32, - max_blocks: 1, - }; - - let mut buf = Vec::with_capacity(rq.encoded_len()); - rq.encode(&mut buf)?; - return Ok(buf); - } - Request::Header { request, .. } => { - let r = schema::v1::light::RemoteHeaderRequest { block: request.block.encode() }; - schema::v1::light::request::Request::RemoteHeaderRequest(r) - } - Request::Read { request, .. } => { - let r = schema::v1::light::RemoteReadRequest { - block: request.block.encode(), - keys: request.keys.clone(), - }; - schema::v1::light::request::Request::RemoteReadRequest(r) - } - Request::ReadChild { request, .. } => { - let r = schema::v1::light::RemoteReadChildRequest { - block: request.block.encode(), - storage_key: request.storage_key.clone().into_inner(), - keys: request.keys.clone(), - }; - schema::v1::light::request::Request::RemoteReadChildRequest(r) - } - Request::Call { request, .. } => { - let r = schema::v1::light::RemoteCallRequest { - block: request.block.encode(), - method: request.method.clone(), - data: request.call_data.clone(), - }; - schema::v1::light::request::Request::RemoteCallRequest(r) - } - Request::Changes { request, .. } => { - let r = schema::v1::light::RemoteChangesRequest { - first: request.first_block.1.encode(), - last: request.last_block.1.encode(), - min: request.tries_roots.1.encode(), - max: request.max_block.1.encode(), - storage_key: request.storage_key.clone().map(|s| s.into_inner()) - .unwrap_or_default(), - key: request.key.clone(), - }; - schema::v1::light::request::Request::RemoteChangesRequest(r) - } - }; - - let rq = schema::v1::light::Request { request: Some(request) }; - let mut buf = Vec::with_capacity(rq.encoded_len()); - rq.encode(&mut buf)?; - Ok(buf) -} - -fn send_reply<B: Block>(result: Result<Reply<B>, ClientError>, request: Request<B>) { - fn send<T>(item: T, sender: oneshot::Sender<T>) { - let _ = sender.send(item); // It is okay if the other end already hung up. - } - match request { - Request::Body { request, sender } => match result { - Err(e) => send(Err(e), sender), - Ok(Reply::Extrinsics(x)) => send(Ok(x), sender), - reply => log::error!("invalid reply for body request: {:?}, {:?}", reply, request), - } - Request::Header { request, sender } => match result { - Err(e) => send(Err(e), sender), - Ok(Reply::Header(x)) => send(Ok(x), sender), - reply => log::error!("invalid reply for header request: {:?}, {:?}", reply, request), - } - Request::Read { request, sender } => match result { - Err(e) => send(Err(e), sender), - Ok(Reply::MapVecU8OptVecU8(x)) => send(Ok(x), sender), - reply => log::error!("invalid reply for read request: {:?}, {:?}", reply, request), - } - Request::ReadChild { request, sender } => match result { - Err(e) => send(Err(e), sender), - Ok(Reply::MapVecU8OptVecU8(x)) => send(Ok(x), sender), - reply => log::error!("invalid reply for read child request: {:?}, {:?}", reply, request), - } - Request::Call { request, sender } => match result { - Err(e) => send(Err(e), sender), - Ok(Reply::VecU8(x)) => send(Ok(x), sender), - reply => log::error!("invalid reply for call request: {:?}, {:?}", reply, request), - } - Request::Changes { request, sender } => match result { - Err(e) => send(Err(e), sender), - Ok(Reply::VecNumberU32(x)) => send(Ok(x), sender), - reply => log::error!("invalid reply for changes request: {:?}, {:?}", reply, request), - } - } -} - -/// Output type of inbound and outbound substream upgrades. -#[derive(Debug)] -pub enum Event<T> { - /// Incoming request from remote and substream to use for the response. - Request(schema::v1::light::Request, T), - /// Incoming response from remote. - Response(RequestId, Response), -} - -/// Incoming response from remote. -#[derive(Debug, Clone)] -pub enum Response { - /// Incoming light response from remote. - Light(schema::v1::light::Response), - /// Incoming block response from remote. - Block(schema::v1::BlockResponse), -} - -/// Substream upgrade protocol. -/// -/// Reads incoming requests from remote. -#[derive(Debug, Clone)] -pub struct InboundProtocol { - /// The max. request length in bytes. - max_request_size: usize, - /// The protocol to use for upgrade negotiation. - protocol: Bytes, -} - -impl UpgradeInfo for InboundProtocol { - type Info = Bytes; - type InfoIter = iter::Once<Self::Info>; - - fn protocol_info(&self) -> Self::InfoIter { - iter::once(self.protocol.clone()) - } -} - -impl<T> InboundUpgrade<T> for InboundProtocol -where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static -{ - type Output = Event<T>; - type Error = ReadOneError; - type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>; - - fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future { - let future = async move { - let vec = read_one(&mut s, self.max_request_size).await?; - match schema::v1::light::Request::decode(&vec[..]) { - Ok(r) => Ok(Event::Request(r, s)), - Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e))) - } - }; - future.boxed() - } -} - -/// Substream upgrade protocol. -/// -/// Sends a request to remote and awaits the response. -#[derive(Debug, Clone)] -pub struct OutboundProtocol { - /// The serialized protobuf request. - request: Vec<u8>, - /// Local identifier for the request. Used to associate it with a response. - request_id: RequestId, - /// Kind of response expected for this request. - expected: ExpectedResponseTy, - /// The max. response length in bytes. - max_response_size: usize, - /// The protocol to use for upgrade negotiation. - protocol: Bytes, -} - -/// Type of response expected from the remote for this request. -#[derive(Debug, Clone)] -enum ExpectedResponseTy { - Light, - Block, -} - -impl UpgradeInfo for OutboundProtocol { - type Info = Bytes; - type InfoIter = iter::Once<Self::Info>; - - fn protocol_info(&self) -> Self::InfoIter { - iter::once(self.protocol.clone()) - } -} - -impl<T> OutboundUpgrade<T> for OutboundProtocol -where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static -{ - type Output = Event<T>; - type Error = ReadOneError; - type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>; - - fn upgrade_outbound(self, mut s: T, _: Self::Info) -> Self::Future { - let future = async move { - write_one(&mut s, &self.request).await?; - let vec = read_one(&mut s, self.max_response_size).await?; - - match self.expected { - ExpectedResponseTy::Light => { - schema::v1::light::Response::decode(&vec[..]) - .map(|r| Event::Response(self.request_id, Response::Light(r))) - .map_err(|e| { - ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)) - }) - }, - ExpectedResponseTy::Block => { - schema::v1::BlockResponse::decode(&vec[..]) - .map(|r| Event::Response(self.request_id, Response::Block(r))) - .map_err(|e| { - ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)) - }) - } - } - }; - future.boxed() - } -} - -fn fmt_keys(first: Option<&Vec<u8>>, last: Option<&Vec<u8>>) -> String { - if let (Some(first), Some(last)) = (first, last) { - if first == last { - HexDisplay::from(first).to_string() - } else { - format!("{}..{}", HexDisplay::from(first), HexDisplay::from(last)) - } - } else { - String::from("n/a") - } -} - -#[cfg(test)] -mod tests { - use super::*; - use async_std::task; - use assert_matches::assert_matches; - use codec::Encode; - use crate::{ - chain::Client, - config::ProtocolId, - schema, - }; - use futures::{channel::oneshot, prelude::*}; - use libp2p::{ - PeerId, - Multiaddr, - core::{ - ConnectedPoint, - connection::ConnectionId, - identity, - muxing::{StreamMuxerBox, SubstreamRef}, - transport::{Transport, Boxed, memory::MemoryTransport}, - upgrade - }, - noise::{self, Keypair, X25519, NoiseConfig}, - swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}, - yamux - }; - use sc_client_api::{StorageProof, RemoteReadChildRequest, FetchChecker}; - use sp_blockchain::{Error as ClientError}; - use sp_core::storage::ChildInfo; - use std::{ - collections::{HashMap, HashSet}, - io, - iter::{self, FromIterator}, - pin::Pin, - sync::Arc, - task::{Context, Poll} - }; - use sp_runtime::{generic::Header, traits::{BlakeTwo256, Block as BlockT, NumberFor}}; - use super::{Event, LightClientHandler, Request, Response, OutboundProtocol, PeerStatus}; - use void::Void; - - type Block = sp_runtime::generic::Block<Header<u64, BlakeTwo256>, substrate_test_runtime::Extrinsic>; - type Handler = LightClientHandler<Block>; - type Swarm = libp2p::swarm::Swarm<Handler>; - - fn empty_proof() -> Vec<u8> { - StorageProof::empty().encode() - } - - fn make_swarm(ok: bool, ps: sc_peerset::PeersetHandle, cf: super::Config) -> Swarm { - let client = Arc::new(substrate_test_runtime_client::new()); - let checker = Arc::new(DummyFetchChecker { ok, _mark: std::marker::PhantomData }); - let id_key = identity::Keypair::generate_ed25519(); - let dh_key = Keypair::<X25519>::new().into_authentic(&id_key).unwrap(); - let local_peer = id_key.public().into_peer_id(); - let transport = MemoryTransport::default() - .upgrade(upgrade::Version::V1) - .authenticate(NoiseConfig::xx(dh_key).into_authenticated()) - .multiplex(yamux::YamuxConfig::default()) - .boxed(); - Swarm::new(transport, LightClientHandler::new(cf, client, checker, ps), local_peer) - } - - struct DummyFetchChecker<B> { - ok: bool, - _mark: std::marker::PhantomData<B> - } - - impl<B: BlockT> light::FetchChecker<B> for DummyFetchChecker<B> { - fn check_header_proof( - &self, - _request: &RemoteHeaderRequest<B::Header>, - header: Option<B::Header>, - _remote_proof: StorageProof, - ) -> Result<B::Header, ClientError> { - match self.ok { - true if header.is_some() => Ok(header.unwrap()), - _ => Err(ClientError::Backend("Test error".into())), - } - } - - fn check_read_proof( - &self, - request: &RemoteReadRequest<B::Header>, - _: StorageProof, - ) -> Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError> { - match self.ok { - true => Ok(request.keys - .iter() - .cloned() - .map(|k| (k, Some(vec![42]))) - .collect() - ), - false => Err(ClientError::Backend("Test error".into())), - } - } - - fn check_read_child_proof( - &self, - request: &RemoteReadChildRequest<B::Header>, - _: StorageProof, - ) -> Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError> { - match self.ok { - true => Ok(request.keys - .iter() - .cloned() - .map(|k| (k, Some(vec![42]))) - .collect() - ), - false => Err(ClientError::Backend("Test error".into())), - } - } - - fn check_execution_proof( - &self, - _: &RemoteCallRequest<B::Header>, - _: StorageProof, - ) -> Result<Vec<u8>, ClientError> { - match self.ok { - true => Ok(vec![42]), - false => Err(ClientError::Backend("Test error".into())), - } - } - - fn check_changes_proof( - &self, - _: &RemoteChangesRequest<B::Header>, - _: ChangesProof<B::Header> - ) -> Result<Vec<(NumberFor<B>, u32)>, ClientError> { - match self.ok { - true => Ok(vec![(100u32.into(), 2)]), - false => Err(ClientError::Backend("Test error".into())), - } - } - - fn check_body_proof( - &self, - _: &RemoteBodyRequest<B::Header>, - body: Vec<B::Extrinsic> - ) -> Result<Vec<B::Extrinsic>, ClientError> { - match self.ok { - true => Ok(body), - false => Err(ClientError::Backend("Test error".into())), - } - } - } - - fn make_config() -> super::Config { - super::Config::new(&ProtocolId::from("foo")) - } - - fn dummy_header() -> sp_test_primitives::Header { - sp_test_primitives::Header { - parent_hash: Default::default(), - number: 0, - state_root: Default::default(), - extrinsics_root: Default::default(), - digest: Default::default(), - } - } - - struct EmptyPollParams(PeerId); - - impl PollParameters for EmptyPollParams { - type SupportedProtocolsIter = iter::Empty<Vec<u8>>; - type ListenedAddressesIter = iter::Empty<Multiaddr>; - type ExternalAddressesIter = iter::Empty<AddressRecord>; - - fn supported_protocols(&self) -> Self::SupportedProtocolsIter { - iter::empty() - } - - fn listened_addresses(&self) -> Self::ListenedAddressesIter { - iter::empty() - } - - fn external_addresses(&self) -> Self::ExternalAddressesIter { - iter::empty() - } - - fn local_peer_id(&self) -> &PeerId { - &self.0 - } - } - - fn peerset() -> (sc_peerset::Peerset, sc_peerset::PeersetHandle) { - let cfg = sc_peerset::SetConfig { - in_peers: 128, - out_peers: 128, - bootnodes: Default::default(), - reserved_only: false, - reserved_nodes: Default::default(), - }; - sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig{ sets: vec![cfg] }) - } - - fn make_behaviour - ( ok: bool - , ps: sc_peerset::PeersetHandle - , cf: super::Config - ) -> LightClientHandler<Block> - { - let client = Arc::new(substrate_test_runtime_client::new()); - let checker = Arc::new(DummyFetchChecker { ok, _mark: std::marker::PhantomData }); - LightClientHandler::new(cf, client, checker, ps) - } - - fn empty_dialer() -> ConnectedPoint { - ConnectedPoint::Dialer { address: Multiaddr::empty() } - } - - fn poll(mut b: &mut LightClientHandler<Block>) -> Poll<NetworkBehaviourAction<OutboundProtocol, Void>> { - let mut p = EmptyPollParams(PeerId::random()); - match future::poll_fn(|cx| Pin::new(&mut b).poll(cx, &mut p)).now_or_never() { - Some(a) => Poll::Ready(a), - None => Poll::Pending - } - } - - #[test] - fn disconnects_from_peer_if_told() { - let peer = PeerId::random(); - let pset = peerset(); - let mut behaviour = make_behaviour(true, pset.1, make_config()); - - behaviour.inject_connection_established(&peer, &ConnectionId::new(1), &empty_dialer()); - behaviour.inject_connected(&peer); - assert_eq!(1, behaviour.peers.len()); - - behaviour.inject_connection_closed(&peer, &ConnectionId::new(1), &empty_dialer()); - behaviour.inject_disconnected(&peer); - assert_eq!(0, behaviour.peers.len()) - } - - #[test] - fn disconnects_from_peer_if_request_times_out() { - let peer0 = PeerId::random(); - let peer1 = PeerId::random(); - let pset = peerset(); - let mut behaviour = make_behaviour(true, pset.1, make_config()); - - behaviour.inject_connection_established(&peer0, &ConnectionId::new(1), &empty_dialer()); - behaviour.inject_connected(&peer0); - behaviour.inject_connection_established(&peer1, &ConnectionId::new(2), &empty_dialer()); - behaviour.inject_connected(&peer1); - - // We now know about two peers. - assert_eq!(HashSet::from_iter(&[peer0.clone(), peer1.clone()]), behaviour.peers.keys().collect::<HashSet<_>>()); - - // No requests have been made yet. - assert!(behaviour.pending_requests.is_empty()); - assert!(behaviour.outstanding.is_empty()); - - // Issue our first request! - let chan = oneshot::channel(); - let request = light::RemoteCallRequest { - block: Default::default(), - header: dummy_header(), - method: "test".into(), - call_data: vec![], - retry_count: Some(1), - }; - behaviour.request(Request::Call { request, sender: chan.0 }).unwrap(); - assert_eq!(1, behaviour.pending_requests.len()); - - // The behaviour should now attempt to send the request. - assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, .. }) => { - assert!(peer_id == peer0 || peer_id == peer1) - }); - - // And we should have one busy peer. - assert!({ - let (idle, busy): (Vec<_>, Vec<_>) = - behaviour.peers.iter().partition(|(_, info)| info.status == PeerStatus::Idle); - - idle.len() == 1 && busy.len() == 1 - && (idle[0].0 == &peer0 || busy[0].0 == &peer0) - && (idle[0].0 == &peer1 || busy[0].0 == &peer1) - }); - - // No more pending requests, but one should be outstanding. - assert_eq!(0, behaviour.pending_requests.len()); - assert_eq!(1, behaviour.outstanding.len()); - - // We now set back the timestamp of the outstanding request to make it expire. - let request = behaviour.outstanding.values_mut().next().unwrap(); - request.timestamp -= make_config().request_timeout; - - // Make progress, but do not expect some action. - assert_matches!(poll(&mut behaviour), Poll::Pending); - - // The request should have timed out by now and the corresponding peer be removed. - assert_eq!(1, behaviour.peers.len()); - // Since we asked for one retry, the request should be back in the pending queue. - assert_eq!(1, behaviour.pending_requests.len()); - // No other request should be ongoing. - assert_eq!(0, behaviour.outstanding.len()); - } - - #[test] - fn disconnects_from_peer_on_incorrect_response() { - let peer = PeerId::random(); - let pset = peerset(); - let mut behaviour = make_behaviour(false, pset.1, make_config()); - // ^--- Making sure the response data check fails. - - let conn = ConnectionId::new(1); - behaviour.inject_connection_established(&peer, &conn, &empty_dialer()); - behaviour.inject_connected(&peer); - assert_eq!(1, behaviour.peers.len()); - - let chan = oneshot::channel(); - let request = light::RemoteCallRequest { - block: Default::default(), - header: dummy_header(), - method: "test".into(), - call_data: vec![], - retry_count: Some(1), - }; - behaviour.request(Request::Call { request, sender: chan.0 }).unwrap(); - - assert_eq!(1, behaviour.pending_requests.len()); - assert_eq!(0, behaviour.outstanding.len()); - poll(&mut behaviour); // Make progress - assert_eq!(0, behaviour.pending_requests.len()); - assert_eq!(1, behaviour.outstanding.len()); - - let request_id = *behaviour.outstanding.keys().next().unwrap(); - - let response = { - let r = schema::v1::light::RemoteCallResponse { proof: empty_proof() }; - schema::v1::light::Response { - response: Some(schema::v1::light::response::Response::RemoteCallResponse(r)), - } - }; - - behaviour.inject_event(peer.clone(), conn, Event::Response(request_id, Response::Light(response))); - assert!(behaviour.peers.is_empty()); - - poll(&mut behaviour); // More progress - - // The request should be back in the pending queue - assert_eq!(1, behaviour.pending_requests.len()); - assert_eq!(0, behaviour.outstanding.len()); - } - - #[test] - fn disconnects_from_peer_on_unexpected_response() { - let peer = PeerId::random(); - let pset = peerset(); - let mut behaviour = make_behaviour(true, pset.1, make_config()); - - let conn = ConnectionId::new(1); - behaviour.inject_connection_established(&peer, &conn, &empty_dialer()); - behaviour.inject_connected(&peer); - assert_eq!(1, behaviour.peers.len()); - assert_eq!(0, behaviour.pending_requests.len()); - assert_eq!(0, behaviour.outstanding.len()); - - // Some unsolicited response - let response = { - let r = schema::v1::light::RemoteCallResponse { proof: empty_proof() }; - schema::v1::light::Response { - response: Some(schema::v1::light::response::Response::RemoteCallResponse(r)), - } - }; - - behaviour.inject_event(peer.clone(), conn, Event::Response(2347895932, Response::Light(response))); - - assert!(behaviour.peers.is_empty()); - poll(&mut behaviour); - assert_eq!(0, behaviour.pending_requests.len()); - assert_eq!(0, behaviour.outstanding.len()); - } - - #[test] - fn disconnects_from_peer_on_wrong_response_type() { - let peer = PeerId::random(); - let pset = peerset(); - let mut behaviour = make_behaviour(true, pset.1, make_config()); - - let conn = ConnectionId::new(1); - behaviour.inject_connection_established(&peer, &conn, &empty_dialer()); - behaviour.inject_connected(&peer); - assert_eq!(1, behaviour.peers.len()); - - let chan = oneshot::channel(); - let request = light::RemoteCallRequest { - block: Default::default(), - header: dummy_header(), - method: "test".into(), - call_data: vec![], - retry_count: Some(1), - }; - behaviour.request(Request::Call { request, sender: chan.0 }).unwrap(); - - assert_eq!(1, behaviour.pending_requests.len()); - assert_eq!(0, behaviour.outstanding.len()); - poll(&mut behaviour); // Make progress - assert_eq!(0, behaviour.pending_requests.len()); - assert_eq!(1, behaviour.outstanding.len()); - - let request_id = *behaviour.outstanding.keys().next().unwrap(); - - let response = { - let r = schema::v1::light::RemoteReadResponse { proof: empty_proof() }; // Not a RemoteCallResponse! - schema::v1::light::Response { - response: Some(schema::v1::light::response::Response::RemoteReadResponse(r)), - } - }; - - behaviour.inject_event(peer.clone(), conn, Event::Response(request_id, Response::Light(response))); - assert!(behaviour.peers.is_empty()); - - poll(&mut behaviour); // More progress - - // The request should be back in the pending queue - assert_eq!(1, behaviour.pending_requests.len()); - assert_eq!(0, behaviour.outstanding.len()); - } - - #[test] - fn receives_remote_failure_after_retry_count_failures() { - let peer1 = PeerId::random(); - let peer2 = PeerId::random(); - let peer3 = PeerId::random(); - let peer4 = PeerId::random(); - let pset = peerset(); - let mut behaviour = make_behaviour(false, pset.1, make_config()); - // ^--- Making sure the response data check fails. - - let conn1 = ConnectionId::new(1); - behaviour.inject_connection_established(&peer1, &conn1, &empty_dialer()); - behaviour.inject_connected(&peer1); - let conn2 = ConnectionId::new(2); - behaviour.inject_connection_established(&peer2, &conn2, &empty_dialer()); - behaviour.inject_connected(&peer2); - let conn3 = ConnectionId::new(3); - behaviour.inject_connection_established(&peer3, &conn3, &empty_dialer()); - behaviour.inject_connected(&peer3); - let conn4 = ConnectionId::new(3); - behaviour.inject_connection_established(&peer4, &conn4, &empty_dialer()); - behaviour.inject_connected(&peer4); - assert_eq!(4, behaviour.peers.len()); - - let mut chan = oneshot::channel(); - let request = light::RemoteCallRequest { - block: Default::default(), - header: dummy_header(), - method: "test".into(), - call_data: vec![], - retry_count: Some(3), // Attempt up to three retries. - }; - behaviour.request(Request::Call { request, sender: chan.0 }).unwrap(); - - assert_eq!(1, behaviour.pending_requests.len()); - assert_eq!(0, behaviour.outstanding.len()); - assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::NotifyHandler { .. })); - assert_eq!(0, behaviour.pending_requests.len()); - assert_eq!(1, behaviour.outstanding.len()); - - for i in 1 ..= 3 { - // Construct an invalid response - let request_id = *behaviour.outstanding.keys().next().unwrap(); - let responding_peer = behaviour.outstanding.values().next().unwrap().peer.clone(); - let response = { - let r = schema::v1::light::RemoteCallResponse { proof: empty_proof() }; - schema::v1::light::Response { - response: Some(schema::v1::light::response::Response::RemoteCallResponse(r)) - } - }; - let conn = ConnectionId::new(i); - behaviour.inject_event(responding_peer, conn, Event::Response(request_id, Response::Light(response.clone()))); - assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::NotifyHandler { .. })); - assert_matches!(chan.1.try_recv(), Ok(None)) - } - // Final invalid response - let request_id = *behaviour.outstanding.keys().next().unwrap(); - let responding_peer = behaviour.outstanding.values().next().unwrap().peer.clone(); - let response = { - let r = schema::v1::light::RemoteCallResponse { proof: empty_proof() }; - schema::v1::light::Response { - response: Some(schema::v1::light::response::Response::RemoteCallResponse(r)), - } - }; - behaviour.inject_event(responding_peer, conn4, Event::Response(request_id, Response::Light(response))); - assert_matches!(poll(&mut behaviour), Poll::Pending); - assert_matches!(chan.1.try_recv(), Ok(Some(Err(ClientError::RemoteFetchFailed)))) - } - - fn issue_request(request: Request<Block>) { - let peer = PeerId::random(); - let pset = peerset(); - let mut behaviour = make_behaviour(true, pset.1, make_config()); - - let conn = ConnectionId::new(1); - behaviour.inject_connection_established(&peer, &conn, &empty_dialer()); - behaviour.inject_connected(&peer); - assert_eq!(1, behaviour.peers.len()); - - let response = match request { - Request::Body { .. } => unimplemented!(), - Request::Header{..} => { - let r = schema::v1::light::RemoteHeaderResponse { - header: dummy_header().encode(), - proof: empty_proof() - }; - schema::v1::light::Response { - response: Some(schema::v1::light::response::Response::RemoteHeaderResponse(r)), - } - } - Request::Read{..} => { - let r = schema::v1::light::RemoteReadResponse { proof: empty_proof() }; - schema::v1::light::Response { - response: Some(schema::v1::light::response::Response::RemoteReadResponse(r)), - } - } - Request::ReadChild{..} => { - let r = schema::v1::light::RemoteReadResponse { proof: empty_proof() }; - schema::v1::light::Response { - response: Some(schema::v1::light::response::Response::RemoteReadResponse(r)), - } - } - Request::Call{..} => { - let r = schema::v1::light::RemoteCallResponse { proof: empty_proof() }; - schema::v1::light::Response { - response: Some(schema::v1::light::response::Response::RemoteCallResponse(r)), - } - } - Request::Changes{..} => { - let r = schema::v1::light::RemoteChangesResponse { - max: iter::repeat(1).take(32).collect(), - proof: Vec::new(), - roots: Vec::new(), - roots_proof: empty_proof() - }; - schema::v1::light::Response { - response: Some(schema::v1::light::response::Response::RemoteChangesResponse(r)), - } - } - }; - - behaviour.request(request).unwrap(); - - assert_eq!(1, behaviour.pending_requests.len()); - assert_eq!(0, behaviour.outstanding.len()); - assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::NotifyHandler { .. })); - assert_eq!(0, behaviour.pending_requests.len()); - assert_eq!(1, behaviour.outstanding.len()); - assert_eq!(1, *behaviour.outstanding.keys().next().unwrap()); - - behaviour.inject_event(peer.clone(), conn, Event::Response(1, Response::Light(response))); - - poll(&mut behaviour); - - assert_eq!(0, behaviour.pending_requests.len()); - assert_eq!(0, behaviour.outstanding.len()) - } - - #[test] - fn receives_remote_call_response() { - let mut chan = oneshot::channel(); - let request = light::RemoteCallRequest { - block: Default::default(), - header: dummy_header(), - method: "test".into(), - call_data: vec![], - retry_count: None, - }; - issue_request(Request::Call { request, sender: chan.0 }); - assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_)))) - } - - #[test] - fn receives_remote_read_response() { - let mut chan = oneshot::channel(); - let request = light::RemoteReadRequest { - header: dummy_header(), - block: Default::default(), - keys: vec![b":key".to_vec()], - retry_count: None, - }; - issue_request(Request::Read { request, sender: chan.0 }); - assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_)))) - } - - #[test] - fn receives_remote_read_child_response() { - let mut chan = oneshot::channel(); - let child_info = ChildInfo::new_default(&b":child_storage:default:sub"[..]); - let request = light::RemoteReadChildRequest { - header: dummy_header(), - block: Default::default(), - storage_key: child_info.prefixed_storage_key(), - keys: vec![b":key".to_vec()], - retry_count: None, - }; - issue_request(Request::ReadChild { request, sender: chan.0 }); - assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_)))) - } - - #[test] - fn receives_remote_header_response() { - let mut chan = oneshot::channel(); - let request = light::RemoteHeaderRequest { - cht_root: Default::default(), - block: 1, - retry_count: None, - }; - issue_request(Request::Header { request, sender: chan.0 }); - assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_)))) - } - - #[test] - fn receives_remote_changes_response() { - let mut chan = oneshot::channel(); - let request = light::RemoteChangesRequest { - changes_trie_configs: vec![sp_core::ChangesTrieConfigurationRange { - zero: (0, Default::default()), - end: None, - config: Some(sp_core::ChangesTrieConfiguration::new(4, 2)), - }], - first_block: (1, Default::default()), - last_block: (100, Default::default()), - max_block: (100, Default::default()), - tries_roots: (1, Default::default(), Vec::new()), - key: Vec::new(), - storage_key: None, - retry_count: None, - }; - issue_request(Request::Changes { request, sender: chan.0 }); - assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_)))) - } - - fn send_receive(request: Request<Block>) { - // We start a swarm on the listening side which awaits incoming requests and answers them: - let local_pset = peerset(); - let local_listen_addr: libp2p::Multiaddr = libp2p::multiaddr::Protocol::Memory(rand::random()).into(); - let mut local_swarm = make_swarm(true, local_pset.1, make_config()); - Swarm::listen_on(&mut local_swarm, local_listen_addr.clone()).unwrap(); - - // We also start a swarm that makes requests and awaits responses: - let remote_pset = peerset(); - let mut remote_swarm = make_swarm(true, remote_pset.1, make_config()); - - // We now schedule a request, dial the remote and let the two swarm work it out: - remote_swarm.request(request).unwrap(); - Swarm::dial_addr(&mut remote_swarm, local_listen_addr).unwrap(); - - let future = { - let a = local_swarm.for_each(|_| future::ready(())); - let b = remote_swarm.for_each(|_| future::ready(())); - future::join(a, b).map(|_| ()) - }; - - task::spawn(future); - } - - #[test] - fn send_receive_call() { - let chan = oneshot::channel(); - let request = light::RemoteCallRequest { - block: Default::default(), - header: dummy_header(), - method: "test".into(), - call_data: vec![], - retry_count: None, - }; - send_receive(Request::Call { request, sender: chan.0 }); - assert_eq!(vec![42], task::block_on(chan.1).unwrap().unwrap()); - // ^--- from `DummyFetchChecker::check_execution_proof` - } - - #[test] - fn send_receive_read() { - let chan = oneshot::channel(); - let request = light::RemoteReadRequest { - header: dummy_header(), - block: Default::default(), - keys: vec![b":key".to_vec()], - retry_count: None - }; - send_receive(Request::Read { request, sender: chan.0 }); - assert_eq!(Some(vec![42]), task::block_on(chan.1).unwrap().unwrap().remove(&b":key"[..]).unwrap()); - // ^--- from `DummyFetchChecker::check_read_proof` - } - - #[test] - fn send_receive_read_child() { - let chan = oneshot::channel(); - let child_info = ChildInfo::new_default(&b":child_storage:default:sub"[..]); - let request = light::RemoteReadChildRequest { - header: dummy_header(), - block: Default::default(), - storage_key: child_info.prefixed_storage_key(), - keys: vec![b":key".to_vec()], - retry_count: None, - }; - send_receive(Request::ReadChild { request, sender: chan.0 }); - assert_eq!(Some(vec![42]), task::block_on(chan.1).unwrap().unwrap().remove(&b":key"[..]).unwrap()); - // ^--- from `DummyFetchChecker::check_read_child_proof` - } - - #[test] - fn send_receive_header() { - sp_tracing::try_init_simple(); - let chan = oneshot::channel(); - let request = light::RemoteHeaderRequest { - cht_root: Default::default(), - block: 1, - retry_count: None, - }; - send_receive(Request::Header { request, sender: chan.0 }); - // The remote does not know block 1: - assert_matches!(task::block_on(chan.1).unwrap(), Err(ClientError::RemoteFetchFailed)); - } - - #[test] - fn send_receive_changes() { - let chan = oneshot::channel(); - let request = light::RemoteChangesRequest { - changes_trie_configs: vec![sp_core::ChangesTrieConfigurationRange { - zero: (0, Default::default()), - end: None, - config: Some(sp_core::ChangesTrieConfiguration::new(4, 2)), - }], - first_block: (1, Default::default()), - last_block: (100, Default::default()), - max_block: (100, Default::default()), - tries_roots: (1, Default::default(), Vec::new()), - key: Vec::new(), - storage_key: None, - retry_count: None, - }; - send_receive(Request::Changes { request, sender: chan.0 }); - assert_eq!(vec![(100, 2)], task::block_on(chan.1).unwrap().unwrap()); - // ^--- from `DummyFetchChecker::check_changes_proof` - } - - #[test] - fn body_request_fields_encoded_properly() { - let (sender, _) = oneshot::channel(); - let serialized_request = serialize_request::<Block>(&Request::Body { - request: RemoteBodyRequest { - header: dummy_header(), - retry_count: None, - }, - sender, - }).unwrap(); - let deserialized_request = schema::v1::BlockRequest::decode(&serialized_request[..]).unwrap(); - assert!( - BlockAttributes::from_be_u32(deserialized_request.fields) - .unwrap() - .contains(BlockAttributes::BODY) - ); - } -} diff --git a/substrate/client/network/src/light_client_requests.rs b/substrate/client/network/src/light_client_requests.rs new file mode 100644 index 0000000000000000000000000000000000000000..f859a35f45b2461fdf5b0311357c82aa40283127 --- /dev/null +++ b/substrate/client/network/src/light_client_requests.rs @@ -0,0 +1,334 @@ +// This file is part of Substrate. + +// Copyright (C) 2020-2021 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Helpers for outgoing and incoming light client requests. + +/// For outgoing light client requests. +pub mod sender; +/// For incoming light client requests. +pub mod handler; + +use crate::config::ProtocolId; +use crate::request_responses::ProtocolConfig; + +use std::time::Duration; + +/// Generate the light client protocol name from chain specific protocol identifier. +fn generate_protocol_name(protocol_id: &ProtocolId) -> String { + let mut s = String::new(); + s.push_str("/"); + s.push_str(protocol_id.as_ref()); + s.push_str("/light/2"); + s +} + +/// Generates a [`ProtocolConfig`] for the light client request protocol, refusing incoming requests. +pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig { + ProtocolConfig { + name: generate_protocol_name(protocol_id).into(), + max_request_size: 1 * 1024 * 1024, + max_response_size: 16 * 1024 * 1024, + request_timeout: Duration::from_secs(15), + inbound_queue: None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::request_responses::IncomingRequest; + use crate::config::ProtocolId; + + use assert_matches::assert_matches; + use futures::executor::{block_on, LocalPool}; + use futures::task::Spawn; + use futures::{channel::oneshot, prelude::*}; + use libp2p::PeerId; + use sc_client_api::StorageProof; + use sc_client_api::light::{RemoteCallRequest, RemoteChangesRequest, RemoteHeaderRequest}; + use sc_client_api::light::{self, RemoteReadRequest, RemoteBodyRequest, ChangesProof}; + use sc_client_api::{FetchChecker, RemoteReadChildRequest}; + use sp_blockchain::Error as ClientError; + use sp_core::storage::ChildInfo; + use sp_runtime::generic::Header; + use sp_runtime::traits::{BlakeTwo256, Block as BlockT, NumberFor}; + use std::collections::HashMap; + use std::sync::Arc; + + pub struct DummyFetchChecker<B> { + pub ok: bool, + pub _mark: std::marker::PhantomData<B>, + } + + impl<B: BlockT> FetchChecker<B> for DummyFetchChecker<B> { + fn check_header_proof( + &self, + _request: &RemoteHeaderRequest<B::Header>, + header: Option<B::Header>, + _remote_proof: StorageProof, + ) -> Result<B::Header, ClientError> { + match self.ok { + true if header.is_some() => Ok(header.unwrap()), + _ => Err(ClientError::Backend("Test error".into())), + } + } + + fn check_read_proof( + &self, + request: &RemoteReadRequest<B::Header>, + _: StorageProof, + ) -> Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError> { + match self.ok { + true => Ok(request + .keys + .iter() + .cloned() + .map(|k| (k, Some(vec![42]))) + .collect()), + false => Err(ClientError::Backend("Test error".into())), + } + } + + fn check_read_child_proof( + &self, + request: &RemoteReadChildRequest<B::Header>, + _: StorageProof, + ) -> Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError> { + match self.ok { + true => Ok(request + .keys + .iter() + .cloned() + .map(|k| (k, Some(vec![42]))) + .collect()), + false => Err(ClientError::Backend("Test error".into())), + } + } + + fn check_execution_proof( + &self, + _: &RemoteCallRequest<B::Header>, + _: StorageProof, + ) -> Result<Vec<u8>, ClientError> { + match self.ok { + true => Ok(vec![42]), + false => Err(ClientError::Backend("Test error".into())), + } + } + + fn check_changes_proof( + &self, + _: &RemoteChangesRequest<B::Header>, + _: ChangesProof<B::Header>, + ) -> Result<Vec<(NumberFor<B>, u32)>, ClientError> { + match self.ok { + true => Ok(vec![(100u32.into(), 2)]), + false => Err(ClientError::Backend("Test error".into())), + } + } + + fn check_body_proof( + &self, + _: &RemoteBodyRequest<B::Header>, + body: Vec<B::Extrinsic>, + ) -> Result<Vec<B::Extrinsic>, ClientError> { + match self.ok { + true => Ok(body), + false => Err(ClientError::Backend("Test error".into())), + } + } + } + + pub fn protocol_id() -> ProtocolId { + ProtocolId::from("test") + } + + pub fn peerset() -> (sc_peerset::Peerset, sc_peerset::PeersetHandle) { + let cfg = sc_peerset::SetConfig { + in_peers: 128, + out_peers: 128, + bootnodes: Default::default(), + reserved_only: false, + reserved_nodes: Default::default(), + }; + sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { sets: vec![cfg] }) + } + + pub fn dummy_header() -> sp_test_primitives::Header { + sp_test_primitives::Header { + parent_hash: Default::default(), + number: 0, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest: Default::default(), + } + } + + type Block = + sp_runtime::generic::Block<Header<u64, BlakeTwo256>, substrate_test_runtime::Extrinsic>; + + fn send_receive(request: sender::Request<Block>, pool: &LocalPool) { + let client = Arc::new(substrate_test_runtime_client::new()); + let (handler, protocol_config) = handler::LightClientRequestHandler::new(&protocol_id(), client); + pool.spawner().spawn_obj(handler.run().boxed().into()).unwrap(); + + let (_peer_set, peer_set_handle) = peerset(); + let mut sender = sender::LightClientRequestSender::<Block>::new( + &protocol_id(), + Arc::new(crate::light_client_requests::tests::DummyFetchChecker { + ok: true, + _mark: std::marker::PhantomData, + }), + peer_set_handle, + ); + sender.inject_connected(PeerId::random()); + + sender.request(request).unwrap(); + let sender::OutEvent::SendRequest { pending_response, request, .. } = block_on(sender.next()).unwrap(); + let (tx, rx) = oneshot::channel(); + block_on(protocol_config.inbound_queue.unwrap().send(IncomingRequest { + peer: PeerId::random(), + payload: request, + pending_response: tx, + })).unwrap(); + pool.spawner().spawn_obj(async move { + pending_response.send(Ok(rx.await.unwrap().result.unwrap())).unwrap(); + }.boxed().into()).unwrap(); + + pool.spawner().spawn_obj(sender.for_each(|_| future::ready(())).boxed().into()).unwrap(); + } + + #[test] + fn send_receive_call() { + let chan = oneshot::channel(); + let request = light::RemoteCallRequest { + block: Default::default(), + header: dummy_header(), + method: "test".into(), + call_data: vec![], + retry_count: None, + }; + + let mut pool = LocalPool::new(); + send_receive(sender::Request::Call { + request, + sender: chan.0, + }, &pool); + assert_eq!(vec![42], pool.run_until(chan.1).unwrap().unwrap()); + // ^--- from `DummyFetchChecker::check_execution_proof` + } + + #[test] + fn send_receive_read() { + let chan = oneshot::channel(); + let request = light::RemoteReadRequest { + header: dummy_header(), + block: Default::default(), + keys: vec![b":key".to_vec()], + retry_count: None, + }; + let mut pool = LocalPool::new(); + send_receive(sender::Request::Read { + request, + sender: chan.0, + }, &pool); + assert_eq!( + Some(vec![42]), + pool.run_until(chan.1) + .unwrap() + .unwrap() + .remove(&b":key"[..]) + .unwrap() + ); + // ^--- from `DummyFetchChecker::check_read_proof` + } + + #[test] + fn send_receive_read_child() { + let chan = oneshot::channel(); + let child_info = ChildInfo::new_default(&b":child_storage:default:sub"[..]); + let request = light::RemoteReadChildRequest { + header: dummy_header(), + block: Default::default(), + storage_key: child_info.prefixed_storage_key(), + keys: vec![b":key".to_vec()], + retry_count: None, + }; + let mut pool = LocalPool::new(); + send_receive(sender::Request::ReadChild { + request, + sender: chan.0, + }, &pool); + assert_eq!( + Some(vec![42]), + pool.run_until(chan.1) + .unwrap() + .unwrap() + .remove(&b":key"[..]) + .unwrap() + ); + // ^--- from `DummyFetchChecker::check_read_child_proof` + } + + #[test] + fn send_receive_header() { + sp_tracing::try_init_simple(); + let chan = oneshot::channel(); + let request = light::RemoteHeaderRequest { + cht_root: Default::default(), + block: 1, + retry_count: None, + }; + let mut pool = LocalPool::new(); + send_receive(sender::Request::Header { + request, + sender: chan.0, + }, &pool); + // The remote does not know block 1: + assert_matches!( + pool.run_until(chan.1).unwrap(), + Err(ClientError::RemoteFetchFailed) + ); + } + + #[test] + fn send_receive_changes() { + let chan = oneshot::channel(); + let request = light::RemoteChangesRequest { + changes_trie_configs: vec![sp_core::ChangesTrieConfigurationRange { + zero: (0, Default::default()), + end: None, + config: Some(sp_core::ChangesTrieConfiguration::new(4, 2)), + }], + first_block: (1, Default::default()), + last_block: (100, Default::default()), + max_block: (100, Default::default()), + tries_roots: (1, Default::default(), Vec::new()), + key: Vec::new(), + storage_key: None, + retry_count: None, + }; + let mut pool = LocalPool::new(); + send_receive(sender::Request::Changes { + request, + sender: chan.0, + }, &pool); + assert_eq!(vec![(100, 2)], pool.run_until(chan.1).unwrap().unwrap()); + // ^--- from `DummyFetchChecker::check_changes_proof` + } +} diff --git a/substrate/client/network/src/light_client_requests/handler.rs b/substrate/client/network/src/light_client_requests/handler.rs new file mode 100644 index 0000000000000000000000000000000000000000..08de99a0a5de430efb8a27c5e5bc26ea0e063e6b --- /dev/null +++ b/substrate/client/network/src/light_client_requests/handler.rs @@ -0,0 +1,399 @@ +// This file is part of Substrate. + +// Copyright (C) 2020-2021 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Helper for incoming light client requests. +//! +//! Handle (i.e. answer) incoming light client requests from a remote peer received via +//! [`crate::request_responses::RequestResponsesBehaviour`] with [`LightClientRequestHandler`]. + +use codec::{self, Encode, Decode}; +use crate::{ + chain::Client, + config::ProtocolId, + schema, + PeerId, +}; +use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}; +use futures::{channel::mpsc, prelude::*}; +use prost::Message; +use sc_client_api::{ + StorageProof, + light +}; +use sc_peerset::ReputationChange; +use sp_core::{ + storage::{ChildInfo, ChildType,StorageKey, PrefixedStorageKey}, + hexdisplay::HexDisplay, +}; +use sp_runtime::{ + traits::{Block, Zero}, + generic::BlockId, +}; +use std::{ + collections::{BTreeMap}, + sync::Arc, +}; +use log::debug; + +const LOG_TARGET: &str = "light-client-request-handler"; + +/// Handler for incoming light client requests from a remote peer. +pub struct LightClientRequestHandler<B: Block> { + request_receiver: mpsc::Receiver<IncomingRequest>, + /// Blockchain client. + client: Arc<dyn Client<B>>, +} + +impl<B: Block> LightClientRequestHandler<B> { + /// Create a new [`BlockRequestHandler`]. + pub fn new( + protocol_id: &ProtocolId, + client: Arc<dyn Client<B>>, + ) -> (Self, ProtocolConfig) { + // For now due to lack of data on light client request handling in production systems, this + // value is chosen to match the block request limit. + let (tx, request_receiver) = mpsc::channel(20); + + let mut protocol_config = super::generate_protocol_config(protocol_id); + protocol_config.inbound_queue = Some(tx); + + (Self { client, request_receiver }, protocol_config) + } + + /// Run [`LightClientRequestHandler`]. + pub async fn run(mut self) { + while let Some(request) = self.request_receiver.next().await { + let IncomingRequest { peer, payload, pending_response } = request; + + match self.handle_request(peer, payload) { + Ok(response_data) => { + let response = OutgoingResponse { result: Ok(response_data), reputation_changes: Vec::new() }; + match pending_response.send(response) { + Ok(()) => debug!( + target: LOG_TARGET, + "Handled light client request from {}.", + peer, + ), + Err(_) => debug!( + target: LOG_TARGET, + "Failed to handle light client request from {}: {}", + peer, HandleRequestError::SendResponse, + ), + }; + } , + Err(e) => { + debug!( + target: LOG_TARGET, + "Failed to handle light client request from {}: {}", + peer, e, + ); + + let reputation_changes = match e { + HandleRequestError::BadRequest(_) => { + vec![ReputationChange::new(-(1 << 12), "bad request")] + } + _ => Vec::new(), + }; + + let response = OutgoingResponse { result: Err(()), reputation_changes }; + if pending_response.send(response).is_err() { + debug!( + target: LOG_TARGET, + "Failed to handle light client request from {}: {}", + peer, HandleRequestError::SendResponse, + ); + }; + }, + } + } + } + + + fn handle_request( + &mut self, + peer: PeerId, + payload: Vec<u8>, + ) -> Result<Vec<u8>, HandleRequestError> { + let request = schema::v1::light::Request::decode(&payload[..])?; + + let response = match &request.request { + Some(schema::v1::light::request::Request::RemoteCallRequest(r)) => + self.on_remote_call_request(&peer, r)?, + Some(schema::v1::light::request::Request::RemoteReadRequest(r)) => + self.on_remote_read_request(&peer, r)?, + Some(schema::v1::light::request::Request::RemoteHeaderRequest(r)) => + self.on_remote_header_request(&peer, r)?, + Some(schema::v1::light::request::Request::RemoteReadChildRequest(r)) => + self.on_remote_read_child_request(&peer, r)?, + Some(schema::v1::light::request::Request::RemoteChangesRequest(r)) => + self.on_remote_changes_request(&peer, r)?, + None => { + return Err(HandleRequestError::BadRequest("Remote request without request data.")); + } + }; + + let mut data = Vec::new(); + response.encode(&mut data)?; + + Ok(data) + } + + fn on_remote_call_request( + &mut self, + peer: &PeerId, + request: &schema::v1::light::RemoteCallRequest, + ) -> Result<schema::v1::light::Response, HandleRequestError> { + log::trace!( + "Remote call request from {} ({} at {:?}).", + peer, request.method, request.block, + ); + + let block = Decode::decode(&mut request.block.as_ref())?; + + let proof = match self.client.execution_proof( + &BlockId::Hash(block), + &request.method, &request.data, + ) { + Ok((_, proof)) => proof, + Err(e) => { + log::trace!( + "remote call request from {} ({} at {:?}) failed with: {}", + peer, request.method, request.block, e, + ); + StorageProof::empty() + } + }; + + let response = { + let r = schema::v1::light::RemoteCallResponse { proof: proof.encode() }; + schema::v1::light::response::Response::RemoteCallResponse(r) + }; + + Ok(schema::v1::light::Response { response: Some(response) }) + } + + fn on_remote_read_request( + &mut self, + peer: &PeerId, + request: &schema::v1::light::RemoteReadRequest, + ) -> Result<schema::v1::light::Response, HandleRequestError> { + if request.keys.is_empty() { + log::debug!("Invalid remote read request sent by {}.", peer); + return Err(HandleRequestError::BadRequest("Remote read request without keys.")) + } + + log::trace!( + "Remote read request from {} ({} at {:?}).", + peer, fmt_keys(request.keys.first(), request.keys.last()), request.block, + ); + + let block = Decode::decode(&mut request.block.as_ref())?; + + let proof = match self.client.read_proof( + &BlockId::Hash(block), + &mut request.keys.iter().map(AsRef::as_ref), + ) { + Ok(proof) => proof, + Err(error) => { + log::trace!( + "remote read request from {} ({} at {:?}) failed with: {}", + peer, fmt_keys(request.keys.first(), request.keys.last()), request.block, error, + ); + StorageProof::empty() + } + }; + + let response = { + let r = schema::v1::light::RemoteReadResponse { proof: proof.encode() }; + schema::v1::light::response::Response::RemoteReadResponse(r) + }; + + Ok(schema::v1::light::Response { response: Some(response) }) + } + + fn on_remote_read_child_request( + &mut self, + peer: &PeerId, + request: &schema::v1::light::RemoteReadChildRequest, + ) -> Result<schema::v1::light::Response, HandleRequestError> { + if request.keys.is_empty() { + log::debug!("Invalid remote child read request sent by {}.", peer); + return Err(HandleRequestError::BadRequest("Remove read child request without keys.")) + } + + log::trace!( + "Remote read child request from {} ({} {} at {:?}).", + peer, + HexDisplay::from(&request.storage_key), + fmt_keys(request.keys.first(), request.keys.last()), + request.block, + ); + + let block = Decode::decode(&mut request.block.as_ref())?; + + let prefixed_key = PrefixedStorageKey::new_ref(&request.storage_key); + let child_info = match ChildType::from_prefixed_key(prefixed_key) { + Some((ChildType::ParentKeyId, storage_key)) => Ok(ChildInfo::new_default(storage_key)), + None => Err(sp_blockchain::Error::InvalidChildStorageKey), + }; + let proof = match child_info.and_then(|child_info| self.client.read_child_proof( + &BlockId::Hash(block), + &child_info, + &mut request.keys.iter().map(AsRef::as_ref) + )) { + Ok(proof) => proof, + Err(error) => { + log::trace!( + "remote read child request from {} ({} {} at {:?}) failed with: {}", + peer, + HexDisplay::from(&request.storage_key), + fmt_keys(request.keys.first(), request.keys.last()), + request.block, + error, + ); + StorageProof::empty() + } + }; + + let response = { + let r = schema::v1::light::RemoteReadResponse { proof: proof.encode() }; + schema::v1::light::response::Response::RemoteReadResponse(r) + }; + + Ok(schema::v1::light::Response { response: Some(response) }) + } + + fn on_remote_header_request( + &mut self, + peer: &PeerId, + request: &schema::v1::light::RemoteHeaderRequest, + ) -> Result<schema::v1::light::Response, HandleRequestError> { + log::trace!("Remote header proof request from {} ({:?}).", peer, request.block); + + let block = Decode::decode(&mut request.block.as_ref())?; + let (header, proof) = match self.client.header_proof(&BlockId::Number(block)) { + Ok((header, proof)) => (header.encode(), proof), + Err(error) => { + log::trace!( + "Remote header proof request from {} ({:?}) failed with: {}.", + peer, request.block, error + ); + (Default::default(), StorageProof::empty()) + } + }; + + let response = { + let r = schema::v1::light::RemoteHeaderResponse { header, proof: proof.encode() }; + schema::v1::light::response::Response::RemoteHeaderResponse(r) + }; + + Ok(schema::v1::light::Response { response: Some(response) }) + } + + fn on_remote_changes_request( + &mut self, + peer: &PeerId, + request: &schema::v1::light::RemoteChangesRequest, + ) -> Result<schema::v1::light::Response, HandleRequestError> { + log::trace!( + "Remote changes proof request from {} for key {} ({:?}..{:?}).", + peer, + if !request.storage_key.is_empty() { + format!("{} : {}", HexDisplay::from(&request.storage_key), HexDisplay::from(&request.key)) + } else { + HexDisplay::from(&request.key).to_string() + }, + request.first, + request.last, + ); + + let first = Decode::decode(&mut request.first.as_ref())?; + let last = Decode::decode(&mut request.last.as_ref())?; + let min = Decode::decode(&mut request.min.as_ref())?; + let max = Decode::decode(&mut request.max.as_ref())?; + let key = StorageKey(request.key.clone()); + let storage_key = if request.storage_key.is_empty() { + None + } else { + Some(PrefixedStorageKey::new_ref(&request.storage_key)) + }; + + let proof = match self.client.key_changes_proof(first, last, min, max, storage_key, &key) { + Ok(proof) => proof, + Err(error) => { + log::trace!( + "Remote changes proof request from {} for key {} ({:?}..{:?}) failed with: {}.", + peer, + format!("{} : {}", HexDisplay::from(&request.storage_key), HexDisplay::from(&key.0)), + request.first, + request.last, + error, + ); + + light::ChangesProof::<B::Header> { + max_block: Zero::zero(), + proof: Vec::new(), + roots: BTreeMap::new(), + roots_proof: StorageProof::empty(), + } + } + }; + + let response = { + let r = schema::v1::light::RemoteChangesResponse { + max: proof.max_block.encode(), + proof: proof.proof, + roots: proof.roots.into_iter() + .map(|(k, v)| schema::v1::light::Pair { fst: k.encode(), snd: v.encode() }) + .collect(), + roots_proof: proof.roots_proof.encode(), + }; + schema::v1::light::response::Response::RemoteChangesResponse(r) + }; + + Ok(schema::v1::light::Response { response: Some(response) }) + } +} + +#[derive(derive_more::Display, derive_more::From)] +enum HandleRequestError { + #[display(fmt = "Failed to decode request: {}.", _0)] + DecodeProto(prost::DecodeError), + #[display(fmt = "Failed to encode response: {}.", _0)] + EncodeProto(prost::EncodeError), + #[display(fmt = "Failed to send response.")] + SendResponse, + /// A bad request has been received. + #[display(fmt = "bad request: {}", _0)] + BadRequest(&'static str), + /// Encoding or decoding of some data failed. + #[display(fmt = "codec error: {}", _0)] + Codec(codec::Error), +} + +fn fmt_keys(first: Option<&Vec<u8>>, last: Option<&Vec<u8>>) -> String { + if let (Some(first), Some(last)) = (first, last) { + if first == last { + HexDisplay::from(first).to_string() + } else { + format!("{}..{}", HexDisplay::from(first), HexDisplay::from(last)) + } + } else { + String::from("n/a") + } +} diff --git a/substrate/client/network/src/light_client_requests/sender.rs b/substrate/client/network/src/light_client_requests/sender.rs new file mode 100644 index 0000000000000000000000000000000000000000..652f465d6f2503c214f9eb098f7daba9abbf686b --- /dev/null +++ b/substrate/client/network/src/light_client_requests/sender.rs @@ -0,0 +1,1343 @@ +// This file is part of Substrate. + +// Copyright (C) 2020-2021 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Helper for outgoing light client requests. +//! +//! Call [`LightClientRequestSender::send_request`] to send out light client requests. It will: +//! +//! 1. Build the request. +//! +//! 2. Forward the request to [`crate::request_responses::RequestResponsesBehaviour`] via +//! [`OutEvent::SendRequest`]. +//! +//! 3. Wait for the response and forward the response via the [`oneshot::Sender`] provided earlier +//! with [`LightClientRequestSender::send_request`]. + +use codec::{self, Encode, Decode}; +use crate::{ + config::ProtocolId, + protocol::message::{BlockAttributes}, + schema, + PeerId, +}; +use crate::request_responses::{RequestFailure, OutboundFailure}; +use futures::{channel::{oneshot}, future::BoxFuture, prelude::*, stream::FuturesUnordered}; +use prost::Message; +use sc_client_api::{ + light::{ + self, RemoteBodyRequest, + } +}; +use sc_peerset::ReputationChange; +use sp_blockchain::{Error as ClientError}; +use sp_runtime::{ + traits::{Block, Header, NumberFor}, +}; +use std::{ + collections::{BTreeMap, VecDeque, HashMap}, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +mod rep { + use super::*; + + /// Reputation change for a peer when a request timed out. + pub const TIMEOUT: ReputationChange = ReputationChange::new(-(1 << 8), "light client request timeout"); + /// Reputation change for a peer when a request is refused. + pub const REFUSED: ReputationChange = ReputationChange::new(-(1 << 8), "light client request refused"); +} + +/// Configuration options for [`LightClientRequestSender`]. +#[derive(Debug, Clone)] +struct Config { + max_pending_requests: usize, + light_protocol: String, + block_protocol: String, +} + +impl Config { + /// Create a new [`LightClientRequestSender`] configuration. + pub fn new(id: &ProtocolId) -> Self { + Config { + max_pending_requests: 128, + light_protocol: super::generate_protocol_name(id), + block_protocol: crate::block_request_handler::generate_protocol_name(id), + } + } +} + +/// State machine helping to send out light client requests. +pub struct LightClientRequestSender<B: Block> { + /// This behaviour's configuration. + config: Config, + /// Verifies that received responses are correct. + checker: Arc<dyn light::FetchChecker<B>>, + /// Peer information (addresses, their best block, etc.) + peers: HashMap<PeerId, PeerInfo<B>>, + /// Pending (local) requests. + pending_requests: VecDeque<PendingRequest<B>>, + /// Requests on their way to remote peers. + sent_requests: FuturesUnordered<BoxFuture< + 'static, (SentRequest<B>, Result<Result<Vec<u8>, RequestFailure>, oneshot::Canceled>), + >>, + /// Handle to use for reporting misbehaviour of peers. + peerset: sc_peerset::PeersetHandle, +} + +/// Augments a pending light client request with metadata. +#[derive(Debug)] +struct PendingRequest<B: Block> { + /// Remaining attempts. + attempts_left: usize, + /// The actual request. + request: Request<B>, +} + +impl<B: Block> PendingRequest<B> { + fn new(req: Request<B>) -> Self { + PendingRequest { + // Number of retries + one for the initial attempt. + attempts_left: req.retries() + 1, + request: req, + } + } + + fn into_sent(self, peer_id: PeerId) -> SentRequest<B> { + SentRequest { + attempts_left: self.attempts_left, + request: self.request, + peer: peer_id, + } + } +} + +/// Augments a light client request with metadata that is currently being send to a remote. +#[derive(Debug)] +struct SentRequest<B: Block> { + /// Remaining attempts. + attempts_left: usize, + /// The actual request. + request: Request<B>, + /// The peer that the request is send to. + peer: PeerId, +} + +impl<B: Block> SentRequest<B> { + fn into_pending(self) -> PendingRequest<B> { + PendingRequest { + attempts_left: self.attempts_left, + request: self.request, + } + } +} + +impl<B: Block> Unpin for LightClientRequestSender<B> {} + +impl<B> LightClientRequestSender<B> +where + B: Block, +{ + /// Construct a new light client handler. + pub fn new( + id: &ProtocolId, + checker: Arc<dyn light::FetchChecker<B>>, + peerset: sc_peerset::PeersetHandle, + ) -> Self { + LightClientRequestSender { + config: Config::new(id), + checker, + peers: Default::default(), + pending_requests: Default::default(), + sent_requests: Default::default(), + peerset, + } + } + + /// We rely on external information about peers best blocks as we lack the + /// means to determine it ourselves. + pub fn update_best_block(&mut self, peer: &PeerId, num: NumberFor<B>) { + if let Some(info) = self.peers.get_mut(peer) { + log::trace!("new best block for {:?}: {:?}", peer, num); + info.best_block = Some(num) + } + } + + /// Issue a new light client request. + pub fn request(&mut self, req: Request<B>) -> Result<(), SendRequestError> { + if self.pending_requests.len() >= self.config.max_pending_requests { + return Err(SendRequestError::TooManyRequests) + } + self.pending_requests.push_back(PendingRequest::new(req)); + Ok(()) + } + + /// Remove the given peer. + /// + /// In-flight requests to the given peer might fail and be retried. See + /// [`<LightClientRequestSender as Stream>::poll_next`]. + fn remove_peer(&mut self, peer: PeerId) { + self.peers.remove(&peer); + } + + /// Process a local request's response from remote. + /// + /// If successful, this will give us the actual, checked data we should be + /// sending back to the client, otherwise an error. + fn on_response( + &mut self, + peer: PeerId, + request: &Request<B>, + response: Response, + ) -> Result<Reply<B>, Error> { + log::trace!("response from {}", peer); + match response { + Response::Light(r) => self.on_response_light(request, r), + Response::Block(r) => self.on_response_block(request, r), + } + } + + fn on_response_light( + &mut self, + request: &Request<B>, + response: schema::v1::light::Response, + ) -> Result<Reply<B>, Error> { + use schema::v1::light::response::Response; + match response.response { + Some(Response::RemoteCallResponse(response)) => + if let Request::Call { request , .. } = request { + let proof = Decode::decode(&mut response.proof.as_ref())?; + let reply = self.checker.check_execution_proof(request, proof)?; + Ok(Reply::VecU8(reply)) + } else { + Err(Error::UnexpectedResponse) + } + Some(Response::RemoteReadResponse(response)) => + match request { + Request::Read { request, .. } => { + let proof = Decode::decode(&mut response.proof.as_ref())?; + let reply = self.checker.check_read_proof(&request, proof)?; + Ok(Reply::MapVecU8OptVecU8(reply)) + } + Request::ReadChild { request, .. } => { + let proof = Decode::decode(&mut response.proof.as_ref())?; + let reply = self.checker.check_read_child_proof(&request, proof)?; + Ok(Reply::MapVecU8OptVecU8(reply)) + } + _ => Err(Error::UnexpectedResponse) + } + Some(Response::RemoteChangesResponse(response)) => + if let Request::Changes { request, .. } = request { + let max_block = Decode::decode(&mut response.max.as_ref())?; + let roots_proof = Decode::decode(&mut response.roots_proof.as_ref())?; + let roots = { + let mut r = BTreeMap::new(); + for pair in response.roots { + let k = Decode::decode(&mut pair.fst.as_ref())?; + let v = Decode::decode(&mut pair.snd.as_ref())?; + r.insert(k, v); + } + r + }; + let reply = self.checker.check_changes_proof(&request, light::ChangesProof { + max_block, + proof: response.proof, + roots, + roots_proof, + })?; + Ok(Reply::VecNumberU32(reply)) + } else { + Err(Error::UnexpectedResponse) + } + Some(Response::RemoteHeaderResponse(response)) => + if let Request::Header { request, .. } = request { + let header = + if response.header.is_empty() { + None + } else { + Some(Decode::decode(&mut response.header.as_ref())?) + }; + let proof = Decode::decode(&mut response.proof.as_ref())?; + let reply = self.checker.check_header_proof(&request, header, proof)?; + Ok(Reply::Header(reply)) + } else { + Err(Error::UnexpectedResponse) + } + None => Err(Error::UnexpectedResponse) + } + } + + fn on_response_block( + &mut self, + request: &Request<B>, + response: schema::v1::BlockResponse, + ) -> Result<Reply<B>, Error> { + let request = if let Request::Body { request , .. } = &request { + request + } else { + return Err(Error::UnexpectedResponse); + }; + + let body: Vec<_> = match response.blocks.into_iter().next() { + Some(b) => b.body, + None => return Err(Error::UnexpectedResponse), + }; + + let body = body.into_iter() + .map(|extrinsic| B::Extrinsic::decode(&mut &extrinsic[..])) + .collect::<Result<_, _>>()?; + + let body = self.checker.check_body_proof(&request, body)?; + Ok(Reply::Extrinsics(body)) + } + + /// Signal that the node is connected to the given peer. + pub fn inject_connected(&mut self, peer: PeerId) { + let prev_entry = self.peers.insert(peer, Default::default()); + debug_assert!( + prev_entry.is_none(), + "Expect `inject_connected` to be called for disconnected peer.", + ); + } + + /// Signal that the node disconnected from the given peer. + pub fn inject_disconnected(&mut self, peer: PeerId) { + self.remove_peer(peer) + } +} + + +impl<B: Block> Stream for LightClientRequestSender<B> { + type Item = OutEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { + // If we have received responses to previously sent requests, check them and pass them on. + while let Poll::Ready(Some((sent_request, request_result))) = self.sent_requests.poll_next_unpin(cx) { + if let Some(info) = self.peers.get_mut(&sent_request.peer) { + if info.status != PeerStatus::Busy { + // If we get here, something is wrong with our internal handling of peer status + // information. At any time, a single peer processes at most one request from + // us. A malicious peer should not be able to get us here. It is our own fault + // and must be fixed! + panic!("unexpected peer status {:?} for {}", info.status, sent_request.peer); + } + + info.status = PeerStatus::Idle; // Make peer available again. + } + + let request_result = match request_result { + Ok(r) => r, + Err(oneshot::Canceled) => { + log::debug!("Oneshot for request to peer {} was canceled.", sent_request.peer); + self.remove_peer(sent_request.peer); + self.peerset.report_peer(sent_request.peer, ReputationChange::new_fatal("no response from peer")); + self.pending_requests.push_back(sent_request.into_pending()); + continue; + } + }; + + let decoded_request_result = request_result.map(|response| { + if sent_request.request.is_block_request() { + schema::v1::BlockResponse::decode(&response[..]) + .map(|r| Response::Block(r)) + } else { + schema::v1::light::Response::decode(&response[..]) + .map(|r| Response::Light(r)) + } + }); + + let response = match decoded_request_result { + Ok(Ok(response)) => response, + Ok(Err(e)) => { + log::debug!("Failed to decode response from peer {}: {:?}.", sent_request.peer, e); + self.remove_peer(sent_request.peer); + self.peerset.report_peer(sent_request.peer, ReputationChange::new_fatal("invalid response from peer")); + self.pending_requests.push_back(sent_request.into_pending()); + continue; + }, + Err(e) => { + log::debug!("Request to peer {} failed with {:?}.", sent_request.peer, e); + + match e { + RequestFailure::NotConnected => { + self.remove_peer(sent_request.peer); + self.pending_requests.push_back(sent_request.into_pending()); + } + RequestFailure::UnknownProtocol => { + debug_assert!( + false, + "Light client and block request protocol should be known when \ + sending requests.", + ); + } + RequestFailure::Refused => { + self.remove_peer(sent_request.peer); + self.peerset.report_peer( + sent_request.peer, + rep::REFUSED, + ); + self.pending_requests.push_back(sent_request.into_pending()); + } + RequestFailure::Obsolete => { + debug_assert!( + false, + "Can not receive `RequestFailure::Obsolete` after dropping the \ + response receiver.", + ); + self.pending_requests.push_back(sent_request.into_pending()); + } + RequestFailure::Network(OutboundFailure::Timeout) => { + self.remove_peer(sent_request.peer); + self.peerset.report_peer( + sent_request.peer, + rep::TIMEOUT, + ); + self.pending_requests.push_back(sent_request.into_pending()); + }, + RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => { + self.remove_peer(sent_request.peer); + self.peerset.report_peer( + sent_request.peer, + ReputationChange::new_fatal( + "peer does not support light client or block request protocol", + ), + ); + self.pending_requests.push_back(sent_request.into_pending()); + } + RequestFailure::Network(OutboundFailure::DialFailure) => { + self.remove_peer(sent_request.peer); + self.peerset.report_peer( + sent_request.peer, + ReputationChange::new_fatal( + "failed to dial peer", + ), + ); + self.pending_requests.push_back(sent_request.into_pending()); + } + RequestFailure::Network(OutboundFailure::ConnectionClosed) => { + self.remove_peer(sent_request.peer); + self.peerset.report_peer( + sent_request.peer, + ReputationChange::new_fatal( + "connection to peer closed", + ), + ); + self.pending_requests.push_back(sent_request.into_pending()); + } + } + + continue; + } + }; + + match self.on_response(sent_request.peer, &sent_request.request, response) { + Ok(reply) => sent_request.request.return_reply(Ok(reply)), + Err(Error::UnexpectedResponse) => { + log::debug!("Unexpected response from peer {}.", sent_request.peer); + self.remove_peer(sent_request.peer); + self.peerset.report_peer( + sent_request.peer, + ReputationChange::new_fatal( + "unexpected response from peer", + ), + ); + self.pending_requests.push_back(sent_request.into_pending()); + } + Err(other) => { + log::debug!("error handling response from peer {}: {}", sent_request.peer, other); + self.remove_peer(sent_request.peer); + self.peerset.report_peer( + sent_request.peer, + ReputationChange::new_fatal( + "invalid response from peer", + ), + ); + self.pending_requests.push_back(sent_request.into_pending()) + } + } + } + + // If we have a pending request to send, try to find an available peer and send it. + while let Some(mut pending_request) = self.pending_requests.pop_front() { + if pending_request.attempts_left == 0 { + pending_request.request.return_reply(Err(ClientError::RemoteFetchFailed)); + continue + } + + let protocol = if pending_request.request.is_block_request() { + self.config.block_protocol.clone() + } else { + self.config.light_protocol.clone() + }; + + // Out of all idle peers, find one who's best block is high enough, choose any idle peer + // if none exists. + let mut peer = None; + for (peer_id, peer_info) in self.peers.iter_mut() { + if peer_info.status == PeerStatus::Idle { + match peer_info.best_block { + Some(n) if n >= pending_request.request.required_block() => { + peer = Some((*peer_id, peer_info)); + break + }, + _ => peer = Some((*peer_id, peer_info)) + } + } + } + + // Break in case there is no idle peer. + let (peer_id, peer_info) = match peer { + Some((peer_id, peer_info)) => (peer_id, peer_info), + None => { + self.pending_requests.push_front(pending_request); + log::debug!("No peer available to send request to."); + + break; + } + }; + + let request_bytes = match pending_request.request.serialize_request() { + Ok(bytes) => bytes, + Err(error) => { + log::debug!("failed to serialize request: {}", error); + pending_request.request.return_reply(Err(ClientError::RemoteFetchFailed)); + continue + } + }; + + let (tx, rx) = oneshot::channel(); + + peer_info.status = PeerStatus::Busy; + + pending_request.attempts_left -= 1; + + self.sent_requests.push(async move { + (pending_request.into_sent(peer_id), rx.await) + }.boxed()); + + return Poll::Ready(Some(OutEvent::SendRequest { + target: peer_id, + request: request_bytes, + pending_response: tx, + protocol_name: protocol, + })); + } + + Poll::Pending + } +} + +/// Events returned by [`LightClientRequestSender`]. +#[derive(Debug)] +pub enum OutEvent { + /// Emit a request to be send out on the network e.g. via [`crate::request_responses`]. + SendRequest { + /// The remote peer to send the request to. + target: PeerId, + /// The encoded request. + request: Vec<u8>, + /// The [`onehsot::Sender`] channel to pass the response to. + pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>, + /// The name of the protocol to use to send the request. + protocol_name: String, + } +} + +/// Incoming response from remote. +#[derive(Debug, Clone)] +pub enum Response { + /// Incoming light response from remote. + Light(schema::v1::light::Response), + /// Incoming block response from remote. + Block(schema::v1::BlockResponse), +} + +/// Error returned by [`LightClientRequestSender::request`]. +#[derive(Debug, derive_more::Display, derive_more::From)] +pub enum SendRequestError { + /// There are currently too many pending request. + #[display(fmt = "too many pending requests")] + TooManyRequests, +} + +/// Error type to propagate errors internally. +#[derive(Debug, derive_more::Display, derive_more::From)] +enum Error { + /// The response type does not correspond to the issued request. + #[display(fmt = "unexpected response")] + UnexpectedResponse, + /// Encoding or decoding of some data failed. + #[display(fmt = "codec error: {}", _0)] + Codec(codec::Error), + /// The chain client errored. + #[display(fmt = "client error: {}", _0)] + Client(ClientError), +} + +/// The data to send back to the light client over the oneshot channel. +// +// It is unified here in order to be able to return it as a function +// result instead of delivering it to the client as a side effect of +// response processing. +#[derive(Debug)] +enum Reply<B: Block> { + VecU8(Vec<u8>), + VecNumberU32(Vec<(<B::Header as Header>::Number, u32)>), + MapVecU8OptVecU8(HashMap<Vec<u8>, Option<Vec<u8>>>), + Header(B::Header), + Extrinsics(Vec<B::Extrinsic>), +} + + +/// Information we have about some peer. +#[derive(Debug)] +struct PeerInfo<B: Block> { + best_block: Option<NumberFor<B>>, + status: PeerStatus, +} + +impl<B: Block> Default for PeerInfo<B> { + fn default() -> Self { + PeerInfo { + best_block: None, + status: PeerStatus::Idle, + } + } +} + +/// A peer is either idle or busy processing a request from us. +#[derive(Debug, Clone, PartialEq, Eq)] +enum PeerStatus { + /// The peer is available. + Idle, + /// We wait for the peer to return us a response for the given request ID. + Busy, +} + +/// The possible light client requests we support. +/// +/// The associated `oneshot::Sender` will be used to convey the result of +/// their request back to them (cf. `Reply`). +// +// This is modeled after light_dispatch.rs's `RequestData` which is not +// used because we currently only support a subset of those. +#[derive(Debug)] +pub enum Request<B: Block> { + /// Remote body request. + Body { + /// Request. + request: RemoteBodyRequest<B::Header>, + /// [`oneshot::Sender`] to return response. + sender: oneshot::Sender<Result<Vec<B::Extrinsic>, ClientError>> + }, + /// Remote header request. + Header { + /// Request. + request: light::RemoteHeaderRequest<B::Header>, + /// [`oneshot::Sender`] to return response. + sender: oneshot::Sender<Result<B::Header, ClientError>> + }, + /// Remote read request. + Read { + /// Request. + request: light::RemoteReadRequest<B::Header>, + /// [`oneshot::Sender`] to return response. + sender: oneshot::Sender<Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError>> + }, + /// Remote read child request. + ReadChild { + /// Request. + request: light::RemoteReadChildRequest<B::Header>, + /// [`oneshot::Sender`] to return response. + sender: oneshot::Sender<Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError>> + }, + /// Remote call request. + Call { + /// Request. + request: light::RemoteCallRequest<B::Header>, + /// [`oneshot::Sender`] to return response. + sender: oneshot::Sender<Result<Vec<u8>, ClientError>> + }, + /// Remote changes request. + Changes { + /// Request. + request: light::RemoteChangesRequest<B::Header>, + /// [`oneshot::Sender`] to return response. + sender: oneshot::Sender<Result<Vec<(NumberFor<B>, u32)>, ClientError>> + } +} + +impl<B: Block> Request<B> { + fn is_block_request(&self) -> bool { + matches!(self, Request::Body { .. }) + } + + fn required_block(&self) -> NumberFor<B> { + match self { + Request::Body { request, .. } => *request.header.number(), + Request::Header { request, .. } => request.block, + Request::Read { request, .. } => *request.header.number(), + Request::ReadChild { request, .. } => *request.header.number(), + Request::Call { request, .. } => *request.header.number(), + Request::Changes { request, .. } => request.max_block.0, + } + } + + fn retries(&self) -> usize { + let rc = match self { + Request::Body { request, .. } => request.retry_count, + Request::Header { request, .. } => request.retry_count, + Request::Read { request, .. } => request.retry_count, + Request::ReadChild { request, .. } => request.retry_count, + Request::Call { request, .. } => request.retry_count, + Request::Changes { request, .. } => request.retry_count, + }; + rc.unwrap_or(0) + } + + fn serialize_request(&self) -> Result<Vec<u8>, prost::EncodeError> { + let request = match self { + Request::Body { request, .. } => { + let rq = schema::v1::BlockRequest { + fields: BlockAttributes::BODY.to_be_u32(), + from_block: Some(schema::v1::block_request::FromBlock::Hash( + request.header.hash().encode(), + )), + to_block: Default::default(), + direction: schema::v1::Direction::Ascending as i32, + max_blocks: 1, + }; + + let mut buf = Vec::with_capacity(rq.encoded_len()); + rq.encode(&mut buf)?; + return Ok(buf); + } + Request::Header { request, .. } => { + let r = schema::v1::light::RemoteHeaderRequest { block: request.block.encode() }; + schema::v1::light::request::Request::RemoteHeaderRequest(r) + } + Request::Read { request, .. } => { + let r = schema::v1::light::RemoteReadRequest { + block: request.block.encode(), + keys: request.keys.clone(), + }; + schema::v1::light::request::Request::RemoteReadRequest(r) + } + Request::ReadChild { request, .. } => { + let r = schema::v1::light::RemoteReadChildRequest { + block: request.block.encode(), + storage_key: request.storage_key.clone().into_inner(), + keys: request.keys.clone(), + }; + schema::v1::light::request::Request::RemoteReadChildRequest(r) + } + Request::Call { request, .. } => { + let r = schema::v1::light::RemoteCallRequest { + block: request.block.encode(), + method: request.method.clone(), + data: request.call_data.clone(), + }; + schema::v1::light::request::Request::RemoteCallRequest(r) + } + Request::Changes { request, .. } => { + let r = schema::v1::light::RemoteChangesRequest { + first: request.first_block.1.encode(), + last: request.last_block.1.encode(), + min: request.tries_roots.1.encode(), + max: request.max_block.1.encode(), + storage_key: request.storage_key.clone().map(|s| s.into_inner()) + .unwrap_or_default(), + key: request.key.clone(), + }; + schema::v1::light::request::Request::RemoteChangesRequest(r) + } + }; + + let rq = schema::v1::light::Request { request: Some(request) }; + let mut buf = Vec::with_capacity(rq.encoded_len()); + rq.encode(&mut buf)?; + Ok(buf) + } + + fn return_reply(self, result: Result<Reply<B>, ClientError>) { + fn send<T>(item: T, sender: oneshot::Sender<T>) { + let _ = sender.send(item); // It is okay if the other end already hung up. + } + match self { + Request::Body { request, sender } => match result { + Err(e) => send(Err(e), sender), + Ok(Reply::Extrinsics(x)) => send(Ok(x), sender), + reply => log::error!("invalid reply for body request: {:?}, {:?}", reply, request), + } + Request::Header { request, sender } => match result { + Err(e) => send(Err(e), sender), + Ok(Reply::Header(x)) => send(Ok(x), sender), + reply => log::error!("invalid reply for header request: {:?}, {:?}", reply, request), + } + Request::Read { request, sender } => match result { + Err(e) => send(Err(e), sender), + Ok(Reply::MapVecU8OptVecU8(x)) => send(Ok(x), sender), + reply => log::error!("invalid reply for read request: {:?}, {:?}", reply, request), + } + Request::ReadChild { request, sender } => match result { + Err(e) => send(Err(e), sender), + Ok(Reply::MapVecU8OptVecU8(x)) => send(Ok(x), sender), + reply => log::error!("invalid reply for read child request: {:?}, {:?}", reply, request), + } + Request::Call { request, sender } => match result { + Err(e) => send(Err(e), sender), + Ok(Reply::VecU8(x)) => send(Ok(x), sender), + reply => log::error!("invalid reply for call request: {:?}, {:?}", reply, request), + } + Request::Changes { request, sender } => match result { + Err(e) => send(Err(e), sender), + Ok(Reply::VecNumberU32(x)) => send(Ok(x), sender), + reply => log::error!("invalid reply for changes request: {:?}, {:?}", reply, request), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::light_client_requests::tests::{DummyFetchChecker, protocol_id, peerset, dummy_header}; + use crate::request_responses::OutboundFailure; + + use assert_matches::assert_matches; + use futures::channel::oneshot; + use futures::executor::block_on; + use futures::poll; + use sc_client_api::StorageProof; + use sp_core::storage::ChildInfo; + use sp_runtime::generic::Header; + use sp_runtime::traits::BlakeTwo256; + use std::collections::HashSet; + use std::iter::FromIterator; + + fn empty_proof() -> Vec<u8> { + StorageProof::empty().encode() + } + + #[test] + fn removes_peer_if_told() { + let peer = PeerId::random(); + let (_peer_set, peer_set_handle) = peerset(); + let mut sender = LightClientRequestSender::<Block>::new( + &protocol_id(), + Arc::new(DummyFetchChecker { + ok: true, + _mark: std::marker::PhantomData, + }), + peer_set_handle, + ); + + sender.inject_connected(peer); + assert_eq!(1, sender.peers.len()); + + sender.inject_disconnected(peer); + assert_eq!(0, sender.peers.len()); + } + + type Block = + sp_runtime::generic::Block<Header<u64, BlakeTwo256>, substrate_test_runtime::Extrinsic>; + + #[test] + fn body_request_fields_encoded_properly() { + let (sender, _receiver) = oneshot::channel(); + let request = Request::<Block>::Body { + request: RemoteBodyRequest { + header: dummy_header(), + retry_count: None, + }, + sender, + }; + let serialized_request = request.serialize_request().unwrap(); + let deserialized_request = schema::v1::BlockRequest::decode(&serialized_request[..]).unwrap(); + assert!(BlockAttributes::from_be_u32(deserialized_request.fields) + .unwrap() + .contains(BlockAttributes::BODY)); + } + + #[test] + fn disconnects_from_peer_if_request_times_out() { + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + + let (_peer_set, peer_set_handle) = peerset(); + let mut sender = LightClientRequestSender::<Block>::new( + &protocol_id(), + Arc::new(crate::light_client_requests::tests::DummyFetchChecker { + ok: true, + _mark: std::marker::PhantomData, + }), + peer_set_handle, + ); + + sender.inject_connected(peer0); + sender.inject_connected(peer1); + + assert_eq!( + HashSet::from_iter(&[peer0.clone(), peer1.clone()]), + sender.peers.keys().collect::<HashSet<_>>(), + "Expect knowledge of two peers." + ); + + assert!(sender.pending_requests.is_empty(), "Expect no pending request."); + assert!(sender.sent_requests.is_empty(), "Expect no sent request."); + + // Issue a request! + let chan = oneshot::channel(); + let request = light::RemoteCallRequest { + block: Default::default(), + header: dummy_header(), + method: "test".into(), + call_data: vec![], + retry_count: Some(1), + }; + sender.request(Request::Call { request, sender: chan.0 }).unwrap(); + assert_eq!(1, sender.pending_requests.len(), "Expect one pending request."); + + let OutEvent::SendRequest { target, pending_response, .. } = block_on(sender.next()).unwrap(); + assert!( + target == peer0 || target == peer1, + "Expect request to originate from known peer.", + ); + + // And we should have one busy peer. + assert!({ + let (idle, busy): (Vec<_>, Vec<_>) = sender + .peers + .iter() + .partition(|(_, info)| info.status == PeerStatus::Idle); + idle.len() == 1 + && busy.len() == 1 + && (idle[0].0 == &peer0 || busy[0].0 == &peer0) + && (idle[0].0 == &peer1 || busy[0].0 == &peer1) + }); + + assert_eq!(0, sender.pending_requests.len(), "Expect no pending request."); + assert_eq!(1, sender.sent_requests.len(), "Expect one request to be sent."); + + // Report first attempt as timed out. + pending_response.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).unwrap(); + + // Expect a new request to be issued. + let OutEvent::SendRequest { pending_response, .. } = block_on(sender.next()).unwrap(); + + assert_eq!(1, sender.peers.len(), "Expect peer to be removed."); + assert_eq!(0, sender.pending_requests.len(), "Expect no request to be pending."); + assert_eq!(1, sender.sent_requests.len(), "Expect new request to be issued."); + + // Report second attempt as timed out. + pending_response.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).unwrap(); + assert_matches!( + block_on(async { poll!(sender.next()) }), Poll::Pending, + "Expect sender to not issue another attempt.", + ); + assert_matches!( + block_on(chan.1).unwrap(), Err(ClientError::RemoteFetchFailed), + "Expect request failure to be reported.", + ); + assert_eq!(0, sender.peers.len(), "Expect no peer to be left"); + assert_eq!(0, sender.pending_requests.len(), "Expect no request to be pending."); + assert_eq!(0, sender.sent_requests.len(), "Expect no other request to be in progress."); + } + + #[test] + fn disconnects_from_peer_on_incorrect_response() { + let peer = PeerId::random(); + + let (_peer_set, peer_set_handle) = peerset(); + let mut sender = LightClientRequestSender::<Block>::new( + &protocol_id(), + Arc::new(crate::light_client_requests::tests::DummyFetchChecker { + ok: false, + // ^--- Making sure the response data check fails. + _mark: std::marker::PhantomData, + }), + peer_set_handle, + ); + + sender.inject_connected(peer); + assert_eq!(1, sender.peers.len(), "Expect one peer."); + + let chan = oneshot::channel(); + let request = light::RemoteCallRequest { + block: Default::default(), + header: dummy_header(), + method: "test".into(), + call_data: vec![], + retry_count: Some(1), + }; + sender + .request(Request::Call { + request, + sender: chan.0, + }) + .unwrap(); + + assert_eq!(1, sender.pending_requests.len(), "Expect one pending request."); + assert_eq!(0, sender.sent_requests.len(), "Expect zero sent requests."); + + let OutEvent::SendRequest { pending_response, .. } = block_on(sender.next()).unwrap(); + assert_eq!(0, sender.pending_requests.len(), "Expect zero pending requests."); + assert_eq!(1, sender.sent_requests.len(), "Expect one sent request."); + + let response = { + let r = schema::v1::light::RemoteCallResponse { + proof: empty_proof(), + }; + let response = schema::v1::light::Response { + response: Some(schema::v1::light::response::Response::RemoteCallResponse(r)), + }; + let mut data = Vec::new(); + response.encode(&mut data).unwrap(); + data + }; + + pending_response.send(Ok(response)).unwrap(); + + assert_matches!( + block_on(async { poll!(sender.next()) }), Poll::Pending, + "Expect sender to not issue another attempt, given that there is no peer left.", + ); + + assert!(sender.peers.is_empty(), "Expect no peers to be left."); + assert_eq!(1, sender.pending_requests.len(), "Expect request to be pending again."); + assert_eq!(0, sender.sent_requests.len(), "Expect no request to be sent."); + } + + #[test] + fn disconnects_from_peer_on_wrong_response_type() { + let peer = PeerId::random(); + let (_peer_set, peer_set_handle) = peerset(); + let mut sender = LightClientRequestSender::<Block>::new( + &protocol_id(), + Arc::new(crate::light_client_requests::tests::DummyFetchChecker { + ok: true, + _mark: std::marker::PhantomData, + }), + peer_set_handle, + ); + + sender.inject_connected(peer); + assert_eq!(1, sender.peers.len(), "Expect one peer."); + + let chan = oneshot::channel(); + let request = light::RemoteCallRequest { + block: Default::default(), + header: dummy_header(), + method: "test".into(), + call_data: vec![], + retry_count: Some(1), + }; + sender + .request(Request::Call { + request, + sender: chan.0, + }) + .unwrap(); + + assert_eq!(1, sender.pending_requests.len()); + assert_eq!(0, sender.sent_requests.len()); + let OutEvent::SendRequest { pending_response, .. } = block_on(sender.next()).unwrap(); + assert_eq!(0, sender.pending_requests.len(), "Expect zero pending requests."); + assert_eq!(1, sender.sent_requests.len(), "Expect one sent request."); + + let response = { + let r = schema::v1::light::RemoteReadResponse { + proof: empty_proof(), + }; // Not a RemoteCallResponse! + let response = schema::v1::light::Response { + response: Some(schema::v1::light::response::Response::RemoteReadResponse(r)), + }; + let mut data = Vec::new(); + response.encode(&mut data).unwrap(); + data + }; + + pending_response.send(Ok(response)).unwrap(); + assert_matches!( + block_on(async { poll!(sender.next()) }), Poll::Pending, + "Expect sender to not issue another attempt, given that there is no peer left.", + ); + + assert!(sender.peers.is_empty(), "Expect no peers to be left."); + assert_eq!(1, sender.pending_requests.len(), "Expect request to be pending again."); + assert_eq!(0, sender.sent_requests.len(), "Expect no request to be sent."); + } + + #[test] + fn receives_remote_failure_after_retry_count_failures() { + let peers = (0..4).map(|_| PeerId::random()).collect::<Vec<_>>(); + + let (_peer_set, peer_set_handle) = peerset(); + let mut sender = LightClientRequestSender::<Block>::new( + &protocol_id(), + Arc::new(crate::light_client_requests::tests::DummyFetchChecker { + ok: false, + // ^--- Making sure the response data check fails. + _mark: std::marker::PhantomData, + }), + peer_set_handle, + ); + + for peer in &peers { + sender.inject_connected(*peer); + } + assert_eq!(4, sender.peers.len(), "Expect four peers."); + + let mut chan = oneshot::channel(); + let request = light::RemoteCallRequest { + block: Default::default(), + header: dummy_header(), + method: "test".into(), + call_data: vec![], + retry_count: Some(3), // Attempt up to three retries. + }; + sender + .request(Request::Call { + request, + sender: chan.0, + }) + .unwrap(); + + assert_eq!(1, sender.pending_requests.len()); + assert_eq!(0, sender.sent_requests.len()); + let mut pending_response = match block_on(sender.next()).unwrap() { + OutEvent::SendRequest { pending_response, .. } => Some(pending_response), + }; + assert_eq!(0, sender.pending_requests.len(), "Expect zero pending requests."); + assert_eq!(1, sender.sent_requests.len(), "Expect one sent request."); + + for (i, _peer) in peers.iter().enumerate() { + // Construct an invalid response + let response = { + let r = schema::v1::light::RemoteCallResponse { + proof: empty_proof(), + }; + let response = schema::v1::light::Response { + response: Some(schema::v1::light::response::Response::RemoteCallResponse(r)), + }; + let mut data = Vec::new(); + response.encode(&mut data).unwrap(); + data + }; + pending_response.take().unwrap().send(Ok(response)).unwrap(); + + if i < 3 { + pending_response = match block_on(sender.next()).unwrap() { + OutEvent::SendRequest { pending_response, .. } => Some(pending_response), + }; + assert_matches!(chan.1.try_recv(), Ok(None)) + } else { + // Last peer and last attempt. + assert_matches!( + block_on(async { poll!(sender.next()) }), Poll::Pending, + "Expect sender to not issue another attempt, given that there is no peer left.", + ); + assert_matches!( + chan.1.try_recv(), + Ok(Some(Err(ClientError::RemoteFetchFailed))) + ) + } + } + } + + fn issue_request(request: Request<Block>) { + let peer = PeerId::random(); + + let (_peer_set, peer_set_handle) = peerset(); + let mut sender = LightClientRequestSender::<Block>::new( + &protocol_id(), + Arc::new(crate::light_client_requests::tests::DummyFetchChecker { + ok: true, + _mark: std::marker::PhantomData, + }), + peer_set_handle, + ); + + sender.inject_connected(peer); + assert_eq!(1, sender.peers.len(), "Expect one peer."); + + let response = match request { + Request::Body { .. } => unimplemented!(), + Request::Header { .. } => { + let r = schema::v1::light::RemoteHeaderResponse { + header: dummy_header().encode(), + proof: empty_proof(), + }; + schema::v1::light::Response { + response: Some(schema::v1::light::response::Response::RemoteHeaderResponse( + r, + )), + } + } + Request::Read { .. } => { + let r = schema::v1::light::RemoteReadResponse { + proof: empty_proof(), + }; + schema::v1::light::Response { + response: Some(schema::v1::light::response::Response::RemoteReadResponse(r)), + } + } + Request::ReadChild { .. } => { + let r = schema::v1::light::RemoteReadResponse { + proof: empty_proof(), + }; + schema::v1::light::Response { + response: Some(schema::v1::light::response::Response::RemoteReadResponse(r)), + } + } + Request::Call { .. } => { + let r = schema::v1::light::RemoteCallResponse { + proof: empty_proof(), + }; + schema::v1::light::Response { + response: Some(schema::v1::light::response::Response::RemoteCallResponse(r)), + } + } + Request::Changes { .. } => { + let r = schema::v1::light::RemoteChangesResponse { + max: std::iter::repeat(1).take(32).collect(), + proof: Vec::new(), + roots: Vec::new(), + roots_proof: empty_proof(), + }; + schema::v1::light::Response { + response: Some(schema::v1::light::response::Response::RemoteChangesResponse(r)), + } + } + }; + + let response = { + let mut data = Vec::new(); + response.encode(&mut data).unwrap(); + data + }; + + sender.request(request).unwrap(); + + assert_eq!(1, sender.pending_requests.len()); + assert_eq!(0, sender.sent_requests.len()); + let OutEvent::SendRequest { pending_response, .. } = block_on(sender.next()).unwrap(); + assert_eq!(0, sender.pending_requests.len()); + assert_eq!(1, sender.sent_requests.len()); + + pending_response.send(Ok(response)).unwrap(); + assert_matches!( + block_on(async { poll!(sender.next()) }), Poll::Pending, + "Expect sender to not issue another attempt, given that there is no peer left.", + ); + + assert_eq!(0, sender.pending_requests.len()); + assert_eq!(0, sender.sent_requests.len()) + } + + #[test] + fn receives_remote_call_response() { + let mut chan = oneshot::channel(); + let request = light::RemoteCallRequest { + block: Default::default(), + header: dummy_header(), + method: "test".into(), + call_data: vec![], + retry_count: None, + }; + issue_request(Request::Call { + request, + sender: chan.0, + }); + assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_)))) + } + + #[test] + fn receives_remote_read_response() { + let mut chan = oneshot::channel(); + let request = light::RemoteReadRequest { + header: dummy_header(), + block: Default::default(), + keys: vec![b":key".to_vec()], + retry_count: None, + }; + issue_request(Request::Read { + request, + sender: chan.0, + }); + assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_)))) + } + + #[test] + fn receives_remote_read_child_response() { + let mut chan = oneshot::channel(); + let child_info = ChildInfo::new_default(&b":child_storage:default:sub"[..]); + let request = light::RemoteReadChildRequest { + header: dummy_header(), + block: Default::default(), + storage_key: child_info.prefixed_storage_key(), + keys: vec![b":key".to_vec()], + retry_count: None, + }; + issue_request(Request::ReadChild { + request, + sender: chan.0, + }); + assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_)))) + } + + #[test] + fn receives_remote_header_response() { + let mut chan = oneshot::channel(); + let request = light::RemoteHeaderRequest { + cht_root: Default::default(), + block: 1, + retry_count: None, + }; + issue_request(Request::Header { + request, + sender: chan.0, + }); + assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_)))) + } + + #[test] + fn receives_remote_changes_response() { + let mut chan = oneshot::channel(); + let request = light::RemoteChangesRequest { + changes_trie_configs: vec![sp_core::ChangesTrieConfigurationRange { + zero: (0, Default::default()), + end: None, + config: Some(sp_core::ChangesTrieConfiguration::new(4, 2)), + }], + first_block: (1, Default::default()), + last_block: (100, Default::default()), + max_block: (100, Default::default()), + tries_roots: (1, Default::default(), Vec::new()), + key: Vec::new(), + storage_key: None, + retry_count: None, + }; + issue_request(Request::Changes { + request, + sender: chan.0, + }); + assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_)))) + } +} diff --git a/substrate/client/network/src/on_demand_layer.rs b/substrate/client/network/src/on_demand_layer.rs index 9ec1fb7508c3e6a4819250d0643b48cee7c00876..ef8076e8cbed7988c4c262dc79065472a59699fd 100644 --- a/substrate/client/network/src/on_demand_layer.rs +++ b/substrate/client/network/src/on_demand_layer.rs @@ -18,7 +18,7 @@ //! On-demand requests service. -use crate::light_client_handler; +use crate::light_client_requests; use futures::{channel::oneshot, prelude::*}; use parking_lot::Mutex; @@ -45,10 +45,10 @@ pub struct OnDemand<B: BlockT> { /// Note that a better alternative would be to use a MPMC queue here, and add a `poll` method /// from the `OnDemand`. However there exists no popular implementation of MPMC channels in /// asynchronous Rust at the moment - requests_queue: Mutex<Option<TracingUnboundedReceiver<light_client_handler::Request<B>>>>, + requests_queue: Mutex<Option<TracingUnboundedReceiver<light_client_requests::sender::Request<B>>>>, /// Sending side of `requests_queue`. - requests_send: TracingUnboundedSender<light_client_handler::Request<B>>, + requests_send: TracingUnboundedSender<light_client_requests::sender::Request<B>>, } @@ -149,7 +149,7 @@ where /// If this function returns `None`, that means that the receiver has already been extracted in /// the past, and therefore that something already handles the requests. pub(crate) fn extract_receiver(&self) - -> Option<TracingUnboundedReceiver<light_client_handler::Request<B>>> + -> Option<TracingUnboundedReceiver<light_client_requests::sender::Request<B>>> { self.requests_queue.lock().take() } @@ -170,7 +170,7 @@ where let (sender, receiver) = oneshot::channel(); let _ = self .requests_send - .unbounded_send(light_client_handler::Request::Header { request, sender }); + .unbounded_send(light_client_requests::sender::Request::Header { request, sender }); RemoteResponse { receiver } } @@ -178,7 +178,7 @@ where let (sender, receiver) = oneshot::channel(); let _ = self .requests_send - .unbounded_send(light_client_handler::Request::Read { request, sender }); + .unbounded_send(light_client_requests::sender::Request::Read { request, sender }); RemoteResponse { receiver } } @@ -189,7 +189,7 @@ where let (sender, receiver) = oneshot::channel(); let _ = self .requests_send - .unbounded_send(light_client_handler::Request::ReadChild { request, sender }); + .unbounded_send(light_client_requests::sender::Request::ReadChild { request, sender }); RemoteResponse { receiver } } @@ -197,7 +197,7 @@ where let (sender, receiver) = oneshot::channel(); let _ = self .requests_send - .unbounded_send(light_client_handler::Request::Call { request, sender }); + .unbounded_send(light_client_requests::sender::Request::Call { request, sender }); RemoteResponse { receiver } } @@ -208,7 +208,7 @@ where let (sender, receiver) = oneshot::channel(); let _ = self .requests_send - .unbounded_send(light_client_handler::Request::Changes { request, sender }); + .unbounded_send(light_client_requests::sender::Request::Changes { request, sender }); RemoteResponse { receiver } } @@ -216,7 +216,7 @@ where let (sender, receiver) = oneshot::channel(); let _ = self .requests_send - .unbounded_send(light_client_handler::Request::Body { request, sender }); + .unbounded_send(light_client_requests::sender::Request::Body { request, sender }); RemoteResponse { receiver } } } diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 9170644c3f409dd21f24865db06dbcf3f7b9f82c..4ac6ffe67f9092dacd19a2417d5b1a53f55b4fe5 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -281,10 +281,11 @@ impl RequestResponsesBehaviour { if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) { if protocol.is_connected(target) { let request_id = protocol.send_request(target, request); - self.pending_requests.insert( + let prev_req_id = self.pending_requests.insert( (protocol_name.to_string().into(), request_id).into(), (Instant::now(), pending_response), ); + debug_assert!(prev_req_id.is_none(), "Expect request id to be unique."); } else { if pending_response.send(Err(RequestFailure::NotConnected)).is_err() { log::debug!( diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 20968c12788938e347b5fc058ab19dcbd2f1b988..58c623a8f5f1f6fa84f74789cbb9140d4c13655c 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -38,7 +38,7 @@ use crate::{ NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer, }, on_demand_layer::AlwaysBadChecker, - light_client_handler, + light_client_requests, protocol::{ self, NotifsHandlerError, @@ -254,11 +254,10 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> { params.network_config.client_version, params.network_config.node_name ); - let light_client_handler = { - let config = light_client_handler::Config::new(¶ms.protocol_id); - light_client_handler::LightClientHandler::new( - config, - params.chain, + + let light_client_request_sender = { + light_client_requests::sender::LightClientRequestSender::new( + ¶ms.protocol_id, checker, peerset_handle.clone(), ) @@ -339,9 +338,10 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> { params.role, user_agent, local_public, - light_client_handler, + light_client_request_sender, discovery_config, params.block_request_protocol_config, + params.light_client_request_protocol_config, params.network_config.request_response_protocols, ); @@ -1286,7 +1286,7 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> { /// Messages from the [`NetworkService`] that must be processed. from_service: TracingUnboundedReceiver<ServiceToWorkerMsg<B, H>>, /// Receiver for queries from the light client that must be processed. - light_client_rqs: Option<TracingUnboundedReceiver<light_client_handler::Request<B>>>, + light_client_rqs: Option<TracingUnboundedReceiver<light_client_requests::sender::Request<B>>>, /// Senders for events that happen on the network. event_streams: out_events::OutChannels, /// Prometheus network metrics. @@ -1312,10 +1312,14 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> { // Check for new incoming light client requests. if let Some(light_client_rqs) = this.light_client_rqs.as_mut() { while let Poll::Ready(Some(rq)) = light_client_rqs.poll_next_unpin(cx) { - // This can error if there are too many queued requests already. - if this.network_service.light_client_request(rq).is_err() { - log::warn!("Couldn't start light client request: too many pending requests"); + let result = this.network_service.light_client_request(rq); + match result { + Ok(()) => {}, + Err(light_client_requests::sender::SendRequestError::TooManyRequests) => { + log::warn!("Couldn't start light client request: too many pending requests"); + } } + if let Some(metrics) = this.metrics.as_ref() { metrics.issued_light_requests.inc(); } @@ -1608,11 +1612,11 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> { let reason = match cause { Some(ConnectionError::IO(_)) => "transport-error", Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( - EitherError::A(EitherError::B(EitherError::A( - PingFailure::Timeout)))))))) => "ping-timeout", + EitherError::B(EitherError::A( + PingFailure::Timeout))))))) => "ping-timeout", Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( - EitherError::A(EitherError::A( - NotifsHandlerError::SyncNotificationsClogged))))))) => "sync-notifications-clogged", + EitherError::A( + NotifsHandlerError::SyncNotificationsClogged)))))) => "sync-notifications-clogged", Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(_))) => "protocol-error", Some(ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout)) => "keep-alive-timeout", None => "actively-closed", diff --git a/substrate/client/network/src/service/tests.rs b/substrate/client/network/src/service/tests.rs index 8f16040aee3b19111e6240cb7208869689be4a94..f88854963fb95a2363c864ce1e387b1ed7269bd1 100644 --- a/substrate/client/network/src/service/tests.rs +++ b/substrate/client/network/src/service/tests.rs @@ -18,6 +18,7 @@ use crate::{config, Event, NetworkService, NetworkWorker}; use crate::block_request_handler::BlockRequestHandler; +use crate::light_client_requests::handler::LightClientRequestHandler; use libp2p::PeerId; use futures::prelude::*; @@ -96,7 +97,16 @@ fn build_test_full_node(config: config::NetworkConfiguration) let block_request_protocol_config = { let (handler, protocol_config) = BlockRequestHandler::new( - protocol_id.clone(), + &protocol_id, + client.clone(), + ); + async_std::task::spawn(handler.run().boxed()); + protocol_config + }; + + let light_client_request_protocol_config = { + let (handler, protocol_config) = LightClientRequestHandler::new( + &protocol_id, client.clone(), ); async_std::task::spawn(handler.run().boxed()); @@ -117,6 +127,7 @@ fn build_test_full_node(config: config::NetworkConfiguration) ), metrics_registry: None, block_request_protocol_config, + light_client_request_protocol_config, }) .unwrap(); diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 786fddeed5554f35815d818b602e90a42149f2d9..f523be857507f45c25f0f378010ad9a0e17d0fbb 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -30,6 +30,7 @@ use std::{ use libp2p::build_multiaddr; use log::trace; use sc_network::block_request_handler::{self, BlockRequestHandler}; +use sc_network::light_client_requests::{self, handler::LightClientRequestHandler}; use sp_blockchain::{ HeaderBackend, Result as ClientResult, well_known_cache_keys::{self, Id as CacheKeyId}, @@ -726,7 +727,13 @@ pub trait TestNetFactory: Sized { let protocol_id = ProtocolId::from("test-protocol-name"); let block_request_protocol_config = { - let (handler, protocol_config) = BlockRequestHandler::new(protocol_id.clone(), client.clone()); + let (handler, protocol_config) = BlockRequestHandler::new(&protocol_id, client.clone()); + self.spawn_task(handler.run().boxed()); + protocol_config + }; + + let light_client_request_protocol_config = { + let (handler, protocol_config) = LightClientRequestHandler::new(&protocol_id, client.clone()); self.spawn_task(handler.run().boxed()); protocol_config }; @@ -744,6 +751,7 @@ pub trait TestNetFactory: Sized { .unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)), metrics_registry: None, block_request_protocol_config, + light_client_request_protocol_config, }).unwrap(); trace!(target: "test_network", "Peer identifier: {}", network.service().local_peer_id()); @@ -813,11 +821,13 @@ pub trait TestNetFactory: Sized { let protocol_id = ProtocolId::from("test-protocol-name"); - // Add block request handler. let block_request_protocol_config = block_request_handler::generate_protocol_config( - protocol_id.clone(), + &protocol_id, ); + let light_client_request_protocol_config = + light_client_requests::generate_protocol_config(&protocol_id); + let network = NetworkWorker::new(sc_network::config::Params { role: Role::Light, executor: None, @@ -830,6 +840,7 @@ pub trait TestNetFactory: Sized { block_announce_validator: Box::new(DefaultBlockAnnounceValidator), metrics_registry: None, block_request_protocol_config, + light_client_request_protocol_config, }).unwrap(); self.mut_peers(|peers| { diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 2ee95bd24d3245eda0cd774272f438b289497b40..882a6c406265002819f34c69375acba1e72513b4 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -43,6 +43,7 @@ use log::{info, warn}; use sc_network::config::{Role, OnDemand}; use sc_network::NetworkService; use sc_network::block_request_handler::{self, BlockRequestHandler}; +use sc_network::light_client_requests::{self, handler::LightClientRequestHandler}; use sp_runtime::generic::BlockId; use sp_runtime::traits::{ Block as BlockT, HashFor, Zero, BlockIdTo, @@ -869,11 +870,11 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>( let block_request_protocol_config = { if matches!(config.role, Role::Light) { // Allow outgoing requests but deny incoming requests. - block_request_handler::generate_protocol_config(protocol_id.clone()) + block_request_handler::generate_protocol_config(&protocol_id) } else { // Allow both outgoing and incoming requests. let (handler, protocol_config) = BlockRequestHandler::new( - protocol_id.clone(), + &protocol_id, client.clone(), ); spawn_handle.spawn("block_request_handler", handler.run()); @@ -881,6 +882,21 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>( } }; + let light_client_request_protocol_config = { + if matches!(config.role, Role::Light) { + // Allow outgoing requests but deny incoming requests. + light_client_requests::generate_protocol_config(&protocol_id) + } else { + // Allow both outgoing and incoming requests. + let (handler, protocol_config) = LightClientRequestHandler::new( + &protocol_id, + client.clone(), + ); + spawn_handle.spawn("light_client_request_handler", handler.run()); + protocol_config + } + }; + let network_params = sc_network::config::Params { role: config.role.clone(), executor: { @@ -898,6 +914,7 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>( block_announce_validator, metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_request_protocol_config, + light_client_request_protocol_config, }; let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();