diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index 3b6224e9cc5fc61b06161a2dd076e0d72c340d63..d6949b491c822c96b2848dfcccccd4d765592a7c 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -304,11 +304,11 @@ Behaviour<B, H> { impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B>> for Behaviour<B, H> { fn inject_event(&mut self, event: block_requests::Event<B>) { match event { - block_requests::Event::AnsweredRequest { peer, response_build_time } => { + block_requests::Event::AnsweredRequest { peer, total_handling_time } => { self.events.push(BehaviourOut::AnsweredRequest { peer, protocol: self.block_requests.protocol_name().to_vec(), - build_time: response_build_time, + build_time: total_handling_time, }); }, block_requests::Event::Response { peer, original_request, response, request_duration } => { diff --git a/substrate/client/network/src/block_requests.rs b/substrate/client/network/src/block_requests.rs index e8c96cc6d8e3b78dbc5d3b1bf2f1962467ed664d..b3e79398405a398009dfec2f2e2c2a144d336619 100644 --- a/substrate/client/network/src/block_requests.rs +++ b/substrate/client/network/src/block_requests.rs @@ -74,12 +74,12 @@ pub type Error = Box<dyn std::error::Error + 'static>; /// Event generated by the block requests behaviour. #[derive(Debug)] pub enum Event<B: Block> { - /// A request came and we answered it. + /// A request came and we have successfully answered it. AnsweredRequest { /// Peer which has emitted the request. peer: PeerId, - /// Time it took to compute the response. - response_build_time: Duration, + /// Time elapsed between when we received the request and when we sent back the response. + total_handling_time: Duration, }, /// A response to a block request has arrived. @@ -190,8 +190,9 @@ pub struct BlockRequests<B: Block> { chain: Arc<dyn Client<B>>, /// List of all active connections and the requests we've sent. peers: HashMap<PeerId, Vec<Connection<B>>>, - /// Futures sending back the block request response. - outgoing: FuturesUnordered<BoxFuture<'static, ()>>, + /// Futures sending back the block request response. Returns the `PeerId` we sent back to, and + /// the total time the handling of this request took. + outgoing: FuturesUnordered<BoxFuture<'static, (PeerId, Duration)>>, /// Events to return as soon as possible from `poll`. pending_events: VecDeque<NetworkBehaviourAction<OutboundProtocol<B>, Event<B>>>, } @@ -533,9 +534,7 @@ where node_event: NodeEvent<B, NegotiatedSubstream> ) { match node_event { - NodeEvent::Request(request, mut stream) => { - let before_answer_build = Instant::now(); - + NodeEvent::Request(request, mut stream, handling_start) => { match self.on_block_request(&peer, &request) { Ok(res) => { log::trace!( @@ -551,7 +550,7 @@ where peer, e ) } else { - let future = async move { + self.outgoing.push(async move { if let Err(e) = write_one(&mut stream, data).await { log::debug!( target: "sync", @@ -559,8 +558,8 @@ where e ); } - }; - self.outgoing.push(future.boxed()) + (peer, handling_start.elapsed()) + }.boxed()); } } Err(e) => log::debug!( @@ -568,12 +567,6 @@ where "Error handling block request from peer {}: {}", peer, e ) } - - let ev = Event::AnsweredRequest { - peer: peer.clone(), - response_build_time: before_answer_build.elapsed(), - }; - self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev)); } NodeEvent::Response(original_request, response) => { log::trace!( @@ -711,7 +704,14 @@ where } } - while let Poll::Ready(Some(_)) = self.outgoing.poll_next_unpin(cx) {} + while let Poll::Ready(Some((peer, total_handling_time))) = self.outgoing.poll_next_unpin(cx) { + let ev = Event::AnsweredRequest { + peer, + total_handling_time, + }; + self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev)); + } + Poll::Pending } } @@ -719,8 +719,9 @@ where /// Output type of inbound and outbound substream upgrades. #[derive(Debug)] pub enum NodeEvent<B: Block, T> { - /// Incoming request from remote and substream to use for the response. - Request(schema::v1::BlockRequest, T), + /// Incoming request from remote, substream to use for the response, and when we started + /// handling this request. + Request(schema::v1::BlockRequest, T, Instant), /// Incoming response from remote. Response(message::BlockRequest<B>, schema::v1::BlockResponse), } @@ -760,11 +761,14 @@ where type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>; fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future { + // This `Instant` will be passed around until the processing of this request is done. + let handling_start = Instant::now(); + let future = async move { let len = self.max_request_len; let vec = read_one(&mut s, len).await?; match schema::v1::BlockRequest::decode(&vec[..]) { - Ok(r) => Ok(NodeEvent::Request(r, s)), + Ok(r) => Ok(NodeEvent::Request(r, s, handling_start)), Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e))) } };