From 818976d98e7ae8dfe216817a2eae4a0f99c7dd48 Mon Sep 17 00:00:00 2001 From: Aaro Altonen <48052676+altonen@users.noreply.github.com> Date: Tue, 18 Apr 2023 10:47:36 +0300 Subject: [PATCH] Poll the substream validation before polling `Notifications` (#13934) * Poll the substream validation before polling `Notifications` In tests, it can happen that `Notifications` doesn't produce any events which causes `poll()` to return `Poll::Pending` and the substream validation futures won't get polled. Poll the futures before calling `Notifications` so results for substream validations are received even if `Notifications` is not producing any events. * Remove `pending_messages` * Remove unused import --- substrate/client/network/src/protocol.rs | 44 +++++++++--------------- 1 file changed, 16 insertions(+), 28 deletions(-) diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index a7e6f36ef62..0075e856e75 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -40,7 +40,7 @@ use sc_utils::mpsc::TracingUnboundedSender; use sp_runtime::traits::Block as BlockT; use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{HashMap, HashSet}, future::Future, iter, pin::Pin, @@ -77,8 +77,6 @@ type PendingSyncSubstreamValidation = // Lock must always be taken in order declared here. pub struct Protocol<B: BlockT> { - /// Pending list of messages to return from `poll` as a priority. - pending_messages: VecDeque<CustomMessageOutcome>, /// Used to report reputation changes. peerset_handle: sc_peerset::PeersetHandle, /// Handles opening the unique substream and sending and receiving raw messages. @@ -181,7 +179,6 @@ impl<B: BlockT> Protocol<B> { }; let protocol = Self { - pending_messages: VecDeque::new(), peerset_handle: peerset_handle.clone(), behaviour, notification_protocols: iter::once(block_announces_protocol.notifications_protocol) @@ -409,8 +406,21 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> { cx: &mut std::task::Context, params: &mut impl PollParameters, ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> { - if let Some(message) = self.pending_messages.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message)) + while let Poll::Ready(Some(validation_result)) = + self.sync_substream_validations.poll_next_unpin(cx) + { + match validation_result { + Ok((peer, roles)) => { + self.peers.insert(peer, roles); + }, + Err(peer) => { + log::debug!( + target: "sub-libp2p", + "`SyncingEngine` rejected stream" + ); + self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC); + }, + } } let event = match self.behaviour.poll(cx, params) { @@ -430,23 +440,6 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> { return Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }), }; - while let Poll::Ready(Some(validation_result)) = - self.sync_substream_validations.poll_next_unpin(cx) - { - match validation_result { - Ok((peer, roles)) => { - self.peers.insert(peer, roles); - }, - Err(peer) => { - log::debug!( - target: "sub-libp2p", - "`SyncingEngine` rejected stream" - ); - self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC); - }, - } - } - let outcome = match event { NotificationsOut::CustomProtocolOpen { peer_id, @@ -509,7 +502,6 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> { ) { Ok(handshake) => { let roles = handshake.roles; - self.peers.insert(peer_id, roles); let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send( @@ -644,10 +636,6 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(outcome)) } - if let Some(message) = self.pending_messages.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message)) - } - // This block can only be reached if an event was pulled from the behaviour and that // resulted in `CustomMessageOutcome::None`. Since there might be another pending // message from the behaviour, the task is scheduled again. -- GitLab