diff --git a/substrate/client/network/src/protocol/generic_proto/behaviour.rs b/substrate/client/network/src/protocol/generic_proto/behaviour.rs index b3c209eb0c0c6aa0810612f08ba5b9b6e6162246..cf6188726daae6f6a17d6741e7cbd4418cdbf031 100644 --- a/substrate/client/network/src/protocol/generic_proto/behaviour.rs +++ b/substrate/client/network/src/protocol/generic_proto/behaviour.rs @@ -126,6 +126,18 @@ pub struct GenericProto { /// List of peers in our state. peers: FnvHashMap<PeerId, PeerState>, + /// The elements in `peers` occasionally contain `Delay` objects that we would normally have + /// to be polled one by one. In order to avoid doing so, as an optimization, every `Delay` is + /// instead put inside of `delays` and reference by a [`DelayId`]. This stream + /// yields `PeerId`s whose `DelayId` is potentially ready. + /// + /// By design, we never remove elements from this list. Elements are removed only when the + /// `Delay` triggers. As such, this stream may produce obsolete elements. + delays: stream::FuturesUnordered<Pin<Box<dyn Future<Output = (DelayId, PeerId)> + Send>>>, + + /// [`DelayId`] to assign to the next delay. + next_delay_id: DelayId, + /// List of incoming messages we have sent to the peer set manager and that are waiting for an /// answer. incoming: SmallVec<[IncomingPeer; 6]>, @@ -141,6 +153,10 @@ pub struct GenericProto { queue_size_report: Option<HistogramVec>, } +/// Identifier for a delay firing. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +struct DelayId(u64); + /// State of a peer we're connected to. #[derive(Debug)] enum PeerState { @@ -158,8 +174,8 @@ enum PeerState { /// The peerset requested that we connect to this peer. We are currently not connected. PendingRequest { - /// When to actually start dialing. - timer: futures_timer::Delay, + /// When to actually start dialing. References an entry in `delays`. + timer: DelayId, /// When the `timer` will trigger. timer_deadline: Instant, }, @@ -183,8 +199,8 @@ enum PeerState { DisabledPendingEnable { /// The connections that are currently open for custom protocol traffic. open: SmallVec<[ConnectionId; crate::MAX_CONNECTIONS_PER_PEER]>, - /// When to enable this remote. - timer: futures_timer::Delay, + /// When to enable this remote. References an entry in `delays`. + timer: DelayId, /// When the `timer` will trigger. timer_deadline: Instant, }, @@ -338,6 +354,8 @@ impl GenericProto { notif_protocols: Vec::new(), peerset, peers: FnvHashMap::default(), + delays: Default::default(), + next_delay_id: DelayId(0), incoming: SmallVec::new(), next_incoming_index: sc_peerset::IncomingIndex(0), events: VecDeque::new(), @@ -627,10 +645,20 @@ impl GenericProto { match mem::replace(occ_entry.get_mut(), PeerState::Poisoned) { PeerState::Banned { ref until } if *until > now => { + let peer_id = occ_entry.key().clone(); debug!(target: "sub-libp2p", "PSM => Connect({:?}): Will start to connect at \ - until {:?}", occ_entry.key(), until); + until {:?}", peer_id, until); + + let delay_id = self.next_delay_id; + self.next_delay_id.0 += 1; + let delay = futures_timer::Delay::new(*until - now); + self.delays.push(async move { + delay.await; + (delay_id, peer_id) + }.boxed()); + *occ_entry.into_mut() = PeerState::PendingRequest { - timer: futures_timer::Delay::new(*until - now), + timer: delay_id, timer_deadline: *until, }; }, @@ -649,11 +677,21 @@ impl GenericProto { open, banned_until: Some(ref banned) } if *banned > now => { + let peer_id = occ_entry.key().clone(); debug!(target: "sub-libp2p", "PSM => Connect({:?}): But peer is banned until {:?}", - occ_entry.key(), banned); + peer_id, banned); + + let delay_id = self.next_delay_id; + self.next_delay_id.0 += 1; + let delay = futures_timer::Delay::new(*banned - now); + self.delays.push(async move { + delay.await; + (delay_id, peer_id) + }.boxed()); + *occ_entry.into_mut() = PeerState::DisabledPendingEnable { open, - timer: futures_timer::Delay::new(*banned - now), + timer: delay_id, timer_deadline: *banned, }; }, @@ -1363,34 +1401,37 @@ impl NetworkBehaviour for GenericProto { } } - for (peer_id, peer_state) in self.peers.iter_mut() { - match peer_state { - PeerState::PendingRequest { timer, .. } => { - if let Poll::Pending = Pin::new(timer).poll(cx) { - continue; - } + while let Poll::Ready(Some((delay_id, peer_id))) = + Pin::new(&mut self.delays).poll_next(cx) { + let peer_state = match self.peers.get_mut(&peer_id) { + Some(s) => s, + // We intentionally never remove elements from `delays`, and it may + // thus contain peers which are now gone. This is a normal situation. + None => continue, + }; + match peer_state { + PeerState::PendingRequest { timer, .. } if *timer == delay_id => { debug!(target: "sub-libp2p", "Libp2p <= Dial {:?} now that ban has expired", peer_id); self.events.push_back(NetworkBehaviourAction::DialPeer { - peer_id: peer_id.clone(), + peer_id, condition: DialPeerCondition::Disconnected }); *peer_state = PeerState::Requested; } - PeerState::DisabledPendingEnable { timer, open, .. } => { - if let Poll::Pending = Pin::new(timer).poll(cx) { - continue; - } - + PeerState::DisabledPendingEnable { timer, open, .. } if *timer == delay_id => { debug!(target: "sub-libp2p", "Handler({:?}) <= Enable (ban expired)", peer_id); self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer_id.clone(), + peer_id, handler: NotifyHandler::All, event: NotifsHandlerIn::Enable, }); *peer_state = PeerState::Enabled { open: mem::replace(open, Default::default()) }; } + + // We intentionally never remove elements from `delays`, and it may + // thus contain obsolete entries. This is a normal situation. _ => {}, } }