diff --git a/substrate/core/network-libp2p/src/behaviour.rs b/substrate/core/network-libp2p/src/behaviour.rs index 38ec22b0ac13bc98e0db6391c139f08d155c6a1a..c71df6f7c51328c2afd64e5e589ba87d3bc033a6 100644 --- a/substrate/core/network-libp2p/src/behaviour.rs +++ b/substrate/core/network-libp2p/src/behaviour.rs @@ -14,8 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see <http://www.gnu.org/licenses/>. -use crate::custom_proto::{CustomProtos, CustomProtosOut, RegisteredProtocols}; -use crate::{NetworkConfiguration, ProtocolId}; +use crate::custom_proto::{CustomProto, CustomProtoOut, RegisteredProtocol}; +use crate::NetworkConfiguration; use futures::prelude::*; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey}; @@ -37,7 +37,7 @@ pub struct Behaviour<TMessage, TSubstream> { /// Periodically ping nodes, and close the connection if it's unresponsive. ping: Ping<TSubstream>, /// Custom protocols (dot, bbq, sub, etc.). - custom_protocols: CustomProtos<TMessage, TSubstream>, + custom_protocols: CustomProto<TMessage, TSubstream>, /// Discovers nodes of the network. Defined below. discovery: DiscoveryBehaviour<TSubstream>, /// Periodically identifies the remote and responds to incoming requests. @@ -51,7 +51,7 @@ pub struct Behaviour<TMessage, TSubstream> { impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> { /// Builds a new `Behaviour`. // TODO: redundancy between config and local_public_key (https://github.com/libp2p/rust-libp2p/issues/745) - pub fn new(config: &NetworkConfiguration, local_public_key: PublicKey, protocols: RegisteredProtocols<TMessage>) -> Self { + pub fn new(config: &NetworkConfiguration, local_public_key: PublicKey, protocols: RegisteredProtocol<TMessage>) -> Self { let identify = { let proto_version = "/substrate/1.0".to_string(); let user_agent = format!("{} ({})", config.client_version, config.node_name); @@ -59,7 +59,7 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> { }; let local_peer_id = local_public_key.into_peer_id(); - let custom_protocols = CustomProtos::new(config, &local_peer_id, protocols); + let custom_protocols = CustomProto::new(config, &local_peer_id, protocols); Behaviour { ping: Ping::new(), @@ -70,15 +70,15 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> { } } - /// Sends a message to a peer using the given custom protocol. + /// Sends a message to a peer. /// /// Has no effect if the custom protocol is not open with the given peer. /// /// Also note that even we have a valid open substream, it may in fact be already closed /// without us knowing, in which case the packet will not be received. #[inline] - pub fn send_custom_message(&mut self, target: &PeerId, protocol_id: ProtocolId, data: TMessage) { - self.custom_protocols.send_packet(target, protocol_id, data) + pub fn send_custom_message(&mut self, target: &PeerId, data: TMessage) { + self.custom_protocols.send_packet(target, data) } /// Returns the number of peers in the topology. @@ -149,9 +149,9 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> { self.custom_protocols.is_enabled(peer_id) } - /// Returns the list of protocols we have open with the given peer. - pub fn open_protocols<'a>(&'a self, peer_id: &'a PeerId) -> impl Iterator<Item = ProtocolId> + 'a { - self.custom_protocols.open_protocols(peer_id) + /// Returns true if we have an open protocol with the given peer. + pub fn is_open(&self, peer_id: &PeerId) -> bool { + self.custom_protocols.is_open(peer_id) } /// Disconnects the custom protocols from a peer. @@ -184,8 +184,6 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> { pub enum BehaviourOut<TMessage> { /// Opened a custom protocol with the remote. CustomProtocolOpen { - /// Identifier of the protocol. - protocol_id: ProtocolId, /// Version of the protocol that has been opened. version: u8, /// Id of the node we have opened a connection with. @@ -198,8 +196,6 @@ pub enum BehaviourOut<TMessage> { CustomProtocolClosed { /// Id of the peer we were connected to. peer_id: PeerId, - /// Identifier of the protocol. - protocol_id: ProtocolId, /// Reason why the substream closed. If `Ok`, then it's a graceful exit (EOF). result: io::Result<()>, }, @@ -208,8 +204,6 @@ pub enum BehaviourOut<TMessage> { CustomMessage { /// Id of the peer the message came from. peer_id: PeerId, - /// Protocol which generated the message. - protocol_id: ProtocolId, /// Message that has been received. message: TMessage, }, @@ -218,8 +212,6 @@ pub enum BehaviourOut<TMessage> { Clogged { /// Id of the peer the message came from. peer_id: PeerId, - /// Protocol which generated the message. - protocol_id: ProtocolId, /// Copy of the messages that are within the buffer, for further diagnostic. messages: Vec<TMessage>, }, @@ -241,20 +233,20 @@ pub enum BehaviourOut<TMessage> { }, } -impl<TMessage> From<CustomProtosOut<TMessage>> for BehaviourOut<TMessage> { - fn from(other: CustomProtosOut<TMessage>) -> BehaviourOut<TMessage> { +impl<TMessage> From<CustomProtoOut<TMessage>> for BehaviourOut<TMessage> { + fn from(other: CustomProtoOut<TMessage>) -> BehaviourOut<TMessage> { match other { - CustomProtosOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint } => { - BehaviourOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint } + CustomProtoOut::CustomProtocolOpen { version, peer_id, endpoint } => { + BehaviourOut::CustomProtocolOpen { version, peer_id, endpoint } } - CustomProtosOut::CustomProtocolClosed { protocol_id, peer_id, result } => { - BehaviourOut::CustomProtocolClosed { protocol_id, peer_id, result } + CustomProtoOut::CustomProtocolClosed { peer_id, result } => { + BehaviourOut::CustomProtocolClosed { peer_id, result } } - CustomProtosOut::CustomMessage { protocol_id, peer_id, message } => { - BehaviourOut::CustomMessage { protocol_id, peer_id, message } + CustomProtoOut::CustomMessage { peer_id, message } => { + BehaviourOut::CustomMessage { peer_id, message } } - CustomProtosOut::Clogged { protocol_id, peer_id, messages } => { - BehaviourOut::Clogged { protocol_id, peer_id, messages } + CustomProtoOut::Clogged { peer_id, messages } => { + BehaviourOut::Clogged { peer_id, messages } } } } @@ -266,8 +258,8 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<void::Void> for Behaviou } } -impl<TMessage, TSubstream> NetworkBehaviourEventProcess<CustomProtosOut<TMessage>> for Behaviour<TMessage, TSubstream> { - fn inject_event(&mut self, event: CustomProtosOut<TMessage>) { +impl<TMessage, TSubstream> NetworkBehaviourEventProcess<CustomProtoOut<TMessage>> for Behaviour<TMessage, TSubstream> { + fn inject_event(&mut self, event: CustomProtoOut<TMessage>) { self.events.push(event.into()); } } diff --git a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs index ab64b42dafe21e5bb1a12de6a2502d7d28bf2957..473d2200ffbc6db4310ad2d947db17ae1c533575 100644 --- a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs +++ b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs @@ -14,10 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see <http://www.gnu.org/licenses/>. -use crate::custom_proto::handler::{CustomProtosHandler, CustomProtosHandlerOut, CustomProtosHandlerIn}; +use crate::custom_proto::handler::{CustomProtoHandler, CustomProtoHandlerOut, CustomProtoHandlerIn}; use crate::custom_proto::topology::NetTopology; -use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocols}; -use crate::{NetworkConfiguration, NonReservedPeerMode, ProtocolId}; +use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol}; +use crate::{NetworkConfiguration, NonReservedPeerMode}; use crate::parse_str_addr; use fnv::{FnvHashMap, FnvHashSet}; use futures::prelude::*; @@ -35,22 +35,22 @@ const NODES_FILE: &str = "nodes.json"; const PEER_DISABLE_DURATION: Duration = Duration::from_secs(5 * 60); /// Network behaviour that handles opening substreams for custom protocols with other nodes. -pub struct CustomProtos<TMessage, TSubstream> { +pub struct CustomProto<TMessage, TSubstream> { /// List of protocols to open with peers. Never modified. - registered_protocols: RegisteredProtocols<TMessage>, + protocol: RegisteredProtocol<TMessage>, /// Topology of the network. topology: NetTopology, - /// List of custom protocols that we have open with remotes. - open_protocols: Vec<(PeerId, ProtocolId)>, + /// List of peers for which the custom protocol is open. + opened_peers: FnvHashSet<PeerId>, /// List of peer handlers that were enabled. /// /// Note that it is possible for a peer to be in the shutdown process, in which case it will - /// not be in this list but will be present in `open_protocols`. + /// not be in this list but will be present in `opened_peers`. /// It is also possible that we have *just* enabled a peer, in which case it will be in this - /// list but not in `open_protocols`. + /// list but not in `opened_peers`. enabled_peers: FnvHashSet<PeerId>, /// Maximum number of incoming non-reserved connections, taken from the config. Never modified. @@ -76,19 +76,17 @@ pub struct CustomProtos<TMessage, TSubstream> { next_connect_to_nodes: Delay, /// Events to produce from `poll()`. - events: SmallVec<[NetworkBehaviourAction<CustomProtosHandlerIn<TMessage>, CustomProtosOut<TMessage>>; 4]>, + events: SmallVec<[NetworkBehaviourAction<CustomProtoHandlerIn<TMessage>, CustomProtoOut<TMessage>>; 4]>, /// Marker to pin the generics. marker: PhantomData<TSubstream>, } -/// Event that can be emitted by the `CustomProtos`. +/// Event that can be emitted by the `CustomProto`. #[derive(Debug)] -pub enum CustomProtosOut<TMessage> { +pub enum CustomProtoOut<TMessage> { /// Opened a custom protocol with the remote. CustomProtocolOpen { - /// Identifier of the protocol. - protocol_id: ProtocolId, /// Version of the protocol that has been opened. version: u8, /// Id of the node we have opened a connection with. @@ -101,8 +99,6 @@ pub enum CustomProtosOut<TMessage> { CustomProtocolClosed { /// Id of the peer we were connected to. peer_id: PeerId, - /// Identifier of the protocol. - protocol_id: ProtocolId, /// Reason why the substream closed. If `Ok`, then it's a graceful exit (EOF). result: io::Result<()>, }, @@ -111,8 +107,6 @@ pub enum CustomProtosOut<TMessage> { CustomMessage { /// Id of the peer the message came from. peer_id: PeerId, - /// Protocol which generated the message. - protocol_id: ProtocolId, /// Message that has been received. message: TMessage, }, @@ -122,16 +116,14 @@ pub enum CustomProtosOut<TMessage> { Clogged { /// Id of the peer which is clogged. peer_id: PeerId, - /// Protocol which has a problem. - protocol_id: ProtocolId, /// Copy of the messages that are within the buffer, for further diagnostic. messages: Vec<TMessage>, }, } -impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> { - /// Creates a `CustomProtos`. - pub fn new(config: &NetworkConfiguration, local_peer_id: &PeerId, registered_protocols: RegisteredProtocols<TMessage>) -> Self { +impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> { + /// Creates a `CustomProto`. + pub fn new(config: &NetworkConfiguration, local_peer_id: &PeerId, protocol: RegisteredProtocol<TMessage>) -> Self { // Initialize the topology of the network. let mut topology = if let Some(ref path) = config.net_config_path { let path = Path::new(path).join(NODES_FILE); @@ -157,11 +149,8 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> { .saturating_add(max_outgoing_connections) .saturating_add(4); // We add an arbitrary number for reserved peers slots - // Expected maximum number of substreams. - let open_protos_cap = connec_cap.saturating_mul(registered_protocols.len()); - - CustomProtos { - registered_protocols, + CustomProto { + protocol, topology, max_incoming_connections, max_outgoing_connections, @@ -169,7 +158,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> { connected_peers: Default::default(), reserved_peers: Default::default(), banned_peers: Vec::new(), - open_protocols: Vec::with_capacity(open_protos_cap), + opened_peers: FnvHashSet::with_capacity_and_hasher(connec_cap, Default::default()), enabled_peers: FnvHashSet::with_capacity_and_hasher(connec_cap, Default::default()), next_connect_to_nodes: Delay::new(Instant::now()), events: SmallVec::new(), @@ -232,7 +221,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> { } events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), - event: CustomProtosHandlerIn::Disable, + event: CustomProtoHandlerIn::Disable, }); false }) @@ -248,7 +237,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> { if self.enabled_peers.remove(peer) { self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer.clone(), - event: CustomProtosHandlerIn::Disable, + event: CustomProtoHandlerIn::Disable, }); } } @@ -273,7 +262,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> { if self.enabled_peers.remove(&peer_id) { self.events.push(NetworkBehaviourAction::SendEvent { peer_id, - event: CustomProtosHandlerIn::Disable, + event: CustomProtoHandlerIn::Disable, }); } } @@ -288,25 +277,21 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> { self.enabled_peers.contains(peer_id) } - /// Returns the list of protocols we have open with the given peer. - pub fn open_protocols<'a>(&'a self, peer_id: &'a PeerId) -> impl Iterator<Item = ProtocolId> + 'a { - self.open_protocols - .iter() - .filter(move |(p, _)| p == peer_id) - .map(|(_, proto)| *proto) + /// Returns true if we have opened a protocol with the given peer. + pub fn is_open(&self, peer_id: &PeerId) -> bool { + self.opened_peers.contains(peer_id) } - /// Sends a message to a peer using the given custom protocol. + /// Sends a message to a peer. /// /// Has no effect if the custom protocol is not open with the given peer. /// /// Also note that even we have a valid open substream, it may in fact be already closed /// without us knowing, in which case the packet will not be received. - pub fn send_packet(&mut self, target: &PeerId, protocol_id: ProtocolId, message: TMessage) { + pub fn send_packet(&mut self, target: &PeerId, message: TMessage) { self.events.push(NetworkBehaviourAction::SendEvent { peer_id: target.clone(), - event: CustomProtosHandlerIn::SendCustomMessage { - protocol: protocol_id, + event: CustomProtoHandlerIn::SendCustomMessage { message, } }); @@ -408,7 +393,7 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> { num_to_open -= 1; self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), - event: CustomProtosHandlerIn::Enable(Endpoint::Dialer), + event: CustomProtoHandlerIn::Enable(Endpoint::Dialer), }); } @@ -443,16 +428,16 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> { } } -impl<TMessage, TSubstream> NetworkBehaviour for CustomProtos<TMessage, TSubstream> +impl<TMessage, TSubstream> NetworkBehaviour for CustomProto<TMessage, TSubstream> where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage, { - type ProtocolsHandler = CustomProtosHandler<TMessage, TSubstream>; - type OutEvent = CustomProtosOut<TMessage>; + type ProtocolsHandler = CustomProtoHandler<TMessage, TSubstream>; + type OutEvent = CustomProtoOut<TMessage>; fn new_handler(&mut self) -> Self::ProtocolsHandler { - CustomProtosHandler::new(self.registered_protocols.clone()) + CustomProtoHandler::new(self.protocol.clone()) } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> { @@ -470,7 +455,7 @@ where debug!(target: "sub-libp2p", "Ignoring {:?} because we're in reserved mode", peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), - event: CustomProtosHandlerIn::Disable, + event: CustomProtoHandlerIn::Disable, }); return } @@ -482,7 +467,7 @@ where debug!(target: "sub-libp2p", "Ignoring banned peer {:?}", peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), - event: CustomProtosHandlerIn::Disable, + event: CustomProtoHandlerIn::Disable, }); return } @@ -501,7 +486,7 @@ where if num_outgoing == self.max_outgoing_connections { self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), - event: CustomProtosHandlerIn::Disable, + event: CustomProtoHandlerIn::Disable, }); return } @@ -518,7 +503,7 @@ where we're full", peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), - event: CustomProtosHandlerIn::Disable, + event: CustomProtoHandlerIn::Disable, }); return } @@ -533,13 +518,13 @@ where trace!(target: "sub-libp2p", "Enabling custom protocols with {:?} (active)", peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), - event: CustomProtosHandlerIn::Enable(Endpoint::Dialer), + event: CustomProtoHandlerIn::Enable(Endpoint::Dialer), }); } else { trace!(target: "sub-libp2p", "Enabling custom protocols with {:?} (passive)", peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), - event: CustomProtosHandlerIn::Enable(Endpoint::Listener), + event: CustomProtoHandlerIn::Enable(Endpoint::Listener), }); } @@ -553,11 +538,8 @@ where self.topology.set_disconnected(peer_id, &endpoint); - while let Some(pos) = self.open_protocols.iter().position(|(p, _)| p == peer_id) { - let (_, protocol_id) = self.open_protocols.remove(pos); - - let event = CustomProtosOut::CustomProtocolClosed { - protocol_id, + if self.opened_peers.remove(&peer_id) { + let event = CustomProtoOut::CustomProtocolClosed { peer_id: peer_id.clone(), result: Ok(()), }; @@ -596,35 +578,23 @@ where event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent, ) { match event { - CustomProtosHandlerOut::CustomProtocolClosed { protocol_id, result } => { - let pos = self.open_protocols.iter().position(|(s, p)| - s == &source && p == &protocol_id - ); - - if let Some(pos) = pos { - self.open_protocols.remove(pos); - } else { - debug_assert!(false, "Couldn't find protocol in open_protocols"); - } + CustomProtoHandlerOut::CustomProtocolClosed { result } => { + self.opened_peers.remove(&source); - let event = CustomProtosOut::CustomProtocolClosed { - protocol_id, + let event = CustomProtoOut::CustomProtocolClosed { result, peer_id: source, }; self.events.push(NetworkBehaviourAction::GenerateEvent(event)); } - CustomProtosHandlerOut::CustomProtocolOpen { protocol_id, version } => { - debug_assert!(!self.open_protocols.iter().any(|(s, p)| - s == &source && p == &protocol_id - )); - self.open_protocols.push((source.clone(), protocol_id)); + CustomProtoHandlerOut::CustomProtocolOpen { version } => { + debug_assert!(!self.is_open(&source)); + self.opened_peers.insert(source.clone()); let endpoint = self.connected_peers.get(&source) .expect("We only receive events from connected nodes; QED").clone(); - let event = CustomProtosOut::CustomProtocolOpen { - protocol_id, + let event = CustomProtoOut::CustomProtocolOpen { version, peer_id: source, endpoint, @@ -632,38 +602,32 @@ where self.events.push(NetworkBehaviourAction::GenerateEvent(event)); } - CustomProtosHandlerOut::CustomMessage { protocol_id, message } => { - debug_assert!(self.open_protocols.iter().any(|(s, p)| - s == &source && p == &protocol_id - )); - let event = CustomProtosOut::CustomMessage { + CustomProtoHandlerOut::CustomMessage { message } => { + debug_assert!(self.is_open(&source)); + let event = CustomProtoOut::CustomMessage { peer_id: source, - protocol_id, message, }; self.events.push(NetworkBehaviourAction::GenerateEvent(event)); } - CustomProtosHandlerOut::Clogged { protocol_id, messages } => { - debug_assert!(self.open_protocols.iter().any(|(s, p)| - s == &source && p == &protocol_id - )); - warn!(target: "sub-libp2p", "Queue of packets to send to {:?} (protocol: {:?}) is \ - pretty large", source, protocol_id); - self.events.push(NetworkBehaviourAction::GenerateEvent(CustomProtosOut::Clogged { + CustomProtoHandlerOut::Clogged { messages } => { + debug_assert!(self.is_open(&source)); + warn!(target: "sub-libp2p", "Queue of packets to send to {:?} is \ + pretty large", source); + self.events.push(NetworkBehaviourAction::GenerateEvent(CustomProtoOut::Clogged { peer_id: source, - protocol_id, messages, })); } - CustomProtosHandlerOut::ProtocolError { protocol_id, error, is_severe } => { + CustomProtoHandlerOut::ProtocolError { error, is_severe } => { if is_severe { - warn!(target: "sub-libp2p", "Network misbehaviour from {:?} with protocol \ - {:?}: {:?}", source, protocol_id, error); + warn!(target: "sub-libp2p", "Network misbehaviour from {:?}: {:?}", + source, error); self.ban_peer(source); } else { - debug!(target: "sub-libp2p", "Network misbehaviour from {:?} with protocol \ - {:?}: {:?}", source, protocol_id, error); + debug!(target: "sub-libp2p", "Network misbehaviour from {:?}: {:?}", + source, error); self.disconnect_peer(&source); } } diff --git a/substrate/core/network-libp2p/src/custom_proto/handler.rs b/substrate/core/network-libp2p/src/custom_proto/handler.rs index 947dc2ddd4130515214e8b3ac27963f8b624aaa8..a9db9fcb99bc2a374a2f73f07e0bc1d91637fdb9 100644 --- a/substrate/core/network-libp2p/src/custom_proto/handler.rs +++ b/substrate/core/network-libp2p/src/custom_proto/handler.rs @@ -14,8 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see <http://www.gnu.org/licenses/>. -use crate::ProtocolId; -use crate::custom_proto::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol, RegisteredProtocols}; +use crate::custom_proto::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol}; use crate::custom_proto::upgrade::{RegisteredProtocolEvent, RegisteredProtocolSubstream}; use futures::prelude::*; use libp2p::core::{ @@ -63,32 +62,26 @@ use void::Void; /// happens on one substream, we consider that we are disconnected. Re-enabling is performed by /// opening an outbound substream. /// -pub struct CustomProtosHandler<TMessage, TSubstream> { - /// Fields individual to each protocol that we support. - protocols: SmallVec<[PerProtocol<TMessage, TSubstream>; 1]>, +pub struct CustomProtoHandler<TMessage, TSubstream> { + /// Configuration for the protocol upgrade to negotiate. + protocol: RegisteredProtocol<TMessage>, + + /// State of the communications with the remote. + state: ProtocolState<TMessage, TSubstream>, /// Queue of events to send to the outside. /// /// This queue must only ever be modified to insert elements at the back, or remove the first /// element. - events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol<TMessage>, ProtocolId, CustomProtosHandlerOut<TMessage>>; 16]>, + events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol<TMessage>, (), CustomProtoHandlerOut<TMessage>>; 16]>, /// We have a warm-up period after creating the handler during which we don't shut down the /// connection. warm_up_end: Instant, } -/// Fields individual to each protocol that we support. -struct PerProtocol<TMessage, TSubstream> { - /// Configuration for the protocol upgrade to negotiate. - protocol: RegisteredProtocol<TMessage>, - - /// State of the communications with the remote. - state: PerProtocolState<TMessage, TSubstream>, -} - -/// State of the handler for a specific protocol. -enum PerProtocolState<TMessage, TSubstream> { +/// State of the handler. +enum ProtocolState<TMessage, TSubstream> { /// Waiting for the behaviour to tell the handler whether it is enabled or disabled. Init { /// List of substreams opened by the remote but that haven't been processed yet. @@ -160,44 +153,203 @@ struct PerProtocolNormalState<TMessage, TSubstream> { shutdown: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 4]>, } -impl<TMessage, TSubstream> PerProtocol<TMessage, TSubstream> +impl<TMessage, TSubstream> PerProtocolNormalState<TMessage, TSubstream> where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { - /// Enables the protocol. Returns an optional event to emit. - /// Must be passed the endpoint of the connection. - #[must_use] - fn enable(&mut self, endpoint: Endpoint) - -> Option<ProtocolsHandlerEvent<RegisteredProtocol<TMessage>, ProtocolId, CustomProtosHandlerOut<TMessage>>> { + /// Polls for things that are new. Same API constraints as `Future::poll()`. + /// Optionally returns the event to produce. + /// You must pass the `protocol_id` as we need have to inject it in the returned event. + /// API note: Ideally we wouldn't need to be passed a `ProtocolId`, and we would return a + /// different enum that doesn't contain any `protocol_id`, and the caller would inject + /// the ID itself, but that's a ton of code for not much gain. + fn poll(&mut self) -> Option<CustomProtoHandlerOut<TMessage>> { + for n in (0..self.pending_response.len()).rev() { + let (request_id, mut substream) = self.pending_response.swap_remove(n); + match substream.poll() { + Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => { + if message.request_id() == CustomMessageId::Response(request_id) { + let event = CustomProtoHandlerOut::CustomMessage { + message + }; + self.shutdown.push(substream); + return Some(event); + } else { + self.shutdown.push(substream); + let event = CustomProtoHandlerOut::ProtocolError { + is_severe: true, + error: format!("Request ID doesn't match substream: expected {:?}, \ + got {:?}", request_id, message.request_id()).into(), + }; + return Some(event); + } + }, + Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { .. }))) => + unreachable!("Cannot receive Clogged message with new protocol version; QED"), + Ok(Async::NotReady) => + self.pending_response.push((request_id, substream)), + Ok(Async::Ready(None)) => { + self.shutdown.push(substream); + let event = CustomProtoHandlerOut::ProtocolError { + is_severe: false, + error: format!("Request ID {:?} didn't receive an answer", request_id).into(), + }; + return Some(event); + } + Err(err) => { + self.shutdown.push(substream); + let event = CustomProtoHandlerOut::ProtocolError { + is_severe: false, + error: format!("Error while waiting for an answer for {:?}: {}", + request_id, err).into(), + }; + return Some(event); + } + } + } - let return_value; + for n in (0..self.incoming_substreams.len()).rev() { + let mut substream = self.incoming_substreams.swap_remove(n); + match substream.poll() { + Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => { + return match message.request_id() { + CustomMessageId::Request(id) => { + self.pending_send_back.push((id, substream)); + Some(CustomProtoHandlerOut::CustomMessage { + message + }) + } + CustomMessageId::OneWay => { + self.shutdown.push(substream); + Some(CustomProtoHandlerOut::CustomMessage { + message + }) + } + _ => { + self.shutdown.push(substream); + Some(CustomProtoHandlerOut::ProtocolError { + is_severe: true, + error: format!("Received response in new substream").into(), + }) + } + } + }, + Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { .. }))) => + unreachable!("Cannot receive Clogged message with new protocol version; QED"), + Ok(Async::NotReady) => + self.incoming_substreams.push(substream), + Ok(Async::Ready(None)) => {} + Err(err) => { + self.shutdown.push(substream); + return Some(CustomProtoHandlerOut::ProtocolError { + is_severe: false, + error: format!("Error in incoming substream: {}", err).into(), + }); + } + } + } + + shutdown_list(&mut self.shutdown); + None + } +} - self.state = match mem::replace(&mut self.state, PerProtocolState::Poisoned) { - PerProtocolState::Poisoned => { +/// Event that can be received by a `CustomProtoHandler`. +#[derive(Debug)] +pub enum CustomProtoHandlerIn<TMessage> { + /// The node should start using custom protocols. Contains whether we are the dialer or the + /// listener of the connection. + Enable(Endpoint), + + /// The node should stop using custom protocols. + Disable, + + /// Sends a message through a custom protocol substream. + SendCustomMessage { + /// The message to send. + message: TMessage, + }, +} + +/// Event that can be emitted by a `CustomProtoHandler`. +#[derive(Debug)] +pub enum CustomProtoHandlerOut<TMessage> { + /// Opened a custom protocol with the remote. + CustomProtocolOpen { + /// Version of the protocol that has been opened. + version: u8, + }, + + /// Closed a custom protocol with the remote. + CustomProtocolClosed { + /// Reason why the substream closed. If `Ok`, then it's a graceful exit (EOF). + result: io::Result<()>, + }, + + /// Receives a message on a custom protocol substream. + CustomMessage { + /// Message that has been received. + message: TMessage, + }, + + /// A substream to the remote is clogged. The send buffer is very large, and we should print + /// a diagnostic message and/or avoid sending more data. + Clogged { + /// Copy of the messages that are within the buffer, for further diagnostic. + messages: Vec<TMessage>, + }, + + /// An error has happened on the protocol level with this node. + ProtocolError { + /// If true the error is severe, such as a protocol violation. + is_severe: bool, + /// The error that happened. + error: Box<dyn error::Error + Send + Sync>, + }, +} + +impl<TMessage, TSubstream> CustomProtoHandler<TMessage, TSubstream> +where + TSubstream: AsyncRead + AsyncWrite, + TMessage: CustomMessage, +{ + /// Builds a new `CustomProtoHandler`. + pub fn new(protocol: RegisteredProtocol<TMessage>) -> Self { + CustomProtoHandler { + protocol, + state: ProtocolState::Init { + substreams: SmallVec::new(), + init_deadline: Delay::new(Instant::now() + Duration::from_secs(5)) + }, + events_queue: SmallVec::new(), + warm_up_end: Instant::now() + Duration::from_secs(5), + } + } + + /// Enables the handler. + fn enable(&mut self, endpoint: Endpoint) { + self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { + ProtocolState::Poisoned => { error!(target: "sub-libp2p", "Handler is in poisoned state"); - return_value = None; - PerProtocolState::Poisoned + ProtocolState::Poisoned } - PerProtocolState::Init { substreams: incoming, .. } => { + ProtocolState::Init { substreams: incoming, .. } => { if incoming.is_empty() { if let Endpoint::Dialer = endpoint { - return_value = Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { + self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade: self.protocol.clone(), - info: self.protocol.id(), + info: (), }); - } else { - return_value = None; } - PerProtocolState::Opening { + ProtocolState::Opening { deadline: Delay::new(Instant::now() + Duration::from_secs(60)) } } else if incoming.iter().any(|s| s.is_multiplex()) { - let event = CustomProtosHandlerOut::CustomProtocolOpen { - protocol_id: self.protocol.id(), + let event = CustomProtoHandlerOut::CustomProtocolOpen { version: incoming[0].protocol_version() }; - return_value = Some(ProtocolsHandlerEvent::Custom(event)); - PerProtocolState::Normal(PerProtocolNormalState { + self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); + ProtocolState::Normal(PerProtocolNormalState { outgoing_substream: None, incoming_substreams: incoming.into_iter().collect(), pending_response: SmallVec::new(), @@ -207,12 +359,11 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { }) } else { - let event = CustomProtosHandlerOut::CustomProtocolOpen { - protocol_id: self.protocol.id(), + let event = CustomProtoHandlerOut::CustomProtocolOpen { version: incoming[0].protocol_version() }; - return_value = Some(ProtocolsHandlerEvent::Custom(event)); - PerProtocolState::BackCompat { + self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); + ProtocolState::BackCompat { substream: incoming.into_iter().next() .expect("We have a check above that incoming isn't empty; QED"), shutdown: SmallVec::new() @@ -220,51 +371,48 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { } } - st @ PerProtocolState::Opening { .. } => { return_value = None; st } - st @ PerProtocolState::BackCompat { .. } => { return_value = None; st } - st @ PerProtocolState::Normal { .. } => { return_value = None; st } - PerProtocolState::Disabled { shutdown, .. } => { - return_value = None; - PerProtocolState::Disabled { shutdown, reenable: true } + st @ ProtocolState::Opening { .. } => st, + st @ ProtocolState::BackCompat { .. } => st, + st @ ProtocolState::Normal { .. } => st, + ProtocolState::Disabled { shutdown, .. } => { + ProtocolState::Disabled { shutdown, reenable: true } } - }; - - return_value + } } - /// Disables the protocol. Returns `true` if the protocol was closed, `false` if it was already - /// closed or not open yet. - fn disable(&mut self) -> bool { - let mut return_value = false; - - self.state = match mem::replace(&mut self.state, PerProtocolState::Poisoned) { - PerProtocolState::Poisoned => { + /// Disables the handler. + fn disable(&mut self) { + self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { + ProtocolState::Poisoned => { error!(target: "sub-libp2p", "Handler is in poisoned state"); - PerProtocolState::Poisoned + ProtocolState::Poisoned } - PerProtocolState::Init { substreams: mut shutdown, .. } => { + ProtocolState::Init { substreams: mut shutdown, .. } => { for s in &mut shutdown { s.shutdown(); } - PerProtocolState::Disabled { shutdown, reenable: false } + ProtocolState::Disabled { shutdown, reenable: false } } - PerProtocolState::Opening { .. } => { - PerProtocolState::Disabled { shutdown: SmallVec::new(), reenable: false } + ProtocolState::Opening { .. } => { + ProtocolState::Disabled { shutdown: SmallVec::new(), reenable: false } } - PerProtocolState::BackCompat { mut substream, mut shutdown } => { + ProtocolState::BackCompat { mut substream, mut shutdown } => { substream.shutdown(); shutdown.push(substream); - return_value = true; - PerProtocolState::Disabled { + let event = CustomProtoHandlerOut::CustomProtocolClosed { + result: Ok(()) + }; + self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); + ProtocolState::Disabled { shutdown: shutdown.into_iter().collect(), reenable: false } } - PerProtocolState::Normal(state) => { + ProtocolState::Normal(state) => { let mut out: SmallVec<[_; 6]> = SmallVec::new(); out.extend(state.outgoing_substream.into_iter()); out.extend(state.incoming_substreams.into_iter()); @@ -274,31 +422,31 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { s.shutdown(); } out.extend(state.shutdown.into_iter()); - return_value = true; - PerProtocolState::Disabled { shutdown: out, reenable: false } + let event = CustomProtoHandlerOut::CustomProtocolClosed { + result: Ok(()) + }; + self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); + ProtocolState::Disabled { shutdown: out, reenable: false } } - PerProtocolState::Disabled { shutdown, .. } => - PerProtocolState::Disabled { shutdown, reenable: false }, + ProtocolState::Disabled { shutdown, .. } => + ProtocolState::Disabled { shutdown, reenable: false }, }; - - return_value } /// Polls the state for events. Optionally returns an event to produce. #[must_use] - fn poll(&mut self) - -> Option<ProtocolsHandlerEvent<RegisteredProtocol<TMessage>, ProtocolId, CustomProtosHandlerOut<TMessage>>> { - + fn poll_state(&mut self) + -> Option<ProtocolsHandlerEvent<RegisteredProtocol<TMessage>, (), CustomProtoHandlerOut<TMessage>>> { let return_value; - self.state = match mem::replace(&mut self.state, PerProtocolState::Poisoned) { - PerProtocolState::Poisoned => { + self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { + ProtocolState::Poisoned => { error!(target: "sub-libp2p", "Handler is in poisoned state; shutting down"); return_value = None; - PerProtocolState::Poisoned + ProtocolState::Poisoned } - PerProtocolState::Init { substreams, mut init_deadline } => { + ProtocolState::Init { substreams, mut init_deadline } => { match init_deadline.poll() { Ok(Async::Ready(())) => error!(target: "sub-libp2p", "Handler initialization process is too long"), @@ -307,74 +455,69 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { } return_value = None; - PerProtocolState::Init { substreams, init_deadline } + ProtocolState::Init { substreams, init_deadline } } - PerProtocolState::Opening { mut deadline } => { + ProtocolState::Opening { mut deadline } => { match deadline.poll() { Ok(Async::Ready(())) => { deadline.reset(Instant::now() + Duration::from_secs(60)); - let event = CustomProtosHandlerOut::ProtocolError { - protocol_id: self.protocol.id(), + let event = CustomProtoHandlerOut::ProtocolError { is_severe: false, error: "Timeout when opening protocol".to_string().into(), }; return_value = Some(ProtocolsHandlerEvent::Custom(event)); - PerProtocolState::Opening { deadline } + ProtocolState::Opening { deadline } }, Ok(Async::NotReady) => { return_value = None; - PerProtocolState::Opening { deadline } + ProtocolState::Opening { deadline } }, Err(_) => { error!(target: "sub-libp2p", "Tokio timer has errored"); deadline.reset(Instant::now() + Duration::from_secs(60)); return_value = None; - PerProtocolState::Opening { deadline } + ProtocolState::Opening { deadline } }, } } - PerProtocolState::BackCompat { mut substream, shutdown } => { + ProtocolState::BackCompat { mut substream, shutdown } => { match substream.poll() { Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => { - let event = CustomProtosHandlerOut::CustomMessage { - protocol_id: substream.protocol_id(), + let event = CustomProtoHandlerOut::CustomMessage { message }; return_value = Some(ProtocolsHandlerEvent::Custom(event)); - PerProtocolState::BackCompat { substream, shutdown } + ProtocolState::BackCompat { substream, shutdown } }, Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages }))) => { - let event = CustomProtosHandlerOut::Clogged { - protocol_id: substream.protocol_id(), + let event = CustomProtoHandlerOut::Clogged { messages, }; return_value = Some(ProtocolsHandlerEvent::Custom(event)); - PerProtocolState::BackCompat { substream, shutdown } + ProtocolState::BackCompat { substream, shutdown } } Ok(Async::NotReady) => { return_value = None; - PerProtocolState::BackCompat { substream, shutdown } + ProtocolState::BackCompat { substream, shutdown } } Ok(Async::Ready(None)) => { - let event = CustomProtosHandlerOut::CustomProtocolClosed { - protocol_id: substream.protocol_id(), + let event = CustomProtoHandlerOut::CustomProtocolClosed { result: Ok(()) }; return_value = Some(ProtocolsHandlerEvent::Custom(event)); - PerProtocolState::Disabled { + ProtocolState::Disabled { shutdown: shutdown.into_iter().collect(), reenable: false } } Err(err) => { - let event = CustomProtosHandlerOut::CustomProtocolClosed { - protocol_id: substream.protocol_id(), + let event = CustomProtoHandlerOut::CustomProtocolClosed { result: Err(err), }; return_value = Some(ProtocolsHandlerEvent::Custom(event)); - PerProtocolState::Disabled { + ProtocolState::Disabled { shutdown: shutdown.into_iter().collect(), reenable: false } @@ -382,291 +525,66 @@ where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { } } - PerProtocolState::Normal(mut norm_state) => { - if let Some(event) = norm_state.poll(self.protocol.id()) { + ProtocolState::Normal(mut norm_state) => { + if let Some(event) = norm_state.poll() { return_value = Some(ProtocolsHandlerEvent::Custom(event)); } else { return_value = None; } - PerProtocolState::Normal(norm_state) + ProtocolState::Normal(norm_state) } - PerProtocolState::Disabled { mut shutdown, reenable } => { + ProtocolState::Disabled { mut shutdown, reenable } => { shutdown_list(&mut shutdown); // If `reenable` is `true`, that means we should open the substreams system again // after all the substreams are closed. if reenable && shutdown.is_empty() { return_value = Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade: self.protocol.clone(), - info: self.protocol.id(), + info: (), }); - PerProtocolState::Opening { + ProtocolState::Opening { deadline: Delay::new(Instant::now() + Duration::from_secs(60)) } } else { return_value = None; - PerProtocolState::Disabled { shutdown, reenable } + ProtocolState::Disabled { shutdown, reenable } } } }; return_value } -} - -impl<TMessage, TSubstream> PerProtocolNormalState<TMessage, TSubstream> -where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { - /// Polls for things that are new. Same API constraints as `Future::poll()`. - /// Optionally returns the event to produce. - /// You must pass the `protocol_id` as we need have to inject it in the returned event. - /// API note: Ideally we wouldn't need to be passed a `ProtocolId`, and we would return a - /// different enum that doesn't contain any `protocol_id`, and the caller would inject - /// the ID itself, but that's a ton of code for not much gain. - fn poll(&mut self, protocol_id: ProtocolId) -> Option<CustomProtosHandlerOut<TMessage>> { - for n in (0..self.pending_response.len()).rev() { - let (request_id, mut substream) = self.pending_response.swap_remove(n); - match substream.poll() { - Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => { - if message.request_id() == CustomMessageId::Response(request_id) { - let event = CustomProtosHandlerOut::CustomMessage { - protocol_id: substream.protocol_id(), - message - }; - self.shutdown.push(substream); - return Some(event); - } else { - self.shutdown.push(substream); - let event = CustomProtosHandlerOut::ProtocolError { - protocol_id, - is_severe: true, - error: format!("Request ID doesn't match substream: expected {:?}, \ - got {:?}", request_id, message.request_id()).into(), - }; - return Some(event); - } - }, - Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { .. }))) => - unreachable!("Cannot receive Clogged message with new protocol version; QED"), - Ok(Async::NotReady) => - self.pending_response.push((request_id, substream)), - Ok(Async::Ready(None)) => { - self.shutdown.push(substream); - let event = CustomProtosHandlerOut::ProtocolError { - protocol_id, - is_severe: false, - error: format!("Request ID {:?} didn't receive an answer", request_id).into(), - }; - return Some(event); - } - Err(err) => { - self.shutdown.push(substream); - let event = CustomProtosHandlerOut::ProtocolError { - protocol_id, - is_severe: false, - error: format!("Error while waiting for an answer for {:?}: {}", - request_id, err).into(), - }; - return Some(event); - } - } - } - - for n in (0..self.incoming_substreams.len()).rev() { - let mut substream = self.incoming_substreams.swap_remove(n); - match substream.poll() { - Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => { - let protocol_id = substream.protocol_id(); - if let CustomMessageId::Request(id) = message.request_id() { - self.pending_send_back.push((id, substream)); - return Some(CustomProtosHandlerOut::CustomMessage { - protocol_id, - message - }); - } else if let CustomMessageId::OneWay = message.request_id() { - self.shutdown.push(substream); - return Some(CustomProtosHandlerOut::CustomMessage { - protocol_id, - message - }); - } else { - self.shutdown.push(substream); - return Some(CustomProtosHandlerOut::ProtocolError { - protocol_id, - is_severe: true, - error: format!("Received response in new substream").into(), - }); - } - }, - Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { .. }))) => - unreachable!("Cannot receive Clogged message with new protocol version; QED"), - Ok(Async::NotReady) => - self.incoming_substreams.push(substream), - Ok(Async::Ready(None)) => {} - Err(err) => { - self.shutdown.push(substream); - return Some(CustomProtosHandlerOut::ProtocolError { - protocol_id, - is_severe: false, - error: format!("Error in incoming substream: {}", err).into(), - }); - } - } - } - - shutdown_list(&mut self.shutdown); - None - } -} - -/// Event that can be received by a `CustomProtosHandler`. -#[derive(Debug)] -pub enum CustomProtosHandlerIn<TMessage> { - /// The node should start using custom protocols. Contains whether we are the dialer or the - /// listener of the connection. - Enable(Endpoint), - - /// The node should stop using custom protocols. - Disable, - - /// Sends a message through a custom protocol substream. - SendCustomMessage { - /// The protocol to use. - protocol: ProtocolId, - /// The message to send. - message: TMessage, - }, -} - -/// Event that can be emitted by a `CustomProtosHandler`. -#[derive(Debug)] -pub enum CustomProtosHandlerOut<TMessage> { - /// Opened a custom protocol with the remote. - CustomProtocolOpen { - /// Identifier of the protocol. - protocol_id: ProtocolId, - /// Version of the protocol that has been opened. - version: u8, - }, - - /// Closed a custom protocol with the remote. - CustomProtocolClosed { - /// Identifier of the protocol. - protocol_id: ProtocolId, - /// Reason why the substream closed. If `Ok`, then it's a graceful exit (EOF). - result: io::Result<()>, - }, - - /// Receives a message on a custom protocol substream. - CustomMessage { - /// Protocol which generated the message. - protocol_id: ProtocolId, - /// Message that has been received. - message: TMessage, - }, - - /// A substream to the remote is clogged. The send buffer is very large, and we should print - /// a diagnostic message and/or avoid sending more data. - Clogged { - /// Protocol which is clogged. - protocol_id: ProtocolId, - /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec<TMessage>, - }, - - /// An error has happened on the protocol level with this node. - ProtocolError { - /// Protocol for which the error happened. - protocol_id: ProtocolId, - /// If true the error is severe, such as a protocol violation. - is_severe: bool, - /// The error that happened. - error: Box<dyn error::Error + Send + Sync>, - }, -} - -impl<TMessage, TSubstream> CustomProtosHandler<TMessage, TSubstream> -where - TSubstream: AsyncRead + AsyncWrite, - TMessage: CustomMessage, -{ - /// Builds a new `CustomProtosHandler`. - pub fn new(protocols: RegisteredProtocols<TMessage>) -> Self { - CustomProtosHandler { - protocols: protocols.0.into_iter().map(|protocol| { - PerProtocol { - protocol, - state: PerProtocolState::Init { - substreams: SmallVec::new(), - init_deadline: Delay::new(Instant::now() + Duration::from_secs(5)) - }, - } - }).collect(), - events_queue: SmallVec::new(), - warm_up_end: Instant::now() + Duration::from_secs(5), - } - } - - /// Enables the handler for all protocols. - fn enable(&mut self, endpoint: Endpoint) { - for protocol in &mut self.protocols { - if let Some(message) = protocol.enable(endpoint) { - self.events_queue.push(message); - } - } - } - - /// Disables the handler for all protocols. - fn disable(&mut self) { - for protocol in &mut self.protocols { - if protocol.disable() { - let event = CustomProtosHandlerOut::CustomProtocolClosed { - protocol_id: protocol.protocol.id(), - result: Ok(()) - }; - self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); - } - } - } /// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`. fn inject_fully_negotiated( &mut self, mut substream: RegisteredProtocolSubstream<TMessage, TSubstream> ) { - let state = match self.protocols.iter_mut().find(|p| p.protocol.id() == substream.protocol_id()) { - Some(p) => &mut p.state, - None => { - error!(target: "sub-libp2p", "Found unknown protocol ID {:?}", - substream.protocol_id()); - return - }, - }; - - *state = match mem::replace(state, PerProtocolState::Poisoned) { - PerProtocolState::Poisoned => { + self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { + ProtocolState::Poisoned => { error!(target: "sub-libp2p", "Handler is in poisoned state"); - PerProtocolState::Poisoned + ProtocolState::Poisoned } - PerProtocolState::Init { mut substreams, init_deadline } => { + ProtocolState::Init { mut substreams, init_deadline } => { if substream.endpoint() == Endpoint::Dialer { error!(target: "sub-libp2p", "Opened dialing substream before initialization"); } substreams.push(substream); - PerProtocolState::Init { substreams, init_deadline } + ProtocolState::Init { substreams, init_deadline } } - PerProtocolState::Opening { .. } => { - let event = CustomProtosHandlerOut::CustomProtocolOpen { - protocol_id: substream.protocol_id(), + ProtocolState::Opening { .. } => { + let event = CustomProtoHandlerOut::CustomProtocolOpen { version: substream.protocol_version() }; self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); match (substream.endpoint(), substream.is_multiplex()) { (Endpoint::Dialer, true) => { - PerProtocolState::Normal(PerProtocolNormalState { + ProtocolState::Normal(PerProtocolNormalState { outgoing_substream: Some(substream), incoming_substreams: SmallVec::new(), pending_response: SmallVec::new(), @@ -676,7 +594,7 @@ where }) }, (Endpoint::Listener, true) => { - PerProtocolState::Normal(PerProtocolNormalState { + ProtocolState::Normal(PerProtocolNormalState { outgoing_substream: None, incoming_substreams: smallvec![substream], pending_response: SmallVec::new(), @@ -686,7 +604,7 @@ where }) }, (_, false) => { - PerProtocolState::BackCompat { + ProtocolState::BackCompat { substream, shutdown: SmallVec::new() } @@ -694,15 +612,15 @@ where } } - PerProtocolState::BackCompat { substream: existing, mut shutdown } => { + ProtocolState::BackCompat { substream: existing, mut shutdown } => { warn!(target: "sub-libp2p", "Received extra substream after having already one \ open in backwards-compatibility mode"); substream.shutdown(); shutdown.push(substream); - PerProtocolState::BackCompat { substream: existing, shutdown } + ProtocolState::BackCompat { substream: existing, shutdown } } - PerProtocolState::Normal(mut state) => { + ProtocolState::Normal(mut state) => { if substream.endpoint() == Endpoint::Listener { state.incoming_substreams.push(substream); } else if !state.pending_messages.is_empty() { @@ -720,49 +638,39 @@ where state.shutdown.push(substream); } - PerProtocolState::Normal(state) + ProtocolState::Normal(state) } - PerProtocolState::Disabled { mut shutdown, .. } => { + ProtocolState::Disabled { mut shutdown, .. } => { substream.shutdown(); shutdown.push(substream); - PerProtocolState::Disabled { shutdown, reenable: false } + ProtocolState::Disabled { shutdown, reenable: false } } }; } /// Sends a message to the remote. - fn send_message(&mut self, protocol: ProtocolId, message: TMessage) { - let (protocol, state) = match self.protocols.iter_mut().find(|p| p.protocol.id() == protocol) { - Some(p) => (&mut p.protocol, &mut p.state), - None => { - error!(target: "sub-libp2p", "Tried to send message over unknown protocol ID {:?}", - protocol); - return - }, - }; - - match *state { - PerProtocolState::BackCompat { ref mut substream, .. } => + fn send_message(&mut self, message: TMessage) { + match self.state { + ProtocolState::BackCompat { ref mut substream, .. } => substream.send_message(message), - PerProtocolState::Normal(ref mut state) => { + ProtocolState::Normal(ref mut state) => { if let CustomMessageId::Request(request_id) = message.request_id() { if let Some(mut outgoing_substream) = state.outgoing_substream.take() { outgoing_substream.send_message(message); state.pending_response.push((request_id, outgoing_substream)); } else { if state.pending_messages.len() >= 2048 { - let event = CustomProtosHandlerOut::Clogged { + let event = CustomProtoHandlerOut::Clogged { messages: Vec::new(), - protocol_id: protocol.id() }; self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); } state.pending_messages.push(message); self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { - upgrade: protocol.clone(), - info: protocol.id() + upgrade: self.protocol.clone(), + info: () }); } } else if let CustomMessageId::Response(request_id) = message.request_id() { @@ -779,16 +687,15 @@ where state.shutdown.push(outgoing_substream); } else { if state.pending_messages.len() >= 2048 { - let event = CustomProtosHandlerOut::Clogged { + let event = CustomProtoHandlerOut::Clogged { messages: Vec::new(), - protocol_id: protocol.id() }; self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); } state.pending_messages.push(message); self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { - upgrade: protocol.clone(), - info: protocol.id() + upgrade: self.protocol.clone(), + info: () }); } } @@ -798,19 +705,18 @@ where } } -impl<TMessage, TSubstream> ProtocolsHandler for CustomProtosHandler<TMessage, TSubstream> +impl<TMessage, TSubstream> ProtocolsHandler for CustomProtoHandler<TMessage, TSubstream> where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { - type InEvent = CustomProtosHandlerIn<TMessage>; - type OutEvent = CustomProtosHandlerOut<TMessage>; + type InEvent = CustomProtoHandlerIn<TMessage>; + type OutEvent = CustomProtoHandlerOut<TMessage>; type Substream = TSubstream; type Error = Void; - type InboundProtocol = RegisteredProtocols<TMessage>; + type InboundProtocol = RegisteredProtocol<TMessage>; type OutboundProtocol = RegisteredProtocol<TMessage>; - type OutboundOpenInfo = ProtocolId; + type OutboundOpenInfo = (); - #[inline] fn listen_protocol(&self) -> Self::InboundProtocol { - RegisteredProtocols(self.protocols.iter().map(|p| p.protocol.clone()).collect()) + self.protocol.clone() } fn inject_fully_negotiated_inbound( @@ -820,7 +726,6 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { self.inject_fully_negotiated(proto); } - #[inline] fn inject_fully_negotiated_outbound( &mut self, proto: <Self::OutboundProtocol as OutboundUpgrade<TSubstream>>::Output, @@ -829,24 +734,23 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { self.inject_fully_negotiated(proto); } - fn inject_event(&mut self, message: CustomProtosHandlerIn<TMessage>) { + fn inject_event(&mut self, message: CustomProtoHandlerIn<TMessage>) { match message { - CustomProtosHandlerIn::Disable => self.disable(), - CustomProtosHandlerIn::Enable(endpoint) => self.enable(endpoint), - CustomProtosHandlerIn::SendCustomMessage { protocol, message } => - self.send_message(protocol, message), + CustomProtoHandlerIn::Disable => self.disable(), + CustomProtoHandlerIn::Enable(endpoint) => self.enable(endpoint), + CustomProtoHandlerIn::SendCustomMessage { message } => + self.send_message(message), } } #[inline] - fn inject_dial_upgrade_error(&mut self, protocol_id: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<io::Error>) { + fn inject_dial_upgrade_error(&mut self, _: (), err: ProtocolsHandlerUpgrErr<io::Error>) { let is_severe = match err { ProtocolsHandlerUpgrErr::Upgrade(_) => true, _ => false, }; - self.events_queue.push(ProtocolsHandlerEvent::Custom(CustomProtosHandlerOut::ProtocolError { - protocol_id, + self.events_queue.push(ProtocolsHandlerEvent::Custom(CustomProtoHandlerOut::ProtocolError { is_severe, error: Box::new(err), })); @@ -863,14 +767,11 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { let mut keep_forever = false; - for protocol in self.protocols.iter() { - match protocol.state { - PerProtocolState::Init { .. } | PerProtocolState::Opening { .. } => {} - PerProtocolState::BackCompat { .. } | PerProtocolState::Normal { .. } => - keep_forever = true, - PerProtocolState::Disabled { .. } | - PerProtocolState::Poisoned => return KeepAlive::Now, - } + match self.state { + ProtocolState::Init { .. } | ProtocolState::Opening { .. } => {} + ProtocolState::BackCompat { .. } | ProtocolState::Normal { .. } => + keep_forever = true, + ProtocolState::Disabled { .. } | ProtocolState::Poisoned => return KeepAlive::Now, } if keep_forever { @@ -893,22 +794,20 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { } // Process all the substreams. - for protocol in self.protocols.iter_mut() { - if let Some(event) = protocol.poll() { - return Ok(Async::Ready(event)) - } + if let Some(event) = self.poll_state() { + return Ok(Async::Ready(event)) } Ok(Async::NotReady) } } -impl<TMessage, TSubstream> fmt::Debug for CustomProtosHandler<TMessage, TSubstream> +impl<TMessage, TSubstream> fmt::Debug for CustomProtoHandler<TMessage, TSubstream> where TSubstream: AsyncRead + AsyncWrite, { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - f.debug_struct("CustomProtosHandler") + f.debug_struct("CustomProtoHandler") .finish() } } diff --git a/substrate/core/network-libp2p/src/custom_proto/mod.rs b/substrate/core/network-libp2p/src/custom_proto/mod.rs index 073ce8360aeea9137ec356108fea24f6a6b654c1..279f9a23347829ed817f672fa531d4418dcd2a2a 100644 --- a/substrate/core/network-libp2p/src/custom_proto/mod.rs +++ b/substrate/core/network-libp2p/src/custom_proto/mod.rs @@ -14,8 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see <http://www.gnu.org/licenses/>. -pub use self::behaviour::{CustomProtos, CustomProtosOut}; -pub use self::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol, RegisteredProtocols}; +pub use self::behaviour::{CustomProto, CustomProtoOut}; +pub use self::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol}; mod behaviour; mod handler; diff --git a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs b/substrate/core/network-libp2p/src/custom_proto/upgrade.rs index caab8ec9d09eeebbec5b3557380910bea5bb95c8..8f54aa18d0f838982ec419b586090ba3da81d99e 100644 --- a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs +++ b/substrate/core/network-libp2p/src/custom_proto/upgrade.rs @@ -437,99 +437,3 @@ where TSubstream: AsyncRead + AsyncWrite, }) } } - -// Connection upgrade for all the protocols contained in it. -pub struct RegisteredProtocols<TMessage>(pub Vec<RegisteredProtocol<TMessage>>); - -impl<TMessage> RegisteredProtocols<TMessage> { - /// Returns the number of protocols. - #[inline] - pub fn len(&self) -> usize { - self.0.len() - } -} - -impl<TMessage> Default for RegisteredProtocols<TMessage> { - fn default() -> Self { - RegisteredProtocols(Vec::new()) - } -} - -impl<TMessage> UpgradeInfo for RegisteredProtocols<TMessage> { - type Info = RegisteredProtocolsName; - type InfoIter = VecIntoIter<Self::Info>; - - #[inline] - fn protocol_info(&self) -> Self::InfoIter { - // We concat the lists of `RegisteredProtocol::protocol_names` for - // each protocol. - self.0.iter().enumerate().flat_map(|(n, proto)| - UpgradeInfo::protocol_info(proto) - .map(move |inner| { - RegisteredProtocolsName { - inner, - index: n, - } - }) - ).collect::<Vec<_>>().into_iter() - } -} - -impl<TMessage> Clone for RegisteredProtocols<TMessage> { - fn clone(&self) -> Self { - RegisteredProtocols(self.0.clone()) - } -} - -/// Implementation of `ProtocolName` for several custom protocols. -#[derive(Debug, Clone)] -pub struct RegisteredProtocolsName { - /// Inner registered protocol. - inner: RegisteredProtocolName, - /// Index of the protocol in the list of registered custom protocols. - index: usize, -} - -impl ProtocolName for RegisteredProtocolsName { - fn protocol_name(&self) -> &[u8] { - self.inner.protocol_name() - } -} - -impl<TMessage, TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocols<TMessage> -where TSubstream: AsyncRead + AsyncWrite, -{ - type Output = <RegisteredProtocol<TMessage> as InboundUpgrade<TSubstream>>::Output; - type Future = <RegisteredProtocol<TMessage> as InboundUpgrade<TSubstream>>::Future; - type Error = io::Error; - - #[inline] - fn upgrade_inbound( - self, - socket: TSubstream, - info: Self::Info, - ) -> Self::Future { - self.0.into_iter() - .nth(info.index) - .expect("invalid protocol index ; programmer logic error") - .upgrade_inbound(socket, info.inner) - } -} - -impl<TMessage, TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocols<TMessage> -where TSubstream: AsyncRead + AsyncWrite, -{ - type Output = <Self as InboundUpgrade<TSubstream>>::Output; - type Future = <Self as InboundUpgrade<TSubstream>>::Future; - type Error = <Self as InboundUpgrade<TSubstream>>::Error; - - #[inline] - fn upgrade_outbound( - self, - socket: TSubstream, - info: Self::Info, - ) -> Self::Future { - // Upgrades are symmetrical. - self.upgrade_inbound(socket, info) - } -} diff --git a/substrate/core/network-libp2p/src/lib.rs b/substrate/core/network-libp2p/src/lib.rs index 362a47c19d53ae717433100d45e99175a42e7e9c..aa5afb821bffb89d81b32e89bdfbd21ac972e9a9 100644 --- a/substrate/core/network-libp2p/src/lib.rs +++ b/substrate/core/network-libp2p/src/lib.rs @@ -134,8 +134,9 @@ pub struct NetworkStatePeer { /// If true, the peer is "enabled", which means that we try to open Substrate-related protocols /// with this peer. If false, we stick to Kademlia and/or other network-only protocols. pub enabled: bool, - /// List of protocols that we have open with the given peer. - pub open_protocols: HashSet<ProtocolId>, + /// If true, the peer is "open", which means that we have a Substrate-related protocol + /// with this peer. + pub open: bool, /// List of addresses known for this node, with its reputation score. pub known_addresses: HashMap<Multiaddr, u32>, } diff --git a/substrate/core/network-libp2p/src/service_task.rs b/substrate/core/network-libp2p/src/service_task.rs index 0c23fc41207677bdf8f3f59b311c3084fd8623d6..1ac8a6ed75bd62b2e67ba0542eb74b2939038ebc 100644 --- a/substrate/core/network-libp2p/src/service_task.rs +++ b/substrate/core/network-libp2p/src/service_task.rs @@ -18,8 +18,8 @@ use crate::{ behaviour::Behaviour, behaviour::BehaviourOut, transport, NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer }; -use crate::custom_proto::{CustomMessage, RegisteredProtocol, RegisteredProtocols}; -use crate::{NetworkConfiguration, NodeIndex, ProtocolId, parse_str_addr}; +use crate::custom_proto::{CustomMessage, RegisteredProtocol}; +use crate::{NetworkConfiguration, NodeIndex, parse_str_addr}; use fnv::FnvHashMap; use futures::{prelude::*, Stream}; use libp2p::{multiaddr::Protocol, Multiaddr, PeerId}; @@ -37,12 +37,11 @@ use tokio_timer::Interval; /// Starts the substrate libp2p service. /// /// Returns a stream that must be polled regularly in order for the networking to function. -pub fn start_service<TProtos, TMessage>( +pub fn start_service<TMessage>( config: NetworkConfiguration, - registered_custom: TProtos, + registered_custom: RegisteredProtocol<TMessage>, ) -> Result<Service<TMessage>, IoError> -where TProtos: IntoIterator<Item = RegisteredProtocol<TMessage>>, - TMessage: CustomMessage + Send + 'static { +where TMessage: CustomMessage + Send + 'static { if let Some(ref path) = config.net_config_path { fs::create_dir_all(Path::new(path))?; @@ -55,7 +54,6 @@ where TProtos: IntoIterator<Item = RegisteredProtocol<TMessage>>, // Build the swarm. let (mut swarm, bandwidth) = { - let registered_custom = RegisteredProtocols(registered_custom.into_iter().collect()); let behaviour = Behaviour::new(&config, local_public, registered_custom); let (transport, bandwidth) = transport::build_transport(local_identity); (Swarm::new(transport, behaviour, local_peer_id.clone()), bandwidth) @@ -118,8 +116,6 @@ pub enum ServiceEvent<TMessage> { peer_id: PeerId, /// Index of the node. node_index: NodeIndex, - /// Protocol that has been opened. - protocol: ProtocolId, /// Version of the protocol that was opened. version: u8, /// Node debug info @@ -130,20 +126,6 @@ pub enum ServiceEvent<TMessage> { ClosedCustomProtocol { /// Index of the node. node_index: NodeIndex, - /// Protocol that has been closed. - protocol: ProtocolId, - /// Node debug info - debug_info: String, - }, - - /// Sustom protocol substreams has been closed. - /// - /// Same as `ClosedCustomProtocol` but with multiple protocols. - ClosedCustomProtocols { - /// Index of the node. - node_index: NodeIndex, - /// Protocols that have been closed. - protocols: Vec<ProtocolId>, /// Node debug info debug_info: String, }, @@ -152,8 +134,6 @@ pub enum ServiceEvent<TMessage> { CustomMessage { /// Index of the node. node_index: NodeIndex, - /// Protocol which generated the message. - protocol_id: ProtocolId, /// Message that has been received. message: TMessage, }, @@ -162,8 +142,6 @@ pub enum ServiceEvent<TMessage> { Clogged { /// Index of the node. node_index: NodeIndex, - /// Protocol which generated the message. - protocol_id: ProtocolId, /// Copy of the messages that are within the buffer, for further diagnostic. messages: Vec<TMessage>, }, @@ -224,7 +202,7 @@ where TMessage: CustomMessage + Send + 'static { version_string: info.client_version.clone(), latest_ping_time: info.latest_ping, enabled: swarm.is_enabled(&info.peer_id), - open_protocols: swarm.open_protocols(&info.peer_id).collect(), + open: swarm.is_open(&info.peer_id), known_addresses, }) }).collect() @@ -340,11 +318,10 @@ where TMessage: CustomMessage + Send + 'static { pub fn send_custom_message( &mut self, node_index: NodeIndex, - protocol: ProtocolId, message: TMessage ) { if let Some(peer_id) = self.nodes_info.get(&node_index).map(|info| &info.peer_id) { - self.swarm.send_custom_message(peer_id, protocol, message); + self.swarm.send_custom_message(peer_id, message); } else { warn!(target: "sub-libp2p", "Tried to send message to unknown node: {:}", node_index); } @@ -416,39 +393,35 @@ where TMessage: CustomMessage + Send + 'static { fn poll_swarm(&mut self) -> Poll<Option<ServiceEvent<TMessage>>, IoError> { loop { match self.swarm.poll() { - Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { protocol_id, peer_id, version, endpoint }))) => { + Ok(Async::Ready(Some(BehaviourOut::CustomProtocolOpen { peer_id, version, endpoint }))) => { debug!(target: "sub-libp2p", "Opened custom protocol with {:?}", peer_id); let node_index = self.index_of_peer_or_assign(peer_id.clone(), endpoint); break Ok(Async::Ready(Some(ServiceEvent::OpenedCustomProtocol { peer_id, node_index, - protocol: protocol_id, version, debug_info: self.peer_debug_info(node_index), }))) } - Ok(Async::Ready(Some(BehaviourOut::CustomProtocolClosed { protocol_id, peer_id, result }))) => { + Ok(Async::Ready(Some(BehaviourOut::CustomProtocolClosed { peer_id, result }))) => { debug!(target: "sub-libp2p", "Custom protocol with {:?} closed: {:?}", peer_id, result); let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour"); break Ok(Async::Ready(Some(ServiceEvent::ClosedCustomProtocol { node_index, - protocol: protocol_id, debug_info: self.peer_debug_info(node_index), }))) } - Ok(Async::Ready(Some(BehaviourOut::CustomMessage { protocol_id, peer_id, message }))) => { + Ok(Async::Ready(Some(BehaviourOut::CustomMessage { peer_id, message }))) => { let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour"); break Ok(Async::Ready(Some(ServiceEvent::CustomMessage { node_index, - protocol_id, message, }))) } - Ok(Async::Ready(Some(BehaviourOut::Clogged { protocol_id, peer_id, messages }))) => { + Ok(Async::Ready(Some(BehaviourOut::Clogged { peer_id, messages }))) => { let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour"); break Ok(Async::Ready(Some(ServiceEvent::Clogged { node_index, - protocol_id, messages, }))) } diff --git a/substrate/core/network-libp2p/tests/test.rs b/substrate/core/network-libp2p/tests/test.rs index a514b0b9bb8d8c5067a6c44a50443552a79b2c71..f8f204c5b8921d4180e0be4f00a6c544bad2e146 100644 --- a/substrate/core/network-libp2p/tests/test.rs +++ b/substrate/core/network-libp2p/tests/test.rs @@ -16,8 +16,8 @@ use futures::{future, stream, prelude::*, try_ready}; use rand::seq::SliceRandom; -use std::{io, iter}; -use substrate_network_libp2p::{CustomMessage, ServiceEvent, multiaddr::Protocol, build_multiaddr}; +use std::io; +use substrate_network_libp2p::{CustomMessage, multiaddr::Protocol, ServiceEvent, build_multiaddr}; /// Builds two services. The second one and further have the first one as its bootstrap node. /// This is to be used only for testing, and a panic will happen if something goes wrong. @@ -41,7 +41,7 @@ fn build_nodes<TMsg>(num: usize) -> Vec<substrate_network_libp2p::Service<TMsg>> }; let proto = substrate_network_libp2p::RegisteredProtocol::new(*b"tst", &[1]); - result.push(substrate_network_libp2p::start_service(config, iter::once(proto)).unwrap()); + result.push(substrate_network_libp2p::start_service(config, proto).unwrap()); } result @@ -58,8 +58,7 @@ fn basic_two_nodes_connectivity() { let fut1 = future::poll_fn(move || -> io::Result<_> { match try_ready!(service1.poll()) { - Some(ServiceEvent::OpenedCustomProtocol { protocol, version, .. }) => { - assert_eq!(protocol, *b"tst"); + Some(ServiceEvent::OpenedCustomProtocol { version, .. }) => { assert_eq!(version, 1); Ok(Async::Ready(())) }, @@ -69,8 +68,7 @@ fn basic_two_nodes_connectivity() { let fut2 = future::poll_fn(move || -> io::Result<_> { match try_ready!(service2.poll()) { - Some(ServiceEvent::OpenedCustomProtocol { protocol, version, .. }) => { - assert_eq!(protocol, *b"tst"); + Some(ServiceEvent::OpenedCustomProtocol { version, .. }) => { assert_eq!(version, 1); Ok(Async::Ready(())) }, @@ -101,9 +99,9 @@ fn two_nodes_transfer_lots_of_packets() { let fut1 = future::poll_fn(move || -> io::Result<_> { loop { match try_ready!(service1.poll()) { - Some(ServiceEvent::OpenedCustomProtocol { node_index, protocol, .. }) => { + Some(ServiceEvent::OpenedCustomProtocol { node_index, .. }) => { for n in 0 .. NUM_PACKETS { - service1.send_custom_message(node_index, protocol, vec![(n % 256) as u8]); + service1.send_custom_message(node_index, vec![(n % 256) as u8]); } }, _ => panic!(), @@ -231,9 +229,9 @@ fn basic_two_nodes_requests_in_parallel() { let fut1 = future::poll_fn(move || -> io::Result<_> { loop { match try_ready!(service1.poll()) { - Some(ServiceEvent::OpenedCustomProtocol { node_index, protocol, .. }) => { + Some(ServiceEvent::OpenedCustomProtocol { node_index, .. }) => { for msg in to_send.drain(..) { - service1.send_custom_message(node_index, protocol, msg); + service1.send_custom_message(node_index, msg); } }, _ => panic!(), diff --git a/substrate/core/network/src/on_demand.rs b/substrate/core/network/src/on_demand.rs index d012a1ef4d839900b238d2f8e2fb431ae93a1736..f5505cd3aa7d6df6b1d405ff1ad3840883ddc885 100644 --- a/substrate/core/network/src/on_demand.rs +++ b/substrate/core/network/src/on_demand.rs @@ -532,7 +532,7 @@ pub mod tests { RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, ChangesProof}; use crate::config::Roles; use crate::message; - use network_libp2p::{NodeIndex, ProtocolId, Severity}; + use network_libp2p::{NodeIndex, Severity}; use crate::service::{network_channel, NetworkPort, NetworkMsg}; use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService}; use test_client::runtime::{changes_trie_config, Block, Header}; @@ -644,7 +644,7 @@ pub mod tests { #[test] fn disconnects_from_timeouted_peer() { let (_x, on_demand) = dummy(true); - let (network_sender, network_port) = network_channel(ProtocolId::default()); + let (network_sender, network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); on_demand.on_connect(1, Roles::FULL, 1000); @@ -671,7 +671,7 @@ pub mod tests { #[test] fn disconnects_from_peer_on_response_with_wrong_id() { let (_x, on_demand) = dummy(true); - let (network_sender, network_port) = network_channel(ProtocolId::default()); + let (network_sender, network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); @@ -690,7 +690,7 @@ pub mod tests { #[test] fn disconnects_from_peer_on_incorrect_response() { let (_x, on_demand) = dummy(false); - let (network_sender, network_port) = network_channel(ProtocolId::default()); + let (network_sender, network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); on_demand.remote_call(RemoteCallRequest { block: Default::default(), @@ -709,7 +709,7 @@ pub mod tests { #[test] fn disconnects_from_peer_on_unexpected_response() { let (_x, on_demand) = dummy(true); - let (network_sender, network_port) = network_channel(ProtocolId::default()); + let (network_sender, network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); @@ -720,7 +720,7 @@ pub mod tests { #[test] fn disconnects_from_peer_on_wrong_response_type() { let (_x, on_demand) = dummy(false); - let (network_sender, network_port) = network_channel(ProtocolId::default()); + let (network_sender, network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); @@ -746,7 +746,7 @@ pub mod tests { let retry_count = 2; let (_x, on_demand) = dummy(false); - let (network_sender, _network_port) = network_channel(ProtocolId::default()); + let (network_sender, _network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); for i in 0..retry_count+1 { on_demand.on_connect(i, Roles::FULL, 1000); @@ -786,7 +786,7 @@ pub mod tests { #[test] fn receives_remote_call_response() { let (_x, on_demand) = dummy(true); - let (network_sender, _network_port) = network_channel(ProtocolId::default()); + let (network_sender, _network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); @@ -809,7 +809,7 @@ pub mod tests { #[test] fn receives_remote_read_response() { let (_x, on_demand) = dummy(true); - let (network_sender, _network_port) = network_channel(ProtocolId::default()); + let (network_sender, _network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); @@ -834,7 +834,7 @@ pub mod tests { #[test] fn receives_remote_header_response() { let (_x, on_demand) = dummy(true); - let (network_sender, _network_port) = network_channel(ProtocolId::default()); + let (network_sender, _network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); @@ -869,7 +869,7 @@ pub mod tests { #[test] fn receives_remote_changes_response() { let (_x, on_demand) = dummy(true); - let (network_sender, _network_port) = network_channel(ProtocolId::default()); + let (network_sender, _network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(0, Roles::FULL, 1000); @@ -900,7 +900,7 @@ pub mod tests { #[test] fn does_not_sends_request_to_peer_who_has_no_required_block() { let (_x, on_demand) = dummy(true); - let (network_sender, _network_port) = network_channel(ProtocolId::default()); + let (network_sender, _network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); on_demand.on_connect(1, Roles::FULL, 100); @@ -952,7 +952,7 @@ pub mod tests { // loop forever after dispatching a request to the last peer, since the // last peer was not updated let (_x, on_demand) = dummy(true); - let (network_sender, _network_port) = network_channel(ProtocolId::default()); + let (network_sender, _network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); on_demand.remote_header(RemoteHeaderRequest { @@ -977,7 +977,7 @@ pub mod tests { #[test] fn tries_to_send_all_pending_requests() { let (_x, on_demand) = dummy(true); - let (network_sender, _network_port) = network_channel(ProtocolId::default()); + let (network_sender, _network_port) = network_channel(); on_demand.set_network_sender(network_sender.clone()); on_demand.remote_header(RemoteHeaderRequest { diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 9485203a0dca09d2f817e8963c234abf8ad5ef82..6a6291af3f5bd5626cf5e104ccc5c7a0e0846529 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -153,7 +153,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> { protocol_id: ProtocolId, import_queue: Box<ImportQueue<B>>, ) -> Result<(Arc<Service<B, S>>, NetworkChan<B>), Error> { - let (network_chan, network_port) = network_channel(protocol_id); + let (network_chan, network_port) = network_channel(); let status_sinks = Arc::new(Mutex::new(Vec::new())); // Start in off-line mode, since we're not connected to any nodes yet. let is_offline = Arc::new(AtomicBool::new(true)); @@ -370,10 +370,10 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ManageNetwork for Service /// Create a NetworkPort/Chan pair. -pub fn network_channel<B: BlockT + 'static>(protocol_id: ProtocolId) -> (NetworkChan<B>, NetworkPort<B>) { +pub fn network_channel<B: BlockT + 'static>() -> (NetworkChan<B>, NetworkPort<B>) { let (network_sender, network_receiver) = channel::unbounded(); let task_notify = Arc::new(AtomicTask::new()); - let network_port = NetworkPort::new(network_receiver, protocol_id, task_notify.clone()); + let network_port = NetworkPort::new(network_receiver, task_notify.clone()); let network_chan = NetworkChan::new(network_sender, task_notify); (network_chan, network_port) } @@ -413,26 +413,24 @@ impl<B: BlockT + 'static> Drop for NetworkChan<B> { /// A receiver of NetworkMsg that makes the protocol-id available with each message. pub struct NetworkPort<B: BlockT + 'static> { receiver: Receiver<NetworkMsg<B>>, - protocol_id: ProtocolId, task_notify: Arc<AtomicTask>, } impl<B: BlockT + 'static> NetworkPort<B> { /// Create a new network port for a given protocol-id. - pub fn new(receiver: Receiver<NetworkMsg<B>>, protocol_id: ProtocolId, task_notify: Arc<AtomicTask>) -> Self { + pub fn new(receiver: Receiver<NetworkMsg<B>>, task_notify: Arc<AtomicTask>) -> Self { Self { receiver, - protocol_id, task_notify, } } /// Receive a message, if any is currently-enqueued. /// Register the current tokio task for notification when a new message is available. - pub fn take_one_message(&self) -> Result<Option<(ProtocolId, NetworkMsg<B>)>, ()> { + pub fn take_one_message(&self) -> Result<Option<NetworkMsg<B>>, ()> { self.task_notify.register(); match self.receiver.try_recv() { - Ok(msg) => Ok(Some((self.protocol_id.clone(), msg))), + Ok(msg) => Ok(Some(msg)), Err(TryRecvError::Empty) => Ok(None), Err(TryRecvError::Disconnected) => Err(()), } @@ -463,10 +461,8 @@ fn start_thread<B: BlockT + 'static>( config: NetworkConfiguration, registered: RegisteredProtocol<Message<B>>, ) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc<Mutex<NetworkService<Message<B>>>>), Error> { - let protocol_id = registered.id(); - // Start the main service. - let service = match start_service(config, Some(registered)) { + let service = match start_service(config, registered) { Ok(service) => Arc::new(Mutex::new(service)), Err(err) => { warn!("Error starting network: {}", err); @@ -478,7 +474,7 @@ fn start_thread<B: BlockT + 'static>( let service_clone = service.clone(); let mut runtime = RuntimeBuilder::new().name_prefix("libp2p-").build()?; let thread = thread::Builder::new().name("network".to_string()).spawn(move || { - let fut = run_thread(protocol_sender, service_clone, network_port, protocol_id) + let fut = run_thread(protocol_sender, service_clone, network_port) .select(close_rx.then(|_| Ok(()))) .map(|(val, _)| val) .map_err(|(err,_ )| err); @@ -499,7 +495,6 @@ fn run_thread<B: BlockT + 'static>( protocol_sender: Sender<FromNetworkMsg<B>>, network_service: Arc<Mutex<NetworkService<Message<B>>>>, network_port: NetworkPort<B>, - protocol_id: ProtocolId, ) -> impl Future<Item = (), Error = io::Error> { let network_service_2 = network_service.clone(); @@ -511,7 +506,7 @@ fn run_thread<B: BlockT + 'static>( Ok(None) => Ok(Async::NotReady), Err(_) => Err(()) } - }).for_each(move |(protocol_id, msg)| { + }).for_each(move |msg| { // Handle message from Protocol. match msg { NetworkMsg::PeerIds(node_idxs, sender) => { @@ -523,7 +518,7 @@ fn run_thread<B: BlockT + 'static>( NetworkMsg::Outgoing(who, outgoing_message) => { network_service_2 .lock() - .send_custom_message(who, protocol_id, outgoing_message); + .send_custom_message(who, outgoing_message); }, NetworkMsg::ReportPeer(who, severity) => { match severity { @@ -555,13 +550,6 @@ fn run_thread<B: BlockT + 'static>( // The network service produces events about what happens on the network. Let's process them. let network = stream::poll_fn(move || network_service.lock().poll()).for_each(move |event| { match event { - NetworkServiceEvent::ClosedCustomProtocols { node_index, protocols, debug_info } => { - if !protocols.is_empty() { - debug_assert_eq!(protocols, &[protocol_id]); - let _ = protocol_sender.send( - FromNetworkMsg::PeerDisconnected(node_index, debug_info)); - } - } NetworkServiceEvent::OpenedCustomProtocol { peer_id, node_index, version, debug_info, .. } => { debug_assert_eq!(version, protocol::CURRENT_VERSION as u8); let _ = protocol_sender.send(FromNetworkMsg::PeerConnected(peer_id, node_index, debug_info)); diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 24e05daec097e640ec9a8e906b4fd2ccb853100e..5fa13b58a8a434d89394c7fdd2c6a5923178ed80 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -40,7 +40,7 @@ use crossbeam_channel::{self as channel, Sender, select}; use futures::Future; use futures::sync::{mpsc, oneshot}; use crate::message::{Message, ConsensusEngineId}; -use network_libp2p::{NodeIndex, ProtocolId, PeerId}; +use network_libp2p::{NodeIndex, PeerId}; use parity_codec::Encode; use parking_lot::{Mutex, RwLock}; use primitives::{H256, ed25519::Public as AuthorityId}; @@ -554,7 +554,7 @@ pub trait TestNetFactory: Sized { let tx_pool = Arc::new(EmptyTransactionPool); let verifier = self.make_verifier(client.clone(), config); let (block_import, justification_import, data) = self.make_block_import(client.clone()); - let (network_sender, network_port) = network_channel(ProtocolId::default()); + let (network_sender, network_port) = network_channel(); let import_queue = Box::new(BasicQueue::new(verifier, block_import, justification_import)); let status_sinks = Arc::new(Mutex::new(Vec::new()));