From ff479c4e23f644af9f5cdc163eabd6c1dea4c27a Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 23 May 2019 21:13:23 +0200 Subject: [PATCH] Rework the OnDemand service (#2670) * Rework the OnDemand service * Try fix line widths --- substrate/core/network/src/config.rs | 4 +- substrate/core/network/src/lib.rs | 4 +- substrate/core/network/src/on_demand.rs | 926 ++++++++---------- substrate/core/network/src/on_demand_layer.rs | 149 +++ substrate/core/network/src/protocol.rs | 137 ++- substrate/core/network/src/service.rs | 18 +- substrate/core/network/src/test/mod.rs | 5 +- substrate/core/service/src/lib.rs | 5 +- 8 files changed, 665 insertions(+), 583 deletions(-) create mode 100644 substrate/core/network/src/on_demand_layer.rs diff --git a/substrate/core/network/src/config.rs b/substrate/core/network/src/config.rs index 05cf27ca755..a2a34780bf5 100644 --- a/substrate/core/network/src/config.rs +++ b/substrate/core/network/src/config.rs @@ -21,7 +21,7 @@ pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, NodeKeyConfi use bitflags::bitflags; use crate::chain::{Client, FinalityProofProvider}; use parity_codec; -use crate::on_demand::OnDemandService; +use crate::on_demand_layer::OnDemand; use runtime_primitives::traits::{Block as BlockT}; use crate::service::{ExHashT, TransactionPool}; use std::sync::Arc; @@ -37,7 +37,7 @@ pub struct Params { /// Finality proof provider. pub finality_proof_provider: Option>>, /// On-demand service reference. - pub on_demand: Option>>, + pub on_demand: Option>>, /// Transaction pool. pub transaction_pool: Arc>, /// Protocol specialization. diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index 4d69767c7bd..e3ed56d5adb 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -30,6 +30,7 @@ mod protocol; mod chain; mod blocks; mod on_demand; +mod on_demand_layer; mod util; pub mod config; pub mod consensus_gossip; @@ -56,6 +57,7 @@ pub use network_libp2p::{ }; pub use message::{generic as generic_message, RequestId, Status as StatusMessage}; pub use error::Error; -pub use on_demand::{OnDemand, OnDemandService, RemoteResponse}; +pub use on_demand::AlwaysBadChecker; +pub use on_demand_layer::{OnDemand, RemoteResponse}; #[doc(hidden)] pub use runtime_primitives::traits::Block as BlockT; diff --git a/substrate/core/network/src/on_demand.rs b/substrate/core/network/src/on_demand.rs index ec4c178232f..f77b50dac6f 100644 --- a/substrate/core/network/src/on_demand.rs +++ b/substrate/core/network/src/on_demand.rs @@ -17,22 +17,18 @@ //! On-demand requests service. use std::collections::{HashMap, VecDeque}; -use std::sync::{Arc, Weak}; +use std::sync::Arc; use std::time::{Instant, Duration}; use log::{trace, info}; -use futures::{Async, Future, Poll}; -use futures::sync::oneshot::{channel, Receiver, Sender as OneShotSender}; +use futures::sync::oneshot::{Sender as OneShotSender}; use linked_hash_map::{Entry, LinkedHashMap}; -use parking_lot::Mutex; use client::error::Error as ClientError; -use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest, +use client::light::fetcher::{FetchChecker, RemoteHeaderRequest, RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, ChangesProof, RemoteReadChildRequest, RemoteBodyRequest}; use crate::message; use network_libp2p::PeerId; use crate::config::Roles; -use crate::service::Service as NetworkService; -use crate::specialization::NetworkSpecialization; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; /// Remote request timeout. @@ -42,97 +38,21 @@ const RETRY_COUNT: usize = 1; /// Reputation change for a peer when a request timed out. const TIMEOUT_REPUTATION_CHANGE: i32 = -(1 << 8); -/// On-demand service API. -pub trait OnDemandService: Send + Sync { - /// When new node is connected. - fn on_connect(&self, peer: PeerId, role: Roles, best_number: NumberFor); - - /// When block is announced by the peer. - fn on_block_announce(&self, peer: PeerId, best_number: NumberFor); - - /// When node is disconnected. - fn on_disconnect(&self, peer: PeerId); - - /// Maintain peers requests. - fn maintain_peers(&self); - - /// When header response is received from remote node. - fn on_remote_header_response( - &self, - peer: PeerId, - response: message::RemoteHeaderResponse - ); - - /// When read response is received from remote node. - fn on_remote_read_response(&self, peer: PeerId, response: message::RemoteReadResponse); - - /// When call response is received from remote node. - fn on_remote_call_response(&self, peer: PeerId, response: message::RemoteCallResponse); - - /// When changes response is received from remote node. - fn on_remote_changes_response( - &self, - peer: PeerId, - response: message::RemoteChangesResponse, Block::Hash> - ); - - /// When body response is received from remote node. - fn on_remote_body_response( - &self, - peer: PeerId, - response: message::BlockResponse - ); - - /// Check whether a block response is an `on_demand` response - fn is_on_demand_response(&self, peer: &PeerId, request_id: message::RequestId) -> bool; -} - -/// Trait used by the `OnDemand` service to communicate messages back to the network. +/// Trait used by the `OnDemandCore` service to communicate messages back to the network. pub trait OnDemandNetwork { /// Adjusts the reputation of the given peer. - fn report_peer(&self, who: &PeerId, reputation_change: i32); + fn report_peer(&mut self, who: &PeerId, reputation_change: i32); /// Disconnect from the given peer. Used in case of misbehaviour. - fn disconnect_peer(&self, who: &PeerId); + fn disconnect_peer(&mut self, who: &PeerId); /// Send a request to a peer. - fn send_request(&self, who: &PeerId, message: message::Message); -} - -impl> OnDemandNetwork for Weak> { - fn report_peer(&self, who: &PeerId, reputation_change: i32) { - if let Some(service) = self.upgrade() { - service.report_peer(who.clone(), reputation_change) - } - } - - fn disconnect_peer(&self, who: &PeerId) { - if let Some(service) = self.upgrade() { - service.disconnect_peer(who.clone()) - } - } - - fn send_request(&self, who: &PeerId, message: message::Message) { - if let Some(service) = self.upgrade() { - service.send_request(who.clone(), message) - } - } + fn send_request(&mut self, who: &PeerId, message: message::Message); } /// On-demand requests service. Dispatches requests to appropriate peers. -pub struct OnDemand { - core: Mutex>, - checker: Arc>, - network_interface: Mutex + Send + Sync + 'static>>>, -} - -/// On-demand remote call response. -pub struct RemoteResponse { - receiver: Receiver>, -} - -#[derive(Default)] -struct OnDemandCore { +pub struct OnDemandCore { + checker: Arc>, next_request_id: u64, pending_requests: VecDeque>, active_peers: LinkedHashMap>, @@ -147,7 +67,10 @@ struct Request { data: RequestData, } -enum RequestData { +/// One request for data made by the `Client`. +/// +/// Contains a `Sender` where to send the result. +pub(crate) enum RequestData { RemoteBody(RemoteBodyRequest, OneShotSender, ClientError>>), RemoteHeader(RemoteHeaderRequest, OneShotSender>), RemoteRead(RemoteReadRequest, OneShotSender>, ClientError>>), @@ -156,7 +79,10 @@ enum RequestData { OneShotSender>, ClientError>> ), RemoteCall(RemoteCallRequest, OneShotSender, ClientError>>), - RemoteChanges(RemoteChangesRequest, OneShotSender, u32)>, ClientError>>), + RemoteChanges( + RemoteChangesRequest, + OneShotSender, u32)>, ClientError>> + ), } enum Accept { @@ -165,103 +91,126 @@ enum Accept { Unexpected(RequestData), } -impl Future for RemoteResponse { - type Item = T; - type Error = ClientError; - - fn poll(&mut self) -> Poll { - self.receiver.poll() - .map_err(|_| ClientError::RemoteFetchCancelled.into()) - .and_then(|r| match r { - Async::Ready(Ok(ready)) => Ok(Async::Ready(ready)), - Async::Ready(Err(error)) => Err(error), - Async::NotReady => Ok(Async::NotReady), - }) +/// Dummy implementation of `FetchChecker` that always assumes that responses are bad. +/// +/// Considering that it is the responsibility of the client to build the fetcher, it can use this +/// implementation if it knows that it will never perform any request. +#[derive(Default, Clone)] +pub struct AlwaysBadChecker; + +impl FetchChecker for AlwaysBadChecker { + fn check_header_proof( + &self, + _request: &RemoteHeaderRequest, + _remote_header: Option, + _remote_proof: Vec> + ) -> Result { + Err(ClientError::Msg("AlwaysBadChecker".into())) } -} -impl OnDemand where - B::Header: HeaderT, -{ - /// Creates new on-demand service. - pub fn new(checker: Arc>) -> Self { - OnDemand { - checker, - network_interface: Mutex::new(None), - core: Mutex::new(OnDemandCore { - next_request_id: 0, - pending_requests: VecDeque::new(), - active_peers: LinkedHashMap::new(), - idle_peers: VecDeque::new(), - best_blocks: HashMap::new(), - }) - } + fn check_read_proof( + &self, + _request: &RemoteReadRequest, + _remote_proof: Vec> + ) -> Result>, ClientError> { + Err(ClientError::Msg("AlwaysBadChecker".into())) } - /// Get checker reference. - pub fn checker(&self) -> &Arc> { - &self.checker + fn check_read_child_proof( + &self, + _request: &RemoteReadChildRequest, + _remote_proof: Vec> + ) -> Result>, ClientError> { + Err(ClientError::Msg("AlwaysBadChecker".into())) + } + + fn check_execution_proof( + &self, + _request: &RemoteCallRequest, + _remote_proof: Vec> + ) -> Result, ClientError> { + Err(ClientError::Msg("AlwaysBadChecker".into())) } - /// Sets weak reference to network service. - pub fn set_network_interface(&self, network_interface: Box + Send + Sync + 'static>) { - self.network_interface.lock().replace(network_interface); + fn check_changes_proof( + &self, + _request: &RemoteChangesRequest, + _remote_proof: ChangesProof + ) -> Result, u32)>, ClientError> { + Err(ClientError::Msg("AlwaysBadChecker".into())) } - fn report_peer(&self, who: &PeerId, reputation_change: i32) { - self.network_interface - .lock() - .as_ref() - .expect("1. OnDemand is passed a network sender upon initialization of the service, 2. it should bet set by now") - .report_peer(who, reputation_change); + fn check_body_proof( + &self, + _request: &RemoteBodyRequest, + _body: Vec + ) -> Result, ClientError> { + Err(ClientError::Msg("AlwaysBadChecker".into())) } +} - fn disconnect_peer(&self, who: &PeerId) { - self.network_interface - .lock() - .as_ref() - .expect("1. OnDemand is passed a network sender upon initialization of the service, 2. it should bet set by now") - .disconnect_peer(who); +impl OnDemandCore where + B::Header: HeaderT, +{ + /// Creates new on-demand requests processer. + pub fn new(checker: Arc>) -> Self { + OnDemandCore { + checker, + next_request_id: 0, + pending_requests: VecDeque::new(), + active_peers: LinkedHashMap::new(), + idle_peers: VecDeque::new(), + best_blocks: HashMap::new(), + } } - fn send_request(&self, who: &PeerId, msg: message::Message) { - self.network_interface - .lock() - .as_ref() - .expect("1. OnDemand is passed a network sender upon initialization of the service, 2. it should bet set by now") - .send_request(who, msg); + /// Inserts a new request in the list of requests to execute. + pub(crate) fn add_request(&mut self, network: impl OnDemandNetwork, data: RequestData) { + self.insert(RETRY_COUNT, data); + self.dispatch(network); } - /// Schedule && dispatch all scheduled requests. - fn schedule_request(&self, retry_count: Option, data: RequestData, result: R) -> R { - let mut core = self.core.lock(); - core.insert(retry_count.unwrap_or(RETRY_COUNT), data); - core.dispatch(self); - result + /// Inserts a new request in the list of requests to execute. + fn insert(&mut self, retry_count: usize, data: RequestData) { + let request_id = self.next_request_id; + self.next_request_id += 1; + + self.pending_requests.push_back(Request { + id: request_id, + timestamp: Instant::now(), + retry_count, + data, + }); } /// Try to accept response from given peer. - fn accept_response) -> Accept>(&self, rtype: &str, peer: PeerId, request_id: u64, try_accept: F) { - let mut core = self.core.lock(); - let request = match core.remove(peer.clone(), request_id) { + fn accept_response( + &mut self, + rtype: &str, + mut network: impl OnDemandNetwork, + peer: PeerId, + request_id: u64, + try_accept: impl FnOnce(Request, &Arc>) -> Accept + ) { + let request = match self.remove(peer.clone(), request_id) { Some(request) => request, None => { info!("Invalid remote {} response from peer {}", rtype, peer); - self.report_peer(&peer, i32::min_value()); - self.disconnect_peer(&peer); - core.remove_peer(peer); + network.report_peer(&peer, i32::min_value()); + network.disconnect_peer(&peer); + self.remove_peer(peer); return; }, }; let retry_count = request.retry_count; - let (retry_count, retry_request_data) = match try_accept(request) { + let (retry_count, retry_request_data) = match try_accept(request, &self.checker) { Accept::Ok => (retry_count, None), Accept::CheckFailed(error, retry_request_data) => { info!("Failed to check remote {} response from peer {}: {}", rtype, peer, error); - self.report_peer(&peer, i32::min_value()); - self.disconnect_peer(&peer); - core.remove_peer(peer); + network.report_peer(&peer, i32::min_value()); + network.disconnect_peer(&peer); + self.remove_peer(peer); if retry_count > 0 { (retry_count - 1, Some(retry_request_data)) @@ -273,60 +222,78 @@ impl OnDemand where }, Accept::Unexpected(retry_request_data) => { info!("Unexpected response to remote {} from peer", rtype); - self.report_peer(&peer, i32::min_value()); - self.disconnect_peer(&peer); - core.remove_peer(peer); + network.report_peer(&peer, i32::min_value()); + network.disconnect_peer(&peer); + self.remove_peer(peer); (retry_count, Some(retry_request_data)) }, }; if let Some(request_data) = retry_request_data { - core.insert(retry_count, request_data); + self.insert(retry_count, request_data); } - core.dispatch(self); + self.dispatch(network); } -} -impl OnDemandService for OnDemand where - B: BlockT, - B::Header: HeaderT, -{ - fn on_connect(&self, peer: PeerId, role: Roles, best_number: NumberFor) { + pub fn on_connect( + &mut self, + network: impl OnDemandNetwork, + peer: PeerId, + role: Roles, + best_number: NumberFor + ) { if !role.is_full() { return; } - let mut core = self.core.lock(); - core.add_peer(peer, best_number); - core.dispatch(self); + self.idle_peers.push_back(peer.clone()); + self.best_blocks.insert(peer, best_number); + + self.dispatch(network); } - fn on_block_announce(&self, peer: PeerId, best_number: NumberFor) { - let mut core = self.core.lock(); - core.update_peer(peer, best_number); - core.dispatch(self); + pub fn on_block_announce(&mut self, network: impl OnDemandNetwork, peer: PeerId, best_number: NumberFor) { + self.best_blocks.insert(peer, best_number); + self.dispatch(network); } - fn on_disconnect(&self, peer: PeerId) { - let mut core = self.core.lock(); - core.remove_peer(peer); - core.dispatch(self); + pub fn on_disconnect(&mut self, network: impl OnDemandNetwork, peer: PeerId) { + self.remove_peer(peer); + self.dispatch(network); } - fn maintain_peers(&self) { - let mut core = self.core.lock(); - for bad_peer in core.maintain_peers() { - self.report_peer(&bad_peer, TIMEOUT_REPUTATION_CHANGE); - self.disconnect_peer(&bad_peer); + pub fn maintain_peers(&mut self, mut network: impl OnDemandNetwork) { + let now = Instant::now(); + + loop { + match self.active_peers.front() { + Some((_, request)) if now - request.timestamp >= REQUEST_TIMEOUT => (), + _ => break, + } + + let (bad_peer, request) = self.active_peers.pop_front().expect("front() is Some as checked above"); + self.pending_requests.push_front(request); + network.report_peer(&bad_peer, TIMEOUT_REPUTATION_CHANGE); + network.disconnect_peer(&bad_peer); } - core.dispatch(self); + + self.dispatch(network); } - fn on_remote_header_response(&self, peer: PeerId, response: message::RemoteHeaderResponse) { - self.accept_response("header", peer, response.id, |request| match request.data { - RequestData::RemoteHeader(request, sender) => match self.checker.check_header_proof(&request, response.header, response.proof) { + pub fn on_remote_header_response( + &mut self, + network: impl OnDemandNetwork, + peer: PeerId, + response: message::RemoteHeaderResponse + ) { + self.accept_response("header", network, peer, response.id, |request, checker| match request.data { + RequestData::RemoteHeader(request, sender) => match checker.check_header_proof( + &request, + response.header, + response.proof + ) { Ok(response) => { // we do not bother if receiver has been dropped already let _ = sender.send(Ok(response)); @@ -338,10 +305,15 @@ impl OnDemandService for OnDemand where }) } - fn on_remote_read_response(&self, peer: PeerId, response: message::RemoteReadResponse) { - self.accept_response("read", peer, response.id, |request| match request.data { + pub fn on_remote_read_response( + &mut self, + network: impl OnDemandNetwork, + peer: PeerId, + response: message::RemoteReadResponse + ) { + self.accept_response("read", network, peer, response.id, |request, checker| match request.data { RequestData::RemoteRead(request, sender) => { - match self.checker.check_read_proof(&request, response.proof) { + match checker.check_read_proof(&request, response.proof) { Ok(response) => { // we do not bother if receiver has been dropped already let _ = sender.send(Ok(response)); @@ -353,7 +325,7 @@ impl OnDemandService for OnDemand where ), }}, RequestData::RemoteReadChild(request, sender) => { - match self.checker.check_read_child_proof(&request, response.proof) { + match checker.check_read_child_proof(&request, response.proof) { Ok(response) => { // we do not bother if receiver has been dropped already let _ = sender.send(Ok(response)); @@ -368,9 +340,14 @@ impl OnDemandService for OnDemand where }) } - fn on_remote_call_response(&self, peer: PeerId, response: message::RemoteCallResponse) { - self.accept_response("call", peer, response.id, |request| match request.data { - RequestData::RemoteCall(request, sender) => match self.checker.check_execution_proof(&request, response.proof) { + pub fn on_remote_call_response( + &mut self, + network: impl OnDemandNetwork, + peer: PeerId, + response: message::RemoteCallResponse + ) { + self.accept_response("call", network, peer, response.id, |request, checker| match request.data { + RequestData::RemoteCall(request, sender) => match checker.check_execution_proof(&request, response.proof) { Ok(response) => { // we do not bother if receiver has been dropped already let _ = sender.send(Ok(response)); @@ -382,9 +359,14 @@ impl OnDemandService for OnDemand where }) } - fn on_remote_changes_response(&self, peer: PeerId, response: message::RemoteChangesResponse, B::Hash>) { - self.accept_response("changes", peer, response.id, |request| match request.data { - RequestData::RemoteChanges(request, sender) => match self.checker.check_changes_proof( + pub fn on_remote_changes_response( + &mut self, + network: impl OnDemandNetwork, + peer: PeerId, + response: message::RemoteChangesResponse, B::Hash> + ) { + self.accept_response("changes", network, peer, response.id, |request, checker| match request.data { + RequestData::RemoteChanges(request, sender) => match checker.check_changes_proof( &request, ChangesProof { max_block: response.max, proof: response.proof, @@ -402,8 +384,13 @@ impl OnDemandService for OnDemand where }) } - fn on_remote_body_response(&self, peer: PeerId, response: message::BlockResponse) { - self.accept_response("body", peer, response.id, |request| match request.data { + pub fn on_remote_body_response( + &mut self, + network: impl OnDemandNetwork, + peer: PeerId, + response: message::BlockResponse + ) { + self.accept_response("body", network, peer, response.id, |request, checker| match request.data { RequestData::RemoteBody(request, sender) => { let mut bodies: Vec<_> = response .blocks @@ -420,7 +407,7 @@ impl OnDemandService for OnDemand where } let body = bodies.remove(0); - match self.checker.check_body_proof(&request, body) { + match checker.check_body_proof(&request, body) { Ok(body) => { let _ = sender.send(Ok(body)); Accept::Ok @@ -432,83 +419,21 @@ impl OnDemandService for OnDemand where }) } - fn is_on_demand_response(&self, peer: &PeerId, request_id: message::RequestId) -> bool { - let core = self.core.lock(); - core.is_pending_request(&peer, request_id) - } -} - -impl Fetcher for OnDemand where - B: BlockT, - B::Header: HeaderT, -{ - type RemoteHeaderResult = RemoteResponse; - type RemoteReadResult = RemoteResponse>>; - type RemoteCallResult = RemoteResponse>; - type RemoteChangesResult = RemoteResponse, u32)>>; - type RemoteBodyResult = RemoteResponse>; - - fn remote_header(&self, request: RemoteHeaderRequest) -> Self::RemoteHeaderResult { - let (sender, receiver) = channel(); - self.schedule_request(request.retry_count.clone(), RequestData::RemoteHeader(request, sender), - RemoteResponse { receiver }) - } - - fn remote_read(&self, request: RemoteReadRequest) -> Self::RemoteReadResult { - let (sender, receiver) = channel(); - self.schedule_request( - request.retry_count.clone(), - RequestData::RemoteRead(request, sender), - RemoteResponse { receiver } - ) - } - - fn remote_read_child( - &self, - request: RemoteReadChildRequest - ) -> Self::RemoteReadResult { - let (sender, receiver) = channel(); - self.schedule_request( - request.retry_count.clone(), - RequestData::RemoteReadChild(request, sender), - RemoteResponse { receiver } - ) - } - - fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult { - let (sender, receiver) = channel(); - self.schedule_request(request.retry_count.clone(), RequestData::RemoteCall(request, sender), - RemoteResponse { receiver }) - } - - fn remote_changes(&self, request: RemoteChangesRequest) -> Self::RemoteChangesResult { - let (sender, receiver) = channel(); - self.schedule_request(request.retry_count.clone(), RequestData::RemoteChanges(request, sender), - RemoteResponse { receiver }) - } - - fn remote_body(&self, request: RemoteBodyRequest) -> Self::RemoteBodyResult { - let (sender, receiver) = channel(); - self.schedule_request(request.retry_count.clone(), RequestData::RemoteBody(request, sender), - RemoteResponse { receiver }) - } -} - -impl OnDemandCore where - B: BlockT, - B::Header: HeaderT, -{ - fn is_pending_request(&self, peer: &PeerId, request_id: message::RequestId) -> bool { + pub fn is_on_demand_response(&self, peer: &PeerId, request_id: message::RequestId) -> bool { self.active_peers.get(&peer).map_or(false, |r| r.id == request_id) } - pub fn add_peer(&mut self, peer: PeerId, best_number: NumberFor) { - self.idle_peers.push_back(peer.clone()); - self.best_blocks.insert(peer, best_number); - } - - pub fn update_peer(&mut self, peer: PeerId, best_number: NumberFor) { - self.best_blocks.insert(peer, best_number); + fn remove(&mut self, peer: PeerId, id: u64) -> Option> { + match self.active_peers.entry(peer.clone()) { + Entry::Occupied(entry) => match entry.get().id == id { + true => { + self.idle_peers.push_back(peer); + Some(entry.remove()) + }, + false => None, + }, + Entry::Vacant(_) => None, + } } pub fn remove_peer(&mut self, peer: PeerId) { @@ -524,48 +449,8 @@ impl OnDemandCore where } } - pub fn maintain_peers(&mut self) -> Vec { - let now = Instant::now(); - let mut bad_peers = Vec::new(); - loop { - match self.active_peers.front() { - Some((_, request)) if now - request.timestamp >= REQUEST_TIMEOUT => (), - _ => return bad_peers, - } - - let (bad_peer, request) = self.active_peers.pop_front().expect("front() is Some as checked above"); - self.pending_requests.push_front(request); - bad_peers.push(bad_peer); - } - } - - pub fn insert(&mut self, retry_count: usize, data: RequestData) { - let request_id = self.next_request_id; - self.next_request_id += 1; - - self.pending_requests.push_back(Request { - id: request_id, - timestamp: Instant::now(), - retry_count, - data, - }); - } - - pub fn remove(&mut self, peer: PeerId, id: u64) -> Option> { - match self.active_peers.entry(peer.clone()) { - Entry::Occupied(entry) => match entry.get().id == id { - true => { - self.idle_peers.push_back(peer); - Some(entry.remove()) - }, - false => None, - }, - Entry::Vacant(_) => None, - } - } - - pub fn dispatch(&mut self, on_demand: &OnDemand) { - + /// Dispatches pending requests. + fn dispatch(&mut self, mut network: impl OnDemandNetwork) { let mut last_peer = self.idle_peers.back().cloned(); let mut unhandled_requests = VecDeque::new(); @@ -610,7 +495,7 @@ impl OnDemandCore where let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed"); request.timestamp = Instant::now(); trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer); - on_demand.send_request(&peer, request.message()); + network.send_request(&peer, request.message()); self.active_peers.insert(peer, request); } @@ -619,7 +504,7 @@ impl OnDemandCore where } impl Request { - pub fn required_block(&self) -> NumberFor { + fn required_block(&self) -> NumberFor { match self.data { RequestData::RemoteHeader(ref data, _) => data.block, RequestData::RemoteRead(ref data, _) => *data.header.number(), @@ -630,7 +515,7 @@ impl Request { } } - pub fn message(&self) -> message::Message { + fn message(&self) -> message::Message { match self.data { RequestData::RemoteHeader(ref data, _) => message::generic::Message::RemoteHeaderRequest(message::RemoteHeaderRequest { @@ -682,7 +567,7 @@ impl Request { } impl RequestData { - pub fn fail(self, error: ClientError) { + fn fail(self, error: ClientError) { // don't care if anyone is listening match self { RequestData::RemoteHeader(_, sender) => { let _ = sender.send(Err(error)); }, @@ -698,21 +583,20 @@ impl RequestData { #[cfg(test)] pub mod tests { use std::collections::HashSet; - use std::sync::{Arc, Mutex}; + use std::sync::Arc; use std::time::Instant; - use futures::Future; + use futures::{Future, sync::oneshot}; use runtime_primitives::traits::{Block as BlockT, NumberFor}; use client::{error::{Error as ClientError, Result as ClientResult}}; - use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest, + use client::light::fetcher::{FetchChecker, RemoteHeaderRequest, ChangesProof, RemoteCallRequest, RemoteReadRequest, RemoteReadChildRequest, RemoteChangesRequest, RemoteBodyRequest}; use crate::config::Roles; use crate::message; use network_libp2p::PeerId; - use super::{REQUEST_TIMEOUT, OnDemand, OnDemandNetwork, OnDemandService}; + use super::{REQUEST_TIMEOUT, OnDemandCore, OnDemandNetwork, RequestData}; use test_client::runtime::{changes_trie_config, Block, Extrinsic, Header}; - pub struct DummyExecutor; struct DummyFetchChecker { ok: bool } impl FetchChecker for DummyFetchChecker { @@ -776,19 +660,21 @@ pub mod tests { } } - fn dummy(ok: bool) -> (Arc, Arc>) { - let executor = Arc::new(DummyExecutor); - let service = Arc::new(OnDemand::new(Arc::new(DummyFetchChecker { ok }))); - (executor, service) + fn dummy(ok: bool) -> OnDemandCore { + OnDemandCore::new(Arc::new(DummyFetchChecker { ok })) } - fn total_peers(on_demand: &OnDemand) -> usize { - let core = on_demand.core.lock(); - core.idle_peers.len() + core.active_peers.len() + fn total_peers(on_demand: &OnDemandCore) -> usize { + on_demand.idle_peers.len() + on_demand.active_peers.len() } - fn receive_call_response(on_demand: &OnDemand, peer: PeerId, id: message::RequestId) { - on_demand.on_remote_call_response(peer, message::RemoteCallResponse { + fn receive_call_response( + network_interface: impl OnDemandNetwork, + on_demand: &mut OnDemandCore, + peer: PeerId, + id: message::RequestId + ) { + on_demand.on_remote_call_response(network_interface, peer, message::RemoteCallResponse { id: id, proof: vec![vec![2]], }); @@ -806,152 +692,149 @@ pub mod tests { #[derive(Default)] struct DummyNetwork { - disconnected_peers: Mutex>, + disconnected_peers: HashSet, } - impl OnDemandNetwork for Arc { - fn report_peer(&self, _: &PeerId, _: i32) {} - fn disconnect_peer(&self, who: &PeerId) { - self.disconnected_peers.lock().unwrap().insert(who.clone()); + impl<'a, B: BlockT> OnDemandNetwork for &'a mut DummyNetwork { + fn report_peer(&mut self, _: &PeerId, _: i32) {} + fn disconnect_peer(&mut self, who: &PeerId) { + self.disconnected_peers.insert(who.clone()); } - fn send_request(&self, _: &PeerId, _: message::Message) {} + fn send_request(&mut self, _: &PeerId, _: message::Message) {} } - fn assert_disconnected_peer(dummy: Arc) { - assert_eq!(dummy.disconnected_peers.lock().unwrap().len(), 1); + fn assert_disconnected_peer(dummy: &DummyNetwork) { + assert_eq!(dummy.disconnected_peers.len(), 1); } #[test] fn knows_about_peers_roles() { - let (_, on_demand) = dummy(true); + let mut network_interface = DummyNetwork::default(); + let mut on_demand = dummy(true); let peer0 = PeerId::random(); let peer1 = PeerId::random(); let peer2 = PeerId::random(); - on_demand.on_connect(peer0, Roles::LIGHT, 1000); - on_demand.on_connect(peer1.clone(), Roles::FULL, 2000); - on_demand.on_connect(peer2.clone(), Roles::AUTHORITY, 3000); - assert_eq!(vec![peer1.clone(), peer2.clone()], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); - assert_eq!(on_demand.core.lock().best_blocks.get(&peer1), Some(&2000)); - assert_eq!(on_demand.core.lock().best_blocks.get(&peer2), Some(&3000)); + on_demand.on_connect(&mut network_interface, peer0, Roles::LIGHT, 1000); + on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 2000); + on_demand.on_connect(&mut network_interface, peer2.clone(), Roles::AUTHORITY, 3000); + assert_eq!(vec![peer1.clone(), peer2.clone()], on_demand.idle_peers.iter().cloned().collect::>()); + assert_eq!(on_demand.best_blocks.get(&peer1), Some(&2000)); + assert_eq!(on_demand.best_blocks.get(&peer2), Some(&3000)); } #[test] fn disconnects_from_idle_peer() { let peer0 = PeerId::random(); - let (_, on_demand) = dummy(true); - on_demand.on_connect(peer0.clone(), Roles::FULL, 100); - assert_eq!(1, total_peers(&*on_demand)); - assert!(!on_demand.core.lock().best_blocks.is_empty()); + let mut network_interface = DummyNetwork::default(); + let mut on_demand = dummy(true); + on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 100); + assert_eq!(1, total_peers(&on_demand)); + assert!(!on_demand.best_blocks.is_empty()); - on_demand.on_disconnect(peer0); - assert_eq!(0, total_peers(&*on_demand)); - assert!(on_demand.core.lock().best_blocks.is_empty()); + on_demand.on_disconnect(&mut network_interface, peer0); + assert_eq!(0, total_peers(&on_demand)); + assert!(on_demand.best_blocks.is_empty()); } #[test] fn disconnects_from_timeouted_peer() { - let (_x, on_demand) = dummy(true); - let network_interface = Arc::new(DummyNetwork::default()); + let mut on_demand = dummy(true); + let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); let peer1 = PeerId::random(); - on_demand.set_network_interface(Box::new(network_interface.clone())); - on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); - on_demand.on_connect(peer1.clone(), Roles::FULL, 1000); - assert_eq!(vec![peer0.clone(), peer1.clone()], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); - assert!(on_demand.core.lock().active_peers.is_empty()); + on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); + on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 1000); + assert_eq!(vec![peer0.clone(), peer1.clone()], on_demand.idle_peers.iter().cloned().collect::>()); + assert!(on_demand.active_peers.is_empty()); - on_demand.remote_call(RemoteCallRequest { + on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { block: Default::default(), header: dummy_header(), method: "test".into(), call_data: vec![], retry_count: None, - }); - assert_eq!(vec![peer1.clone()], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); - assert_eq!(vec![peer0.clone()], on_demand.core.lock().active_peers.keys().cloned().collect::>()); - - on_demand.core.lock().active_peers[&peer0].timestamp = Instant::now() - REQUEST_TIMEOUT - REQUEST_TIMEOUT; - on_demand.maintain_peers(); - assert!(on_demand.core.lock().idle_peers.is_empty()); - assert_eq!(vec![peer1.clone()], on_demand.core.lock().active_peers.keys().cloned().collect::>()); - assert_disconnected_peer(network_interface); + }, oneshot::channel().0)); + assert_eq!(vec![peer1.clone()], on_demand.idle_peers.iter().cloned().collect::>()); + assert_eq!(vec![peer0.clone()], on_demand.active_peers.keys().cloned().collect::>()); + + on_demand.active_peers[&peer0].timestamp = Instant::now() - REQUEST_TIMEOUT - REQUEST_TIMEOUT; + on_demand.maintain_peers(&mut network_interface); + assert!(on_demand.idle_peers.is_empty()); + assert_eq!(vec![peer1.clone()], on_demand.active_peers.keys().cloned().collect::>()); + assert_disconnected_peer(&network_interface); } #[test] fn disconnects_from_peer_on_response_with_wrong_id() { - let (_x, on_demand) = dummy(true); + let mut on_demand = dummy(true); let peer0 = PeerId::random(); - let network_interface = Arc::new(DummyNetwork::default()); - on_demand.set_network_interface(Box::new(network_interface.clone())); - on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); + let mut network_interface = DummyNetwork::default(); + on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - on_demand.remote_call(RemoteCallRequest { + on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { block: Default::default(), header: dummy_header(), method: "test".into(), call_data: vec![], retry_count: None, - }); - receive_call_response(&*on_demand, peer0, 1); - assert_disconnected_peer(network_interface); - assert_eq!(on_demand.core.lock().pending_requests.len(), 1); + }, oneshot::channel().0)); + receive_call_response(&mut network_interface, &mut on_demand, peer0, 1); + assert_disconnected_peer(&network_interface); + assert_eq!(on_demand.pending_requests.len(), 1); } #[test] fn disconnects_from_peer_on_incorrect_response() { - let (_x, on_demand) = dummy(false); - let network_interface = Arc::new(DummyNetwork::default()); + let mut on_demand = dummy(false); + let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); - on_demand.set_network_interface(Box::new(network_interface.clone())); - on_demand.remote_call(RemoteCallRequest { + on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { block: Default::default(), header: dummy_header(), method: "test".into(), call_data: vec![], retry_count: Some(1), - }); + }, oneshot::channel().0)); - on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); - receive_call_response(&*on_demand, peer0.clone(), 0); - assert_disconnected_peer(network_interface); - assert_eq!(on_demand.core.lock().pending_requests.len(), 1); + on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); + receive_call_response(&mut network_interface, &mut on_demand, peer0.clone(), 0); + assert_disconnected_peer(&network_interface); + assert_eq!(on_demand.pending_requests.len(), 1); } #[test] fn disconnects_from_peer_on_unexpected_response() { - let (_x, on_demand) = dummy(true); - let network_interface = Arc::new(DummyNetwork::default()); + let mut on_demand = dummy(true); + let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); - on_demand.set_network_interface(Box::new(network_interface.clone())); - on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); + on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - receive_call_response(&*on_demand, peer0, 0); - assert_disconnected_peer(network_interface); + receive_call_response(&mut network_interface, &mut on_demand, peer0, 0); + assert_disconnected_peer(&network_interface); } #[test] fn disconnects_from_peer_on_wrong_response_type() { - let (_x, on_demand) = dummy(false); + let mut on_demand = dummy(false); let peer0 = PeerId::random(); - let network_interface = Arc::new(DummyNetwork::default()); - on_demand.set_network_interface(Box::new(network_interface.clone())); - on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); + let mut network_interface = DummyNetwork::default(); + on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - on_demand.remote_call(RemoteCallRequest { + on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { block: Default::default(), header: dummy_header(), method: "test".into(), call_data: vec![], retry_count: Some(1), - }); + }, oneshot::channel().0)); - on_demand.on_remote_read_response(peer0.clone(), message::RemoteReadResponse { + on_demand.on_remote_read_response(&mut network_interface, peer0.clone(), message::RemoteReadResponse { id: 0, proof: vec![vec![2]], }); - assert_disconnected_peer(network_interface); - assert_eq!(on_demand.core.lock().pending_requests.len(), 1); + assert_disconnected_peer(&network_interface); + assert_eq!(on_demand.pending_requests.len(), 1); } #[test] @@ -960,26 +843,26 @@ pub mod tests { let retry_count = 2; let peer_ids = (0 .. retry_count + 1).map(|_| PeerId::random()).collect::>(); - let (_x, on_demand) = dummy(false); - let network_interface = Arc::new(DummyNetwork::default()); - on_demand.set_network_interface(Box::new(network_interface.clone())); + let mut on_demand = dummy(false); + let mut network_interface = DummyNetwork::default(); for i in 0..retry_count+1 { - on_demand.on_connect(peer_ids[i].clone(), Roles::FULL, 1000); + on_demand.on_connect(&mut network_interface, peer_ids[i].clone(), Roles::FULL, 1000); } let sync = Arc::new((Mutex::new(0), Mutex::new(0), Condvar::new())); let thread_sync = sync.clone(); - let response = on_demand.remote_call(RemoteCallRequest { + let (tx, response) = oneshot::channel(); + on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { block: Default::default(), header: dummy_header(), method: "test".into(), call_data: vec![], retry_count: Some(retry_count) - }); + }, tx)); let thread = ::std::thread::spawn(move || { let &(ref current, ref finished_at, ref finished) = &*thread_sync; - let _ = response.wait().unwrap_err(); + let _ = response.wait().unwrap().unwrap_err(); *finished_at.lock() = *current.lock(); finished.notify_one(); }); @@ -988,7 +871,7 @@ pub mod tests { for i in 0..retry_count+1 { let mut current = current.lock(); *current = *current + 1; - receive_call_response(&*on_demand, peer_ids[i].clone(), i as u64); + receive_call_response(&mut network_interface, &mut on_demand, peer_ids[i].clone(), i as u64); } let mut finished_at = finished_at.lock(); @@ -1000,48 +883,48 @@ pub mod tests { #[test] fn receives_remote_call_response() { - let (_x, on_demand) = dummy(true); - let network_interface = Arc::new(DummyNetwork::default()); + let mut on_demand = dummy(true); + let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); - on_demand.set_network_interface(Box::new(network_interface.clone())); - on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); + on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - let response = on_demand.remote_call(RemoteCallRequest { + let (tx, response) = oneshot::channel(); + on_demand.add_request(&mut network_interface, RequestData::RemoteCall(RemoteCallRequest { block: Default::default(), header: dummy_header(), method: "test".into(), call_data: vec![], retry_count: None, - }); + }, tx)); let thread = ::std::thread::spawn(move || { - let result = response.wait().unwrap(); + let result = response.wait().unwrap().unwrap(); assert_eq!(result, vec![42]); }); - receive_call_response(&*on_demand, peer0.clone(), 0); + receive_call_response(&mut network_interface, &mut on_demand, peer0.clone(), 0); thread.join().unwrap(); } #[test] fn receives_remote_read_response() { - let (_x, on_demand) = dummy(true); - let network_interface = Arc::new(DummyNetwork::default()); + let mut on_demand = dummy(true); + let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); - on_demand.set_network_interface(Box::new(network_interface.clone())); - on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); + on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - let response = on_demand.remote_read(RemoteReadRequest { + let (tx, response) = oneshot::channel(); + on_demand.add_request(&mut network_interface, RequestData::RemoteRead(RemoteReadRequest { header: dummy_header(), block: Default::default(), key: b":key".to_vec(), retry_count: None, - }); + }, tx)); let thread = ::std::thread::spawn(move || { - let result = response.wait().unwrap(); + let result = response.wait().unwrap().unwrap(); assert_eq!(result, Some(vec![42])); }); - on_demand.on_remote_read_response(peer0.clone(), message::RemoteReadResponse { + on_demand.on_remote_read_response(&mut network_interface, peer0.clone(), message::RemoteReadResponse { id: 0, proof: vec![vec![2]], }); @@ -1050,25 +933,25 @@ pub mod tests { #[test] fn receives_remote_read_child_response() { - let (_x, on_demand) = dummy(true); - let network_interface = Arc::new(DummyNetwork::default()); + let mut on_demand = dummy(true); + let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); - on_demand.set_network_interface(Box::new(network_interface.clone())); - on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); + on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - let response = on_demand.remote_read_child(RemoteReadChildRequest { + let (tx, response) = oneshot::channel(); + on_demand.add_request(&mut network_interface, RequestData::RemoteReadChild(RemoteReadChildRequest { header: dummy_header(), block: Default::default(), storage_key: b":child_storage:sub".to_vec(), key: b":key".to_vec(), retry_count: None, - }); + }, tx)); let thread = ::std::thread::spawn(move || { - let result = response.wait().unwrap(); + let result = response.wait().unwrap().unwrap(); assert_eq!(result, Some(vec![42])); }); - on_demand.on_remote_read_response( + on_demand.on_remote_read_response(&mut network_interface, peer0.clone(), message::RemoteReadResponse { id: 0, proof: vec![vec![2]], @@ -1078,19 +961,19 @@ pub mod tests { #[test] fn receives_remote_header_response() { - let (_x, on_demand) = dummy(true); - let network_interface = Arc::new(DummyNetwork::default()); + let mut on_demand = dummy(true); + let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); - on_demand.set_network_interface(Box::new(network_interface.clone())); - on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); + on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - let response = on_demand.remote_header(RemoteHeaderRequest { + let (tx, response) = oneshot::channel(); + on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 1, retry_count: None, - }); + }, tx)); let thread = ::std::thread::spawn(move || { - let result = response.wait().unwrap(); + let result = response.wait().unwrap().unwrap(); assert_eq!( result.hash(), "6443a0b46e0412e626363028115a9f2c\ @@ -1098,7 +981,7 @@ pub mod tests { ); }); - on_demand.on_remote_header_response(peer0.clone(), message::RemoteHeaderResponse { + on_demand.on_remote_header_response(&mut network_interface, peer0.clone(), message::RemoteHeaderResponse { id: 0, header: Some(Header { parent_hash: Default::default(), @@ -1114,13 +997,13 @@ pub mod tests { #[test] fn receives_remote_changes_response() { - let (_x, on_demand) = dummy(true); - let network_interface = Arc::new(DummyNetwork::default()); + let mut on_demand = dummy(true); + let mut network_interface = DummyNetwork::default(); let peer0 = PeerId::random(); - on_demand.set_network_interface(Box::new(network_interface.clone())); - on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); + on_demand.on_connect(&mut network_interface, peer0.clone(), Roles::FULL, 1000); - let response = on_demand.remote_changes(RemoteChangesRequest { + let (tx, response) = oneshot::channel(); + on_demand.add_request(&mut network_interface, RequestData::RemoteChanges(RemoteChangesRequest { changes_trie_config: changes_trie_config(), first_block: (1, Default::default()), last_block: (100, Default::default()), @@ -1128,13 +1011,13 @@ pub mod tests { tries_roots: (1, Default::default(), vec![]), key: vec![], retry_count: None, - }); + }, tx)); let thread = ::std::thread::spawn(move || { - let result = response.wait().unwrap(); + let result = response.wait().unwrap().unwrap(); assert_eq!(result, vec![(100, 2)]); }); - on_demand.on_remote_changes_response(peer0.clone(), message::RemoteChangesResponse { + on_demand.on_remote_changes_response(&mut network_interface, peer0.clone(), message::RemoteChangesResponse { id: 0, max: 1000, proof: vec![vec![2]], @@ -1146,53 +1029,52 @@ pub mod tests { #[test] fn does_not_sends_request_to_peer_who_has_no_required_block() { - let (_x, on_demand) = dummy(true); - let network_interface = Arc::new(DummyNetwork::default()); + let mut on_demand = dummy(true); + let mut network_interface = DummyNetwork::default(); let peer1 = PeerId::random(); let peer2 = PeerId::random(); - on_demand.set_network_interface(Box::new(network_interface.clone())); - on_demand.on_connect(peer1.clone(), Roles::FULL, 100); + on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 100); - on_demand.remote_header(RemoteHeaderRequest { + on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 200, retry_count: None, - }); - on_demand.remote_header(RemoteHeaderRequest { + }, oneshot::channel().0)); + on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 250, retry_count: None, - }); - on_demand.remote_header(RemoteHeaderRequest { + }, oneshot::channel().0)); + on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 250, retry_count: None, - }); + }, oneshot::channel().0)); - on_demand.on_connect(peer2.clone(), Roles::FULL, 150); + on_demand.on_connect(&mut network_interface, peer2.clone(), Roles::FULL, 150); - assert_eq!(vec![peer1.clone(), peer2.clone()], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); - assert_eq!(on_demand.core.lock().pending_requests.len(), 3); + assert_eq!(vec![peer1.clone(), peer2.clone()], on_demand.idle_peers.iter().cloned().collect::>()); + assert_eq!(on_demand.pending_requests.len(), 3); - on_demand.on_block_announce(peer1.clone(), 250); + on_demand.on_block_announce(&mut network_interface, peer1.clone(), 250); - assert_eq!(vec![peer2.clone()], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); - assert_eq!(on_demand.core.lock().pending_requests.len(), 2); + assert_eq!(vec![peer2.clone()], on_demand.idle_peers.iter().cloned().collect::>()); + assert_eq!(on_demand.pending_requests.len(), 2); - on_demand.on_block_announce(peer2.clone(), 250); + on_demand.on_block_announce(&mut network_interface, peer2.clone(), 250); - assert!(!on_demand.core.lock().idle_peers.iter().any(|_| true)); - assert_eq!(on_demand.core.lock().pending_requests.len(), 1); + assert!(!on_demand.idle_peers.iter().any(|_| true)); + assert_eq!(on_demand.pending_requests.len(), 1); - on_demand.on_remote_header_response(peer1.clone(), message::RemoteHeaderResponse { + on_demand.on_remote_header_response(&mut network_interface, peer1.clone(), message::RemoteHeaderResponse { id: 0, header: Some(dummy_header()), proof: vec![], }); - assert!(!on_demand.core.lock().idle_peers.iter().any(|_| true)); - assert_eq!(on_demand.core.lock().pending_requests.len(), 0); + assert!(!on_demand.idle_peers.iter().any(|_| true)); + assert_eq!(on_demand.pending_requests.len(), 0); } #[test] @@ -1200,73 +1082,70 @@ pub mod tests { // this test is a regression for a bug where the dispatch function would // loop forever after dispatching a request to the last peer, since the // last peer was not updated - let (_x, on_demand) = dummy(true); - let network_interface = Arc::new(DummyNetwork::default()); + let mut on_demand = dummy(true); + let mut network_interface = DummyNetwork::default(); let peer1 = PeerId::random(); let peer2 = PeerId::random(); let peer3 = PeerId::random(); - on_demand.set_network_interface(Box::new(network_interface.clone())); - on_demand.remote_header(RemoteHeaderRequest { + on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 250, retry_count: None, - }); - on_demand.remote_header(RemoteHeaderRequest { + }, oneshot::channel().0)); + on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 250, retry_count: None, - }); + }, oneshot::channel().0)); - on_demand.on_connect(peer1.clone(), Roles::FULL, 200); - on_demand.on_connect(peer2.clone(), Roles::FULL, 200); - on_demand.on_connect(peer3.clone(), Roles::FULL, 250); + on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 200); + on_demand.on_connect(&mut network_interface, peer2.clone(), Roles::FULL, 200); + on_demand.on_connect(&mut network_interface, peer3.clone(), Roles::FULL, 250); - assert_eq!(vec![peer1.clone(), peer2.clone()], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); - assert_eq!(on_demand.core.lock().pending_requests.len(), 1); + assert_eq!(vec![peer1.clone(), peer2.clone()], on_demand.idle_peers.iter().cloned().collect::>()); + assert_eq!(on_demand.pending_requests.len(), 1); } #[test] fn tries_to_send_all_pending_requests() { - let (_x, on_demand) = dummy(true); - let network_interface = Arc::new(DummyNetwork::default()); + let mut on_demand = dummy(true); + let mut network_interface = DummyNetwork::default(); let peer1 = PeerId::random(); - on_demand.set_network_interface(Box::new(network_interface.clone())); - on_demand.remote_header(RemoteHeaderRequest { + on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 300, retry_count: None, - }); - on_demand.remote_header(RemoteHeaderRequest { + }, oneshot::channel().0)); + on_demand.add_request(&mut network_interface, RequestData::RemoteHeader(RemoteHeaderRequest { cht_root: Default::default(), block: 250, retry_count: None, - }); + }, oneshot::channel().0)); - on_demand.on_connect(peer1.clone(), Roles::FULL, 250); + on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250); - assert!(on_demand.core.lock().idle_peers.iter().cloned().collect::>().is_empty()); - assert_eq!(on_demand.core.lock().pending_requests.len(), 1); + assert!(on_demand.idle_peers.iter().cloned().collect::>().is_empty()); + assert_eq!(on_demand.pending_requests.len(), 1); } #[test] fn remote_body_with_one_block_body_should_succeed() { - let (_x, on_demand) = dummy(true); - let network_interface = Arc::new(DummyNetwork::default()); + let mut on_demand = dummy(true); + let mut network_interface = DummyNetwork::default(); let peer1 = PeerId::random(); - on_demand.set_network_interface(Box::new(network_interface.clone())); let header = dummy_header(); - on_demand.on_connect(peer1.clone(), Roles::FULL, 250); + on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250); - on_demand.remote_body(RemoteBodyRequest { + on_demand.add_request(&mut network_interface, RequestData::RemoteBody(RemoteBodyRequest { header: header.clone(), retry_count: None, - }); + }, oneshot::channel().0)); - assert!(on_demand.core.lock().pending_requests.is_empty()); - assert_eq!(on_demand.core.lock().active_peers.len(), 1); + assert!(on_demand.pending_requests.is_empty()); + assert_eq!(on_demand.active_peers.len(), 1); let block = message::BlockData:: { hash: primitives::H256::random(), @@ -1282,29 +1161,28 @@ pub mod tests { blocks: vec![block], }; - on_demand.on_remote_body_response(peer1.clone(), response); + on_demand.on_remote_body_response(&mut network_interface, peer1.clone(), response); - assert!(on_demand.core.lock().active_peers.is_empty()); - assert_eq!(on_demand.core.lock().idle_peers.len(), 1); + assert!(on_demand.active_peers.is_empty()); + assert_eq!(on_demand.idle_peers.len(), 1); } #[test] fn remote_body_with_three_bodies_should_fail() { - let (_x, on_demand) = dummy(true); - let network_interface = Arc::new(DummyNetwork::default()); + let mut on_demand = dummy(true); + let mut network_interface = DummyNetwork::default(); let peer1 = PeerId::random(); - on_demand.set_network_interface(Box::new(network_interface.clone())); let header = dummy_header(); - on_demand.on_connect(peer1.clone(), Roles::FULL, 250); + on_demand.on_connect(&mut network_interface, peer1.clone(), Roles::FULL, 250); - on_demand.remote_body(RemoteBodyRequest { + on_demand.add_request(&mut network_interface, RequestData::RemoteBody(RemoteBodyRequest { header: header.clone(), retry_count: None, - }); + }, oneshot::channel().0)); - assert!(on_demand.core.lock().pending_requests.is_empty()); - assert_eq!(on_demand.core.lock().active_peers.len(), 1); + assert!(on_demand.pending_requests.is_empty()); + assert_eq!(on_demand.active_peers.len(), 1); let response = { let blocks: Vec<_> = (0..3).map(|_| message::BlockData:: { @@ -1322,8 +1200,8 @@ pub mod tests { } }; - on_demand.on_remote_body_response(peer1.clone(), response); - assert!(on_demand.core.lock().active_peers.is_empty()); - assert!(on_demand.core.lock().idle_peers.is_empty(), "peer should be disconnected after bad response"); + on_demand.on_remote_body_response(&mut network_interface, peer1.clone(), response); + assert!(on_demand.active_peers.is_empty()); + assert!(on_demand.idle_peers.is_empty(), "peer should be disconnected after bad response"); } } diff --git a/substrate/core/network/src/on_demand_layer.rs b/substrate/core/network/src/on_demand_layer.rs new file mode 100644 index 00000000000..95f9f4d67b4 --- /dev/null +++ b/substrate/core/network/src/on_demand_layer.rs @@ -0,0 +1,149 @@ +// Copyright 2017-2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! On-demand requests service. + +use crate::on_demand::RequestData; +use std::sync::Arc; +use futures::{prelude::*, sync::mpsc, sync::oneshot}; +use parking_lot::Mutex; +use client::error::Error as ClientError; +use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest, + RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, + RemoteReadChildRequest, RemoteBodyRequest}; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; + +/// Implements the `Fetcher` trait of the client. Makes it possible for the light client to perform +/// network requests for some state. +/// +/// This implementation stores all the requests in a queue. The network, in parallel, is then +/// responsible for pulling elements out of that queue and fulfilling them. +pub struct OnDemand { + /// Objects that checks whether what has been retrieved is correct. + checker: Arc>, + + /// Queue of requests. Set to `Some` at initialization, then extracted by the network. + /// + /// Note that a better alternative would be to use a MPMC queue here, and add a `poll` method + /// from the `OnDemand`. However there exists no popular implementation of MPMC channels in + /// asynchronous Rust at the moment + requests_queue: Mutex>>>, + + /// Sending side of `requests_queue`. + requests_send: mpsc::UnboundedSender>, +} + +impl OnDemand where + B::Header: HeaderT, +{ + /// Creates new on-demand service. + pub fn new(checker: Arc>) -> Self { + let (requests_send, requests_queue) = mpsc::unbounded(); + let requests_queue = Mutex::new(Some(requests_queue)); + + OnDemand { + checker, + requests_queue, + requests_send, + } + } + + /// Get checker reference. + pub fn checker(&self) -> &Arc> { + &self.checker + } + + /// Extracts the queue of requests. + /// + /// Whenever one of the methods of the `Fetcher` trait is called, an element is pushed on this + /// channel. + /// + /// If this function returns `None`, that means that the receiver has already been extracted in + /// the past, and therefore that something already handles the requests. + pub(crate) fn extract_receiver(&self) -> Option>> { + self.requests_queue.lock().take() + } +} + +impl Fetcher for OnDemand where + B: BlockT, + B::Header: HeaderT, +{ + type RemoteHeaderResult = RemoteResponse; + type RemoteReadResult = RemoteResponse>>; + type RemoteCallResult = RemoteResponse>; + type RemoteChangesResult = RemoteResponse, u32)>>; + type RemoteBodyResult = RemoteResponse>; + + fn remote_header(&self, request: RemoteHeaderRequest) -> Self::RemoteHeaderResult { + let (sender, receiver) = oneshot::channel(); + let _ = self.requests_send.unbounded_send(RequestData::RemoteHeader(request, sender)); + RemoteResponse { receiver } + } + + fn remote_read(&self, request: RemoteReadRequest) -> Self::RemoteReadResult { + let (sender, receiver) = oneshot::channel(); + let _ = self.requests_send.unbounded_send(RequestData::RemoteRead(request, sender)); + RemoteResponse { receiver } + } + + fn remote_read_child( + &self, + request: RemoteReadChildRequest + ) -> Self::RemoteReadResult { + let (sender, receiver) = oneshot::channel(); + let _ = self.requests_send.unbounded_send(RequestData::RemoteReadChild(request, sender)); + RemoteResponse { receiver } + } + + fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult { + let (sender, receiver) = oneshot::channel(); + let _ = self.requests_send.unbounded_send(RequestData::RemoteCall(request, sender)); + RemoteResponse { receiver } + } + + fn remote_changes(&self, request: RemoteChangesRequest) -> Self::RemoteChangesResult { + let (sender, receiver) = oneshot::channel(); + let _ = self.requests_send.unbounded_send(RequestData::RemoteChanges(request, sender)); + RemoteResponse { receiver } + } + + fn remote_body(&self, request: RemoteBodyRequest) -> Self::RemoteBodyResult { + let (sender, receiver) = oneshot::channel(); + let _ = self.requests_send.unbounded_send(RequestData::RemoteBody(request, sender)); + RemoteResponse { receiver } + } +} + +/// Future for an on-demand remote call response. +pub struct RemoteResponse { + receiver: oneshot::Receiver>, +} + +impl Future for RemoteResponse { + type Item = T; + type Error = ClientError; + + fn poll(&mut self) -> Poll { + self.receiver.poll() + .map_err(|_| ClientError::RemoteFetchCancelled.into()) + .and_then(|r| match r { + Async::Ready(Ok(ready)) => Ok(Async::Ready(ready)), + Async::Ready(Err(error)) => Err(error), + Async::NotReady => Ok(Async::NotReady), + }) + } +} diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index c9e56362d11..b3102e588e0 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -30,7 +30,7 @@ use crate::message::{ }; use crate::message::generic::{Message as GenericMessage, ConsensusMessage}; use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; -use crate::on_demand::OnDemandService; +use crate::on_demand::{OnDemandCore, OnDemandNetwork, RequestData}; use crate::specialization::NetworkSpecialization; use crate::sync::{ChainSync, Context as SyncContext, Status as SyncStatus, SyncState}; use crate::service::{TransactionPool, ExHashT}; @@ -41,7 +41,7 @@ use std::sync::Arc; use std::{cmp, num::NonZeroUsize, time}; use log::{trace, debug, warn, error}; use crate::chain::{Client, FinalityProofProvider}; -use client::light::fetcher::ChangesProof; +use client::light::fetcher::{FetchChecker, ChangesProof}; use crate::{error, util::LruHashSet}; const REQUEST_TIMEOUT_SEC: u64 = 40; @@ -83,7 +83,8 @@ pub struct Protocol, H: ExHashT> { /// Interval at which we call `propagate_extrinsics`. propagate_timeout: tokio::timer::Interval, config: ProtocolConfig, - on_demand: Option>>, + /// Handler for on-demand requests. + on_demand_core: OnDemandCore, genesis_hash: B::Hash, sync: ChainSync, specialization: S, @@ -159,6 +160,20 @@ pub trait NetworkOut { fn send_message(&mut self, who: PeerId, message: Message); } +impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut { + fn report_peer(&mut self, who: &PeerId, reputation: i32) { + NetworkOut::report_peer(**self, who.clone(), reputation) + } + + fn disconnect_peer(&mut self, who: &PeerId) { + NetworkOut::disconnect_peer(**self, who.clone()) + } + + fn send_request(&mut self, who: &PeerId, message: Message) { + NetworkOut::send_message(**self, who.clone(), message) + } +} + /// Context for a network-specific handler. pub trait Context { /// Adjusts the reputation of the peer. Use this to point out that a peer has been malign or @@ -263,7 +278,7 @@ impl, H: ExHashT> Protocol { pub fn new( config: ProtocolConfig, chain: Arc>, - on_demand: Option>>, + checker: Arc>, specialization: S, ) -> error::Result> { let info = chain.info()?; @@ -276,7 +291,7 @@ impl, H: ExHashT> Protocol { peers: HashMap::new(), chain, }, - on_demand, + on_demand_core: OnDemandCore::new(checker), genesis_hash: info.chain.genesis_hash, sync, specialization: specialization, @@ -307,6 +322,13 @@ impl, H: ExHashT> Protocol { self.sync.status().is_offline() } + /// Starts a new data demand request. + /// + /// The parameter contains a `Sender` where the result, once received, must be sent. + pub(crate) fn add_on_demand_request(&mut self, mut network_out: &mut dyn NetworkOut, rq: RequestData) { + self.on_demand_core.add_request(&mut network_out, rq); + } + pub fn poll( &mut self, network_out: &mut dyn NetworkOut, @@ -324,7 +346,7 @@ impl, H: ExHashT> Protocol { } fn is_on_demand_response(&self, who: &PeerId, response_id: message::RequestId) -> bool { - self.on_demand.as_ref().map_or(false, |od| od.is_on_demand_response(&who, response_id)) + self.on_demand_core.is_on_demand_response(&who, response_id) } fn handle_response( @@ -378,7 +400,7 @@ impl, H: ExHashT> Protocol { GenericMessage::BlockResponse(r) => { // Note, this is safe because only `ordinary bodies` and `remote bodies` are received in this matter. if self.is_on_demand_response(&who, r.id) { - self.on_remote_body_response(who, r); + self.on_remote_body_response(network_out, who, r); } else { if let Some(request) = self.handle_response(network_out, who.clone(), &r) { let outcome = self.on_block_response(network_out, who.clone(), request, r); @@ -394,13 +416,20 @@ impl, H: ExHashT> Protocol { GenericMessage::Transactions(m) => self.on_extrinsics(network_out, transaction_pool, who, m), GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(network_out, who, request), - GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(who, response), - GenericMessage::RemoteReadRequest(request) => self.on_remote_read_request(network_out, who, request), - GenericMessage::RemoteReadResponse(response) => self.on_remote_read_response(who, response), - GenericMessage::RemoteHeaderRequest(request) => self.on_remote_header_request(network_out, who, request), - GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(who, response), - GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(network_out, who, request), - GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(who, response), + GenericMessage::RemoteCallResponse(response) => + self.on_remote_call_response(network_out, who, response), + GenericMessage::RemoteReadRequest(request) => + self.on_remote_read_request(network_out, who, request), + GenericMessage::RemoteReadResponse(response) => + self.on_remote_read_response(network_out, who, response), + GenericMessage::RemoteHeaderRequest(request) => + self.on_remote_header_request(network_out, who, request), + GenericMessage::RemoteHeaderResponse(response) => + self.on_remote_header_response(network_out, who, response), + GenericMessage::RemoteChangesRequest(request) => + self.on_remote_changes_request(network_out, who, request), + GenericMessage::RemoteChangesResponse(response) => + self.on_remote_changes_response(network_out, who, response), GenericMessage::FinalityProofRequest(request) => self.on_finality_proof_request(network_out, who, request, finality_proof_provider), GenericMessage::FinalityProofResponse(response) => @@ -480,7 +509,7 @@ impl, H: ExHashT> Protocol { } /// Called by peer when it is disconnecting - pub fn on_peer_disconnected(&mut self, network_out: &mut dyn NetworkOut, peer: PeerId, debug_info: String) { + pub fn on_peer_disconnected(&mut self, mut network_out: &mut dyn NetworkOut, peer: PeerId, debug_info: String) { trace!(target: "sync", "Disconnecting {}: {}", peer, debug_info); // lock all the the peer lists so that add/remove peer events are in order let removed = { @@ -494,7 +523,7 @@ impl, H: ExHashT> Protocol { } self.sync.peer_disconnected(&mut context, peer.clone()); self.specialization.on_disconnect(&mut context, peer.clone()); - self.on_demand.as_ref().map(|s| s.on_disconnect(peer)); + self.on_demand_core.on_disconnect(&mut network_out, peer); } } @@ -514,7 +543,12 @@ impl, H: ExHashT> Protocol { } } - fn on_block_request(&mut self, network_out: &mut dyn NetworkOut, peer: PeerId, request: message::BlockRequest) { + fn on_block_request( + &mut self, + network_out: &mut dyn NetworkOut, + peer: PeerId, + request: message::BlockRequest + ) { trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, peer, @@ -643,13 +677,11 @@ impl, H: ExHashT> Protocol { /// Perform time based maintenance. /// /// > **Note**: This method normally doesn't have to be called except for testing purposes. - pub fn tick(&mut self, network_out: &mut dyn NetworkOut) { + pub fn tick(&mut self, mut network_out: &mut dyn NetworkOut) { self.consensus_gossip.tick(&mut ProtocolContext::new(&mut self.context_data, network_out)); self.maintain_peers(network_out); self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, network_out)); - self.on_demand - .as_ref() - .map(|s| s.maintain_peers()); + self.on_demand_core.maintain_peers(&mut network_out); } fn maintain_peers(&mut self, network_out: &mut dyn NetworkOut) { @@ -681,7 +713,7 @@ impl, H: ExHashT> Protocol { } /// Called by peer to report status - fn on_status_message(&mut self, network_out: &mut dyn NetworkOut, who: PeerId, status: message::Status) { + fn on_status_message(&mut self, mut network_out: &mut dyn NetworkOut, who: PeerId, status: message::Status) { trace!(target: "sync", "New peer {} {:?}", who, status); let protocol_version = { if self.context_data.peers.contains_key(&who) { @@ -756,10 +788,8 @@ impl, H: ExHashT> Protocol { status.version }; + self.on_demand_core.on_connect(&mut network_out, who.clone(), status.roles, status.best_number); let mut context = ProtocolContext::new(&mut self.context_data, network_out); - self.on_demand - .as_ref() - .map(|s| s.on_connect(who.clone(), status.roles, status.best_number)); self.sync.new_peer(&mut context, who.clone()); if protocol_version > 2 { self.consensus_gossip.new_peer(&mut context, who.clone(), status.roles); @@ -875,7 +905,7 @@ impl, H: ExHashT> Protocol { fn on_block_announce( &mut self, - network_out: &mut dyn NetworkOut, + mut network_out: &mut dyn NetworkOut, who: PeerId, announce: message::BlockAnnounce ) { @@ -886,9 +916,7 @@ impl, H: ExHashT> Protocol { peer.known_blocks.insert(hash.clone()); } } - self.on_demand - .as_ref() - .map(|s| s.on_block_announce(who.clone(), *header.number())); + self.on_demand_core.on_block_announce(&mut network_out, who.clone(), *header.number()); self.sync.on_block_announce( &mut ProtocolContext::new(&mut self.context_data, network_out), who.clone(), @@ -1029,7 +1057,12 @@ impl, H: ExHashT> Protocol { /// Request a finality proof for the given block. /// /// Queues a new finality proof request and tries to dispatch all pending requests. - pub fn request_finality_proof(&mut self, network_out: &mut dyn NetworkOut, hash: &B::Hash, number: NumberFor) { + pub fn request_finality_proof( + &mut self, + network_out: &mut dyn NetworkOut, + hash: &B::Hash, + number: NumberFor + ) { let mut context = ProtocolContext::new(&mut self.context_data, network_out); self.sync.request_finality_proof(&hash, number, &mut context); } @@ -1042,11 +1075,14 @@ impl, H: ExHashT> Protocol { self.sync.finality_proof_import_result(request_block, finalization_result) } - fn on_remote_call_response(&mut self, who: PeerId, response: message::RemoteCallResponse) { + fn on_remote_call_response( + &mut self, + mut network_out: &mut dyn NetworkOut, + who: PeerId, + response: message::RemoteCallResponse + ) { trace!(target: "sync", "Remote call response {} from {}", response.id, who); - self.on_demand - .as_ref() - .map(|s| s.on_remote_call_response(who, response)); + self.on_demand_core.on_remote_call_response(&mut network_out, who, response); } fn on_remote_read_request( @@ -1079,11 +1115,15 @@ impl, H: ExHashT> Protocol { }), ); } - fn on_remote_read_response(&mut self, who: PeerId, response: message::RemoteReadResponse) { + + fn on_remote_read_response( + &mut self, + mut network_out: &mut dyn NetworkOut, + who: PeerId, + response: message::RemoteReadResponse + ) { trace!(target: "sync", "Remote read response {} from {}", response.id, who); - self.on_demand - .as_ref() - .map(|s| s.on_remote_read_response(who, response)); + self.on_demand_core.on_remote_read_response(&mut network_out, who, response); } fn on_remote_header_request( @@ -1119,13 +1159,12 @@ impl, H: ExHashT> Protocol { fn on_remote_header_response( &mut self, + mut network_out: &mut dyn NetworkOut, who: PeerId, response: message::RemoteHeaderResponse, ) { trace!(target: "sync", "Remote header proof response {} from {}", response.id, who); - self.on_demand - .as_ref() - .map(|s| s.on_remote_header_response(who, response)); + self.on_demand_core.on_remote_header_response(&mut network_out, who, response); } fn on_remote_changes_request( @@ -1182,6 +1221,7 @@ impl, H: ExHashT> Protocol { fn on_remote_changes_response( &mut self, + mut network_out: &mut dyn NetworkOut, who: PeerId, response: message::RemoteChangesResponse, B::Hash>, ) { @@ -1190,9 +1230,7 @@ impl, H: ExHashT> Protocol { who, response.max ); - self.on_demand - .as_ref() - .map(|s| s.on_remote_changes_response(who, response)); + self.on_demand_core.on_remote_changes_response(&mut network_out, who, response); } fn on_finality_proof_request( @@ -1250,10 +1288,13 @@ impl, H: ExHashT> Protocol { } } - fn on_remote_body_response(&self, peer: PeerId, response: message::BlockResponse) { - self.on_demand - .as_ref() - .map(|od| od.on_remote_body_response(peer, response)); + fn on_remote_body_response( + &mut self, + mut network_out: &mut dyn NetworkOut, + peer: PeerId, + response: message::BlockResponse + ) { + self.on_demand_core.on_remote_body_response(&mut network_out, peer, response); } } diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 4516609c5e7..b2e56a83bd2 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -29,9 +29,11 @@ use peerset::PeersetHandle; use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder}; use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; +use crate::AlwaysBadChecker; use crate::chain::FinalityProofProvider; use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use crate::message::Message; +use crate::on_demand::RequestData; use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer}; use crate::protocol::{ProtocolStatus, PeerInfo, NetworkOut}; use crate::config::Params; @@ -216,7 +218,8 @@ impl> Service { let protocol = Protocol::new( params.config, params.chain, - params.on_demand, + params.on_demand.as_ref().map(|od| od.checker().clone()) + .unwrap_or(Arc::new(AlwaysBadChecker)), params.specialization, )?; let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect(); @@ -234,6 +237,7 @@ impl> Service { status_sinks.clone(), params.network_config, registered, + params.on_demand.and_then(|od| od.extract_receiver()), )?; let service = Arc::new(Service { @@ -531,6 +535,7 @@ fn start_thread, H: ExHashT>( status_sinks: Arc>>>>, config: NetworkConfiguration, registered: RegisteredProtocol>, + on_demand_in: Option>>, ) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc>>>, PeersetHandle), Error> { // Start the main service. let (service, peerset) = match start_service(config, registered) { @@ -558,7 +563,8 @@ fn start_thread, H: ExHashT>( network_port, protocol_rx, status_sinks, - peerset_clone + peerset_clone, + on_demand_in ) .select(close_rx.then(|_| Ok(()))) .map(|(val, _)| val) @@ -589,6 +595,7 @@ fn run_thread, H: ExHashT>( mut protocol_rx: mpsc::UnboundedReceiver>, status_sinks: Arc>>>>, peerset: PeersetHandle, + mut on_demand_in: Option>>, ) -> impl Future { // Implementation of `protocol::NetworkOut` using the available local variables. struct Ctxt<'a, B: BlockT>(&'a mut NetworkService>, &'a PeersetHandle); @@ -628,6 +635,13 @@ fn run_thread, H: ExHashT>( Err(err) => void::unreachable(err), } + // Check for new incoming on-demand requests. + if let Some(on_demand_in) = on_demand_in.as_mut() { + while let Ok(Async::Ready(Some(rq))) = on_demand_in.poll() { + protocol.add_on_demand_request(&mut Ctxt(&mut network_service.lock(), &peerset), rq); + } + } + loop { match network_port.poll() { Ok(Async::NotReady) => break, diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 7cbbbf13704..13d18a0d44d 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -24,6 +24,7 @@ mod sync; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; +use crate::AlwaysBadChecker; use log::trace; use crate::chain::FinalityProofProvider; use client::{self, ClientInfo, BlockchainEvents, FinalityNotifications}; @@ -956,7 +957,7 @@ pub trait TestNetFactory: Sized { let protocol = Protocol::new( config.clone(), client.clone(), - None, + Arc::new(AlwaysBadChecker), specialization, ).unwrap(); @@ -1010,7 +1011,7 @@ pub trait TestNetFactory: Sized { let protocol = Protocol::new( config, client.clone(), - None, + Arc::new(AlwaysBadChecker), specialization, ).unwrap(); diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index bc257dcd260..81e153b7aee 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -182,7 +182,7 @@ impl Service { network_config: config.network.clone(), chain: client.clone(), finality_proof_provider, - on_demand: on_demand.as_ref().map(|d| d.clone() as _), + on_demand, transaction_pool: transaction_pool_adapter.clone() as _, specialization: network_protocol, }; @@ -202,9 +202,6 @@ impl Service { let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); let network = network::Service::new(network_params, protocol_id, import_queue)?; - if let Some(on_demand) = on_demand.as_ref() { - on_demand.set_network_interface(Box::new(Arc::downgrade(&network))); - } let inherents_pool = Arc::new(InherentsPool::default()); let offchain_workers = if config.offchain_worker { -- GitLab