diff --git a/substrate/client/network/src/protocol/generic_proto/behaviour.rs b/substrate/client/network/src/protocol/generic_proto/behaviour.rs index 32cf417ec49af74dd23a7a193c7e03b5c606ba63..9d48d40bdfaff4cd80af52237bcf5428af0c4f7e 100644 --- a/substrate/client/network/src/protocol/generic_proto/behaviour.rs +++ b/substrate/client/network/src/protocol/generic_proto/behaviour.rs @@ -34,7 +34,7 @@ use prometheus_endpoint::HistogramVec; use rand::distributions::{Distribution as _, Uniform}; use smallvec::SmallVec; use std::task::{Context, Poll}; -use std::{borrow::Cow, cmp, collections::hash_map::Entry}; +use std::{borrow::Cow, cmp, collections::{hash_map::Entry, VecDeque}}; use std::{error, mem, pin::Pin, str, time::Duration}; use wasm_timer::Instant; @@ -135,7 +135,7 @@ pub struct GenericProto { next_incoming_index: sc_peerset::IncomingIndex, /// Events to produce from `poll()`. - events: SmallVec<[NetworkBehaviourAction<NotifsHandlerIn, GenericProtoOut>; 4]>, + events: VecDeque<NetworkBehaviourAction<NotifsHandlerIn, GenericProtoOut>>, /// If `Some`, report the message queue sizes on this `Histogram`. queue_size_report: Option<HistogramVec>, @@ -340,7 +340,7 @@ impl GenericProto { peers: FnvHashMap::default(), incoming: SmallVec::new(), next_incoming_index: sc_peerset::IncomingIndex(0), - events: SmallVec::new(), + events: VecDeque::new(), queue_size_report, } } @@ -374,7 +374,7 @@ impl GenericProto { // Send an event to all the peers we're connected to, updating the handshake message. for (peer_id, _) in self.peers.iter().filter(|(_, state)| state.is_connected()) { - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), handler: NotifyHandler::All, event: NotifsHandlerIn::UpdateHandshake { @@ -446,7 +446,7 @@ impl GenericProto { debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); self.peerset.dropped(peer_id.clone()); debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", peer_id); - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), handler: NotifyHandler::All, event: NotifsHandlerIn::Disable, @@ -471,7 +471,7 @@ impl GenericProto { inc.alive = false; debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", peer_id); - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), handler: NotifyHandler::All, event: NotifsHandlerIn::Disable, @@ -562,7 +562,7 @@ impl GenericProto { ); trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: target.clone(), handler: NotifyHandler::One(conn), event: NotifsHandlerIn::SendNotification { @@ -592,7 +592,7 @@ impl GenericProto { trace!(target: "sub-libp2p", "External API => Packet for {:?}", target); trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: target.clone(), handler: NotifyHandler::One(conn), event: NotifsHandlerIn::SendLegacy { @@ -614,7 +614,7 @@ impl GenericProto { // If there's no entry in `self.peers`, start dialing. debug!(target: "sub-libp2p", "PSM => Connect({:?}): Starting to connect", entry.key()); debug!(target: "sub-libp2p", "Libp2p <= Dial {:?}", entry.key()); - self.events.push(NetworkBehaviourAction::DialPeer { + self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id: entry.key().clone(), condition: DialPeerCondition::Disconnected }); @@ -638,7 +638,7 @@ impl GenericProto { PeerState::Banned { .. } => { debug!(target: "sub-libp2p", "PSM => Connect({:?}): Starting to connect", occ_entry.key()); debug!(target: "sub-libp2p", "Libp2p <= Dial {:?}", occ_entry.key()); - self.events.push(NetworkBehaviourAction::DialPeer { + self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id: occ_entry.key().clone(), condition: DialPeerCondition::Disconnected }); @@ -662,7 +662,7 @@ impl GenericProto { debug!(target: "sub-libp2p", "PSM => Connect({:?}): Enabling connections.", occ_entry.key()); debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", occ_entry.key()); - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: occ_entry.key().clone(), handler: NotifyHandler::All, event: NotifsHandlerIn::Enable, @@ -681,7 +681,7 @@ impl GenericProto { incoming for incoming peer") } debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", occ_entry.key()); - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: occ_entry.key().clone(), handler: NotifyHandler::All, event: NotifsHandlerIn::Enable, @@ -746,7 +746,7 @@ impl GenericProto { PeerState::Enabled { open } => { debug!(target: "sub-libp2p", "PSM => Drop({:?}): Disabling connections.", entry.key()); debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", entry.key()); - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: entry.key().clone(), handler: NotifyHandler::All, event: NotifsHandlerIn::Disable, @@ -801,7 +801,7 @@ impl GenericProto { debug!(target: "sub-libp2p", "PSM => Accept({:?}, {:?}): Enabling connections.", index, incoming.peer_id); debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", incoming.peer_id); - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: incoming.peer_id, handler: NotifyHandler::All, event: NotifsHandlerIn::Enable, @@ -834,7 +834,7 @@ impl GenericProto { debug!(target: "sub-libp2p", "PSM => Reject({:?}, {:?}): Rejecting connections.", index, incoming.peer_id); debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", incoming.peer_id); - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: incoming.peer_id, handler: NotifyHandler::All, event: NotifsHandlerIn::Disable, @@ -881,7 +881,7 @@ impl NetworkBehaviour for GenericProto { peer_id, endpoint ); *st = PeerState::Enabled { open: SmallVec::new() }; - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), handler: NotifyHandler::One(*conn), event: NotifsHandlerIn::Enable @@ -925,7 +925,7 @@ impl NetworkBehaviour for GenericProto { "Libp2p => Connected({},{:?}): Not requested by PSM, disabling.", peer_id, endpoint); *st = PeerState::Disabled { open: SmallVec::new(), banned_until }; - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), handler: NotifyHandler::One(*conn), event: NotifsHandlerIn::Disable @@ -941,7 +941,7 @@ impl NetworkBehaviour for GenericProto { (PeerState::Enabled { .. }, _) => { debug!(target: "sub-libp2p", "Handler({},{:?}) <= Enable secondary connection", peer_id, conn); - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), handler: NotifyHandler::One(*conn), event: NotifsHandlerIn::Enable @@ -951,7 +951,7 @@ impl NetworkBehaviour for GenericProto { (PeerState::Disabled { .. }, _) | (PeerState::DisabledPendingEnable { .. }, _) => { debug!(target: "sub-libp2p", "Handler({},{:?}) <= Disable secondary connection", peer_id, conn); - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), handler: NotifyHandler::One(*conn), event: NotifsHandlerIn::Disable @@ -979,7 +979,7 @@ impl NetworkBehaviour for GenericProto { reason: "Disconnected by libp2p".into(), }; - self.events.push(NetworkBehaviourAction::GenerateEvent(event)); + self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } } _ => {} @@ -1144,7 +1144,7 @@ impl NetworkBehaviour for GenericProto { debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", source); debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", source); self.peerset.dropped(source.clone()); - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: source.clone(), handler: NotifyHandler::All, event: NotifsHandlerIn::Disable, @@ -1216,7 +1216,7 @@ impl NetworkBehaviour for GenericProto { reason, peer_id: source, }; - self.events.push(NetworkBehaviourAction::GenerateEvent(event)); + self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } else { debug!(target: "sub-libp2p", "Secondary connection closed custom protocol."); } @@ -1254,7 +1254,7 @@ impl NetworkBehaviour for GenericProto { if first { debug!(target: "sub-libp2p", "External API <= Open({:?})", source); let event = GenericProtoOut::CustomProtocolOpen { peer_id: source }; - self.events.push(NetworkBehaviourAction::GenerateEvent(event)); + self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } else { debug!(target: "sub-libp2p", "Secondary connection opened custom protocol."); } @@ -1269,7 +1269,7 @@ impl NetworkBehaviour for GenericProto { message, }; - self.events.push(NetworkBehaviourAction::GenerateEvent(event)); + self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } NotifsHandlerOut::Notification { protocol_name, message } => { @@ -1287,7 +1287,7 @@ impl NetworkBehaviour for GenericProto { message, }; - self.events.push(NetworkBehaviourAction::GenerateEvent(event)); + self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } NotifsHandlerOut::Clogged { messages } => { @@ -1296,7 +1296,7 @@ impl NetworkBehaviour for GenericProto { trace!(target: "sub-libp2p", "External API <= Clogged({:?})", source); warn!(target: "sub-libp2p", "Queue of packets to send to {:?} is \ pretty large", source); - self.events.push(NetworkBehaviourAction::GenerateEvent(GenericProtoOut::Clogged { + self.events.push_back(NetworkBehaviourAction::GenerateEvent(GenericProtoOut::Clogged { peer_id: source, messages, })); @@ -1335,6 +1335,10 @@ impl NetworkBehaviour for GenericProto { Self::OutEvent, >, > { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); + } + // Poll for instructions from the peerset. // Note that the peerset is a *best effort* crate, and we have to use defensive programming. loop { @@ -1368,7 +1372,7 @@ impl NetworkBehaviour for GenericProto { } debug!(target: "sub-libp2p", "Libp2p <= Dial {:?} now that ban has expired", peer_id); - self.events.push(NetworkBehaviourAction::DialPeer { + self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id: peer_id.clone(), condition: DialPeerCondition::Disconnected }); @@ -1390,7 +1394,7 @@ impl NetworkBehaviour for GenericProto { } debug!(target: "sub-libp2p", "Handler({:?}) <= Enable (ban expired)", peer_id); - self.events.push(NetworkBehaviourAction::NotifyHandler { + self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), handler: NotifyHandler::All, event: NotifsHandlerIn::Enable, @@ -1402,8 +1406,8 @@ impl NetworkBehaviour for GenericProto { } } - if !self.events.is_empty() { - return Poll::Ready(self.events.remove(0)) + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); } Poll::Pending diff --git a/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs b/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs index e51b37139b82bec6bad05e14bd82a5b7e4966e06..c7de2d265e96fdf66f567366bd10fd6cf8ece265 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -30,7 +30,7 @@ use libp2p::swarm::{ }; use log::{debug, error}; use smallvec::{smallvec, SmallVec}; -use std::{borrow::Cow, error, fmt, io, mem, time::Duration}; +use std::{borrow::Cow, collections::VecDeque, error, fmt, io, mem, time::Duration}; use std::{pin::Pin, task::{Context, Poll}}; /// Implements the `IntoProtocolsHandler` trait of libp2p. @@ -117,7 +117,7 @@ impl IntoProtocolsHandler for LegacyProtoHandlerProto { substreams: SmallVec::new(), init_deadline: Delay::new(Duration::from_secs(20)) }, - events_queue: SmallVec::new(), + events_queue: VecDeque::new(), } } } @@ -142,7 +142,7 @@ pub struct LegacyProtoHandler { /// /// This queue must only ever be modified to insert elements at the back, or remove the first /// element. - events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol, (), LegacyProtoHandlerOut, ConnectionKillError>; 16]>, + events_queue: VecDeque<ProtocolsHandlerEvent<RegisteredProtocol, (), LegacyProtoHandlerOut, ConnectionKillError>>, } /// State of the handler. @@ -277,7 +277,7 @@ impl LegacyProtoHandler { ProtocolState::Init { substreams: incoming, .. } => { if incoming.is_empty() { if let ConnectedPoint::Dialer { .. } = self.endpoint { - self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { + self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(self.protocol.clone()), info: (), }); @@ -290,7 +290,7 @@ impl LegacyProtoHandler { version: incoming[0].protocol_version(), endpoint: self.endpoint.clone() }; - self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); + self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event)); ProtocolState::Normal { substreams: incoming.into_iter().collect(), shutdown: SmallVec::new() @@ -488,7 +488,7 @@ impl LegacyProtoHandler { version: substream.protocol_version(), endpoint: self.endpoint.clone() }; - self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); + self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event)); ProtocolState::Normal { substreams: smallvec![substream], shutdown: SmallVec::new() @@ -565,7 +565,7 @@ impl ProtocolsHandler for LegacyProtoHandler { _ => false, }; - self.events_queue.push(ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { + self.events_queue.push_back(ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error: Box::new(err), })); @@ -587,8 +587,7 @@ impl ProtocolsHandler for LegacyProtoHandler { ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error> > { // Flush the events queue if necessary. - if !self.events_queue.is_empty() { - let event = self.events_queue.remove(0); + if let Some(event) = self.events_queue.pop_front() { return Poll::Ready(event) } diff --git a/substrate/client/network/src/protocol/generic_proto/handler/notif_in.rs b/substrate/client/network/src/protocol/generic_proto/handler/notif_in.rs index ef4644c548a4fd355993c4ffdfafc8c614b42f12..be78fb970e90b2524fd1478f48462e159dd1b28d 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/notif_in.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/notif_in.rs @@ -37,8 +37,7 @@ use libp2p::swarm::{ NegotiatedSubstream, }; use log::{error, warn}; -use smallvec::SmallVec; -use std::{borrow::Cow, fmt, pin::Pin, task::{Context, Poll}}; +use std::{borrow::Cow, collections::VecDeque, fmt, pin::Pin, task::{Context, Poll}}; /// Implements the `IntoProtocolsHandler` trait of libp2p. /// @@ -70,7 +69,7 @@ pub struct NotifsInHandler { /// /// This queue is only ever modified to insert elements at the back, or remove the first /// element. - events_queue: SmallVec<[ProtocolsHandlerEvent<DeniedUpgrade, (), NotifsInHandlerOut, void::Void>; 16]>, + events_queue: VecDeque<ProtocolsHandlerEvent<DeniedUpgrade, (), NotifsInHandlerOut, void::Void>>, } /// Event that can be received by a `NotifsInHandler`. @@ -130,7 +129,7 @@ impl IntoProtocolsHandler for NotifsInHandlerProto { in_protocol: self.in_protocol, substream: None, pending_accept_refuses: 0, - events_queue: SmallVec::new(), + events_queue: VecDeque::new(), } } } @@ -160,7 +159,7 @@ impl ProtocolsHandler for NotifsInHandler { ) { // If a substream already exists, we drop it and replace it with the new incoming one. if self.substream.is_some() { - self.events_queue.push(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed)); + self.events_queue.push_back(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed)); } // Note that we drop the existing substream, which will send an equivalent to a TCP "RST" @@ -171,7 +170,7 @@ impl ProtocolsHandler for NotifsInHandler { // and we can't close "more" than that anyway. self.substream = Some(proto); - self.events_queue.push(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(msg))); + self.events_queue.push_back(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(msg))); self.pending_accept_refuses = self.pending_accept_refuses .checked_add(1) .unwrap_or_else(|| { @@ -233,8 +232,7 @@ impl ProtocolsHandler for NotifsInHandler { ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error> > { // Flush the events queue if necessary. - if !self.events_queue.is_empty() { - let event = self.events_queue.remove(0); + if let Some(event) = self.events_queue.pop_front() { return Poll::Ready(event) } diff --git a/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs b/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs index b5d6cd61ada2a9e54bfa5873e0f7992ab1dfcfcb..6b97ad67e34c696f58996a8c13f983f0486b84fa 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs @@ -35,8 +35,7 @@ use libp2p::swarm::{ }; use log::{debug, warn, error}; use prometheus_endpoint::Histogram; -use smallvec::SmallVec; -use std::{borrow::Cow, fmt, mem, pin::Pin, task::{Context, Poll}, time::Duration}; +use std::{borrow::Cow, collections::VecDeque, fmt, mem, pin::Pin, task::{Context, Poll}, time::Duration}; use wasm_timer::Instant; /// Maximum duration to open a substream and receive the handshake message. After that, we @@ -85,7 +84,7 @@ impl IntoProtocolsHandler for NotifsOutHandlerProto { when_connection_open: Instant::now(), queue_size_report: self.queue_size_report, state: State::Disabled, - events_queue: SmallVec::new(), + events_queue: VecDeque::new(), peer_id: peer_id.clone(), } } @@ -116,7 +115,7 @@ pub struct NotifsOutHandler { /// /// This queue must only ever be modified to insert elements at the back, or remove the first /// element. - events_queue: SmallVec<[ProtocolsHandlerEvent<NotificationsOut, (), NotifsOutHandlerOut, void::Void>; 16]>, + events_queue: VecDeque<ProtocolsHandlerEvent<NotificationsOut, (), NotifsOutHandlerOut, void::Void>>, /// Who we are connected to. peer_id: PeerId, @@ -247,7 +246,7 @@ impl ProtocolsHandler for NotifsOutHandler { match mem::replace(&mut self.state, State::Poisoned) { State::Opening { initial_message } => { let ev = NotifsOutHandlerOut::Open { handshake: handshake_msg }; - self.events_queue.push(ProtocolsHandlerEvent::Custom(ev)); + self.events_queue.push_back(ProtocolsHandlerEvent::Custom(ev)); self.state = State::Open { substream, initial_message }; }, // If the handler was disabled while we were negotiating the protocol, immediately @@ -267,7 +266,7 @@ impl ProtocolsHandler for NotifsOutHandler { match mem::replace(&mut self.state, State::Poisoned) { State::Disabled => { let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message.clone()); - self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { + self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(proto).with_timeout(OPEN_TIMEOUT), info: (), }); @@ -287,7 +286,7 @@ impl ProtocolsHandler for NotifsOutHandler { } let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message.clone()); - self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { + self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(proto).with_timeout(OPEN_TIMEOUT), info: (), }); @@ -347,7 +346,7 @@ impl ProtocolsHandler for NotifsOutHandler { State::Opening { .. } => { self.state = State::Refused; let ev = NotifsOutHandlerOut::Refused; - self.events_queue.push(ProtocolsHandlerEvent::Custom(ev)); + self.events_queue.push_back(ProtocolsHandlerEvent::Custom(ev)); }, State::DisabledOpening => self.state = State::Disabled, State::Poisoned => error!("â˜Žï¸ Notifications handler in a poisoned state"), @@ -371,9 +370,8 @@ impl ProtocolsHandler for NotifsOutHandler { cx: &mut Context, ) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>> { // Flush the events queue if necessary. - if !self.events_queue.is_empty() { - let event = self.events_queue.remove(0); - return Poll::Ready(event); + if let Some(event) = self.events_queue.pop_front() { + return Poll::Ready(event) } match &mut self.state { @@ -385,7 +383,7 @@ impl ProtocolsHandler for NotifsOutHandler { let initial_message = mem::replace(initial_message, Vec::new()); self.state = State::Opening { initial_message: initial_message.clone() }; let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message); - self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { + self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(proto).with_timeout(OPEN_TIMEOUT), info: (), });