From 98a88a7d424fa409c6ad496467a94ebdca1c93ae Mon Sep 17 00:00:00 2001 From: Pierre Krieger <pierre.krieger1708@gmail.com> Date: Wed, 20 Mar 2019 09:29:58 +0100 Subject: [PATCH] Start the handler init timer later (#2041) --- .../src/custom_proto/behaviour.rs | 12 ++-- .../src/custom_proto/handler.rs | 70 ++++++++++++++----- 2 files changed, 57 insertions(+), 25 deletions(-) diff --git a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs index 473d2200ffb..c8b931e4fb9 100644 --- a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs +++ b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see <http://www.gnu.org/licenses/>. -use crate::custom_proto::handler::{CustomProtoHandler, CustomProtoHandlerOut, CustomProtoHandlerIn}; +use crate::custom_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOut, CustomProtoHandlerIn}; use crate::custom_proto::topology::NetTopology; use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol}; use crate::{NetworkConfiguration, NonReservedPeerMode}; @@ -22,7 +22,7 @@ use crate::parse_str_addr; use fnv::{FnvHashMap, FnvHashSet}; use futures::prelude::*; use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use libp2p::core::{protocols_handler::ProtocolsHandler, Endpoint, Multiaddr, PeerId}; +use libp2p::core::{Endpoint, Multiaddr, PeerId}; use log::{debug, trace, warn}; use smallvec::SmallVec; use std::{cmp, error, io, marker::PhantomData, path::Path, time::Duration, time::Instant}; @@ -433,11 +433,11 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage, { - type ProtocolsHandler = CustomProtoHandler<TMessage, TSubstream>; + type ProtocolsHandler = CustomProtoHandlerProto<TMessage, TSubstream>; type OutEvent = CustomProtoOut<TMessage>; fn new_handler(&mut self) -> Self::ProtocolsHandler { - CustomProtoHandler::new(self.protocol.clone()) + CustomProtoHandlerProto::new(self.protocol.clone()) } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> { @@ -575,7 +575,7 @@ where fn inject_node_event( &mut self, source: PeerId, - event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent, + event: CustomProtoHandlerOut<TMessage>, ) { match event { CustomProtoHandlerOut::CustomProtocolClosed { result } => { @@ -639,7 +639,7 @@ where params: &mut PollParameters, ) -> Async< NetworkBehaviourAction< - <Self::ProtocolsHandler as ProtocolsHandler>::InEvent, + CustomProtoHandlerIn<TMessage>, Self::OutEvent, >, > { diff --git a/substrate/core/network-libp2p/src/custom_proto/handler.rs b/substrate/core/network-libp2p/src/custom_proto/handler.rs index a9db9fcb99b..1b3c51babae 100644 --- a/substrate/core/network-libp2p/src/custom_proto/handler.rs +++ b/substrate/core/network-libp2p/src/custom_proto/handler.rs @@ -18,23 +18,25 @@ use crate::custom_proto::upgrade::{CustomMessage, CustomMessageId, RegisteredPro use crate::custom_proto::upgrade::{RegisteredProtocolEvent, RegisteredProtocolSubstream}; use futures::prelude::*; use libp2p::core::{ - Endpoint, ProtocolsHandler, ProtocolsHandlerEvent, + PeerId, Endpoint, ProtocolsHandler, ProtocolsHandlerEvent, + protocols_handler::IntoProtocolsHandler, protocols_handler::KeepAlive, protocols_handler::ProtocolsHandlerUpgrErr, upgrade::{InboundUpgrade, OutboundUpgrade} }; use log::{debug, error, warn}; use smallvec::{smallvec, SmallVec}; -use std::{error, fmt, io, mem, time::Duration, time::Instant}; +use std::{error, fmt, io, marker::PhantomData, mem, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::Delay; use void::Void; -/// Implements the `ProtocolsHandler` trait of libp2p. +/// Implements the `IntoProtocolsHandler` trait of libp2p. /// -/// Every time a connection with a remote is established, an instance of this struct is created and -/// sent to a background task dedicated to this connection. It handles all communications that are -/// specific to Substrate. +/// Every time a connection with a remote starts, an instance of this struct is created and +/// sent to a background task dedicated to this connection. Once the connection is established, +/// it is turned into a `CustomProtoHandler`. It then handles all communications that are specific +/// to Substrate on that connection. /// /// Note that there can be multiple instance of this struct simultaneously for same peer. However /// if that happens, only one main instance can communicate with the outer layers of the code. @@ -62,6 +64,49 @@ use void::Void; /// happens on one substream, we consider that we are disconnected. Re-enabling is performed by /// opening an outbound substream. /// +pub struct CustomProtoHandlerProto<TMessage, TSubstream> { + /// Configuration for the protocol upgrade to negotiate. + protocol: RegisteredProtocol<TMessage>, + + /// Marker to pin the generic type. + marker: PhantomData<TSubstream>, +} + +impl<TMessage, TSubstream> CustomProtoHandlerProto<TMessage, TSubstream> +where + TSubstream: AsyncRead + AsyncWrite, + TMessage: CustomMessage, +{ + /// Builds a new `CustomProtoHandlerProto`. + pub fn new(protocol: RegisteredProtocol<TMessage>) -> Self { + CustomProtoHandlerProto { + protocol, + marker: PhantomData, + } + } +} + +impl<TMessage, TSubstream> IntoProtocolsHandler for CustomProtoHandlerProto<TMessage, TSubstream> +where + TSubstream: AsyncRead + AsyncWrite, + TMessage: CustomMessage, +{ + type Handler = CustomProtoHandler<TMessage, TSubstream>; + + fn into_handler(self, _: &PeerId) -> Self::Handler { + CustomProtoHandler { + protocol: self.protocol, + state: ProtocolState::Init { + substreams: SmallVec::new(), + init_deadline: Delay::new(Instant::now() + Duration::from_secs(5)) + }, + events_queue: SmallVec::new(), + warm_up_end: Instant::now() + Duration::from_secs(5), + } + } +} + +/// The actual handler once the connection has been established. pub struct CustomProtoHandler<TMessage, TSubstream> { /// Configuration for the protocol upgrade to negotiate. protocol: RegisteredProtocol<TMessage>, @@ -311,19 +356,6 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage, { - /// Builds a new `CustomProtoHandler`. - pub fn new(protocol: RegisteredProtocol<TMessage>) -> Self { - CustomProtoHandler { - protocol, - state: ProtocolState::Init { - substreams: SmallVec::new(), - init_deadline: Delay::new(Instant::now() + Duration::from_secs(5)) - }, - events_queue: SmallVec::new(), - warm_up_end: Instant::now() + Duration::from_secs(5), - } - } - /// Enables the handler. fn enable(&mut self, endpoint: Endpoint) { self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { -- GitLab