From 385c4ddf69144842e944bf2b5e43cc0f2e787678 Mon Sep 17 00:00:00 2001 From: Pierre Krieger <pierre.krieger1708@gmail.com> Date: Fri, 16 Oct 2020 13:07:05 +0200 Subject: [PATCH] No longer actively open legacy substreams (#7076) * Allow remotes to not open a legacy substream * No longer actively open legacy substreams * Misc fixes * Line width * Special case first protocol as the one bearing the handshake * Legacy opening state no longer keeps connection alive * Remove now-unused code * Simplify inject_dial_upgrade_error * [chaos:basic] * [chaos:basic] * [chaos:basic] --- .../src/protocol/generic_proto/behaviour.rs | 21 --- .../protocol/generic_proto/handler/group.rs | 72 +++------ .../protocol/generic_proto/handler/legacy.rs | 140 ++++++------------ 3 files changed, 61 insertions(+), 172 deletions(-) diff --git a/substrate/client/network/src/protocol/generic_proto/behaviour.rs b/substrate/client/network/src/protocol/generic_proto/behaviour.rs index f0461ac9826..7b62b154016 100644 --- a/substrate/client/network/src/protocol/generic_proto/behaviour.rs +++ b/substrate/client/network/src/protocol/generic_proto/behaviour.rs @@ -1349,27 +1349,6 @@ impl NetworkBehaviour for GenericProto { self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } - - // Don't do anything for non-severe errors except report them. - NotifsHandlerOut::ProtocolError { is_severe, ref error } if !is_severe => { - debug!(target: "sub-libp2p", "Handler({:?}) => Benign protocol error: {:?}", - source, error) - } - - NotifsHandlerOut::ProtocolError { error, .. } => { - debug!(target: "sub-libp2p", - "Handler({:?}) => Severe protocol error: {:?}", - source, error); - // A severe protocol error happens when we detect a "bad" peer, such as a peer on - // a different chain, or a peer that doesn't speak the same protocol(s). We - // decrease the peer's reputation, hence lowering the chances we try this peer - // again in the short term. - self.peerset.report_peer( - source.clone(), - sc_peerset::ReputationChange::new(i32::min_value(), "Protocol error") - ); - self.disconnect_peer_inner(&source, Some(Duration::from_secs(5))); - } } } diff --git a/substrate/client/network/src/protocol/generic_proto/handler/group.rs b/substrate/client/network/src/protocol/generic_proto/handler/group.rs index f355fba60fb..fbfdb1cb6ab 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/group.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/group.rs @@ -53,8 +53,8 @@ use crate::protocol::generic_proto::{ }; use bytes::BytesMut; -use libp2p::core::{either::{EitherError, EitherOutput}, ConnectedPoint, PeerId}; -use libp2p::core::upgrade::{EitherUpgrade, UpgradeError, SelectUpgrade, InboundUpgrade, OutboundUpgrade}; +use libp2p::core::{either::EitherOutput, ConnectedPoint, PeerId}; +use libp2p::core::upgrade::{UpgradeError, SelectUpgrade, InboundUpgrade, OutboundUpgrade}; use libp2p::swarm::{ ProtocolsHandler, ProtocolsHandlerEvent, IntoProtocolsHandler, @@ -70,7 +70,7 @@ use futures::{ }; use log::{debug, error}; use parking_lot::{Mutex, RwLock}; -use std::{borrow::Cow, error, io, str, sync::Arc, task::{Context, Poll}}; +use std::{borrow::Cow, str, sync::Arc, task::{Context, Poll}}; /// Number of pending notifications in asynchronous contexts. /// See [`NotificationsSink::reserve_notification`] for context. @@ -230,14 +230,6 @@ pub enum NotifsHandlerOut { /// Message that has been received. message: BytesMut, }, - - /// An error has happened on the protocol level with this node. - ProtocolError { - /// If true the error is severe, such as a protocol violation. - is_severe: bool, - /// The error that happened. - error: Box<dyn error::Error + Send + Sync>, - }, } /// Sink connected directly to the node background task. Allows sending notifications to the peer. @@ -401,9 +393,9 @@ impl ProtocolsHandler for NotifsHandler { type OutEvent = NotifsHandlerOut; type Error = NotifsHandlerError; type InboundProtocol = SelectUpgrade<UpgradeCollec<NotificationsIn>, RegisteredProtocol>; - type OutboundProtocol = EitherUpgrade<NotificationsOut, RegisteredProtocol>; - // Index within the `out_handlers`; None for legacy - type OutboundOpenInfo = Option<usize>; + type OutboundProtocol = NotificationsOut; + // Index within the `out_handlers` + type OutboundOpenInfo = usize; type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> { @@ -433,13 +425,7 @@ impl ProtocolsHandler for NotifsHandler { out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output, num: Self::OutboundOpenInfo ) { - match (out, num) { - (EitherOutput::First(out), Some(num)) => - self.out_handlers[num].0.inject_fully_negotiated_outbound(out, ()), - (EitherOutput::Second(out), None) => - self.legacy.inject_fully_negotiated_outbound(out, ()), - _ => error!("inject_fully_negotiated_outbound called with wrong parameters"), - } + self.out_handlers[num].0.inject_fully_negotiated_outbound(out, ()) } fn inject_event(&mut self, message: NotifsHandlerIn) { @@ -488,45 +474,30 @@ impl ProtocolsHandler for NotifsHandler { fn inject_dial_upgrade_error( &mut self, - num: Option<usize>, - err: ProtocolsHandlerUpgrErr<EitherError<NotificationsHandshakeError, io::Error>> + num: usize, + err: ProtocolsHandlerUpgrErr<NotificationsHandshakeError> ) { - match (err, num) { - (ProtocolsHandlerUpgrErr::Timeout, Some(num)) => + match err { + ProtocolsHandlerUpgrErr::Timeout => self.out_handlers[num].0.inject_dial_upgrade_error( (), ProtocolsHandlerUpgrErr::Timeout ), - (ProtocolsHandlerUpgrErr::Timeout, None) => - self.legacy.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout), - (ProtocolsHandlerUpgrErr::Timer, Some(num)) => + ProtocolsHandlerUpgrErr::Timer => self.out_handlers[num].0.inject_dial_upgrade_error( (), ProtocolsHandlerUpgrErr::Timer ), - (ProtocolsHandlerUpgrErr::Timer, None) => - self.legacy.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timer), - (ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), Some(num)) => + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => self.out_handlers[num].0.inject_dial_upgrade_error( (), ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) ), - (ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), None) => - self.legacy.inject_dial_upgrade_error( - (), - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) - ), - (ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(err))), Some(num)) => + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) => self.out_handlers[num].0.inject_dial_upgrade_error( (), ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) ), - (ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(err))), None) => - self.legacy.inject_dial_upgrade_error( - (), - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) - ), - _ => error!("inject_dial_upgrade_error called with bad parameters"), } } @@ -632,12 +603,8 @@ impl ProtocolsHandler for NotifsHandler { if self.pending_handshake.is_none() { while let Poll::Ready(ev) = self.legacy.poll(cx) { match ev { - ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } => - return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: protocol - .map_upgrade(EitherUpgrade::B) - .map_info(|()| None) - }), + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, .. } => + match *protocol.info() {}, ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { received_handshake, .. @@ -663,10 +630,6 @@ impl ProtocolsHandler for NotifsHandler { NotifsHandlerOut::CustomMessage { message } )) }, - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) => - return Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::ProtocolError { is_severe, error } - )), ProtocolsHandlerEvent::Close(err) => return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))), } @@ -723,8 +686,7 @@ impl ProtocolsHandler for NotifsHandler { ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } => return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: protocol - .map_upgrade(EitherUpgrade::A) - .map_info(|()| Some(handler_num)) + .map_info(|()| handler_num), }), ProtocolsHandlerEvent::Close(err) => void::unreachable(err), diff --git a/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs b/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs index d17b5e612da..40409355378 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -30,8 +30,8 @@ use libp2p::swarm::{ }; use log::{debug, error}; use smallvec::{smallvec, SmallVec}; -use std::{borrow::Cow, collections::VecDeque, error, fmt, io, mem, time::Duration}; -use std::{pin::Pin, task::{Context, Poll}}; +use std::{borrow::Cow, collections::VecDeque, convert::Infallible, error, fmt, io, mem}; +use std::{pin::Pin, task::{Context, Poll}, time::Duration}; /// Implements the `IntoProtocolsHandler` trait of libp2p. /// @@ -108,10 +108,9 @@ impl IntoProtocolsHandler for LegacyProtoHandlerProto { self.protocol.clone() } - fn into_handler(self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler { + fn into_handler(self, remote_peer_id: &PeerId, _: &ConnectedPoint) -> Self::Handler { LegacyProtoHandler { protocol: self.protocol, - endpoint: connected_point.clone(), remote_peer_id: remote_peer_id.clone(), state: ProtocolState::Init { substreams: SmallVec::new(), @@ -134,15 +133,13 @@ pub struct LegacyProtoHandler { /// any influence on the behaviour. remote_peer_id: PeerId, - /// Whether we are the connection dialer or listener. Used to determine who, between the local - /// node and the remote node, has priority. - endpoint: ConnectedPoint, - /// Queue of events to send to the outside. /// /// This queue must only ever be modified to insert elements at the back, or remove the first /// element. - events_queue: VecDeque<ProtocolsHandlerEvent<RegisteredProtocol, (), LegacyProtoHandlerOut, ConnectionKillError>>, + events_queue: VecDeque< + ProtocolsHandlerEvent<RegisteredProtocol, Infallible, LegacyProtoHandlerOut, ConnectionKillError> + >, } /// State of the handler. @@ -156,12 +153,9 @@ enum ProtocolState { init_deadline: Delay, }, - /// Handler is opening a substream in order to activate itself. + /// Handler is ready to accept incoming substreams. /// If we are in this state, we haven't sent any `CustomProtocolOpen` yet. - Opening { - /// Deadline after which the opening is abnormally long. - deadline: Delay, - }, + Opening, /// Normal operating mode. Contains the substreams that are open. /// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside. @@ -229,14 +223,6 @@ pub enum LegacyProtoHandlerOut { /// Message that has been received. message: BytesMut, }, - - /// An error has happened on the protocol level with this node. - ProtocolError { - /// If true the error is severe, such as a protocol violation. - is_severe: bool, - /// The error that happened. - error: Box<dyn error::Error + Send + Sync>, - }, } impl LegacyProtoHandler { @@ -251,14 +237,7 @@ impl LegacyProtoHandler { ProtocolState::Init { substreams: mut incoming, .. } => { if incoming.is_empty() { - if let ConnectedPoint::Dialer { .. } = self.endpoint { - self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(self.protocol.clone(), ()), - }); - } - ProtocolState::Opening { - deadline: Delay::new(Duration::from_secs(60)) - } + ProtocolState::Opening } else { let event = LegacyProtoHandlerOut::CustomProtocolOpen { version: incoming[0].0.protocol_version(), @@ -316,7 +295,7 @@ impl LegacyProtoHandler { /// Polls the state for events. Optionally returns an event to produce. #[must_use] fn poll_state(&mut self, cx: &mut Context) - -> Option<ProtocolsHandlerEvent<RegisteredProtocol, (), LegacyProtoHandlerOut, ConnectionKillError>> { + -> Option<ProtocolsHandlerEvent<RegisteredProtocol, Infallible, LegacyProtoHandlerOut, ConnectionKillError>> { match mem::replace(&mut self.state, ProtocolState::Poisoned) { ProtocolState::Poisoned => { error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state", @@ -340,21 +319,9 @@ impl LegacyProtoHandler { None } - ProtocolState::Opening { mut deadline } => { - match Pin::new(&mut deadline).poll(cx) { - Poll::Ready(()) => { - let event = LegacyProtoHandlerOut::ProtocolError { - is_severe: true, - error: "Timeout when opening protocol".to_string().into(), - }; - self.state = ProtocolState::KillAsap; - Some(ProtocolsHandlerEvent::Custom(event)) - }, - Poll::Pending => { - self.state = ProtocolState::Opening { deadline }; - None - }, - } + ProtocolState::Opening => { + self.state = ProtocolState::Opening; + None } ProtocolState::Normal { mut substreams, mut shutdown } => { @@ -423,27 +390,35 @@ impl LegacyProtoHandler { // If `reenable` is `true`, that means we should open the substreams system again // after all the substreams are closed. if reenable && shutdown.is_empty() { - self.state = ProtocolState::Opening { - deadline: Delay::new(Duration::from_secs(60)) - }; - Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(self.protocol.clone(), ()), - }) + self.state = ProtocolState::Opening; } else { self.state = ProtocolState::Disabled { shutdown, reenable }; - None } + None } ProtocolState::KillAsap => None, } } +} + +impl ProtocolsHandler for LegacyProtoHandler { + type InEvent = LegacyProtoHandlerIn; + type OutEvent = LegacyProtoHandlerOut; + type Error = ConnectionKillError; + type InboundProtocol = RegisteredProtocol; + type OutboundProtocol = RegisteredProtocol; + type OutboundOpenInfo = Infallible; + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> { + SubstreamProtocol::new(self.protocol.clone(), ()) + } - /// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`. - fn inject_fully_negotiated( + fn inject_fully_negotiated_inbound( &mut self, - mut substream: RegisteredProtocolSubstream<NegotiatedSubstream>, - received_handshake: Vec<u8>, + (mut substream, received_handshake): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output, + (): () ) { self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { ProtocolState::Poisoned => { @@ -487,35 +462,13 @@ impl LegacyProtoHandler { ProtocolState::KillAsap => ProtocolState::KillAsap, }; } -} - -impl ProtocolsHandler for LegacyProtoHandler { - type InEvent = LegacyProtoHandlerIn; - type OutEvent = LegacyProtoHandlerOut; - type Error = ConnectionKillError; - type InboundProtocol = RegisteredProtocol; - type OutboundProtocol = RegisteredProtocol; - type OutboundOpenInfo = (); - type InboundOpenInfo = (); - - fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> { - SubstreamProtocol::new(self.protocol.clone(), ()) - } - - fn inject_fully_negotiated_inbound( - &mut self, - (substream, handshake): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output, - (): () - ) { - self.inject_fully_negotiated(substream, handshake); - } fn inject_fully_negotiated_outbound( &mut self, - (substream, handshake): <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output, - _: Self::OutboundOpenInfo + _: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output, + unreachable: Self::OutboundOpenInfo ) { - self.inject_fully_negotiated(substream, handshake); + match unreachable {} } fn inject_event(&mut self, message: LegacyProtoHandlerIn) { @@ -525,24 +478,19 @@ impl ProtocolsHandler for LegacyProtoHandler { } } - fn inject_dial_upgrade_error(&mut self, _: (), err: ProtocolsHandlerUpgrErr<io::Error>) { - let is_severe = match err { - ProtocolsHandlerUpgrErr::Upgrade(_) => true, - _ => false, - }; - - self.events_queue.push_back(ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { - is_severe, - error: Box::new(err), - })); + fn inject_dial_upgrade_error( + &mut self, + unreachable: Self::OutboundOpenInfo, + _: ProtocolsHandlerUpgrErr<io::Error> + ) { + match unreachable {} } fn connection_keep_alive(&self) -> KeepAlive { match self.state { - ProtocolState::Init { .. } | ProtocolState::Opening { .. } | - ProtocolState::Normal { .. } => KeepAlive::Yes, - ProtocolState::Disabled { .. } | ProtocolState::Poisoned | - ProtocolState::KillAsap => KeepAlive::No, + ProtocolState::Init { .. } | ProtocolState::Normal { .. } => KeepAlive::Yes, + ProtocolState::Opening { .. } | ProtocolState::Disabled { .. } | + ProtocolState::Poisoned | ProtocolState::KillAsap => KeepAlive::No, } } -- GitLab