From 3628998d3c38f8132eb67deb5714d78671d291aa Mon Sep 17 00:00:00 2001 From: Robert Klotzner <eskimor@users.noreply.github.com> Date: Tue, 2 Feb 2021 20:52:12 +0100 Subject: [PATCH] Add a send_request function to NetworkService (#8008) * Add a `send_request` to `NetworkService`. This function delivers responses via a provided sender and also allows for sending requests to currently not connected peers. * Document caveats of send_request better. * Fix compilation in certain cases. * Update docs + introduce IfDisconnected enum for more readable function calls. * Doc fix. * Rename send_request to detached_request. * Whitespace fix - arrrgh * Update client/network/src/service.rs spaces/tabs Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com> * Update client/network/src/request_responses.rs Documentation fix Co-authored-by: Roman Borschel <romanb@users.noreply.github.com> * Update client/network/src/service.rs Typo. Co-authored-by: Roman Borschel <romanb@users.noreply.github.com> * Update client/network/src/service.rs Better docs. Co-authored-by: Roman Borschel <romanb@users.noreply.github.com> * Update client/network/src/service.rs Typo. Co-authored-by: Roman Borschel <romanb@users.noreply.github.com> * Update client/network/src/service.rs Doc improvements. Co-authored-by: Roman Borschel <romanb@users.noreply.github.com> * Remove error in logs on dialing a peer. This is now valid behaviour. * Rename detached_request to start_request. As suggested by @romanb. * Fix merged master. * Fix too long lines. Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com> Co-authored-by: Roman Borschel <romanb@users.noreply.github.com> --- substrate/client/network/src/behaviour.rs | 25 ++++++--- .../client/network/src/request_responses.rs | 32 ++++++++++-- substrate/client/network/src/service.rs | 51 ++++++++++++++----- 3 files changed, 85 insertions(+), 23 deletions(-) diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index a34f6e0960c..7e134f8e699 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -45,6 +45,7 @@ use std::{ pub use crate::request_responses::{ ResponseFailure, InboundFailure, RequestFailure, OutboundFailure, RequestId, + IfDisconnected }; /// General behaviour of the network. Combines all protocols together. @@ -248,8 +249,9 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> { protocol: &str, request: Vec<u8>, pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>, + connect: IfDisconnected, ) { - self.request_responses.send_request(target, protocol, request, pending_response) + self.request_responses.send_request(target, protocol, request, pending_response, connect) } /// Returns a shared reference to the user protocol. @@ -317,7 +319,7 @@ Behaviour<B, H> { } self.request_responses.send_request( - &target, &self.block_request_protocol_name, buf, pending_response, + &target, &self.block_request_protocol_name, buf, pending_response, IfDisconnected::ImmediateError, ); }, CustomMessageOutcome::NotificationStreamOpened { remote, protocol, roles, notifications_sink } => { @@ -454,11 +456,22 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> { _: &mut impl PollParameters, ) -> Poll<NetworkBehaviourAction<TEv, BehaviourOut<B>>> { use light_client_requests::sender::OutEvent; - while let Poll::Ready(Some(event)) = self.light_client_request_sender.poll_next_unpin(cx) { + while let Poll::Ready(Some(event)) = + self.light_client_request_sender.poll_next_unpin(cx) + { match event { - OutEvent::SendRequest { target, request, pending_response, protocol_name } => { - self.request_responses.send_request(&target, &protocol_name, request, pending_response) - } + OutEvent::SendRequest { + target, + request, + pending_response, + protocol_name, + } => self.request_responses.send_request( + &target, + &protocol_name, + request, + pending_response, + IfDisconnected::ImmediateError, + ), } } diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 4ac6ffe67f9..4d478ea7afd 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -196,6 +196,25 @@ impl From<(Cow<'static, str>, RequestId)> for ProtocolRequestId { } } +/// When sending a request, what to do on a disconnected recipient. +pub enum IfDisconnected { + /// Try to connect to the peer. + TryConnect, + /// Just fail if the destination is not yet connected. + ImmediateError, +} + +/// Convenience functions for `IfDisconnected`. +impl IfDisconnected { + /// Shall we connect to a disconnected peer? + pub fn should_connect(self) -> bool { + match self { + Self::TryConnect => true, + Self::ImmediateError => false, + } + } +} + /// Implementation of `NetworkBehaviour` that provides support for request-response protocols. pub struct RequestResponsesBehaviour { /// The multiple sub-protocols, by name. @@ -269,17 +288,19 @@ impl RequestResponsesBehaviour { /// Initiates sending a request. /// - /// An error is returned if we are not connected to the target peer or if the protocol doesn't - /// match one that has been registered. + /// If there is no established connection to the target peer, the behavior is determined by the choice of `connect`. + /// + /// An error is returned if the protocol doesn't match one that has been registered. pub fn send_request( &mut self, target: &PeerId, protocol_name: &str, request: Vec<u8>, pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>, + connect: IfDisconnected, ) { if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) { - if protocol.is_connected(target) { + if protocol.is_connected(target) || connect.should_connect() { let request_id = protocol.send_request(target, request); let prev_req_id = self.pending_requests.insert( (protocol_name.to_string().into(), request_id).into(), @@ -489,7 +510,6 @@ impl NetworkBehaviour for RequestResponsesBehaviour { return Poll::Ready(NetworkBehaviourAction::DialAddress { address }) } NetworkBehaviourAction::DialPeer { peer_id, condition } => { - log::error!("The request-response isn't supposed to start dialing peers"); return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition, @@ -949,6 +969,7 @@ mod tests { protocol_name, b"this is a request".to_vec(), sender, + IfDisconnected::ImmediateError, ); assert!(response_receiver.is_none()); response_receiver = Some(receiver); @@ -1037,6 +1058,7 @@ mod tests { protocol_name, b"this is a request".to_vec(), sender, + IfDisconnected::ImmediateError, ); assert!(response_receiver.is_none()); response_receiver = Some(receiver); @@ -1179,12 +1201,14 @@ mod tests { protocol_name_1, b"this is a request".to_vec(), sender_1, + IfDisconnected::ImmediateError, ); swarm_1.send_request( &peer_id, protocol_name_2, b"this is a request".to_vec(), sender_2, + IfDisconnected::ImmediateError, ); assert!(response_receivers.is_none()); response_receivers = Some((receiver_1, receiver_2)); diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index cb1cc4f3b77..46d36aff902 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -98,7 +98,7 @@ use std::{ task::Poll, }; -pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure}; +pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure, IfDisconnected}; mod metrics; mod out_events; @@ -812,9 +812,10 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> { /// notifications should remain the default ways of communicating information. For example, a /// peer can announce something through a notification, after which the recipient can obtain /// more information by performing a request. - /// As such, this function is meant to be called only with peers we are already connected to. - /// Calling this method with a `target` we are not connected to will *not* attempt to connect - /// to said peer. + /// As such, call this function with `IfDisconnected::ImmediateError` for `connect`. This way you + /// will get an error immediately for disconnected peers, instead of waiting for a potentially very + /// long connection attempt, which would suggest that something is wrong anyway, as you are + /// supposed to be connected because of the notification protocol. /// /// No limit or throttling of concurrent outbound requests per peer and protocol are enforced. /// Such restrictions, if desired, need to be enforced at the call site(s). @@ -826,15 +827,12 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> { &self, target: PeerId, protocol: impl Into<Cow<'static, str>>, - request: Vec<u8> + request: Vec<u8>, + connect: IfDisconnected, ) -> Result<Vec<u8>, RequestFailure> { let (tx, rx) = oneshot::channel(); - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request { - target, - protocol: protocol.into(), - request, - pending_response: tx - }); + + self.start_request(target, protocol, request, tx, connect); match rx.await { Ok(v) => v, @@ -845,6 +843,32 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> { } } + /// Variation of `request` which starts a request whose response is delivered on a provided channel. + /// + /// Instead of blocking and waiting for a reply, this function returns immediately, sending + /// responses via the passed in sender. This alternative API exists to make it easier to + /// integrate with message passing APIs. + /// + /// Keep in mind that the connected receiver might receive a `Canceled` event in case of a + /// closing connection. This is expected behaviour. With `request` you would get a + /// `RequestFailure::Network(OutboundFailure::ConnectionClosed)` in that case. + pub fn start_request( + &self, + target: PeerId, + protocol: impl Into<Cow<'static, str>>, + request: Vec<u8>, + tx: oneshot::Sender<Result<Vec<u8>, RequestFailure>>, + connect: IfDisconnected, + ) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request { + target, + protocol: protocol.into(), + request, + pending_response: tx, + connect, + }); + } + /// You may call this when new transactons are imported by the transaction pool. /// /// All transactions will be fetched from the `TransactionPool` that was passed at @@ -1262,6 +1286,7 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> { protocol: Cow<'static, str>, request: Vec<u8>, pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>, + connect: IfDisconnected, }, DisconnectPeer(PeerId, Cow<'static, str>), NewBestBlockImported(B::Hash, NumberFor<B>), @@ -1385,8 +1410,8 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> { this.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number), ServiceToWorkerMsg::EventStream(sender) => this.event_streams.push(sender), - ServiceToWorkerMsg::Request { target, protocol, request, pending_response } => { - this.network_service.send_request(&target, &protocol, request, pending_response); + ServiceToWorkerMsg::Request { target, protocol, request, pending_response, connect } => { + this.network_service.send_request(&target, &protocol, request, pending_response, connect); }, ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => this.network_service.user_protocol_mut().disconnect_peer(&who, &protocol_name), -- GitLab