diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index d6949b491c822c96b2848dfcccccd4d765592a7c..171acc9562ae648185f367a2c51e5503d285ebc9 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -29,7 +29,7 @@ use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollPa use log::debug; use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}}; use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId, Justification}; -use std::{borrow::Cow, iter, task::{Context, Poll}, time::Duration}; +use std::{borrow::Cow, collections::VecDeque, iter, task::{Context, Poll}, time::Duration}; /// General behaviour of the network. Combines all protocols together. #[derive(NetworkBehaviour)] @@ -51,7 +51,7 @@ pub struct Behaviour<B: BlockT, H: ExHashT> { /// Queue of events to produce for the outside. #[behaviour(ignore)] - events: Vec<BehaviourOut<B>>, + events: VecDeque<BehaviourOut<B>>, /// Role of our local node, as originally passed from the configuration. #[behaviour(ignore)] @@ -118,7 +118,7 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> { block_requests, finality_proof_requests, light_client_handler, - events: Vec::new(), + events: VecDeque::new(), role, } } @@ -183,7 +183,7 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> { engine_id, role, }; - self.events.push(BehaviourOut::Event(ev)); + self.events.push_back(BehaviourOut::Event(ev)); } } @@ -241,26 +241,26 @@ Behaviour<B, H> { fn inject_event(&mut self, event: CustomMessageOutcome<B>) { match event { CustomMessageOutcome::BlockImport(origin, blocks) => - self.events.push(BehaviourOut::BlockImport(origin, blocks)), + self.events.push_back(BehaviourOut::BlockImport(origin, blocks)), CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) => - self.events.push(BehaviourOut::JustificationImport(origin, hash, nb, justification)), + self.events.push_back(BehaviourOut::JustificationImport(origin, hash, nb, justification)), CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) => - self.events.push(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)), + self.events.push_back(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)), CustomMessageOutcome::BlockRequest { target, request } => { match self.block_requests.send_request(&target, request) { block_requests::SendRequestOutcome::Ok => { - self.events.push(BehaviourOut::RequestStarted { + self.events.push_back(BehaviourOut::RequestStarted { peer: target, protocol: self.block_requests.protocol_name().to_vec(), }); }, block_requests::SendRequestOutcome::Replaced { request_duration, .. } => { - self.events.push(BehaviourOut::RequestFinished { + self.events.push_back(BehaviourOut::RequestFinished { peer: target.clone(), protocol: self.block_requests.protocol_name().to_vec(), request_duration, }); - self.events.push(BehaviourOut::RequestStarted { + self.events.push_back(BehaviourOut::RequestStarted { peer: target, protocol: self.block_requests.protocol_name().to_vec(), }); @@ -275,7 +275,7 @@ Behaviour<B, H> { CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles } => { let role = reported_roles_to_observed_role(&self.role, &remote, roles); for engine_id in protocols { - self.events.push(BehaviourOut::Event(Event::NotificationStreamOpened { + self.events.push_back(BehaviourOut::Event(Event::NotificationStreamOpened { remote: remote.clone(), engine_id, role: role.clone(), @@ -284,14 +284,14 @@ Behaviour<B, H> { }, CustomMessageOutcome::NotificationStreamClosed { remote, protocols } => for engine_id in protocols { - self.events.push(BehaviourOut::Event(Event::NotificationStreamClosed { + self.events.push_back(BehaviourOut::Event(Event::NotificationStreamClosed { remote: remote.clone(), engine_id, })); }, CustomMessageOutcome::NotificationsReceived { remote, messages } => { let ev = Event::NotificationsReceived { remote, messages }; - self.events.push(BehaviourOut::Event(ev)); + self.events.push_back(BehaviourOut::Event(ev)); }, CustomMessageOutcome::PeerNewBest(peer_id, number) => { self.light_client_handler.update_best_block(&peer_id, number); @@ -305,14 +305,14 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B fn inject_event(&mut self, event: block_requests::Event<B>) { match event { block_requests::Event::AnsweredRequest { peer, total_handling_time } => { - self.events.push(BehaviourOut::AnsweredRequest { + self.events.push_back(BehaviourOut::AnsweredRequest { peer, protocol: self.block_requests.protocol_name().to_vec(), build_time: total_handling_time, }); }, block_requests::Event::Response { peer, original_request, response, request_duration } => { - self.events.push(BehaviourOut::RequestFinished { + self.events.push_back(BehaviourOut::RequestFinished { peer: peer.clone(), protocol: self.block_requests.protocol_name().to_vec(), request_duration, @@ -324,7 +324,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B // There doesn't exist any mechanism to report cancellations yet. // We would normally disconnect the node, but this event happens as the result of // a disconnect, so there's nothing more to do. - self.events.push(BehaviourOut::RequestFinished { + self.events.push_back(BehaviourOut::RequestFinished { peer, protocol: self.block_requests.protocol_name().to_vec(), request_duration, @@ -333,7 +333,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B block_requests::Event::RequestTimeout { peer, request_duration, .. } => { // There doesn't exist any mechanism to report timeouts yet, so we process them by // disconnecting the node. - self.events.push(BehaviourOut::RequestFinished { + self.events.push_back(BehaviourOut::RequestFinished { peer: peer.clone(), protocol: self.block_requests.protocol_name().to_vec(), request_duration, @@ -396,20 +396,20 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut> self.substrate.add_discovered_nodes(iter::once(peer_id)); } DiscoveryOut::ValueFound(results) => { - self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValueFound(results)))); + self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValueFound(results)))); } DiscoveryOut::ValueNotFound(key) => { - self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValueNotFound(key)))); + self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValueNotFound(key)))); } DiscoveryOut::ValuePut(key) => { - self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePut(key)))); + self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePut(key)))); } DiscoveryOut::ValuePutFailed(key) => { - self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePutFailed(key)))); + self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePutFailed(key)))); } DiscoveryOut::RandomKademliaStarted(protocols) => { for protocol in protocols { - self.events.push(BehaviourOut::RandomKademliaStarted(protocol)); + self.events.push_back(BehaviourOut::RandomKademliaStarted(protocol)); } } } @@ -418,8 +418,8 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut> impl<B: BlockT, H: ExHashT> Behaviour<B, H> { fn poll<TEv>(&mut self, _: &mut Context, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<TEv, BehaviourOut<B>>> { - if !self.events.is_empty() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))) + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) } Poll::Pending