diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index c17bd30b0b70e7d29a86e7ee3d6cb15e3aed7e29..dddd6cddadc93e107826586af5ac7a33a262b493 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -6231,6 +6231,7 @@ dependencies = [ "nohash-hasher", "parity-scale-codec", "parking_lot 0.10.0", + "pin-project", "prost", "prost-build", "quickcheck", @@ -8564,6 +8565,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b7ffb36714206d2f5f05d61a2bc350415c642f2c54433f0ebf829afbe41d570" dependencies = [ "bytes 0.5.4", + "futures 0.3.4", "futures_codec", ] diff --git a/substrate/client/finality-grandpa/src/communication/mod.rs b/substrate/client/finality-grandpa/src/communication/mod.rs index 050a3c8642fde81056af71f6076fd5534f9c8a6d..b5600c1c0d8975c0666e5934c11e4435348d2ed0 100644 --- a/substrate/client/finality-grandpa/src/communication/mod.rs +++ b/substrate/client/finality-grandpa/src/communication/mod.rs @@ -65,6 +65,7 @@ mod periodic; pub(crate) mod tests; pub use sp_finality_grandpa::GRANDPA_ENGINE_ID; +pub const GRANDPA_PROTOCOL_NAME: &[u8] = b"/paritytech/grandpa/1"; // cost scalars for reporting peers. mod cost { @@ -185,7 +186,12 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> { ); let validator = Arc::new(validator); - let gossip_engine = GossipEngine::new(service.clone(), GRANDPA_ENGINE_ID, validator.clone()); + let gossip_engine = GossipEngine::new( + service.clone(), + GRANDPA_ENGINE_ID, + GRANDPA_PROTOCOL_NAME, + validator.clone() + ); { // register all previous votes with the gossip service so that they're diff --git a/substrate/client/finality-grandpa/src/communication/tests.rs b/substrate/client/finality-grandpa/src/communication/tests.rs index 5506512b531d19ff64e439d9430f59d9eeb79e67..96761a2f3c07c567aaefdc3f0540dee3c0cabb1c 100644 --- a/substrate/client/finality-grandpa/src/communication/tests.rs +++ b/substrate/client/finality-grandpa/src/communication/tests.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use sp_keyring::Ed25519Keyring; use parity_scale_codec::Encode; use sp_runtime::{ConsensusEngineId, traits::NumberFor}; -use std::{pin::Pin, task::{Context, Poll}}; +use std::{borrow::Cow, pin::Pin, task::{Context, Poll}}; use crate::environment::SharedVoterSetState; use sp_finality_grandpa::{AuthorityList, GRANDPA_ENGINE_ID}; use super::gossip::{self, GossipValidator}; @@ -61,7 +61,7 @@ impl sc_network_gossip::Network<Block> for TestNetwork { let _ = self.sender.unbounded_send(Event::WriteNotification(who, message)); } - fn register_notifications_protocol(&self, _: ConsensusEngineId) {} + fn register_notifications_protocol(&self, _: ConsensusEngineId, _: Cow<'static, [u8]>) {} fn announce(&self, block: Hash, _associated_data: Vec<u8>) { let _ = self.sender.unbounded_send(Event::Announce(block)); diff --git a/substrate/client/finality-grandpa/src/lib.rs b/substrate/client/finality-grandpa/src/lib.rs index 36b57024c9644877bd038dc92456760f488465c4..650b59dfff662a513836d5cf53d30e9ee9c5d3ce 100644 --- a/substrate/client/finality-grandpa/src/lib.rs +++ b/substrate/client/finality-grandpa/src/lib.rs @@ -886,7 +886,10 @@ pub fn setup_disabled_grandpa<B, E, Block: BlockT, RA, N>( // We register the GRANDPA protocol so that we don't consider it an anomaly // to receive GRANDPA messages on the network. We don't process the // messages. - network.register_notifications_protocol(communication::GRANDPA_ENGINE_ID); + network.register_notifications_protocol( + communication::GRANDPA_ENGINE_ID, + From::from(communication::GRANDPA_PROTOCOL_NAME), + ); Ok(()) } diff --git a/substrate/client/network-gossip/src/bridge.rs b/substrate/client/network-gossip/src/bridge.rs index 7968e59d0704ef617e3efd7d1144c539c42b7a9b..c911766aba40a4f41b0588ae2c33a9a51d5daab4 100644 --- a/substrate/client/network-gossip/src/bridge.rs +++ b/substrate/client/network-gossip/src/bridge.rs @@ -24,7 +24,7 @@ use futures::{prelude::*, channel::mpsc}; use libp2p::PeerId; use parking_lot::Mutex; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; -use std::{pin::Pin, sync::Arc, task::{Context, Poll}}; +use std::{borrow::Cow, pin::Pin, sync::Arc, task::{Context, Poll}}; /// Wraps around an implementation of the `Network` crate and provides gossiping capabilities on /// top of it. @@ -48,6 +48,7 @@ impl<B: BlockT> GossipEngine<B> { pub fn new<N: Network<B> + Send + Clone + 'static>( mut network: N, engine_id: ConsensusEngineId, + protocol_name: impl Into<Cow<'static, [u8]>>, validator: Arc<dyn Validator<B>>, ) -> Self where B: 'static { let mut state_machine = ConsensusGossip::new(); @@ -56,7 +57,7 @@ impl<B: BlockT> GossipEngine<B> { // might miss events. let network_event_stream = network.event_stream(); - network.register_notifications_protocol(engine_id); + network.register_notifications_protocol(engine_id, protocol_name.into()); state_machine.register_validator(&mut network, engine_id, validator); let inner = Arc::new(Mutex::new(GossipEngineInner { diff --git a/substrate/client/network-gossip/src/lib.rs b/substrate/client/network-gossip/src/lib.rs index c4f057a775f47e6c5258ce97878096874fe81b66..abb3f32972b0d4d29197106126656a1eec81436b 100644 --- a/substrate/client/network-gossip/src/lib.rs +++ b/substrate/client/network-gossip/src/lib.rs @@ -61,7 +61,7 @@ pub use self::validator::{DiscardAll, MessageIntent, Validator, ValidatorContext use futures::prelude::*; use sc_network::{specialization::NetworkSpecialization, Event, ExHashT, NetworkService, PeerId, ReputationChange}; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; -use std::{pin::Pin, sync::Arc}; +use std::{borrow::Cow, pin::Pin, sync::Arc}; mod bridge; mod state_machine; @@ -86,7 +86,8 @@ pub trait Network<B: BlockT> { /// See the documentation of [`NetworkService:register_notifications_protocol`] for more information. fn register_notifications_protocol( &self, - engine_id: ConsensusEngineId + engine_id: ConsensusEngineId, + protocol_name: Cow<'static, [u8]>, ); /// Notify everyone we're connected to that we have the given block. @@ -116,8 +117,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Network<B> for Arc<Netw fn register_notifications_protocol( &self, engine_id: ConsensusEngineId, + protocol_name: Cow<'static, [u8]>, ) { - NetworkService::register_notifications_protocol(self, engine_id) + NetworkService::register_notifications_protocol(self, engine_id, protocol_name) } fn announce(&self, block: B::Hash, associated_data: Vec<u8>) { diff --git a/substrate/client/network/Cargo.toml b/substrate/client/network/Cargo.toml index e878e47e693f8dfb222d8b6a332914a5f82763d6..4c2f2d0c314dc9fa5423d7533ac80425e8ebc61e 100644 --- a/substrate/client/network/Cargo.toml +++ b/substrate/client/network/Cargo.toml @@ -36,6 +36,7 @@ sc-block-builder = { version = "0.8", path = "../block-builder" } sc-client = { version = "0.8", path = "../" } sc-client-api = { version = "2.0.0", path = "../api" } sc-peerset = { version = "2.0.0", path = "../peerset" } +pin-project = "0.4.6" serde = { version = "1.0.101", features = ["derive"] } serde_json = "1.0.41" slog = { version = "2.5.2", features = ["nested-values"] } @@ -51,7 +52,7 @@ sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" } substrate-test-client = { version = "2.0.0", optional = true, path = "../../test-utils/client" } substrate-test-runtime-client = { version = "2.0.0", optional = true, path = "../../test-utils/runtime/client" } thiserror = "1" -unsigned-varint = { version = "0.3.0", features = ["futures-codec"] } +unsigned-varint = { version = "0.3.1", features = ["futures", "futures-codec"] } void = "1.0.2" zeroize = "1.0.0" diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index d5e7ae6252ca147e59da50be6c22b8acb7d772f0..fe75649baca3cdc85f307b25f02b0070c680f3e7 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -15,10 +15,10 @@ // along with Substrate. If not, see <http://www.gnu.org/licenses/>. use crate::{DiscoveryNetBehaviour, config::ProtocolId}; -use legacy_proto::{LegacyProto, LegacyProtoOut}; use crate::utils::interval; use bytes::{Bytes, BytesMut}; use futures::prelude::*; +use generic_proto::{GenericProto, GenericProtoOut}; use libp2p::{Multiaddr, PeerId}; use libp2p::core::{ConnectedPoint, nodes::listeners::ListenerId}; use libp2p::swarm::{ProtocolsHandler, IntoProtocolsHandler}; @@ -36,13 +36,14 @@ use sp_runtime::traits::{ }; use sp_arithmetic::traits::SaturatedConversion; use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId}; -use message::generic::{Message as GenericMessage, ConsensusMessage}; +use message::generic::Message as GenericMessage; use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData}; use specialization::NetworkSpecialization; use sync::{ChainSync, SyncState}; use crate::service::{TransactionPool, ExHashT}; use crate::config::{BoxFinalityProofRequestBuilder, Roles}; use rustc_hex::ToHex; +use std::borrow::Cow; use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; use std::fmt::Write; @@ -64,7 +65,7 @@ pub mod api { } } -mod legacy_proto; +mod generic_proto; mod util; pub mod block_requests; @@ -158,9 +159,11 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> { /// When asked for a proof of finality, we use this struct to build one. finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>, /// Handles opening the unique substream and sending and receiving raw messages. - behaviour: LegacyProto, - /// List of notification protocols that have been registered. - registered_notif_protocols: HashSet<ConsensusEngineId>, + behaviour: GenericProto, + /// For each legacy gossiping engine ID, the corresponding new protocol name. + protocol_name_by_engine: HashMap<ConsensusEngineId, Cow<'static, [u8]>>, + /// For each protocol name, the legacy gossiping engine ID. + protocol_engine_by_name: HashMap<Cow<'static, [u8]>, ConsensusEngineId>, } #[derive(Default)] @@ -207,7 +210,7 @@ pub struct PeerInfo<B: BlockT> { } struct LightDispatchIn<'a> { - behaviour: &'a mut LegacyProto, + behaviour: &'a mut GenericProto, peerset: sc_peerset::PeersetHandle, } @@ -347,7 +350,7 @@ pub trait Context<B: BlockT> { /// Protocol context. struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { - behaviour: &'a mut LegacyProto, + behaviour: &'a mut GenericProto, context_data: &'a mut ContextData<B, H>, peerset_handle: &'a sc_peerset::PeersetHandle, } @@ -355,7 +358,7 @@ struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> { fn new( context_data: &'a mut ContextData<B, H>, - behaviour: &'a mut LegacyProto, + behaviour: &'a mut GenericProto, peerset_handle: &'a sc_peerset::PeersetHandle, ) -> Self { ProtocolContext { context_data, peerset_handle, behaviour } @@ -442,7 +445,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { let (peerset, peerset_handle) = sc_peerset::Peerset::from_config(peerset_config); let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>(); - let behaviour = LegacyProto::new(protocol_id, versions, peerset); + let behaviour = GenericProto::new(protocol_id, versions, peerset); let protocol = Protocol { tick_timeout: Box::pin(interval(TICK_TIMEOUT)), @@ -463,7 +466,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { finality_proof_provider, peerset_handle: peerset_handle.clone(), behaviour, - registered_notif_protocols: HashSet::new(), + protocol_name_by_engine: HashMap::new(), + protocol_engine_by_name: HashMap::new(), }; Ok((protocol, peerset_handle)) @@ -646,7 +650,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { GenericMessage::RemoteReadChildRequest(request) => self.on_remote_read_child_request(who, request), GenericMessage::Consensus(msg) => - return if self.registered_notif_protocols.contains(&msg.engine_id) { + return if self.protocol_name_by_engine.contains_key(&msg.engine_id) { CustomMessageOutcome::NotificationsReceived { remote: who.clone(), messages: vec![(msg.engine_id, From::from(msg.data))], @@ -659,7 +663,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { let messages = messages .into_iter() .filter_map(|msg| { - if self.registered_notif_protocols.contains(&msg.engine_id) { + if self.protocol_name_by_engine.contains_key(&msg.engine_id) { Some((msg.engine_id, From::from(msg.data))) } else { warn!(target: "sync", "Received message on non-registered protocol: {:?}", msg.engine_id); @@ -1060,7 +1064,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { // Notify all the notification protocols as open. CustomMessageOutcome::NotificationStreamOpened { remote: who, - protocols: self.registered_notif_protocols.iter().cloned().collect(), + protocols: self.protocol_name_by_engine.keys().cloned().collect(), roles: info.roles, } } @@ -1075,18 +1079,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { engine_id: ConsensusEngineId, message: impl Into<Vec<u8>> ) { - if !self.registered_notif_protocols.contains(&engine_id) { + if let Some(protocol_name) = self.protocol_name_by_engine.get(&engine_id) { + self.behaviour.write_notification(&target, engine_id, protocol_name.clone(), message); + } else { error!( target: "sub-libp2p", "Sending a notification with a protocol that wasn't registered: {:?}", engine_id ); } - - self.send_message(&target, GenericMessage::Consensus(ConsensusMessage { - engine_id, - data: message.into(), - })); } /// Registers a new notifications protocol. @@ -1096,9 +1097,14 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { pub fn register_notifications_protocol( &mut self, engine_id: ConsensusEngineId, + protocol_name: impl Into<Cow<'static, [u8]>>, ) -> Vec<event::Event> { - if !self.registered_notif_protocols.insert(engine_id) { - error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", engine_id); + let protocol_name = protocol_name.into(); + if self.protocol_name_by_engine.insert(engine_id, protocol_name.clone()).is_some() { + error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", protocol_name); + } else { + self.behaviour.register_notif_protocol(protocol_name.clone(), engine_id, Vec::new()); + self.protocol_engine_by_name.insert(protocol_name, engine_id); } // Registering a protocol while we already have open connections isn't great, but for now @@ -1833,7 +1839,7 @@ pub enum CustomMessageOutcome<B: BlockT> { } fn send_request<B: BlockT, H: ExHashT>( - behaviour: &mut LegacyProto, + behaviour: &mut GenericProto, stats: &mut HashMap<&'static str, PacketStats>, peers: &mut HashMap<PeerId, Peer<B, H>>, who: &PeerId, @@ -1854,7 +1860,7 @@ fn send_request<B: BlockT, H: ExHashT>( } fn send_message<B: BlockT>( - behaviour: &mut LegacyProto, + behaviour: &mut GenericProto, stats: &mut HashMap<&'static str, PacketStats>, who: &PeerId, message: Message<B>, @@ -1868,7 +1874,7 @@ fn send_message<B: BlockT>( impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviour for Protocol<B, S, H> { - type ProtocolsHandler = <LegacyProto as NetworkBehaviour>::ProtocolsHandler; + type ProtocolsHandler = <GenericProto as NetworkBehaviour>::ProtocolsHandler; type OutEvent = CustomMessageOutcome<B>; fn new_handler(&mut self) -> Self::ProtocolsHandler { @@ -1954,25 +1960,21 @@ Protocol<B, S, H> { }; let outcome = match event { - LegacyProtoOut::CustomProtocolOpen { peer_id, version, .. } => { - debug_assert!( - version <= CURRENT_VERSION as u8 - && version >= MIN_VERSION as u8 - ); + GenericProtoOut::CustomProtocolOpen { peer_id, .. } => { self.on_peer_connected(peer_id.clone()); CustomMessageOutcome::None } - LegacyProtoOut::CustomProtocolClosed { peer_id, .. } => { + GenericProtoOut::CustomProtocolClosed { peer_id, .. } => { self.on_peer_disconnected(peer_id.clone()); // Notify all the notification protocols as closed. CustomMessageOutcome::NotificationStreamClosed { remote: peer_id, - protocols: self.registered_notif_protocols.iter().cloned().collect(), + protocols: self.protocol_name_by_engine.keys().cloned().collect(), } }, - LegacyProtoOut::CustomMessage { peer_id, message } => + GenericProtoOut::CustomMessage { peer_id, message } => self.on_custom_message(peer_id, message), - LegacyProtoOut::Clogged { peer_id, messages } => { + GenericProtoOut::Clogged { peer_id, messages } => { debug!(target: "sync", "{} clogging messages:", messages.len()); for msg in messages.into_iter().take(5) { let message: Option<Message<B>> = Decode::decode(&mut &msg[..]).ok(); diff --git a/substrate/client/network/src/protocol/legacy_proto.rs b/substrate/client/network/src/protocol/generic_proto.rs similarity index 86% rename from substrate/client/network/src/protocol/legacy_proto.rs rename to substrate/client/network/src/protocol/generic_proto.rs index 434782f7d5065de6372fa651f1ed1ac29b0209aa..f703287f386fdcf9aebcfc8d3fa3d0c5971c7eff 100644 --- a/substrate/client/network/src/protocol/legacy_proto.rs +++ b/substrate/client/network/src/protocol/generic_proto.rs @@ -17,10 +17,10 @@ //! Implementation of libp2p's `NetworkBehaviour` trait that opens a single substream with the //! remote and then allows any communication with them. //! -//! The `Protocol` struct uses `LegacyProto` in order to open substreams with the rest of the +//! The `Protocol` struct uses `GenericProto` in order to open substreams with the rest of the //! network, then performs the Substrate protocol handling on top. -pub use self::behaviour::{LegacyProto, LegacyProtoOut}; +pub use self::behaviour::{GenericProto, GenericProtoOut}; mod behaviour; mod handler; diff --git a/substrate/client/network/src/protocol/legacy_proto/behaviour.rs b/substrate/client/network/src/protocol/generic_proto/behaviour.rs similarity index 86% rename from substrate/client/network/src/protocol/legacy_proto/behaviour.rs rename to substrate/client/network/src/protocol/generic_proto/behaviour.rs index 69c89be9a36957f808773a76d54e8b43ea195251..24e96681a084afcb71b9f41b5f78670a384bc80e 100644 --- a/substrate/client/network/src/protocol/legacy_proto/behaviour.rs +++ b/substrate/client/network/src/protocol/generic_proto/behaviour.rs @@ -15,9 +15,12 @@ // along with Substrate. If not, see <http://www.gnu.org/licenses/>. use crate::{DiscoveryNetBehaviour, config::ProtocolId}; -use crate::protocol::legacy_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOut, CustomProtoHandlerIn}; -use crate::protocol::legacy_proto::upgrade::RegisteredProtocol; +use crate::protocol::message::generic::{Message as GenericMessage, ConsensusMessage}; +use crate::protocol::generic_proto::handler::{NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn}; +use crate::protocol::generic_proto::upgrade::RegisteredProtocol; + use bytes::BytesMut; +use codec::Encode as _; use fnv::FnvHashMap; use futures::prelude::*; use libp2p::core::{ConnectedPoint, Multiaddr, PeerId}; @@ -25,16 +28,32 @@ use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use log::{debug, error, trace, warn}; use rand::distributions::{Distribution as _, Uniform}; use smallvec::SmallVec; -use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, mem, pin::Pin}; -use std::time::Duration; -use wasm_timer::Instant; +use sp_runtime::ConsensusEngineId; +use std::{borrow::Cow, collections::hash_map::Entry, cmp}; +use std::{error, mem, pin::Pin, str, time::Duration}; use std::task::{Context, Poll}; +use wasm_timer::Instant; /// Network behaviour that handles opening substreams for custom protocols with other nodes. /// +/// ## Legacy vs new protocol +/// +/// The `GenericProto` behaves as following: +/// +/// - Whenever a connection is established, we open a single substream (called "legay protocol" in +/// the source code). This substream name depends on the `protocol_id` and `versions` passed at +/// initialization. If the remote refuses this substream, we close the connection. +/// +/// - For each registered protocol, we also open an additional substream for this protocol. If the +/// remote refuses this substream, then it's fine. +/// +/// - Whenever we want to send a message, we can call either `send_packet` to force the legacy +/// substream, or `write_notification` to indicate a registered protocol. If the registered +/// protocol was refused or isn't supported by the remote, we always use the legacy instead. +/// /// ## How it works /// -/// The role of the `LegacyProto` is to synchronize the following components: +/// The role of the `GenericProto` is to synchronize the following components: /// /// - The libp2p swarm that opens new connections and reports disconnects. /// - The connection handler (see `handler.rs`) that handles individual connections. @@ -60,9 +79,12 @@ use std::task::{Context, Poll}; /// Note that this "banning" system is not an actual ban. If a "banned" node tries to connect to /// us, we accept the connection. The "banning" system is only about delaying dialing attempts. /// -pub struct LegacyProto { - /// List of protocols to open with peers. Never modified. - protocol: RegisteredProtocol, +pub struct GenericProto { + /// Legacy protocol to open with peers. Never modified. + legacy_protocol: RegisteredProtocol, + + /// Notification protocols. Entries are only ever added and not removed. + notif_protocols: Vec<(Cow<'static, [u8]>, ConsensusEngineId, Vec<u8>)>, /// Receiver for instructions about who to connect to or disconnect from. peerset: sc_peerset::Peerset, @@ -79,7 +101,7 @@ pub struct LegacyProto { next_incoming_index: sc_peerset::IncomingIndex, /// Events to produce from `poll()`. - events: SmallVec<[NetworkBehaviourAction<CustomProtoHandlerIn, LegacyProtoOut>; 4]>, + events: SmallVec<[NetworkBehaviourAction<NotifsHandlerIn, GenericProtoOut>; 4]>, } /// State of a peer we're connected to. @@ -183,13 +205,11 @@ struct IncomingPeer { incoming_id: sc_peerset::IncomingIndex, } -/// Event that can be emitted by the `LegacyProto`. +/// Event that can be emitted by the `GenericProto`. #[derive(Debug)] -pub enum LegacyProtoOut { +pub enum GenericProtoOut { /// Opened a custom protocol with the remote. CustomProtocolOpen { - /// Version of the protocol that has been opened. - version: u8, /// Id of the node we have opened a connection with. peer_id: PeerId, /// Endpoint used for this custom protocol. @@ -205,6 +225,8 @@ pub enum LegacyProtoOut { }, /// Receives a message on a custom protocol substream. + /// + /// Also concerns received notifications for the notifications API. CustomMessage { /// Id of the peer the message came from. peer_id: PeerId, @@ -222,17 +244,18 @@ pub enum LegacyProtoOut { }, } -impl LegacyProto { +impl GenericProto { /// Creates a `CustomProtos`. pub fn new( protocol: impl Into<ProtocolId>, versions: &[u8], peerset: sc_peerset::Peerset, ) -> Self { - let protocol = RegisteredProtocol::new(protocol, versions); + let legacy_protocol = RegisteredProtocol::new(protocol, versions); - LegacyProto { - protocol, + GenericProto { + legacy_protocol, + notif_protocols: Vec::new(), peerset, peers: FnvHashMap::default(), incoming: SmallVec::new(), @@ -241,6 +264,19 @@ impl LegacyProto { } } + /// Registers a new notifications protocol. + /// + /// You are very strongly encouraged to call this method very early on. Any open connection + /// will retain the protocols that were registered then, and not any new one. + pub fn register_notif_protocol( + &mut self, + protocol_name: impl Into<Cow<'static, [u8]>>, + engine_id: ConsensusEngineId, + handshake_msg: impl Into<Vec<u8>> + ) { + self.notif_protocols.push((protocol_name.into(), engine_id, handshake_msg.into())); + } + /// Returns the list of all the peers we have an open channel to. pub fn open_peers<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a { self.peers.iter().filter(|(_, state)| state.is_open()).map(|(id, _)| id) @@ -292,7 +328,7 @@ impl LegacyProto { debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), - event: CustomProtoHandlerIn::Disable, + event: NotifsHandlerIn::Disable, }); let banned_until = ban.map(|dur| Instant::now() + dur); *entry.into_mut() = PeerState::Disabled { open, connected_point, banned_until } @@ -313,7 +349,7 @@ impl LegacyProto { debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), - event: CustomProtoHandlerIn::Disable, + event: NotifsHandlerIn::Disable, }); let banned_until = ban.map(|dur| Instant::now() + dur); *entry.into_mut() = PeerState::Disabled { open: false, connected_point, banned_until } @@ -339,6 +375,44 @@ impl LegacyProto { } } + /// Sends a notification to a peer. + /// + /// Has no effect if the custom protocol is not open with the given peer. + /// + /// Also note that even if we have a valid open substream, it may in fact be already closed + /// without us knowing, in which case the packet will not be received. + /// + /// > **Note**: Ideally the `engine_id` parameter wouldn't be necessary. See the documentation + /// > of [`NotifsHandlerIn`] for more information. + pub fn write_notification( + &mut self, + target: &PeerId, + engine_id: ConsensusEngineId, + protocol_name: Cow<'static, [u8]>, + message: impl Into<Vec<u8>>, + ) { + if !self.is_open(target) { + return; + } + + trace!( + target: "sub-libp2p", + "External API => Notification for {:?} with protocol {:?}", + target, + str::from_utf8(&protocol_name) + ); + trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); + + self.events.push(NetworkBehaviourAction::SendEvent { + peer_id: target.clone(), + event: NotifsHandlerIn::SendNotification { + message: message.into(), + engine_id, + protocol_name, + }, + }); + } + /// Sends a message to a peer. /// /// Has no effect if the custom protocol is not open with the given peer. @@ -354,7 +428,7 @@ impl LegacyProto { trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: target.clone(), - event: CustomProtoHandlerIn::SendCustomMessage { + event: NotifsHandlerIn::SendLegacy { message, } }); @@ -416,7 +490,7 @@ impl LegacyProto { debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", occ_entry.key()); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: occ_entry.key().clone(), - event: CustomProtoHandlerIn::Enable, + event: NotifsHandlerIn::Enable, }); *occ_entry.into_mut() = PeerState::Enabled { connected_point, open }; }, @@ -434,7 +508,7 @@ impl LegacyProto { debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", occ_entry.key()); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: occ_entry.key().clone(), - event: CustomProtoHandlerIn::Enable, + event: NotifsHandlerIn::Enable, }); *occ_entry.into_mut() = PeerState::Enabled { connected_point, open: false }; }, @@ -491,7 +565,7 @@ impl LegacyProto { debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", entry.key()); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: entry.key().clone(), - event: CustomProtoHandlerIn::Disable, + event: NotifsHandlerIn::Disable, }); *entry.into_mut() = PeerState::Disabled { open, connected_point, banned_until: None } }, @@ -555,7 +629,7 @@ impl LegacyProto { debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", incoming.peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: incoming.peer_id, - event: CustomProtoHandlerIn::Enable, + event: NotifsHandlerIn::Enable, }); *state = PeerState::Enabled { open: false, connected_point }; @@ -597,13 +671,13 @@ impl LegacyProto { debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", incoming.peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: incoming.peer_id, - event: CustomProtoHandlerIn::Disable, + event: NotifsHandlerIn::Disable, }); *state = PeerState::Disabled { open: false, connected_point, banned_until: None }; } } -impl DiscoveryNetBehaviour for LegacyProto { +impl DiscoveryNetBehaviour for GenericProto { fn add_discovered_nodes(&mut self, peer_ids: impl Iterator<Item = PeerId>) { self.peerset.discovered(peer_ids.into_iter().map(|peer_id| { debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id); @@ -612,12 +686,12 @@ impl DiscoveryNetBehaviour for LegacyProto { } } -impl NetworkBehaviour for LegacyProto { - type ProtocolsHandler = CustomProtoHandlerProto; - type OutEvent = LegacyProtoOut; +impl NetworkBehaviour for GenericProto { + type ProtocolsHandler = NotifsHandlerProto; + type OutEvent = GenericProtoOut; fn new_handler(&mut self) -> Self::ProtocolsHandler { - CustomProtoHandlerProto::new(self.protocol.clone()) + NotifsHandlerProto::new(self.legacy_protocol.clone(), self.notif_protocols.clone()) } fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> { @@ -634,7 +708,7 @@ impl NetworkBehaviour for LegacyProto { debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), - event: CustomProtoHandlerIn::Enable, + event: NotifsHandlerIn::Enable, }); *st = PeerState::Enabled { open: false, connected_point }; } @@ -677,7 +751,7 @@ impl NetworkBehaviour for LegacyProto { debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), - event: CustomProtoHandlerIn::Disable, + event: NotifsHandlerIn::Disable, }); *st = PeerState::Disabled { open: false, connected_point, banned_until }; } @@ -707,7 +781,7 @@ impl NetworkBehaviour for LegacyProto { } if open { debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); - let event = LegacyProtoOut::CustomProtocolClosed { + let event = GenericProtoOut::CustomProtocolClosed { peer_id: peer_id.clone(), reason: "Disconnected by libp2p".into(), }; @@ -724,7 +798,7 @@ impl NetworkBehaviour for LegacyProto { self.peers.insert(peer_id.clone(), PeerState::Banned { until: timer_deadline }); if open { debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); - let event = LegacyProtoOut::CustomProtocolClosed { + let event = GenericProtoOut::CustomProtocolClosed { peer_id: peer_id.clone(), reason: "Disconnected by libp2p".into(), }; @@ -746,7 +820,7 @@ impl NetworkBehaviour for LegacyProto { if open { debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); - let event = LegacyProtoOut::CustomProtocolClosed { + let event = GenericProtoOut::CustomProtocolClosed { peer_id: peer_id.clone(), reason: "Disconnected by libp2p".into(), }; @@ -817,10 +891,10 @@ impl NetworkBehaviour for LegacyProto { fn inject_node_event( &mut self, source: PeerId, - event: CustomProtoHandlerOut, + event: NotifsHandlerOut, ) { match event { - CustomProtoHandlerOut::CustomProtocolClosed { reason } => { + NotifsHandlerOut::Closed { reason } => { debug!(target: "sub-libp2p", "Handler({:?}) => Closed: {}", source, reason); let mut entry = if let Entry::Occupied(entry) = self.peers.entry(source.clone()) { @@ -831,7 +905,7 @@ impl NetworkBehaviour for LegacyProto { }; debug!(target: "sub-libp2p", "External API <= Closed({:?})", source); - let event = LegacyProtoOut::CustomProtocolClosed { + let event = GenericProtoOut::CustomProtocolClosed { reason, peer_id: source.clone(), }; @@ -847,7 +921,7 @@ impl NetworkBehaviour for LegacyProto { debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", source); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: source.clone(), - event: CustomProtoHandlerIn::Disable, + event: NotifsHandlerIn::Disable, }); *entry.into_mut() = PeerState::Disabled { @@ -873,8 +947,8 @@ impl NetworkBehaviour for LegacyProto { } } - CustomProtoHandlerOut::CustomProtocolOpen { version } => { - debug!(target: "sub-libp2p", "Handler({:?}) => Open: version {:?}", source, version); + NotifsHandlerOut::Open => { + debug!(target: "sub-libp2p", "Handler({:?}) => Open", source); let endpoint = match self.peers.get_mut(&source) { Some(PeerState::Enabled { ref mut open, ref connected_point }) | Some(PeerState::DisabledPendingEnable { ref mut open, ref connected_point, .. }) | @@ -889,8 +963,7 @@ impl NetworkBehaviour for LegacyProto { }; debug!(target: "sub-libp2p", "External API <= Open({:?})", source); - let event = LegacyProtoOut::CustomProtocolOpen { - version, + let event = GenericProtoOut::CustomProtocolOpen { peer_id: source, endpoint, }; @@ -898,11 +971,11 @@ impl NetworkBehaviour for LegacyProto { self.events.push(NetworkBehaviourAction::GenerateEvent(event)); } - CustomProtoHandlerOut::CustomMessage { message } => { + NotifsHandlerOut::CustomMessage { message } => { debug_assert!(self.is_open(&source)); trace!(target: "sub-libp2p", "Handler({:?}) => Message", source); trace!(target: "sub-libp2p", "External API <= Message({:?})", source); - let event = LegacyProtoOut::CustomMessage { + let event = GenericProtoOut::CustomMessage { peer_id: source, message, }; @@ -910,25 +983,50 @@ impl NetworkBehaviour for LegacyProto { self.events.push(NetworkBehaviourAction::GenerateEvent(event)); } - CustomProtoHandlerOut::Clogged { messages } => { + NotifsHandlerOut::Notification { protocol_name, engine_id, message } => { + debug_assert!(self.is_open(&source)); + trace!( + target: "sub-libp2p", + "Handler({:?}) => Notification({:?})", + source, + str::from_utf8(&protocol_name) + ); + trace!(target: "sub-libp2p", "External API <= Message({:?})", source); + let event = GenericProtoOut::CustomMessage { + peer_id: source, + message: { + let message = GenericMessage::<(), (), (), ()>::Consensus(ConsensusMessage { + engine_id, + data: message.to_vec(), + }); + + // Note that we clone `message` here. + From::from(&message.encode()[..]) + }, + }; + + self.events.push(NetworkBehaviourAction::GenerateEvent(event)); + } + + NotifsHandlerOut::Clogged { messages } => { debug_assert!(self.is_open(&source)); trace!(target: "sub-libp2p", "Handler({:?}) => Clogged", source); trace!(target: "sub-libp2p", "External API <= Clogged({:?})", source); warn!(target: "sub-libp2p", "Queue of packets to send to {:?} is \ pretty large", source); - self.events.push(NetworkBehaviourAction::GenerateEvent(LegacyProtoOut::Clogged { + self.events.push(NetworkBehaviourAction::GenerateEvent(GenericProtoOut::Clogged { peer_id: source, messages, })); } // Don't do anything for non-severe errors except report them. - CustomProtoHandlerOut::ProtocolError { is_severe, ref error } if !is_severe => { + NotifsHandlerOut::ProtocolError { is_severe, ref error } if !is_severe => { debug!(target: "sub-libp2p", "Handler({:?}) => Benign protocol error: {:?}", source, error) } - CustomProtoHandlerOut::ProtocolError { error, .. } => { + NotifsHandlerOut::ProtocolError { error, .. } => { debug!(target: "sub-libp2p", "Handler({:?}) => Severe protocol error: {:?}", source, error); // A severe protocol error happens when we detect a "bad" node, such as a node on @@ -950,7 +1048,7 @@ impl NetworkBehaviour for LegacyProto { _params: &mut impl PollParameters, ) -> Poll< NetworkBehaviourAction< - CustomProtoHandlerIn, + NotifsHandlerIn, Self::OutEvent, >, > { @@ -1005,7 +1103,7 @@ impl NetworkBehaviour for LegacyProto { debug!(target: "sub-libp2p", "Handler({:?}) <= Enable now that ban has expired", peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), - event: CustomProtoHandlerIn::Enable, + event: NotifsHandlerIn::Enable, }); *peer_state = PeerState::Enabled { connected_point, open }; } diff --git a/substrate/client/network/src/protocol/generic_proto/handler.rs b/substrate/client/network/src/protocol/generic_proto/handler.rs new file mode 100644 index 0000000000000000000000000000000000000000..e97176cfbbfbb98cfa6f1f1d2f25dc9f5f777990 --- /dev/null +++ b/substrate/client/network/src/protocol/generic_proto/handler.rs @@ -0,0 +1,22 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see <http://www.gnu.org/licenses/>. + +pub use self::group::{NotifsHandlerProto, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut}; + +mod group; +mod legacy; +mod notif_in; +mod notif_out; diff --git a/substrate/client/network/src/protocol/generic_proto/handler/group.rs b/substrate/client/network/src/protocol/generic_proto/handler/group.rs new file mode 100644 index 0000000000000000000000000000000000000000..d6d9919d3e14df0c2261731cddb0811aa291f920 --- /dev/null +++ b/substrate/client/network/src/protocol/generic_proto/handler/group.rs @@ -0,0 +1,523 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see <http://www.gnu.org/licenses/>. + +//! Implementations of the `IntoProtocolsHandler` and `ProtocolsHandler` traits for both incoming +//! and outgoing substreams for all gossiping protocols together. +//! +//! This is the main implementation of `ProtocolsHandler` in this crate, that handles all the +//! protocols that are Substrate-related and outside of the scope of libp2p. +//! +//! # Usage +//! +//! The handler can be in one of the following states: `Initial`, `Enabled`, `Disabled`. +//! +//! The `Initial` state is the state that the handler initially is in. It is a temporary state +//! during which the user must either enable or disable the handler. After that, the handler stays +//! either enabled or disabled. +//! +//! On the wire, we try to open the following substreams: +//! +//! - One substream for each notification protocol passed as parameter to the +//! `NotifsHandlerProto::new` function. +//! - One "legacy" substream used for anything non-related to gossiping, and used as a fallback +//! in case the notification protocol can't be opened. +//! +//! When the handler is in the `Enabled` state, we immediately open and try to maintain all the +//! aforementioned substreams. When the handler is in the `Disabled` state, we immediately close +//! (or abort opening) all these substreams. It is intended that in the future we allow states in +//! which some protocols are open and not others. Symmetrically, we allow incoming +//! Substrate-related substreams if and only if we are in the `Enabled` state. +//! +//! The user has the choice between sending a message with `SendNotification`, to send a +//! notification, and `SendLegacy`, to send any other kind of message. +//! + +use crate::protocol::generic_proto::{ + handler::legacy::{LegacyProtoHandler, LegacyProtoHandlerProto, LegacyProtoHandlerIn, LegacyProtoHandlerOut}, + handler::notif_in::{NotifsInHandlerProto, NotifsInHandler, NotifsInHandlerIn, NotifsInHandlerOut}, + handler::notif_out::{NotifsOutHandlerProto, NotifsOutHandler, NotifsOutHandlerIn, NotifsOutHandlerOut}, + upgrade::{NotificationsIn, NotificationsOut, NotificationsHandshakeError, RegisteredProtocol, UpgradeCollec}, +}; +use crate::protocol::message::generic::{Message as GenericMessage, ConsensusMessage}; + +use bytes::BytesMut; +use codec::Encode as _; +use libp2p::core::{either::{EitherError, EitherOutput}, ConnectedPoint, PeerId}; +use libp2p::core::upgrade::{EitherUpgrade, UpgradeError, SelectUpgrade, InboundUpgrade, OutboundUpgrade}; +use libp2p::swarm::{ + ProtocolsHandler, ProtocolsHandlerEvent, + IntoProtocolsHandler, + KeepAlive, + ProtocolsHandlerUpgrErr, + SubstreamProtocol, + NegotiatedSubstream, +}; +use log::error; +use sp_runtime::ConsensusEngineId; +use std::{borrow::Cow, error, io, task::{Context, Poll}}; + +/// Implements the `IntoProtocolsHandler` trait of libp2p. +/// +/// Every time a connection with a remote starts, an instance of this struct is created and +/// sent to a background task dedicated to this connection. Once the connection is established, +/// it is turned into a [`NotifsHandler`]. +/// +/// See the documentation at the module level for more information. +pub struct NotifsHandlerProto { + /// Prototypes for handlers for inbound substreams. + in_handlers: Vec<(NotifsInHandlerProto, ConsensusEngineId)>, + + /// Prototypes for handlers for outbound substreams. + out_handlers: Vec<(NotifsOutHandlerProto, ConsensusEngineId)>, + + /// Prototype for handler for backwards-compatibility. + legacy: LegacyProtoHandlerProto, +} + +/// The actual handler once the connection has been established. +/// +/// See the documentation at the module level for more information. +pub struct NotifsHandler { + /// Handlers for inbound substreams. + in_handlers: Vec<(NotifsInHandler, ConsensusEngineId)>, + + /// Handlers for outbound substreams. + out_handlers: Vec<(NotifsOutHandler, ConsensusEngineId)>, + + /// Handler for backwards-compatibility. + legacy: LegacyProtoHandler, + + /// State of this handler. + enabled: EnabledState, + + /// If we receive inbound substream requests while in initialization mode, + /// we push the corresponding index here and process them when the handler + /// gets enabled/disabled. + pending_in: Vec<usize>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum EnabledState { + Initial, + Enabled, + Disabled, +} + +impl IntoProtocolsHandler for NotifsHandlerProto { + type Handler = NotifsHandler; + + fn inbound_protocol(&self) -> SelectUpgrade<UpgradeCollec<NotificationsIn>, RegisteredProtocol> { + let in_handlers = self.in_handlers.iter() + .map(|(h, _)| h.inbound_protocol()) + .collect::<UpgradeCollec<_>>(); + + SelectUpgrade::new(in_handlers, self.legacy.inbound_protocol()) + } + + fn into_handler(self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler { + NotifsHandler { + in_handlers: self.in_handlers + .into_iter() + .map(|(p, e)| (p.into_handler(remote_peer_id, connected_point), e)) + .collect(), + out_handlers: self.out_handlers + .into_iter() + .map(|(p, e)| (p.into_handler(remote_peer_id, connected_point), e)) + .collect(), + legacy: self.legacy.into_handler(remote_peer_id, connected_point), + enabled: EnabledState::Initial, + pending_in: Vec::new(), + } + } +} + +/// Event that can be received by a `NotifsHandler`. +#[derive(Debug)] +pub enum NotifsHandlerIn { + /// The node should start using custom protocols. + Enable, + + /// The node should stop using custom protocols. + Disable, + + /// Sends a message through the custom protocol substream. + /// + /// > **Note**: This must **not** be an encoded `ConsensusMessage` message. + SendLegacy { + /// The message to send. + message: Vec<u8>, + }, + + /// Sends a notifications message. + SendNotification { + /// Name of the protocol for the message. + /// + /// Must match one of the registered protocols. For backwards-compatibility reasons, if + /// the remote doesn't support this protocol, we use the legacy substream to send a + /// `ConsensusMessage` message. + protocol_name: Cow<'static, [u8]>, + + /// The engine ID to use, in case we need to send this message over the legacy substream. + /// + /// > **Note**: Ideally this field wouldn't be necessary, and we would deduce the engine + /// > ID from the existing handlers. However, it is possible (especially in test + /// > situations) that we open connections before all the notification protocols + /// > have been registered, in which case we always rely on the legacy substream. + engine_id: ConsensusEngineId, + + /// The message to send. + message: Vec<u8>, + }, +} + +/// Event that can be emitted by a `NotifsHandler`. +#[derive(Debug)] +pub enum NotifsHandlerOut { + /// Opened the substreams with the remote. + Open, + + /// Closed the substreams with the remote. + Closed { + /// Reason why the substream closed, for diagnostic purposes. + reason: Cow<'static, str>, + }, + + /// Received a non-gossiping message on the legacy substream. + CustomMessage { + /// Message that has been received. + /// + /// Keep in mind that this can be a `ConsensusMessage` message, which then contains a + /// notification. + message: BytesMut, + }, + + /// Received a message on a custom protocol substream. + Notification { + /// Engine corresponding to the message. + protocol_name: Cow<'static, [u8]>, + + /// For legacy reasons, the name to use if we had received the message from the legacy + /// substream. + engine_id: ConsensusEngineId, + + /// Message that has been received. + /// + /// If `protocol_name` is `None`, this decodes to a `Message`. If `protocol_name` is `Some`, + /// this is directly a gossiping message. + message: BytesMut, + }, + + /// A substream to the remote is clogged. The send buffer is very large, and we should print + /// a diagnostic message and/or avoid sending more data. + Clogged { + /// Copy of the messages that are within the buffer, for further diagnostic. + messages: Vec<Vec<u8>>, + }, + + /// An error has happened on the protocol level with this node. + ProtocolError { + /// If true the error is severe, such as a protocol violation. + is_severe: bool, + /// The error that happened. + error: Box<dyn error::Error + Send + Sync>, + }, +} + +impl NotifsHandlerProto { + /// Builds a new handler. + pub fn new(legacy: RegisteredProtocol, list: impl Into<Vec<(Cow<'static, [u8]>, ConsensusEngineId, Vec<u8>)>>) -> Self { + let list = list.into(); + + NotifsHandlerProto { + in_handlers: list.clone().into_iter().map(|(p, e, _)| (NotifsInHandlerProto::new(p), e)).collect(), + out_handlers: list.clone().into_iter().map(|(p, e, _)| (NotifsOutHandlerProto::new(p), e)).collect(), + legacy: LegacyProtoHandlerProto::new(legacy), + } + } +} + +impl ProtocolsHandler for NotifsHandler { + type InEvent = NotifsHandlerIn; + type OutEvent = NotifsHandlerOut; + type Error = EitherError< + EitherError< + <NotifsInHandler as ProtocolsHandler>::Error, + <NotifsOutHandler as ProtocolsHandler>::Error, + >, + <LegacyProtoHandler as ProtocolsHandler>::Error, + >; + type InboundProtocol = SelectUpgrade<UpgradeCollec<NotificationsIn>, RegisteredProtocol>; + type OutboundProtocol = EitherUpgrade<NotificationsOut, RegisteredProtocol>; + // Index within the `out_handlers`; None for legacy + type OutboundOpenInfo = Option<usize>; + + fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> { + let in_handlers = self.in_handlers.iter() + .map(|h| h.0.listen_protocol().into_upgrade().1) + .collect::<UpgradeCollec<_>>(); + + let proto = SelectUpgrade::new(in_handlers, self.legacy.listen_protocol().into_upgrade().1); + SubstreamProtocol::new(proto) + } + + fn inject_fully_negotiated_inbound( + &mut self, + out: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output + ) { + match out { + EitherOutput::First((out, num)) => + self.in_handlers[num].0.inject_fully_negotiated_inbound(out), + EitherOutput::Second(out) => + self.legacy.inject_fully_negotiated_inbound(out), + } + } + + fn inject_fully_negotiated_outbound( + &mut self, + out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output, + num: Self::OutboundOpenInfo + ) { + match (out, num) { + (EitherOutput::First(out), Some(num)) => + self.out_handlers[num].0.inject_fully_negotiated_outbound(out, ()), + (EitherOutput::Second(out), None) => + self.legacy.inject_fully_negotiated_outbound(out, ()), + _ => error!("inject_fully_negotiated_outbound called with wrong parameters"), + } + } + + fn inject_event(&mut self, message: NotifsHandlerIn) { + match message { + NotifsHandlerIn::Enable => { + self.enabled = EnabledState::Enabled; + self.legacy.inject_event(LegacyProtoHandlerIn::Enable); + for (handler, _) in &mut self.out_handlers { + handler.inject_event(NotifsOutHandlerIn::Enable { + initial_message: vec![] + }); + } + for num in self.pending_in.drain(..) { + self.in_handlers[num].0.inject_event(NotifsInHandlerIn::Accept(vec![])); + } + }, + NotifsHandlerIn::Disable => { + self.legacy.inject_event(LegacyProtoHandlerIn::Disable); + // The notifications protocols start in the disabled state. If we were in the + // "Initial" state, then we shouldn't disable the notifications protocols again. + if self.enabled != EnabledState::Initial { + for (handler, _) in &mut self.out_handlers { + handler.inject_event(NotifsOutHandlerIn::Disable); + } + } + self.enabled = EnabledState::Disabled; + for num in self.pending_in.drain(..) { + self.in_handlers[num].0.inject_event(NotifsInHandlerIn::Refuse); + } + }, + NotifsHandlerIn::SendLegacy { message } => + self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message }), + NotifsHandlerIn::SendNotification { message, engine_id, protocol_name } => { + for (handler, ngn_id) in &mut self.out_handlers { + if handler.protocol_name() != &protocol_name[..] { + break; + } + + if handler.is_open() { + handler.inject_event(NotifsOutHandlerIn::Send(message)); + return; + } else { + debug_assert_eq!(engine_id, *ngn_id); + } + } + + let message = GenericMessage::<(), (), (), ()>::Consensus(ConsensusMessage { + engine_id, + data: message, + }); + + self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { + message: message.encode() + }); + }, + } + } + + fn inject_dial_upgrade_error( + &mut self, + num: Option<usize>, + err: ProtocolsHandlerUpgrErr<EitherError<NotificationsHandshakeError, io::Error>> + ) { + match (err, num) { + (ProtocolsHandlerUpgrErr::Timeout, Some(num)) => + self.out_handlers[num].0.inject_dial_upgrade_error( + (), + ProtocolsHandlerUpgrErr::Timeout + ), + (ProtocolsHandlerUpgrErr::Timeout, None) => + self.legacy.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout), + (ProtocolsHandlerUpgrErr::Timer, Some(num)) => + self.out_handlers[num].0.inject_dial_upgrade_error( + (), + ProtocolsHandlerUpgrErr::Timer + ), + (ProtocolsHandlerUpgrErr::Timer, None) => + self.legacy.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timer), + (ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), Some(num)) => + self.out_handlers[num].0.inject_dial_upgrade_error( + (), + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) + ), + (ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), None) => + self.legacy.inject_dial_upgrade_error( + (), + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) + ), + (ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(err))), Some(num)) => + self.out_handlers[num].0.inject_dial_upgrade_error( + (), + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) + ), + (ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(err))), None) => + self.legacy.inject_dial_upgrade_error( + (), + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) + ), + _ => error!("inject_dial_upgrade_error called with bad parameters"), + } + } + + fn connection_keep_alive(&self) -> KeepAlive { + // Iterate over each handler and return the maximum value. + + let mut ret = self.legacy.connection_keep_alive(); + if ret.is_yes() { + return KeepAlive::Yes; + } + + for (handler, _) in &self.in_handlers { + let val = handler.connection_keep_alive(); + if val.is_yes() { + return KeepAlive::Yes; + } + if ret < val { ret = val; } + } + + for (handler, _) in &self.out_handlers { + let val = handler.connection_keep_alive(); + if val.is_yes() { + return KeepAlive::Yes; + } + if ret < val { ret = val; } + } + + ret + } + + fn poll( + &mut self, + cx: &mut Context, + ) -> Poll< + ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error> + > { + for (handler_num, (handler, engine_id)) in self.in_handlers.iter_mut().enumerate() { + while let Poll::Ready(ev) = handler.poll(cx) { + match ev { + ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } => + error!("Incoming substream handler tried to open a substream"), + ProtocolsHandlerEvent::Close(err) => void::unreachable(err), + ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) => + match self.enabled { + EnabledState::Initial => self.pending_in.push(handler_num), + EnabledState::Enabled => + handler.inject_event(NotifsInHandlerIn::Accept(vec![])), + EnabledState::Disabled => + handler.inject_event(NotifsInHandlerIn::Refuse), + }, + ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {}, + ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => { + // Note that right now the legacy substream has precedence over + // everything. If it is not open, then we consider that nothing is open. + if self.legacy.is_open() { + let msg = NotifsHandlerOut::Notification { + message, + engine_id: *engine_id, + protocol_name: handler.protocol_name().to_owned().into(), + }; + return Poll::Ready(ProtocolsHandlerEvent::Custom(msg)); + } + }, + } + } + } + + for (handler_num, (handler, _)) in self.out_handlers.iter_mut().enumerate() { + while let Poll::Ready(ev) = handler.poll(cx) { + match ev { + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } => + return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: protocol.map_upgrade(EitherUpgrade::A), + info: Some(handler_num), + }), + ProtocolsHandlerEvent::Close(err) => void::unreachable(err), + + // At the moment we don't actually care whether any notifications protocol + // opens or closes. + // Whether our communications with the remote are open or closed entirely + // depends on the legacy substream, because as long as we are open the user of + // this struct might try to send legacy protocol messages which we need to + // deliver for things to work properly. + ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Open { .. }) => {}, + ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Closed) => {}, + ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Refused) => {}, + } + } + } + + while let Poll::Ready(ev) = self.legacy.poll(cx) { + match ev { + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } => + return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: protocol.map_upgrade(EitherUpgrade::B), + info: None, + }), + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { .. }) => + return Poll::Ready(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::Open + )), + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { reason }) => + return Poll::Ready(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::Closed { reason } + )), + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => + return Poll::Ready(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::CustomMessage { message } + )), + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Clogged { messages }) => + return Poll::Ready(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::Clogged { messages } + )), + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) => + return Poll::Ready(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::ProtocolError { is_severe, error } + )), + ProtocolsHandlerEvent::Close(err) => + return Poll::Ready(ProtocolsHandlerEvent::Close(EitherError::B(err))), + } + } + + Poll::Pending + } +} diff --git a/substrate/client/network/src/protocol/legacy_proto/handler.rs b/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs similarity index 90% rename from substrate/client/network/src/protocol/legacy_proto/handler.rs rename to substrate/client/network/src/protocol/generic_proto/handler/legacy.rs index e3490993dd46d05549e656b124883f1976a4d062..a2d2fc9246d1c79b761d5e5eb1ce375fba33b49a 100644 --- a/substrate/client/network/src/protocol/legacy_proto/handler.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see <http://www.gnu.org/licenses/>. -use super::upgrade::{RegisteredProtocol, RegisteredProtocolEvent, RegisteredProtocolSubstream}; +use crate::protocol::generic_proto::upgrade::{RegisteredProtocol, RegisteredProtocolEvent, RegisteredProtocolSubstream}; use bytes::BytesMut; use futures::prelude::*; use futures_timer::Delay; @@ -37,7 +37,7 @@ use std::{pin::Pin, task::{Context, Poll}}; /// /// Every time a connection with a remote starts, an instance of this struct is created and /// sent to a background task dedicated to this connection. Once the connection is established, -/// it is turned into a `CustomProtoHandler`. It then handles all communications that are specific +/// it is turned into a `LegacyProtoHandler`. It then handles all communications that are specific /// to Substrate on that single connection. /// /// Note that there can be multiple instance of this struct simultaneously for same peer. However @@ -87,29 +87,29 @@ use std::{pin::Pin, task::{Context, Poll}}; /// We consider that we are now "closed" if the remote closes all the existing substreams. /// Re-opening it can then be performed by closing all active substream and re-opening one. /// -pub struct CustomProtoHandlerProto { +pub struct LegacyProtoHandlerProto { /// Configuration for the protocol upgrade to negotiate. protocol: RegisteredProtocol, } -impl CustomProtoHandlerProto { - /// Builds a new `CustomProtoHandlerProto`. +impl LegacyProtoHandlerProto { + /// Builds a new `LegacyProtoHandlerProto`. pub fn new(protocol: RegisteredProtocol) -> Self { - CustomProtoHandlerProto { + LegacyProtoHandlerProto { protocol, } } } -impl IntoProtocolsHandler for CustomProtoHandlerProto { - type Handler = CustomProtoHandler; +impl IntoProtocolsHandler for LegacyProtoHandlerProto { + type Handler = LegacyProtoHandler; fn inbound_protocol(&self) -> RegisteredProtocol { self.protocol.clone() } fn into_handler(self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler { - CustomProtoHandler { + LegacyProtoHandler { protocol: self.protocol, endpoint: connected_point.to_endpoint(), remote_peer_id: remote_peer_id.clone(), @@ -123,7 +123,7 @@ impl IntoProtocolsHandler for CustomProtoHandlerProto { } /// The actual handler once the connection has been established. -pub struct CustomProtoHandler { +pub struct LegacyProtoHandler { /// Configuration for the protocol upgrade to negotiate. protocol: RegisteredProtocol, @@ -142,7 +142,7 @@ pub struct CustomProtoHandler { /// /// This queue must only ever be modified to insert elements at the back, or remove the first /// element. - events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol, (), CustomProtoHandlerOut, ConnectionKillError>; 16]>, + events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol, (), LegacyProtoHandlerOut, ConnectionKillError>; 16]>, } /// State of the handler. @@ -195,9 +195,9 @@ enum ProtocolState { Poisoned, } -/// Event that can be received by a `CustomProtoHandler`. +/// Event that can be received by a `LegacyProtoHandler`. #[derive(Debug)] -pub enum CustomProtoHandlerIn { +pub enum LegacyProtoHandlerIn { /// The node should start using custom protocols. Enable, @@ -211,9 +211,9 @@ pub enum CustomProtoHandlerIn { }, } -/// Event that can be emitted by a `CustomProtoHandler`. +/// Event that can be emitted by a `LegacyProtoHandler`. #[derive(Debug)] -pub enum CustomProtoHandlerOut { +pub enum LegacyProtoHandlerOut { /// Opened a custom protocol with the remote. CustomProtocolOpen { /// Version of the protocol that has been opened. @@ -248,7 +248,19 @@ pub enum CustomProtoHandlerOut { }, } -impl CustomProtoHandler { +impl LegacyProtoHandler { + /// Returns true if the legacy substream is currently open. + pub fn is_open(&self) -> bool { + match &self.state { + ProtocolState::Init { substreams, .. } => !substreams.is_empty(), + ProtocolState::Opening { .. } => false, + ProtocolState::Normal { substreams, .. } => !substreams.is_empty(), + ProtocolState::Disabled { .. } => false, + ProtocolState::KillAsap => false, + ProtocolState::Poisoned => false, + } + } + /// Enables the handler. fn enable(&mut self) { self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { @@ -271,7 +283,7 @@ impl CustomProtoHandler { } } else { - let event = CustomProtoHandlerOut::CustomProtocolOpen { + let event = LegacyProtoHandlerOut::CustomProtocolOpen { version: incoming[0].protocol_version() }; self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); @@ -325,7 +337,7 @@ impl CustomProtoHandler { /// Polls the state for events. Optionally returns an event to produce. #[must_use] fn poll_state(&mut self, cx: &mut Context) - -> Option<ProtocolsHandlerEvent<RegisteredProtocol, (), CustomProtoHandlerOut, ConnectionKillError>> { + -> Option<ProtocolsHandlerEvent<RegisteredProtocol, (), LegacyProtoHandlerOut, ConnectionKillError>> { match mem::replace(&mut self.state, ProtocolState::Poisoned) { ProtocolState::Poisoned => { error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state", @@ -352,7 +364,7 @@ impl CustomProtoHandler { match Pin::new(&mut deadline).poll(cx) { Poll::Ready(()) => { deadline = Delay::new(Duration::from_secs(60)); - let event = CustomProtoHandlerOut::ProtocolError { + let event = LegacyProtoHandlerOut::ProtocolError { is_severe: true, error: "Timeout when opening protocol".to_string().into(), }; @@ -372,7 +384,7 @@ impl CustomProtoHandler { match Pin::new(&mut substream).poll_next(cx) { Poll::Pending => substreams.push(substream), Poll::Ready(Some(Ok(RegisteredProtocolEvent::Message(message)))) => { - let event = CustomProtoHandlerOut::CustomMessage { + let event = LegacyProtoHandlerOut::CustomMessage { message }; substreams.push(substream); @@ -380,7 +392,7 @@ impl CustomProtoHandler { return Some(ProtocolsHandlerEvent::Custom(event)); }, Poll::Ready(Some(Ok(RegisteredProtocolEvent::Clogged { messages }))) => { - let event = CustomProtoHandlerOut::Clogged { + let event = LegacyProtoHandlerOut::Clogged { messages, }; substreams.push(substream); @@ -390,7 +402,7 @@ impl CustomProtoHandler { Poll::Ready(None) => { shutdown.push(substream); if substreams.is_empty() { - let event = CustomProtoHandlerOut::CustomProtocolClosed { + let event = LegacyProtoHandlerOut::CustomProtocolClosed { reason: "All substreams have been closed by the remote".into(), }; self.state = ProtocolState::Disabled { @@ -402,7 +414,7 @@ impl CustomProtoHandler { } Poll::Ready(Some(Err(err))) => { if substreams.is_empty() { - let event = CustomProtoHandlerOut::CustomProtocolClosed { + let event = LegacyProtoHandlerOut::CustomProtocolClosed { reason: format!("Error on the last substream: {:?}", err).into(), }; self.state = ProtocolState::Disabled { @@ -466,7 +478,7 @@ impl CustomProtoHandler { } ProtocolState::Opening { .. } => { - let event = CustomProtoHandlerOut::CustomProtocolOpen { + let event = LegacyProtoHandlerOut::CustomProtocolOpen { version: substream.protocol_version() }; self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); @@ -503,9 +515,9 @@ impl CustomProtoHandler { } } -impl ProtocolsHandler for CustomProtoHandler { - type InEvent = CustomProtoHandlerIn; - type OutEvent = CustomProtoHandlerOut; +impl ProtocolsHandler for LegacyProtoHandler { + type InEvent = LegacyProtoHandlerIn; + type OutEvent = LegacyProtoHandlerOut; type Error = ConnectionKillError; type InboundProtocol = RegisteredProtocol; type OutboundProtocol = RegisteredProtocol; @@ -530,11 +542,11 @@ impl ProtocolsHandler for CustomProtoHandler { self.inject_fully_negotiated(proto); } - fn inject_event(&mut self, message: CustomProtoHandlerIn) { + fn inject_event(&mut self, message: LegacyProtoHandlerIn) { match message { - CustomProtoHandlerIn::Disable => self.disable(), - CustomProtoHandlerIn::Enable => self.enable(), - CustomProtoHandlerIn::SendCustomMessage { message } => + LegacyProtoHandlerIn::Disable => self.disable(), + LegacyProtoHandlerIn::Enable => self.enable(), + LegacyProtoHandlerIn::SendCustomMessage { message } => self.send_message(message), } } @@ -546,7 +558,7 @@ impl ProtocolsHandler for CustomProtoHandler { _ => false, }; - self.events_queue.push(ProtocolsHandlerEvent::Custom(CustomProtoHandlerOut::ProtocolError { + self.events_queue.push(ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error: Box::new(err), })); @@ -587,9 +599,9 @@ impl ProtocolsHandler for CustomProtoHandler { } } -impl fmt::Debug for CustomProtoHandler { +impl fmt::Debug for LegacyProtoHandler { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - f.debug_struct("CustomProtoHandler") + f.debug_struct("LegacyProtoHandler") .finish() } } diff --git a/substrate/client/network/src/protocol/generic_proto/handler/notif_in.rs b/substrate/client/network/src/protocol/generic_proto/handler/notif_in.rs new file mode 100644 index 0000000000000000000000000000000000000000..4e16fb1af419f4aebb2748b09c477119f1165309 --- /dev/null +++ b/substrate/client/network/src/protocol/generic_proto/handler/notif_in.rs @@ -0,0 +1,256 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see <http://www.gnu.org/licenses/>. + +//! Implementations of the `IntoProtocolsHandler` and `ProtocolsHandler` traits for ingoing +//! substreams for a single gossiping protocol. +//! +//! > **Note**: Each instance corresponds to a single protocol. In order to support multiple +//! > protocols, you need to create multiple instances and group them. +//! + +use crate::protocol::generic_proto::upgrade::{NotificationsIn, NotificationsInSubstream}; +use bytes::BytesMut; +use futures::prelude::*; +use libp2p::core::{ConnectedPoint, PeerId}; +use libp2p::core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade}; +use libp2p::swarm::{ + ProtocolsHandler, ProtocolsHandlerEvent, + IntoProtocolsHandler, + KeepAlive, + ProtocolsHandlerUpgrErr, + SubstreamProtocol, + NegotiatedSubstream, +}; +use log::{error, warn}; +use smallvec::SmallVec; +use std::{borrow::Cow, fmt, pin::Pin, str, task::{Context, Poll}}; + +/// Implements the `IntoProtocolsHandler` trait of libp2p. +/// +/// Every time a connection with a remote starts, an instance of this struct is created and +/// sent to a background task dedicated to this connection. Once the connection is established, +/// it is turned into a [`NotifsInHandler`]. +pub struct NotifsInHandlerProto { + /// Configuration for the protocol upgrade to negotiate. + in_protocol: NotificationsIn, +} + +/// The actual handler once the connection has been established. +pub struct NotifsInHandler { + /// Configuration for the protocol upgrade to negotiate for inbound substreams. + in_protocol: NotificationsIn, + + /// Substream that is open with the remote. + substream: Option<NotificationsInSubstream<NegotiatedSubstream>>, + + /// If the substream is opened and closed rapidly, we can emit several `OpenRequest` and + /// `Closed` messages in a row without the handler having time to respond with `Accept` or + /// `Refuse`. + /// + /// In order to keep the state consistent, we increment this variable every time an + /// `OpenRequest` is emitted and decrement it every time an `Accept` or `Refuse` is received. + pending_accept_refuses: usize, + + /// Queue of events to send to the outside. + /// + /// This queue is only ever modified to insert elements at the back, or remove the first + /// element. + events_queue: SmallVec<[ProtocolsHandlerEvent<DeniedUpgrade, (), NotifsInHandlerOut, void::Void>; 16]>, +} + +/// Event that can be received by a `NotifsInHandler`. +#[derive(Debug)] +pub enum NotifsInHandlerIn { + /// Can be sent back as a response to an `OpenRequest`. Contains the status message to send + /// to the remote. + /// + /// After sending this to the handler, the substream is now considered open and `Notif` events + /// can be received. + Accept(Vec<u8>), + + /// Can be sent back as a response to an `OpenRequest`. + Refuse, +} + +/// Event that can be emitted by a `NotifsInHandler`. +#[derive(Debug)] +pub enum NotifsInHandlerOut { + /// The remote wants to open a substream. Contains the initial message sent by the remote + /// when the substream has been opened. + /// + /// Every time this event is emitted, a corresponding `Accepted` or `Refused` **must** be sent + /// back even if a `Closed` is received. + OpenRequest(Vec<u8>), + + /// The notifications substream has been closed by the remote. In order to avoid race + /// conditions, this does **not** cancel any previously-sent `OpenRequest`. + Closed, + + /// Received a message on the notifications substream. + /// + /// Can only happen after an `Accept` and before a `Closed`. + Notif(BytesMut), +} + +impl NotifsInHandlerProto { + /// Builds a new `NotifsInHandlerProto`. + pub fn new( + protocol_name: impl Into<Cow<'static, [u8]>> + ) -> Self { + NotifsInHandlerProto { + in_protocol: NotificationsIn::new(protocol_name), + } + } +} + +impl IntoProtocolsHandler for NotifsInHandlerProto { + type Handler = NotifsInHandler; + + fn inbound_protocol(&self) -> NotificationsIn { + self.in_protocol.clone() + } + + fn into_handler(self, _: &PeerId, _: &ConnectedPoint) -> Self::Handler { + NotifsInHandler { + in_protocol: self.in_protocol, + substream: None, + pending_accept_refuses: 0, + events_queue: SmallVec::new(), + } + } +} + +impl NotifsInHandler { + /// Returns the name of the protocol that we accept. + pub fn protocol_name(&self) -> &[u8] { + self.in_protocol.protocol_name() + } +} + +impl ProtocolsHandler for NotifsInHandler { + type InEvent = NotifsInHandlerIn; + type OutEvent = NotifsInHandlerOut; + type Error = void::Void; + type InboundProtocol = NotificationsIn; + type OutboundProtocol = DeniedUpgrade; + type OutboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> { + SubstreamProtocol::new(self.in_protocol.clone()) + } + + fn inject_fully_negotiated_inbound( + &mut self, + (msg, proto): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output + ) { + if self.substream.is_some() { + warn!( + target: "sub-libp2p", + "Received duplicate inbound notifications substream for {:?}", + str::from_utf8(self.in_protocol.protocol_name()), + ); + return; + } + + self.substream = Some(proto); + self.events_queue.push(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(msg))); + self.pending_accept_refuses = self.pending_accept_refuses + .checked_add(1) + .unwrap_or_else(|| { + error!(target: "sub-libp2p", "Overflow in pending_accept_refuses"); + usize::max_value() + }); + } + + fn inject_fully_negotiated_outbound( + &mut self, + out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output, + _: Self::OutboundOpenInfo + ) { + // We never emit any outgoing substream. + void::unreachable(out) + } + + fn inject_event(&mut self, message: NotifsInHandlerIn) { + self.pending_accept_refuses = match self.pending_accept_refuses.checked_sub(1) { + Some(v) => v, + None => { + error!( + target: "sub-libp2p", + "Inconsistent state: received Accept/Refuse when no pending request exists" + ); + return; + } + }; + + // If we send multiple `OpenRequest`s in a row, we will receive back multiple + // `Accept`/`Refuse` messages. All of them are obsolete except the last one. + if self.pending_accept_refuses != 0 { + return; + } + + match (message, self.substream.as_mut()) { + (NotifsInHandlerIn::Accept(message), Some(sub)) => sub.send_handshake(message), + (NotifsInHandlerIn::Accept(_), None) => {}, + (NotifsInHandlerIn::Refuse, _) => self.substream = None, + } + } + + fn inject_dial_upgrade_error(&mut self, _: (), _: ProtocolsHandlerUpgrErr<void::Void>) { + error!(target: "sub-libp2p", "Received dial upgrade error in inbound-only handler"); + } + + fn connection_keep_alive(&self) -> KeepAlive { + if self.substream.is_some() { + KeepAlive::Yes + } else { + KeepAlive::No + } + } + + fn poll( + &mut self, + cx: &mut Context, + ) -> Poll< + ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error> + > { + // Flush the events queue if necessary. + if !self.events_queue.is_empty() { + let event = self.events_queue.remove(0); + return Poll::Ready(event) + } + + match self.substream.as_mut().map(|s| Stream::poll_next(Pin::new(s), cx)) { + None | Some(Poll::Pending) => {}, + Some(Poll::Ready(Some(Ok(msg)))) => + return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(msg))), + Some(Poll::Ready(None)) | Some(Poll::Ready(Some(Err(_)))) => { + self.substream = None; + return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed)); + }, + } + + Poll::Pending + } +} + +impl fmt::Debug for NotifsInHandler { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.debug_struct("NotifsInHandler") + .field("substream_open", &self.substream.is_some()) + .finish() + } +} diff --git a/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs b/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs new file mode 100644 index 0000000000000000000000000000000000000000..8c64491d997171df73606c2852765c10c9f3f21b --- /dev/null +++ b/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs @@ -0,0 +1,395 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see <http://www.gnu.org/licenses/>. + +//! Implementations of the `IntoProtocolsHandler` and `ProtocolsHandler` traits for outgoing +//! substreams of a single gossiping protocol. +//! +//! > **Note**: Each instance corresponds to a single protocol. In order to support multiple +//! > protocols, you need to create multiple instances and group them. +//! + +use crate::protocol::generic_proto::upgrade::{NotificationsOut, NotificationsOutSubstream, NotificationsHandshakeError}; +use futures::prelude::*; +use libp2p::core::{ConnectedPoint, PeerId}; +use libp2p::core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade}; +use libp2p::swarm::{ + ProtocolsHandler, ProtocolsHandlerEvent, + IntoProtocolsHandler, + KeepAlive, + ProtocolsHandlerUpgrErr, + SubstreamProtocol, + NegotiatedSubstream, +}; +use log::error; +use smallvec::SmallVec; +use std::{borrow::Cow, fmt, mem, pin::Pin, task::{Context, Poll}, time::Duration}; +use wasm_timer::Instant; + +/// Maximum duration to open a substream and receive the handshake message. After that, we +/// consider that we failed to open the substream. +const OPEN_TIMEOUT: Duration = Duration::from_secs(10); +/// After successfully establishing a connection with the remote, we keep the connection open for +/// at least this amount of time in order to give the rest of the code the chance to notify us to +/// open substreams. +const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5); + +/// Implements the `IntoProtocolsHandler` trait of libp2p. +/// +/// Every time a connection with a remote starts, an instance of this struct is created and +/// sent to a background task dedicated to this connection. Once the connection is established, +/// it is turned into a [`NotifsOutHandler`]. +/// +/// See the documentation of [`NotifsOutHandler`] for more information. +pub struct NotifsOutHandlerProto { + /// Name of the protocol to negotiate. + protocol_name: Cow<'static, [u8]>, +} + +impl NotifsOutHandlerProto { + /// Builds a new [`NotifsOutHandlerProto`]. Will use the given protocol name for the + /// notifications substream. + pub fn new(protocol_name: impl Into<Cow<'static, [u8]>>) -> Self { + NotifsOutHandlerProto { + protocol_name: protocol_name.into(), + } + } +} + +impl IntoProtocolsHandler for NotifsOutHandlerProto { + type Handler = NotifsOutHandler; + + fn inbound_protocol(&self) -> DeniedUpgrade { + DeniedUpgrade + } + + fn into_handler(self, _: &PeerId, _: &ConnectedPoint) -> Self::Handler { + NotifsOutHandler { + protocol_name: self.protocol_name, + when_connection_open: Instant::now(), + state: State::Disabled, + events_queue: SmallVec::new(), + } + } +} + +/// Handler for an outbound notification substream. +/// +/// When a connection is established, this handler starts in the "disabled" state, meaning that +/// no substream will be open. +/// +/// One can try open a substream by sending an [`NotifsOutHandlerIn::Enable`] message to the +/// handler. Once done, the handler will try to establish then maintain an outbound substream with +/// the remote for the purpose of sending notifications to it. +pub struct NotifsOutHandler { + /// Name of the protocol to negotiate. + protocol_name: Cow<'static, [u8]>, + + /// Relationship with the node we're connected to. + state: State, + + /// When the connection with the remote has been successfully established. + when_connection_open: Instant, + + /// Queue of events to send to the outside. + /// + /// This queue must only ever be modified to insert elements at the back, or remove the first + /// element. + events_queue: SmallVec<[ProtocolsHandlerEvent<NotificationsOut, (), NotifsOutHandlerOut, void::Void>; 16]>, +} + +/// Our relationship with the node we're connected to. +enum State { + /// The handler is disabled and idle. No substream is open. + Disabled, + + /// The handler is disabled. A substream is still open and needs to be closed. + /// + /// > **Important**: Having this state means that `poll_close` has been called at least once, + /// > but the `Sink` API is unclear about whether or not the stream can then + /// > be recovered. Because of that, we must never switch from the + /// > `DisabledOpen` state to the `Open` state while keeping the same substream. + DisabledOpen(NotificationsOutSubstream<NegotiatedSubstream>), + + /// The handler is disabled but we are still trying to open a substream with the remote. + /// + /// If the handler gets enabled again, we can immediately switch to `Opening`. + DisabledOpening, + + /// The handler is enabled and we are trying to open a substream with the remote. + Opening { + /// The initial message that we sent. Necessary if we need to re-open a substream. + initial_message: Vec<u8>, + }, + + /// The handler is enabled. We have tried opening a substream in the past but the remote + /// refused it. + Refused, + + /// The handler is enabled and substream is open. + Open { + /// Substream that is currently open. + substream: NotificationsOutSubstream<NegotiatedSubstream>, + /// The initial message that we sent. Necessary if we need to re-open a substream. + initial_message: Vec<u8>, + }, + + /// Poisoned state. Shouldn't be found in the wild. + Poisoned, +} + +/// Event that can be received by a `NotifsOutHandler`. +#[derive(Debug)] +pub enum NotifsOutHandlerIn { + /// Enables the notifications substream for this node. The handler will try to maintain a + /// substream with the remote. + Enable { + /// Initial message to send to remote nodes when we open substreams. + initial_message: Vec<u8>, + }, + + /// Disables the notifications substream for this node. This is the default state. + Disable, + + /// Sends a message on the notifications substream. Ignored if the substream isn't open. + /// + /// It is only valid to send this if the notifications substream has been enabled. + Send(Vec<u8>), +} + +/// Event that can be emitted by a `NotifsOutHandler`. +#[derive(Debug)] +pub enum NotifsOutHandlerOut { + /// The notifications substream has been accepted by the remote. + Open { + /// Handshake message sent by the remote after we opened the substream. + handshake: Vec<u8>, + }, + + /// The notifications substream has been closed by the remote. + Closed, + + /// We tried to open a notifications substream, but the remote refused it. + /// + /// Can only happen if we're in a closed state. + Refused, +} + +impl NotifsOutHandler { + /// Returns true if the substream is currently open. + pub fn is_open(&self) -> bool { + match &self.state { + State::Disabled => false, + State::DisabledOpening => false, + State::DisabledOpen(_) => true, + State::Opening { .. } => false, + State::Refused => false, + State::Open { .. } => true, + State::Poisoned => false, + } + } + + /// Returns the name of the protocol that we negotiate. + pub fn protocol_name(&self) -> &[u8] { + &self.protocol_name + } +} + +impl ProtocolsHandler for NotifsOutHandler { + type InEvent = NotifsOutHandlerIn; + type OutEvent = NotifsOutHandlerOut; + type Error = void::Void; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = NotificationsOut; + type OutboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> { + SubstreamProtocol::new(DeniedUpgrade) + } + + fn inject_fully_negotiated_inbound( + &mut self, + proto: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output + ) { + // We should never reach here. `proto` is a `Void`. + void::unreachable(proto) + } + + fn inject_fully_negotiated_outbound( + &mut self, + (handshake_msg, substream): <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output, + _: () + ) { + match mem::replace(&mut self.state, State::Poisoned) { + State::Opening { initial_message } => { + let ev = NotifsOutHandlerOut::Open { handshake: handshake_msg }; + self.events_queue.push(ProtocolsHandlerEvent::Custom(ev)); + self.state = State::Open { substream, initial_message }; + }, + // If the handler was disabled while we were negotiating the protocol, immediately + // close it. + State::DisabledOpening => self.state = State::DisabledOpen(substream), + + // Any other situation should never happen. + State::Disabled | State::Refused | State::Open { .. } | State::DisabledOpen(_) => + error!("State mismatch in notifications handler: substream already open"), + State::Poisoned => error!("Notifications handler in a poisoned state"), + } + } + + fn inject_event(&mut self, message: NotifsOutHandlerIn) { + match message { + NotifsOutHandlerIn::Enable { initial_message } => { + match mem::replace(&mut self.state, State::Poisoned) { + State::Disabled => { + let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message.clone()); + self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(proto).with_timeout(OPEN_TIMEOUT), + info: (), + }); + self.state = State::Opening { initial_message }; + }, + State::DisabledOpening => self.state = State::Opening { initial_message }, + State::DisabledOpen(mut sub) => { + // As documented above, in this state we have already called `poll_close` + // once on the substream, and it is unclear whether the substream can then + // be recovered. When in doubt, let's drop the existing substream and + // open a new one. + if sub.close().now_or_never().is_none() { + log::warn!( + target: "sub-libp2p", + "Improperly closed outbound notifications substream" + ); + } + + let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message.clone()); + self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(proto).with_timeout(OPEN_TIMEOUT), + info: (), + }); + self.state = State::Opening { initial_message }; + }, + State::Opening { .. } | State::Refused | State::Open { .. } => + error!("Tried to enable notifications handler that was already enabled"), + State::Poisoned => error!("Notifications handler in a poisoned state"), + } + } + + NotifsOutHandlerIn::Disable => { + match mem::replace(&mut self.state, State::Poisoned) { + State::Disabled | State::DisabledOpen(_) | State::DisabledOpening => + error!("Tried to disable notifications handler that was already disabled"), + State::Opening { .. } => self.state = State::DisabledOpening, + State::Refused => self.state = State::Disabled, + State::Open { substream, .. } => self.state = State::DisabledOpen(substream), + State::Poisoned => error!("Notifications handler in a poisoned state"), + } + } + + NotifsOutHandlerIn::Send(msg) => + if let State::Open { substream, .. } = &mut self.state { + if let Some(Ok(_)) = substream.send(msg).now_or_never() { + } else { + log::warn!( + target: "sub-libp2p", + "Failed to push message to queue, dropped it" + ); + } + } else { + // This is an API misuse. + log::warn!( + target: "sub-libp2p", + "Tried to send a notification on a disabled handler" + ); + }, + } + } + + fn inject_dial_upgrade_error(&mut self, _: (), _: ProtocolsHandlerUpgrErr<NotificationsHandshakeError>) { + match mem::replace(&mut self.state, State::Poisoned) { + State::Disabled => {}, + State::DisabledOpen(_) | State::Refused | State::Open { .. } => + error!("State mismatch in NotificationsOut"), + State::Opening { .. } => { + self.state = State::Refused; + let ev = NotifsOutHandlerOut::Refused; + self.events_queue.push(ProtocolsHandlerEvent::Custom(ev)); + }, + State::DisabledOpening => self.state = State::Disabled, + State::Poisoned => error!("Notifications handler in a poisoned state"), + } + } + + fn connection_keep_alive(&self) -> KeepAlive { + match self.state { + // We have a small grace period of `INITIAL_KEEPALIVE_TIME` during which we keep the + // connection open no matter what, in order to avoid closing and reopening + // connections all the time. + State::Disabled | State::DisabledOpen(_) | State::DisabledOpening => + KeepAlive::Until(self.when_connection_open + INITIAL_KEEPALIVE_TIME), + State::Opening { .. } | State::Open { .. } => KeepAlive::Yes, + State::Refused | State::Poisoned => KeepAlive::No, + } + } + + fn poll( + &mut self, + cx: &mut Context, + ) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>> { + // Flush the events queue if necessary. + if !self.events_queue.is_empty() { + let event = self.events_queue.remove(0); + return Poll::Ready(event); + } + + match &mut self.state { + State::Open { substream, initial_message } => + match Sink::poll_flush(Pin::new(substream), cx) { + Poll::Pending | Poll::Ready(Ok(())) => {}, + Poll::Ready(Err(_)) => { + // We try to re-open a substream. + let initial_message = mem::replace(initial_message, Vec::new()); + self.state = State::Opening { initial_message: initial_message.clone() }; + let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message); + self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(proto).with_timeout(OPEN_TIMEOUT), + info: (), + }); + return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Closed)); + } + }, + + State::DisabledOpen(sub) => match Sink::poll_close(Pin::new(sub), cx) { + Poll::Pending => {}, + Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => { + self.state = State::Disabled; + return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Closed)); + }, + }, + + _ => {} + } + + Poll::Pending + } +} + +impl fmt::Debug for NotifsOutHandler { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.debug_struct("NotifsOutHandler") + .field("open", &self.is_open()) + .finish() + } +} diff --git a/substrate/client/network/src/protocol/legacy_proto/tests.rs b/substrate/client/network/src/protocol/generic_proto/tests.rs similarity index 92% rename from substrate/client/network/src/protocol/legacy_proto/tests.rs rename to substrate/client/network/src/protocol/generic_proto/tests.rs index 89b0854d9081f82c80e8f0dab8da0e138ebb4fca..b331b3c2378c3e641a9a2485df6c0c5884c2d7e4 100644 --- a/substrate/client/network/src/protocol/legacy_proto/tests.rs +++ b/substrate/client/network/src/protocol/generic_proto/tests.rs @@ -26,7 +26,7 @@ use libp2p::{PeerId, Multiaddr, Transport}; use rand::seq::SliceRandom; use std::{error, io, task::Context, task::Poll, time::Duration}; use crate::message::Message; -use crate::protocol::legacy_proto::{LegacyProto, LegacyProtoOut}; +use crate::protocol::generic_proto::{GenericProto, GenericProtoOut}; use sp_test_primitives::Block; /// Builds two nodes that have each other as bootstrap nodes. @@ -81,7 +81,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) { }); let behaviour = CustomProtoWithAddr { - inner: LegacyProto::new(&b"test"[..], &[1], peerset), + inner: GenericProto::new(&b"test"[..], &[1], peerset), addrs: addrs .iter() .enumerate() @@ -111,12 +111,12 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) { /// Wraps around the `CustomBehaviour` network behaviour, and adds hardcoded node addresses to it. struct CustomProtoWithAddr { - inner: LegacyProto, + inner: GenericProto, addrs: Vec<(PeerId, Multiaddr)>, } impl std::ops::Deref for CustomProtoWithAddr { - type Target = LegacyProto; + type Target = GenericProto; fn deref(&self) -> &Self::Target { &self.inner @@ -130,8 +130,8 @@ impl std::ops::DerefMut for CustomProtoWithAddr { } impl NetworkBehaviour for CustomProtoWithAddr { - type ProtocolsHandler = <LegacyProto as NetworkBehaviour>::ProtocolsHandler; - type OutEvent = <LegacyProto as NetworkBehaviour>::OutEvent; + type ProtocolsHandler = <GenericProto as NetworkBehaviour>::ProtocolsHandler; + type OutEvent = <GenericProto as NetworkBehaviour>::OutEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { self.inner.new_handler() @@ -223,7 +223,7 @@ fn two_nodes_transfer_lots_of_packets() { let fut1 = future::poll_fn(move |cx| -> Poll<()> { loop { match ready!(service1.poll_next_unpin(cx)) { - Some(LegacyProtoOut::CustomProtocolOpen { peer_id, .. }) => { + Some(GenericProtoOut::CustomProtocolOpen { peer_id, .. }) => { for n in 0 .. NUM_PACKETS { service1.send_packet( &peer_id, @@ -240,8 +240,8 @@ fn two_nodes_transfer_lots_of_packets() { let fut2 = future::poll_fn(move |cx| { loop { match ready!(service2.poll_next_unpin(cx)) { - Some(LegacyProtoOut::CustomProtocolOpen { .. }) => {}, - Some(LegacyProtoOut::CustomMessage { message, .. }) => { + Some(GenericProtoOut::CustomProtocolOpen { .. }) => {}, + Some(GenericProtoOut::CustomMessage { message, .. }) => { match Message::<Block>::decode(&mut &message[..]).unwrap() { Message::<Block>::ChainSpecific(message) => { assert_eq!(message.len(), 1); @@ -285,7 +285,7 @@ fn basic_two_nodes_requests_in_parallel() { let fut1 = future::poll_fn(move |cx| -> Poll<()> { loop { match ready!(service1.poll_next_unpin(cx)) { - Some(LegacyProtoOut::CustomProtocolOpen { peer_id, .. }) => { + Some(GenericProtoOut::CustomProtocolOpen { peer_id, .. }) => { for msg in to_send.drain(..) { service1.send_packet(&peer_id, msg.encode()); } @@ -298,8 +298,8 @@ fn basic_two_nodes_requests_in_parallel() { let fut2 = future::poll_fn(move |cx| { loop { match ready!(service2.poll_next_unpin(cx)) { - Some(LegacyProtoOut::CustomProtocolOpen { .. }) => {}, - Some(LegacyProtoOut::CustomMessage { message, .. }) => { + Some(GenericProtoOut::CustomProtocolOpen { .. }) => {}, + Some(GenericProtoOut::CustomMessage { message, .. }) => { let pos = to_receive.iter().position(|m| m.encode() == message).unwrap(); to_receive.remove(pos); if to_receive.is_empty() { @@ -335,7 +335,7 @@ fn reconnect_after_disconnect() { let mut service1_not_ready = false; match service1.poll_next_unpin(cx) { - Poll::Ready(Some(LegacyProtoOut::CustomProtocolOpen { .. })) => { + Poll::Ready(Some(GenericProtoOut::CustomProtocolOpen { .. })) => { match service1_state { ServiceState::NotConnected => { service1_state = ServiceState::FirstConnec; @@ -347,7 +347,7 @@ fn reconnect_after_disconnect() { ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(), } }, - Poll::Ready(Some(LegacyProtoOut::CustomProtocolClosed { .. })) => { + Poll::Ready(Some(GenericProtoOut::CustomProtocolClosed { .. })) => { match service1_state { ServiceState::FirstConnec => service1_state = ServiceState::Disconnected, ServiceState::ConnectedAgain| ServiceState::NotConnected | @@ -359,7 +359,7 @@ fn reconnect_after_disconnect() { } match service2.poll_next_unpin(cx) { - Poll::Ready(Some(LegacyProtoOut::CustomProtocolOpen { .. })) => { + Poll::Ready(Some(GenericProtoOut::CustomProtocolOpen { .. })) => { match service2_state { ServiceState::NotConnected => { service2_state = ServiceState::FirstConnec; @@ -371,7 +371,7 @@ fn reconnect_after_disconnect() { ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(), } }, - Poll::Ready(Some(LegacyProtoOut::CustomProtocolClosed { .. })) => { + Poll::Ready(Some(GenericProtoOut::CustomProtocolClosed { .. })) => { match service2_state { ServiceState::FirstConnec => service2_state = ServiceState::Disconnected, ServiceState::ConnectedAgain| ServiceState::NotConnected | diff --git a/substrate/client/network/src/protocol/generic_proto/upgrade.rs b/substrate/client/network/src/protocol/generic_proto/upgrade.rs new file mode 100644 index 0000000000000000000000000000000000000000..36f826336532619479b94876776c65e8636f8c57 --- /dev/null +++ b/substrate/client/network/src/protocol/generic_proto/upgrade.rs @@ -0,0 +1,35 @@ +// Copyright 2018-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see <http://www.gnu.org/licenses/>. + +pub use self::collec::UpgradeCollec; +pub use self::legacy::{ + RegisteredProtocol, + RegisteredProtocolEvent, + RegisteredProtocolName, + RegisteredProtocolSubstream +}; +pub use self::notifications::{ + NotificationsIn, + NotificationsInSubstream, + NotificationsOut, + NotificationsOutSubstream, + NotificationsHandshakeError, + NotificationsOutError, +}; + +mod collec; +mod legacy; +mod notifications; diff --git a/substrate/client/network/src/protocol/generic_proto/upgrade/collec.rs b/substrate/client/network/src/protocol/generic_proto/upgrade/collec.rs new file mode 100644 index 0000000000000000000000000000000000000000..f8d199974940fb1abe6d41829d728243dd73af23 --- /dev/null +++ b/substrate/client/network/src/protocol/generic_proto/upgrade/collec.rs @@ -0,0 +1,97 @@ +// Copyright 2018-2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::prelude::*; +use libp2p::core::upgrade::{InboundUpgrade, ProtocolName, UpgradeInfo}; +use std::{iter::FromIterator, pin::Pin, task::{Context, Poll}, vec}; + +// TODO: move this to libp2p => https://github.com/libp2p/rust-libp2p/issues/1445 + +/// Upgrade that combines multiple upgrades of the same type into one. Supports all the protocols +/// supported by either sub-upgrade. +#[derive(Debug, Clone)] +pub struct UpgradeCollec<T>(pub Vec<T>); + +impl<T> From<Vec<T>> for UpgradeCollec<T> { + fn from(list: Vec<T>) -> Self { + UpgradeCollec(list) + } +} + +impl<T> FromIterator<T> for UpgradeCollec<T> { + fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self { + UpgradeCollec(iter.into_iter().collect()) + } +} + +impl<T: UpgradeInfo> UpgradeInfo for UpgradeCollec<T> { + type Info = ProtoNameWithUsize<T::Info>; + type InfoIter = vec::IntoIter<Self::Info>; + + fn protocol_info(&self) -> Self::InfoIter { + self.0.iter().enumerate() + .flat_map(|(n, p)| + p.protocol_info().into_iter().map(move |i| ProtoNameWithUsize(i, n))) + .collect::<Vec<_>>() + .into_iter() + } +} + +impl<T, C> InboundUpgrade<C> for UpgradeCollec<T> +where + T: InboundUpgrade<C>, +{ + type Output = (T::Output, usize); + type Error = (T::Error, usize); + type Future = FutWithUsize<T::Future>; + + fn upgrade_inbound(mut self, sock: C, info: Self::Info) -> Self::Future { + let fut = self.0.remove(info.1).upgrade_inbound(sock, info.0); + FutWithUsize(fut, info.1) + } +} + +/// Groups a `ProtocolName` with a `usize`. +#[derive(Debug, Clone)] +pub struct ProtoNameWithUsize<T>(T, usize); + +impl<T: ProtocolName> ProtocolName for ProtoNameWithUsize<T> { + fn protocol_name(&self) -> &[u8] { + self.0.protocol_name() + } +} + +/// Equivalent to `fut.map_ok(|v| (v, num)).map_err(|e| (e, num))`, where `fut` and `num` are +/// the two fields of this struct. +#[pin_project::pin_project] +pub struct FutWithUsize<T>(#[pin] T, usize); + +impl<T: Future<Output = Result<O, E>>, O, E> Future for FutWithUsize<T> { + type Output = Result<(O, usize), (E, usize)>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let this = self.project(); + match Future::poll(this.0, cx) { + Poll::Ready(Ok(v)) => Poll::Ready(Ok((v, *this.1))), + Poll::Ready(Err(e)) => Poll::Ready(Err((e, *this.1))), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/substrate/client/network/src/protocol/legacy_proto/upgrade.rs b/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs similarity index 100% rename from substrate/client/network/src/protocol/legacy_proto/upgrade.rs rename to substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs diff --git a/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs b/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs new file mode 100644 index 0000000000000000000000000000000000000000..ddc07b5d6f3d6b2ffefe8fec47cee78768a1495a --- /dev/null +++ b/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs @@ -0,0 +1,622 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see <http://www.gnu.org/licenses/>. + +/// Notifications protocol. +/// +/// The Substrate notifications protocol consists in the following: +/// +/// - Node A opens a substream to node B and sends a message which contains some protocol-specific +/// higher-level logic. This message is prefixed with a variable-length integer message length. +/// This message can be empty, in which case `0` is sent. +/// - If node B accepts the substream, it sends back a message with the same properties. +/// Afterwards, the sending side of B is closed. +/// - If instead B refuses the connection (which typically happens because no empty slot is +/// available), then it immediately closes the substream without sending back anything. +/// - Node A can then send notifications to B, prefixed with a variable-length integer indicating +/// the length of the message. +/// - Node A closes its writing side if it doesn't want the notifications substream anymore. +/// +/// Notification substreams are unidirectional. If A opens a substream with B, then B is +/// encouraged but not required to open a substream to A as well. +/// + +use bytes::BytesMut; +use futures::{prelude::*, ready}; +use futures_codec::Framed; +use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade}; +use log::error; +use std::{borrow::Cow, collections::VecDeque, io, iter, mem, pin::Pin, task::{Context, Poll}}; +use unsigned_varint::codec::UviBytes; + +/// Maximum allowed size of the two handshake messages, in bytes. +const MAX_HANDSHAKE_SIZE: usize = 1024; +/// Maximum number of buffered messages before we consider the remote unresponsive and kill the +/// substream. +const MAX_PENDING_MESSAGES: usize = 256; + +/// Upgrade that accepts a substream, sends back a status message, then becomes a unidirectional +/// stream of messages. +#[derive(Debug, Clone)] +pub struct NotificationsIn { + /// Protocol name to use when negotiating the substream. + protocol_name: Cow<'static, [u8]>, +} + +/// Upgrade that opens a substream, waits for the remote to accept by sending back a status +/// message, then becomes a unidirectional sink of data. +#[derive(Debug, Clone)] +pub struct NotificationsOut { + /// Protocol name to use when negotiating the substream. + protocol_name: Cow<'static, [u8]>, + /// Message to send when we start the handshake. + initial_message: Vec<u8>, +} + +/// A substream for incoming notification messages. +/// +/// When creating, this struct starts in a state in which we must first send back a handshake +/// message to the remote. No message will come before this has been done. +#[pin_project::pin_project] +pub struct NotificationsInSubstream<TSubstream> { + #[pin] + socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>, + handshake: NotificationsInSubstreamHandshake, +} + +/// State of the handshake sending back process. +enum NotificationsInSubstreamHandshake { + /// Waiting for the user to give us the handshake message. + NotSent, + /// User gave us the handshake message. Trying to push it in the socket. + PendingSend(Vec<u8>), + /// Handshake message was pushed in the socket. Still need to flush. + Close, + /// Handshake message successfully sent. + Sent, +} + +/// A substream for outgoing notification messages. +#[pin_project::pin_project] +pub struct NotificationsOutSubstream<TSubstream> { + /// Substream where to send messages. + #[pin] + socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>, + /// Queue of messages waiting to be sent. + messages_queue: VecDeque<Vec<u8>>, + /// If true, we need to flush `socket`. + need_flush: bool, +} + +impl NotificationsIn { + /// Builds a new potential upgrade. + pub fn new(protocol_name: impl Into<Cow<'static, [u8]>>) -> Self { + NotificationsIn { + protocol_name: protocol_name.into(), + } + } + + /// Returns the name of the protocol that we accept. + pub fn protocol_name(&self) -> &[u8] { + &self.protocol_name + } +} + +impl UpgradeInfo for NotificationsIn { + type Info = Cow<'static, [u8]>; + type InfoIter = iter::Once<Self::Info>; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.protocol_name.clone()) + } +} + +impl<TSubstream> InboundUpgrade<TSubstream> for NotificationsIn +where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + type Output = (Vec<u8>, NotificationsInSubstream<TSubstream>); + type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>; + type Error = NotificationsHandshakeError; + + fn upgrade_inbound( + self, + mut socket: TSubstream, + _: Self::Info, + ) -> Self::Future { + Box::pin(async move { + let initial_message_len = unsigned_varint::aio::read_usize(&mut socket).await?; + if initial_message_len > MAX_HANDSHAKE_SIZE { + return Err(NotificationsHandshakeError::TooLarge { + requested: initial_message_len, + max: MAX_HANDSHAKE_SIZE, + }); + } + + let mut initial_message = vec![0u8; initial_message_len]; + if !initial_message.is_empty() { + socket.read(&mut initial_message).await?; + } + + let substream = NotificationsInSubstream { + socket: Framed::new(socket, UviBytes::default()), + handshake: NotificationsInSubstreamHandshake::NotSent, + }; + + Ok((initial_message, substream)) + }) + } +} + +impl<TSubstream> NotificationsInSubstream<TSubstream> +where TSubstream: AsyncRead + AsyncWrite, +{ + /// Sends the handshake in order to inform the remote that we accept the substream. + pub fn send_handshake(&mut self, message: impl Into<Vec<u8>>) { + match self.handshake { + NotificationsInSubstreamHandshake::NotSent => {} + _ => { + error!(target: "sub-libp2p", "Tried to send handshake twice"); + return; + } + } + + self.handshake = NotificationsInSubstreamHandshake::PendingSend(message.into()); + } +} + +impl<TSubstream> Stream for NotificationsInSubstream<TSubstream> +where TSubstream: AsyncRead + AsyncWrite + Unpin, +{ + type Item = Result<BytesMut, io::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + + // This `Stream` implementation first tries to send back the handshake if necessary. + loop { + match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) { + NotificationsInSubstreamHandshake::Sent => + return Stream::poll_next(this.socket.as_mut(), cx), + NotificationsInSubstreamHandshake::NotSent => + return Poll::Pending, + NotificationsInSubstreamHandshake::PendingSend(msg) => + match Sink::poll_ready(this.socket.as_mut(), cx) { + Poll::Ready(_) => { + *this.handshake = NotificationsInSubstreamHandshake::Close; + match Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg)) { + Ok(()) => {}, + Err(err) => return Poll::Ready(Some(Err(err))), + } + }, + Poll::Pending => + *this.handshake = NotificationsInSubstreamHandshake::PendingSend(msg), + }, + NotificationsInSubstreamHandshake::Close => + match Sink::poll_close(this.socket.as_mut(), cx)? { + Poll::Ready(()) => + *this.handshake = NotificationsInSubstreamHandshake::Sent, + Poll::Pending => + *this.handshake = NotificationsInSubstreamHandshake::Close, + }, + } + } + } +} + +impl NotificationsOut { + /// Builds a new potential upgrade. + pub fn new(protocol_name: impl Into<Cow<'static, [u8]>>, initial_message: impl Into<Vec<u8>>) -> Self { + let initial_message = initial_message.into(); + if initial_message.len() > MAX_HANDSHAKE_SIZE { + error!(target: "sub-libp2p", "Outbound networking handshake is above allowed protocol limit"); + } + + NotificationsOut { + protocol_name: protocol_name.into(), + initial_message, + } + } +} + +impl UpgradeInfo for NotificationsOut { + type Info = Cow<'static, [u8]>; + type InfoIter = iter::Once<Self::Info>; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.protocol_name.clone()) + } +} + +impl<TSubstream> OutboundUpgrade<TSubstream> for NotificationsOut +where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + type Output = (Vec<u8>, NotificationsOutSubstream<TSubstream>); + type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>; + type Error = NotificationsHandshakeError; + + fn upgrade_outbound( + self, + mut socket: TSubstream, + _: Self::Info, + ) -> Self::Future { + Box::pin(async move { + upgrade::write_with_len_prefix(&mut socket, &self.initial_message).await?; + + // Reading handshake. + let handshake_len = unsigned_varint::aio::read_usize(&mut socket).await?; + if handshake_len > MAX_HANDSHAKE_SIZE { + return Err(NotificationsHandshakeError::TooLarge { + requested: handshake_len, + max: MAX_HANDSHAKE_SIZE, + }); + } + + let mut handshake = vec![0u8; handshake_len]; + if !handshake.is_empty() { + socket.read(&mut handshake).await?; + } + + Ok((handshake, NotificationsOutSubstream { + socket: Framed::new(socket, UviBytes::default()), + messages_queue: VecDeque::with_capacity(MAX_PENDING_MESSAGES), + need_flush: false, + })) + }) + } +} + +impl<TSubstream> Sink<Vec<u8>> for NotificationsOutSubstream<TSubstream> + where TSubstream: AsyncRead + AsyncWrite + Unpin, +{ + type Error = NotificationsOutError; + + fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> { + if self.messages_queue.len() >= MAX_PENDING_MESSAGES { + return Err(NotificationsOutError::Clogged); + } + + self.messages_queue.push_back(item); + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { + let mut this = self.project(); + + while !this.messages_queue.is_empty() { + match Sink::poll_ready(this.socket.as_mut(), cx) { + Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))), + Poll::Ready(Ok(())) => { + let msg = this.messages_queue.pop_front() + .expect("checked for !is_empty above; qed"); + Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg))?; + *this.need_flush = true; + }, + Poll::Pending => return Poll::Pending, + } + } + + if *this.need_flush { + match Sink::poll_flush(this.socket.as_mut(), cx) { + Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))), + Poll::Ready(Ok(())) => *this.need_flush = false, + Poll::Pending => return Poll::Pending, + } + } + + Poll::Ready(Ok(())) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { + ready!(Sink::poll_flush(self.as_mut(), cx))?; + let this = self.project(); + match Sink::poll_close(this.socket, cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(err)) => Poll::Ready(Err(From::from(err))), + Poll::Pending => Poll::Pending, + } + } +} + +/// Error generated by sending on a notifications out substream. +#[derive(Debug, derive_more::From, derive_more::Display)] +pub enum NotificationsHandshakeError { + /// I/O error on the substream. + Io(io::Error), + + /// Initial message or handshake was too large. + #[display(fmt = "Initial message or handshake was too large: {}", requested)] + TooLarge { + /// Size requested by the remote. + requested: usize, + /// Maximum allowed, + max: usize, + }, + + /// Error while decoding the variable-length integer. + VarintDecode(unsigned_varint::decode::Error), +} + +impl From<unsigned_varint::io::ReadError> for NotificationsHandshakeError { + fn from(err: unsigned_varint::io::ReadError) -> Self { + match err { + unsigned_varint::io::ReadError::Io(err) => NotificationsHandshakeError::Io(err), + unsigned_varint::io::ReadError::Decode(err) => NotificationsHandshakeError::VarintDecode(err), + _ => { + log::warn!("Unrecognized varint decoding error"); + NotificationsHandshakeError::Io(From::from(io::ErrorKind::InvalidData)) + } + } + } +} + +/// Error generated by sending on a notifications out substream. +#[derive(Debug, derive_more::From, derive_more::Display)] +pub enum NotificationsOutError { + /// I/O error on the substream. + Io(io::Error), + + /// Remote doesn't process our messages quickly enough. + /// + /// > **Note**: This is not necessarily the remote's fault, and could also be caused by the + /// > local node sending data too quickly. Properly doing back-pressure, however, + /// > would require a deep refactoring effort in Substrate as a whole. + Clogged, +} + +#[cfg(test)] +mod tests { + use super::{NotificationsIn, NotificationsOut}; + + use async_std::net::{TcpListener, TcpStream}; + use futures::{prelude::*, channel::oneshot}; + use libp2p::core::upgrade; + use std::pin::Pin; + + #[test] + fn basic_works() { + const PROTO_NAME: &'static [u8] = b"/test/proto/1"; + let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); + + let client = async_std::task::spawn(async move { + let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); + let (handshake, mut substream) = upgrade::apply_outbound( + socket, + NotificationsOut::new(PROTO_NAME, &b"initial message"[..]), + upgrade::Version::V1 + ).await.unwrap(); + + assert_eq!(handshake, b"hello world"); + substream.send(b"test message".to_vec()).await.unwrap(); + }); + + async_std::task::block_on(async move { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); + + let (socket, _) = listener.accept().await.unwrap(); + let (initial_message, mut substream) = upgrade::apply_inbound( + socket, + NotificationsIn::new(PROTO_NAME) + ).await.unwrap(); + + assert_eq!(initial_message, b"initial message"); + substream.send_handshake(&b"hello world"[..]); + + let msg = substream.next().await.unwrap().unwrap(); + assert_eq!(msg.as_ref(), b"test message"); + }); + + async_std::task::block_on(client); + } + + #[test] + fn empty_handshake() { + // Check that everything still works when the handshake messages are empty. + + const PROTO_NAME: &'static [u8] = b"/test/proto/1"; + let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); + + let client = async_std::task::spawn(async move { + let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); + let (handshake, mut substream) = upgrade::apply_outbound( + socket, + NotificationsOut::new(PROTO_NAME, vec![]), + upgrade::Version::V1 + ).await.unwrap(); + + assert!(handshake.is_empty()); + substream.send(Default::default()).await.unwrap(); + }); + + async_std::task::block_on(async move { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); + + let (socket, _) = listener.accept().await.unwrap(); + let (initial_message, mut substream) = upgrade::apply_inbound( + socket, + NotificationsIn::new(PROTO_NAME) + ).await.unwrap(); + + assert!(initial_message.is_empty()); + substream.send_handshake(vec![]); + + let msg = substream.next().await.unwrap().unwrap(); + assert!(msg.as_ref().is_empty()); + }); + + async_std::task::block_on(client); + } + + #[test] + fn refused() { + const PROTO_NAME: &'static [u8] = b"/test/proto/1"; + let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); + + let client = async_std::task::spawn(async move { + let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); + let outcome = upgrade::apply_outbound( + socket, + NotificationsOut::new(PROTO_NAME, &b"hello"[..]), + upgrade::Version::V1 + ).await; + + // Despite the protocol negotiation being successfully conducted on the listener + // side, we have to receive an error here because the listener didn't send the + // handshake. + assert!(outcome.is_err()); + }); + + async_std::task::block_on(async move { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); + + let (socket, _) = listener.accept().await.unwrap(); + let (initial_msg, substream) = upgrade::apply_inbound( + socket, + NotificationsIn::new(PROTO_NAME) + ).await.unwrap(); + + assert_eq!(initial_msg, b"hello"); + + // We successfully upgrade to the protocol, but then close the substream. + drop(substream); + }); + + async_std::task::block_on(client); + } + + #[test] + fn large_initial_message_refused() { + const PROTO_NAME: &'static [u8] = b"/test/proto/1"; + let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); + + let client = async_std::task::spawn(async move { + let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); + let ret = upgrade::apply_outbound( + socket, + // We check that an initial message that is too large gets refused. + NotificationsOut::new(PROTO_NAME, (0..32768).map(|_| 0).collect::<Vec<_>>()), + upgrade::Version::V1 + ).await; + assert!(ret.is_err()); + }); + + async_std::task::block_on(async move { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); + + let (socket, _) = listener.accept().await.unwrap(); + let ret = upgrade::apply_inbound( + socket, + NotificationsIn::new(PROTO_NAME) + ).await; + assert!(ret.is_err()); + }); + + async_std::task::block_on(client); + } + + #[test] + fn large_handshake_refused() { + const PROTO_NAME: &'static [u8] = b"/test/proto/1"; + let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); + + let client = async_std::task::spawn(async move { + let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); + let ret = upgrade::apply_outbound( + socket, + NotificationsOut::new(PROTO_NAME, &b"initial message"[..]), + upgrade::Version::V1 + ).await; + assert!(ret.is_err()); + }); + + async_std::task::block_on(async move { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); + + let (socket, _) = listener.accept().await.unwrap(); + let (initial_message, mut substream) = upgrade::apply_inbound( + socket, + NotificationsIn::new(PROTO_NAME) + ).await.unwrap(); + assert_eq!(initial_message, b"initial message"); + + // We check that a handshake that is too large gets refused. + substream.send_handshake((0..32768).map(|_| 0).collect::<Vec<_>>()); + let _ = substream.next().await; + }); + + async_std::task::block_on(client); + } + + #[test] + fn buffer_is_full_closes_connection() { + const PROTO_NAME: &'static [u8] = b"/test/proto/1"; + let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); + + let client = async_std::task::spawn(async move { + let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); + let (handshake, mut substream) = upgrade::apply_outbound( + socket, + NotificationsOut::new(PROTO_NAME, vec![]), + upgrade::Version::V1 + ).await.unwrap(); + + assert!(handshake.is_empty()); + + // Push an item and flush so that the test works. + substream.send(b"hello world".to_vec()).await.unwrap(); + + for _ in 0..32768 { + // Push an item on the sink without flushing until an error happens because the + // buffer is full. + let message = b"hello world!".to_vec(); + if future::poll_fn(|cx| Sink::poll_ready(Pin::new(&mut substream), cx)).await.is_err() { + return Ok(()); + } + if Sink::start_send(Pin::new(&mut substream), message).is_err() { + return Ok(()); + } + } + + Err(()) + }); + + async_std::task::block_on(async move { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); + + let (socket, _) = listener.accept().await.unwrap(); + let (initial_message, mut substream) = upgrade::apply_inbound( + socket, + NotificationsIn::new(PROTO_NAME) + ).await.unwrap(); + + assert!(initial_message.is_empty()); + substream.send_handshake(vec![]); + + // Process one message so that the handshake and all works. + let _ = substream.next().await.unwrap().unwrap(); + + client.await.unwrap(); + }); + } +} diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 5674d841b32f3789339329c6db1ceeefd910e655..26facd98af99696aaa813ef5071657a2b57379b8 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -25,7 +25,7 @@ //! The methods of the [`NetworkService`] are implemented by sending a message over a channel, //! which is then processed by [`NetworkWorker::poll`]. -use std::{collections::{HashMap, HashSet}, fs, marker::PhantomData, io, path::Path}; +use std::{borrow::Cow, collections::{HashMap, HashSet}, fs, marker::PhantomData, io, path::Path}; use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}}; use std::pin::Pin; use std::task::Poll; @@ -490,9 +490,11 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic pub fn register_notifications_protocol( &self, engine_id: ConsensusEngineId, + protocol_name: impl Into<Cow<'static, [u8]>>, ) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, + protocol_name: protocol_name.into(), }); } @@ -710,6 +712,7 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT, S: NetworkSpecialization<B>> { }, RegisterNotifProtocol { engine_id: ConsensusEngineId, + protocol_name: Cow<'static, [u8]>, }, DisconnectPeer(PeerId), } @@ -791,8 +794,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne this.event_streams.push(sender), ServiceToWorkerMsg::WriteNotification { message, engine_id, target } => this.network_service.user_protocol_mut().write_notification(target, engine_id, message), - ServiceToWorkerMsg::RegisterNotifProtocol { engine_id } => { - let events = this.network_service.user_protocol_mut().register_notifications_protocol(engine_id); + ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => { + let events = this.network_service.user_protocol_mut().register_notifications_protocol(engine_id, protocol_name); for event in events { this.event_streams.retain(|sender| sender.unbounded_send(event.clone()).is_ok()); }