diff --git a/substrate/core/network/src/legacy_proto/behaviour.rs b/substrate/core/network/src/legacy_proto/behaviour.rs index 1c83329ba0e100195db9adb480b14ba58e775ea4..d1d378174a21b0ec7b35923b0eee7f1c841d31bc 100644 --- a/substrate/core/network/src/legacy_proto/behaviour.rs +++ b/substrate/core/network/src/legacy_proto/behaviour.rs @@ -17,7 +17,7 @@ use crate::{DiscoveryNetBehaviour, config::ProtocolId}; use crate::legacy_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOut, CustomProtoHandlerIn}; use crate::legacy_proto::upgrade::RegisteredProtocol; -use crate::protocol::message::Message; +use bytes::BytesMut; use fnv::FnvHashMap; use futures::prelude::*; use futures03::{compat::Compat, TryFutureExt as _, StreamExt as _, TryStreamExt as _}; @@ -25,7 +25,6 @@ use libp2p::core::{ConnectedPoint, Multiaddr, PeerId}; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use log::{debug, error, trace, warn}; use rand::distributions::{Distribution as _, Uniform}; -use sr_primitives::traits::Block as BlockT; use smallvec::SmallVec; use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, marker::PhantomData, mem, pin::Pin}; use std::time::{Duration, Instant}; @@ -61,9 +60,9 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// Note that this "banning" system is not an actual ban. If a "banned" node tries to connect to /// us, we accept the connection. The "banning" system is only about delaying dialing attempts. /// -pub struct LegacyProto<B: BlockT, TSubstream> { +pub struct LegacyProto< TSubstream> { /// List of protocols to open with peers. Never modified. - protocol: RegisteredProtocol<B>, + protocol: RegisteredProtocol, /// Receiver for instructions about who to connect to or disconnect from. peerset: peerset::Peerset, @@ -80,7 +79,7 @@ pub struct LegacyProto<B: BlockT, TSubstream> { next_incoming_index: peerset::IncomingIndex, /// Events to produce from `poll()`. - events: SmallVec<[NetworkBehaviourAction<CustomProtoHandlerIn<B>, LegacyProtoOut<B>>; 4]>, + events: SmallVec<[NetworkBehaviourAction<CustomProtoHandlerIn, LegacyProtoOut>; 4]>, /// Marker to pin the generics. marker: PhantomData<TSubstream>, @@ -189,7 +188,7 @@ struct IncomingPeer { /// Event that can be emitted by the `LegacyProto`. #[derive(Debug)] -pub enum LegacyProtoOut<B: BlockT> { +pub enum LegacyProtoOut { /// Opened a custom protocol with the remote. CustomProtocolOpen { /// Version of the protocol that has been opened. @@ -213,7 +212,7 @@ pub enum LegacyProtoOut<B: BlockT> { /// Id of the peer the message came from. peer_id: PeerId, /// Message that has been received. - message: Message<B>, + message: BytesMut, }, /// The substream used by the protocol is pretty large. We should print avoid sending more @@ -222,11 +221,11 @@ pub enum LegacyProtoOut<B: BlockT> { /// Id of the peer which is clogged. peer_id: PeerId, /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec<Message<B>>, + messages: Vec<Vec<u8>>, }, } -impl<B: BlockT, TSubstream> LegacyProto<B, TSubstream> { +impl<TSubstream> LegacyProto<TSubstream> { /// Creates a `CustomProtos`. pub fn new( protocol: impl Into<ProtocolId>, @@ -350,8 +349,7 @@ impl<B: BlockT, TSubstream> LegacyProto<B, TSubstream> { /// /// Also note that even we have a valid open substream, it may in fact be already closed /// without us knowing, in which case the packet will not be received. - pub fn send_packet(&mut self, target: &PeerId, message: Message<B>) - where B: BlockT { + pub fn send_packet(&mut self, target: &PeerId, message: Vec<u8>) { if !self.is_open(target) { return; } @@ -607,7 +605,7 @@ impl<B: BlockT, TSubstream> LegacyProto<B, TSubstream> { } } -impl<B: BlockT, TSubstream> DiscoveryNetBehaviour for LegacyProto<B, TSubstream> { +impl<TSubstream> DiscoveryNetBehaviour for LegacyProto<TSubstream> { fn add_discovered_nodes(&mut self, peer_ids: impl Iterator<Item = PeerId>) { self.peerset.discovered(peer_ids.into_iter().map(|peer_id| { debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id); @@ -616,13 +614,12 @@ impl<B: BlockT, TSubstream> DiscoveryNetBehaviour for LegacyProto<B, TSubstream> } } -impl<B, TSubstream> NetworkBehaviour for LegacyProto<B, TSubstream> +impl<TSubstream> NetworkBehaviour for LegacyProto<TSubstream> where TSubstream: AsyncRead + AsyncWrite, - B: BlockT, { - type ProtocolsHandler = CustomProtoHandlerProto<B, TSubstream>; - type OutEvent = LegacyProtoOut<B>; + type ProtocolsHandler = CustomProtoHandlerProto<TSubstream>; + type OutEvent = LegacyProtoOut; fn new_handler(&mut self) -> Self::ProtocolsHandler { CustomProtoHandlerProto::new(self.protocol.clone()) @@ -825,7 +822,7 @@ where fn inject_node_event( &mut self, source: PeerId, - event: CustomProtoHandlerOut<B>, + event: CustomProtoHandlerOut, ) { match event { CustomProtoHandlerOut::CustomProtocolClosed { reason } => { @@ -954,7 +951,7 @@ where _params: &mut impl PollParameters, ) -> Async< NetworkBehaviourAction< - CustomProtoHandlerIn<B>, + CustomProtoHandlerIn, Self::OutEvent, >, > { diff --git a/substrate/core/network/src/legacy_proto/handler.rs b/substrate/core/network/src/legacy_proto/handler.rs index 3fe88d3cfd410a83392cb875e8edb813854f70e3..7bdbe4a31ff7cdab0d74ade7d1a3812494010aa1 100644 --- a/substrate/core/network/src/legacy_proto/handler.rs +++ b/substrate/core/network/src/legacy_proto/handler.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see <http://www.gnu.org/licenses/>. use crate::legacy_proto::upgrade::{RegisteredProtocol, RegisteredProtocolEvent, RegisteredProtocolSubstream}; -use crate::protocol::message::Message; +use bytes::BytesMut; use futures::prelude::*; use futures03::{compat::Compat, TryFutureExt as _}; use futures_timer::Delay; @@ -29,7 +29,6 @@ use libp2p::swarm::{ SubstreamProtocol, }; use log::{debug, error}; -use sr_primitives::traits::Block as BlockT; use smallvec::{smallvec, SmallVec}; use std::{borrow::Cow, error, fmt, io, marker::PhantomData, mem, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -88,21 +87,20 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// We consider that we are now "closed" if the remote closes all the existing substreams. /// Re-opening it can then be performed by closing all active substream and re-opening one. /// -pub struct CustomProtoHandlerProto<B, TSubstream> { +pub struct CustomProtoHandlerProto<TSubstream> { /// Configuration for the protocol upgrade to negotiate. - protocol: RegisteredProtocol<B>, + protocol: RegisteredProtocol, /// Marker to pin the generic type. marker: PhantomData<TSubstream>, } -impl<B, TSubstream> CustomProtoHandlerProto<B, TSubstream> +impl<TSubstream> CustomProtoHandlerProto<TSubstream> where TSubstream: AsyncRead + AsyncWrite, - B: BlockT, { /// Builds a new `CustomProtoHandlerProto`. - pub fn new(protocol: RegisteredProtocol<B>) -> Self { + pub fn new(protocol: RegisteredProtocol) -> Self { CustomProtoHandlerProto { protocol, marker: PhantomData, @@ -110,14 +108,13 @@ where } } -impl<B, TSubstream> IntoProtocolsHandler for CustomProtoHandlerProto<B, TSubstream> +impl<TSubstream> IntoProtocolsHandler for CustomProtoHandlerProto<TSubstream> where TSubstream: AsyncRead + AsyncWrite, - B: BlockT, { - type Handler = CustomProtoHandler<B, TSubstream>; + type Handler = CustomProtoHandler<TSubstream>; - fn inbound_protocol(&self) -> RegisteredProtocol<B> { + fn inbound_protocol(&self) -> RegisteredProtocol { self.protocol.clone() } @@ -136,12 +133,12 @@ where } /// The actual handler once the connection has been established. -pub struct CustomProtoHandler<B: BlockT, TSubstream> { +pub struct CustomProtoHandler<TSubstream> { /// Configuration for the protocol upgrade to negotiate. - protocol: RegisteredProtocol<B>, + protocol: RegisteredProtocol, /// State of the communications with the remote. - state: ProtocolState<B, TSubstream>, + state: ProtocolState<TSubstream>, /// Identifier of the node we're talking to. Used only for logging purposes and shouldn't have /// any influence on the behaviour. @@ -155,15 +152,15 @@ pub struct CustomProtoHandler<B: BlockT, TSubstream> { /// /// This queue must only ever be modified to insert elements at the back, or remove the first /// element. - events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol<B>, (), CustomProtoHandlerOut<B>>; 16]>, + events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol, (), CustomProtoHandlerOut>; 16]>, } /// State of the handler. -enum ProtocolState<B, TSubstream> { +enum ProtocolState<TSubstream> { /// Waiting for the behaviour to tell the handler whether it is enabled or disabled. Init { /// List of substreams opened by the remote but that haven't been processed yet. - substreams: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 6]>, + substreams: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 6]>, /// Deadline after which the initialization is abnormally long. init_deadline: Compat<Delay>, }, @@ -179,9 +176,9 @@ enum ProtocolState<B, TSubstream> { /// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside. Normal { /// The substreams where bidirectional communications happen. - substreams: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 4]>, + substreams: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 4]>, /// Contains substreams which are being shut down. - shutdown: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 4]>, + shutdown: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 4]>, }, /// We are disabled. Contains substreams that are being closed. @@ -189,7 +186,7 @@ enum ProtocolState<B, TSubstream> { /// outside or we have never sent any `CustomProtocolOpen` in the first place. Disabled { /// List of substreams to shut down. - shutdown: SmallVec<[RegisteredProtocolSubstream<B, TSubstream>; 6]>, + shutdown: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 6]>, /// If true, we should reactivate the handler after all the substreams in `shutdown` have /// been closed. @@ -210,7 +207,7 @@ enum ProtocolState<B, TSubstream> { /// Event that can be received by a `CustomProtoHandler`. #[derive(Debug)] -pub enum CustomProtoHandlerIn<B: BlockT> { +pub enum CustomProtoHandlerIn { /// The node should start using custom protocols. Enable, @@ -220,13 +217,13 @@ pub enum CustomProtoHandlerIn<B: BlockT> { /// Sends a message through a custom protocol substream. SendCustomMessage { /// The message to send. - message: Message<B>, + message: Vec<u8>, }, } /// Event that can be emitted by a `CustomProtoHandler`. #[derive(Debug)] -pub enum CustomProtoHandlerOut<B: BlockT> { +pub enum CustomProtoHandlerOut { /// Opened a custom protocol with the remote. CustomProtocolOpen { /// Version of the protocol that has been opened. @@ -242,14 +239,14 @@ pub enum CustomProtoHandlerOut<B: BlockT> { /// Receives a message on a custom protocol substream. CustomMessage { /// Message that has been received. - message: Message<B>, + message: BytesMut, }, /// A substream to the remote is clogged. The send buffer is very large, and we should print /// a diagnostic message and/or avoid sending more data. Clogged { /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec<Message<B>>, + messages: Vec<Vec<u8>>, }, /// An error has happened on the protocol level with this node. @@ -261,10 +258,9 @@ pub enum CustomProtoHandlerOut<B: BlockT> { }, } -impl<B, TSubstream> CustomProtoHandler<B, TSubstream> +impl<TSubstream> CustomProtoHandler<TSubstream> where TSubstream: AsyncRead + AsyncWrite, - B: BlockT, { /// Enables the handler. fn enable(&mut self) { @@ -342,7 +338,7 @@ where /// Polls the state for events. Optionally returns an event to produce. #[must_use] fn poll_state(&mut self) - -> Option<ProtocolsHandlerEvent<RegisteredProtocol<B>, (), CustomProtoHandlerOut<B>>> { + -> Option<ProtocolsHandlerEvent<RegisteredProtocol, (), CustomProtoHandlerOut>> { match mem::replace(&mut self.state, ProtocolState::Poisoned) { ProtocolState::Poisoned => { error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state", @@ -471,7 +467,7 @@ where /// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`. fn inject_fully_negotiated( &mut self, - mut substream: RegisteredProtocolSubstream<B, TSubstream> + mut substream: RegisteredProtocolSubstream<TSubstream> ) { self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { ProtocolState::Poisoned => { @@ -516,7 +512,7 @@ where } /// Sends a message to the remote. - fn send_message(&mut self, message: Message<B>) { + fn send_message(&mut self, message: Vec<u8>) { match self.state { ProtocolState::Normal { ref mut substreams, .. } => substreams[0].send_message(message), @@ -527,14 +523,14 @@ where } } -impl<B, TSubstream> ProtocolsHandler for CustomProtoHandler<B, TSubstream> -where TSubstream: AsyncRead + AsyncWrite, B: BlockT { - type InEvent = CustomProtoHandlerIn<B>; - type OutEvent = CustomProtoHandlerOut<B>; +impl<TSubstream> ProtocolsHandler for CustomProtoHandler<TSubstream> +where TSubstream: AsyncRead + AsyncWrite { + type InEvent = CustomProtoHandlerIn; + type OutEvent = CustomProtoHandlerOut; type Substream = TSubstream; type Error = ConnectionKillError; - type InboundProtocol = RegisteredProtocol<B>; - type OutboundProtocol = RegisteredProtocol<B>; + type InboundProtocol = RegisteredProtocol; + type OutboundProtocol = RegisteredProtocol; type OutboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> { @@ -556,7 +552,7 @@ where TSubstream: AsyncRead + AsyncWrite, B: BlockT { self.inject_fully_negotiated(proto); } - fn inject_event(&mut self, message: CustomProtoHandlerIn<B>) { + fn inject_event(&mut self, message: CustomProtoHandlerIn) { match message { CustomProtoHandlerIn::Disable => self.disable(), CustomProtoHandlerIn::Enable => self.enable(), @@ -613,7 +609,7 @@ where TSubstream: AsyncRead + AsyncWrite, B: BlockT { } } -impl<B: BlockT, TSubstream> fmt::Debug for CustomProtoHandler<B, TSubstream> +impl<TSubstream> fmt::Debug for CustomProtoHandler<TSubstream> where TSubstream: AsyncRead + AsyncWrite, { @@ -625,9 +621,9 @@ where /// Given a list of substreams, tries to shut them down. The substreams that have been successfully /// shut down are removed from the list. -fn shutdown_list<B, TSubstream> - (list: &mut SmallVec<impl smallvec::Array<Item = RegisteredProtocolSubstream<B, TSubstream>>>) -where TSubstream: AsyncRead + AsyncWrite, B: BlockT { +fn shutdown_list<TSubstream> + (list: &mut SmallVec<impl smallvec::Array<Item = RegisteredProtocolSubstream<TSubstream>>>) +where TSubstream: AsyncRead + AsyncWrite { 'outer: for n in (0..list.len()).rev() { let mut substream = list.swap_remove(n); loop { diff --git a/substrate/core/network/src/legacy_proto/tests.rs b/substrate/core/network/src/legacy_proto/tests.rs index 8fd47843df2e56f07d3a2a2c77a50159400b1bbd..49ab38e3b7e0668f2906be689f83712793ffe4c4 100644 --- a/substrate/core/network/src/legacy_proto/tests.rs +++ b/substrate/core/network/src/legacy_proto/tests.rs @@ -17,6 +17,7 @@ #![cfg(test)] use futures::{future, prelude::*, try_ready}; +use codec::{Encode, Decode}; use libp2p::core::nodes::Substream; use libp2p::core::{ConnectedPoint, transport::boxed::Boxed, muxing::StreamMuxerBox}; use libp2p::swarm::{Swarm, ProtocolsHandler, IntoProtocolsHandler}; @@ -24,9 +25,9 @@ use libp2p::swarm::{PollParameters, NetworkBehaviour, NetworkBehaviourAction}; use libp2p::{PeerId, Multiaddr, Transport}; use rand::seq::SliceRandom; use std::{io, time::Duration, time::Instant}; -use test_client::runtime::Block; -use crate::message::generic::Message; +use crate::message::Message; use crate::legacy_proto::{LegacyProto, LegacyProtoOut}; +use test_client::runtime::Block; /// Builds two nodes that have each other as bootstrap nodes. /// This is to be used only for testing, and a panic will happen if something goes wrong. @@ -101,12 +102,12 @@ fn build_nodes() /// Wraps around the `CustomBehaviour` network behaviour, and adds hardcoded node addresses to it. struct CustomProtoWithAddr { - inner: LegacyProto<Block, Substream<StreamMuxerBox>>, + inner: LegacyProto<Substream<StreamMuxerBox>>, addrs: Vec<(PeerId, Multiaddr)>, } impl std::ops::Deref for CustomProtoWithAddr { - type Target = LegacyProto<Block, Substream<StreamMuxerBox>>; + type Target = LegacyProto<Substream<StreamMuxerBox>>; fn deref(&self) -> &Self::Target { &self.inner @@ -121,8 +122,8 @@ impl std::ops::DerefMut for CustomProtoWithAddr { impl NetworkBehaviour for CustomProtoWithAddr { type ProtocolsHandler = - <LegacyProto<Block, Substream<StreamMuxerBox>> as NetworkBehaviour>::ProtocolsHandler; - type OutEvent = <LegacyProto<Block, Substream<StreamMuxerBox>> as NetworkBehaviour>::OutEvent; + <LegacyProto<Substream<StreamMuxerBox>> as NetworkBehaviour>::ProtocolsHandler; + type OutEvent = <LegacyProto<Substream<StreamMuxerBox>> as NetworkBehaviour>::OutEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { self.inner.new_handler() @@ -209,7 +210,7 @@ fn two_nodes_transfer_lots_of_packets() { for n in 0 .. NUM_PACKETS { service1.send_packet( &peer_id, - Message::ChainSpecific(vec![(n % 256) as u8]) + Message::<Block>::ChainSpecific(vec![(n % 256) as u8]).encode() ); } }, @@ -223,11 +224,16 @@ fn two_nodes_transfer_lots_of_packets() { loop { match try_ready!(service2.poll()) { Some(LegacyProtoOut::CustomProtocolOpen { .. }) => {}, - Some(LegacyProtoOut::CustomMessage { message: Message::ChainSpecific(message), .. }) => { - assert_eq!(message.len(), 1); - packet_counter += 1; - if packet_counter == NUM_PACKETS { - return Ok(Async::Ready(())) + Some(LegacyProtoOut::CustomMessage { message, .. }) => { + match Message::<Block>::decode(&mut &message[..]).unwrap() { + Message::<Block>::ChainSpecific(message) => { + assert_eq!(message.len(), 1); + packet_counter += 1; + if packet_counter == NUM_PACKETS { + return Ok(Async::Ready(())) + } + }, + _ => panic!(), } } _ => panic!(), @@ -248,7 +254,7 @@ fn basic_two_nodes_requests_in_parallel() { let mut to_send = Vec::new(); for _ in 0..200 { // Note: don't make that number too high or the CPU usage will explode. let msg = (0..10).map(|_| rand::random::<u8>()).collect::<Vec<_>>(); - to_send.push(Message::ChainSpecific(msg)); + to_send.push(Message::<Block>::ChainSpecific(msg)); } to_send }; @@ -263,7 +269,7 @@ fn basic_two_nodes_requests_in_parallel() { match try_ready!(service1.poll()) { Some(LegacyProtoOut::CustomProtocolOpen { peer_id, .. }) => { for msg in to_send.drain(..) { - service1.send_packet(&peer_id, msg); + service1.send_packet(&peer_id, msg.encode()); } }, _ => panic!(), @@ -276,7 +282,7 @@ fn basic_two_nodes_requests_in_parallel() { match try_ready!(service2.poll()) { Some(LegacyProtoOut::CustomProtocolOpen { .. }) => {}, Some(LegacyProtoOut::CustomMessage { message, .. }) => { - let pos = to_receive.iter().position(|m| *m == message).unwrap(); + let pos = to_receive.iter().position(|m| m.encode() == message).unwrap(); to_receive.remove(pos); if to_receive.is_empty() { return Ok(Async::Ready(())) diff --git a/substrate/core/network/src/legacy_proto/upgrade.rs b/substrate/core/network/src/legacy_proto/upgrade.rs index 8831d16f91636ae0a94c1a20fd843b4d781c0a57..fdf23ec351c8a1cfcffdbe31fe6786b04065060e 100644 --- a/substrate/core/network/src/legacy_proto/upgrade.rs +++ b/substrate/core/network/src/legacy_proto/upgrade.rs @@ -15,15 +15,11 @@ // along with Substrate. If not, see <http://www.gnu.org/licenses/>. use crate::config::ProtocolId; -use crate::protocol::message::Message; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use libp2p::core::{Negotiated, Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName}; use libp2p::tokio_codec::Framed; -use log::debug; -use std::{collections::VecDeque, io, marker::PhantomData, vec::IntoIter as VecIntoIter}; +use std::{collections::VecDeque, io, vec::IntoIter as VecIntoIter}; use futures::{prelude::*, future, stream}; -use codec::{Decode, Encode}; -use sr_primitives::traits::Block as BlockT; use tokio_io::{AsyncRead, AsyncWrite}; use unsigned_varint::codec::UviBytes; @@ -31,7 +27,7 @@ use unsigned_varint::codec::UviBytes; /// /// Note that "a single protocol" here refers to `par` for example. However /// each protocol can have multiple different versions for networking purposes. -pub struct RegisteredProtocol<B> { +pub struct RegisteredProtocol { /// Id of the protocol for API purposes. id: ProtocolId, /// Base name of the protocol as advertised on the network. @@ -40,11 +36,9 @@ pub struct RegisteredProtocol<B> { /// List of protocol versions that we support. /// Ordered in descending order so that the best comes first. supported_versions: Vec<u8>, - /// Marker to pin the generic. - marker: PhantomData<B>, } -impl<B> RegisteredProtocol<B> { +impl RegisteredProtocol { /// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be /// passed inside the `RegisteredProtocolOutput`. pub fn new(protocol: impl Into<ProtocolId>, versions: &[u8]) @@ -62,24 +56,22 @@ impl<B> RegisteredProtocol<B> { tmp.sort_unstable_by(|a, b| b.cmp(&a)); tmp }, - marker: PhantomData, } } } -impl<B> Clone for RegisteredProtocol<B> { +impl Clone for RegisteredProtocol { fn clone(&self) -> Self { RegisteredProtocol { id: self.id.clone(), base_name: self.base_name.clone(), supported_versions: self.supported_versions.clone(), - marker: PhantomData, } } } /// Output of a `RegisteredProtocol` upgrade. -pub struct RegisteredProtocolSubstream<B, TSubstream> { +pub struct RegisteredProtocolSubstream<TSubstream> { /// If true, we are in the process of closing the sink. is_closing: bool, /// Whether the local node opened this substream (dialer), or we received this substream from @@ -96,11 +88,9 @@ pub struct RegisteredProtocolSubstream<B, TSubstream> { /// If true, we have sent a "remote is clogged" event recently and shouldn't send another one /// unless the buffer empties then fills itself again. clogged_fuse: bool, - /// Marker to pin the generic. - marker: PhantomData<B>, } -impl<B, TSubstream> RegisteredProtocolSubstream<B, TSubstream> { +impl<TSubstream> RegisteredProtocolSubstream<TSubstream> { /// Returns the version of the protocol that was negotiated. pub fn protocol_version(&self) -> u8 { self.protocol_version @@ -124,33 +114,32 @@ impl<B, TSubstream> RegisteredProtocolSubstream<B, TSubstream> { } /// Sends a message to the substream. - pub fn send_message(&mut self, data: Message<B>) - where B: BlockT { + pub fn send_message(&mut self, data: Vec<u8>) { if self.is_closing { return } - self.send_queue.push_back(data.encode()); + self.send_queue.push_back(data); } } /// Event produced by the `RegisteredProtocolSubstream`. #[derive(Debug, Clone)] -pub enum RegisteredProtocolEvent<B: BlockT> { +pub enum RegisteredProtocolEvent { /// Received a message from the remote. - Message(Message<B>), + Message(BytesMut), /// Diagnostic event indicating that the connection is clogged and we should avoid sending too /// many messages to it. Clogged { /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec<Message<B>>, + messages: Vec<Vec<u8>>, }, } -impl<B, TSubstream> Stream for RegisteredProtocolSubstream<B, TSubstream> -where TSubstream: AsyncRead + AsyncWrite, B: BlockT { - type Item = RegisteredProtocolEvent<B>; +impl<TSubstream> Stream for RegisteredProtocolSubstream<TSubstream> +where TSubstream: AsyncRead + AsyncWrite { + type Item = RegisteredProtocolEvent; type Error = io::Error; fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { @@ -179,8 +168,7 @@ where TSubstream: AsyncRead + AsyncWrite, B: BlockT { self.clogged_fuse = true; return Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages: self.send_queue.iter() - .map(|m| Decode::decode(&mut &m[..])) - .filter_map(Result::ok) + .map(|m| m.clone()) .collect(), }))) } @@ -199,15 +187,7 @@ where TSubstream: AsyncRead + AsyncWrite, B: BlockT { // Note that `inner` is wrapped in a `Fuse`, therefore we can poll it forever. match self.inner.poll()? { Async::Ready(Some(data)) => { - let message = <Message<B> as Decode>::decode(&mut &data[..]) - .map_err(|err| { - debug!( - target: "sub-libp2p", - "Couldn't decode packet sent by the remote: {:?}: {}", data, err.what(), - ); - io::ErrorKind::InvalidData - })?; - Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) + Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(data)))) } Async::Ready(None) => if !self.requires_poll_complete && self.send_queue.is_empty() { @@ -220,7 +200,7 @@ where TSubstream: AsyncRead + AsyncWrite, B: BlockT { } } -impl<B> UpgradeInfo for RegisteredProtocol<B> { +impl UpgradeInfo for RegisteredProtocol { type Info = RegisteredProtocolName; type InfoIter = VecIntoIter<Self::Info>; @@ -255,10 +235,10 @@ impl ProtocolName for RegisteredProtocolName { } } -impl<B, TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocol<B> +impl<TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocol where TSubstream: AsyncRead + AsyncWrite, { - type Output = RegisteredProtocolSubstream<B, TSubstream>; + type Output = RegisteredProtocolSubstream<TSubstream>; type Future = future::FutureResult<Self::Output, io::Error>; type Error = io::Error; @@ -281,12 +261,11 @@ where TSubstream: AsyncRead + AsyncWrite, inner: framed.fuse(), protocol_version: info.version, clogged_fuse: false, - marker: PhantomData, }) } } -impl<B, TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocol<B> +impl<TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocol where TSubstream: AsyncRead + AsyncWrite, { type Output = <Self as InboundUpgrade<TSubstream>>::Output; @@ -308,7 +287,6 @@ where TSubstream: AsyncRead + AsyncWrite, inner: framed.fuse(), protocol_version: info.version, clogged_fuse: false, - marker: PhantomData, }) } } diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 6afb4c8b116ae56087330ee22cc28392dddc000e..8b3b5a49e40ec89a8841ead4399429898b6a7356 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -16,6 +16,7 @@ use crate::{DiscoveryNetBehaviour, config::ProtocolId}; use crate::legacy_proto::{LegacyProto, LegacyProtoOut}; +use bytes::BytesMut; use futures::prelude::*; use futures03::{StreamExt as _, TryStreamExt as _}; use libp2p::{Multiaddr, PeerId}; @@ -28,6 +29,7 @@ use consensus::{ block_validation::BlockAnnounceValidator, import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin} }; +use codec::{Decode, Encode}; use sr_primitives::{generic::BlockId, ConsensusEngineId, Justification}; use sr_primitives::traits::{ Block as BlockT, Header as HeaderT, NumberFor, One, Zero, @@ -44,6 +46,7 @@ use crate::config::{BoxFinalityProofRequestBuilder, Roles}; use rustc_hex::ToHex; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; +use std::fmt::Write; use std::{cmp, num::NonZeroUsize, time}; use log::{trace, debug, warn, error}; use crate::chain::{Client, FinalityProofProvider}; @@ -90,6 +93,8 @@ const PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE: i32 = -(1 << 8); const NEW_EXTRINSIC_REPUTATION_CHANGE: i32 = 1 << 7; /// We sent an RPC query to the given node, but it failed. const RPC_FAILED_REPUTATION_CHANGE: i32 = -(1 << 12); +/// We received a message that failed to decode. +const BAD_MESSAGE_REPUTATION_CHANGE: i32 = -(1 << 12); // Lock must always be taken in order declared here. pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> { @@ -113,7 +118,15 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> { /// When asked for a proof of finality, we use this struct to build one. finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>, /// Handles opening the unique substream and sending and receiving raw messages. - behaviour: LegacyProto<B, Substream<StreamMuxerBox>>, + behaviour: LegacyProto<Substream<StreamMuxerBox>>, +} + +#[derive(Default)] +struct PacketStats { + bytes_in: u64, + bytes_out: u64, + count_in: u64, + count_out: u64, } /// A peer that we are connected to @@ -151,12 +164,12 @@ pub struct PeerInfo<B: BlockT> { pub best_number: <B::Header as HeaderT>::Number, } -struct LightDispatchIn<'a, B: BlockT> { - behaviour: &'a mut LegacyProto<B, Substream<StreamMuxerBox>>, +struct LightDispatchIn<'a> { + behaviour: &'a mut LegacyProto<Substream<StreamMuxerBox>>, peerset: peerset::PeersetHandle, } -impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> { +impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a> { fn report_peer(&mut self, who: &PeerId, reputation: i32) { self.peerset.report_peer(who.clone(), reputation) } @@ -166,12 +179,12 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> { } fn send_header_request(&mut self, who: &PeerId, id: RequestId, block: <<B as BlockT>::Header as HeaderT>::Number) { - let message = message::generic::Message::RemoteHeaderRequest(message::RemoteHeaderRequest { + let message: Message<B> = message::generic::Message::RemoteHeaderRequest(message::RemoteHeaderRequest { id, block, }); - self.behaviour.send_packet(who, message) + self.behaviour.send_packet(who, message.encode()) } fn send_read_request( @@ -181,13 +194,13 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> { block: <B as BlockT>::Hash, keys: Vec<Vec<u8>>, ) { - let message = message::generic::Message::RemoteReadRequest(message::RemoteReadRequest { + let message: Message<B> = message::generic::Message::RemoteReadRequest(message::RemoteReadRequest { id, block, keys, }); - self.behaviour.send_packet(who, message) + self.behaviour.send_packet(who, message.encode()) } fn send_read_child_request( @@ -198,14 +211,14 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> { storage_key: Vec<u8>, keys: Vec<Vec<u8>>, ) { - let message = message::generic::Message::RemoteReadChildRequest(message::RemoteReadChildRequest { + let message: Message<B> = message::generic::Message::RemoteReadChildRequest(message::RemoteReadChildRequest { id, block, storage_key, keys, }); - self.behaviour.send_packet(who, message) + self.behaviour.send_packet(who, message.encode()) } fn send_call_request( @@ -216,14 +229,14 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> { method: String, data: Vec<u8> ) { - let message = message::generic::Message::RemoteCallRequest(message::RemoteCallRequest { + let message: Message<B> = message::generic::Message::RemoteCallRequest(message::RemoteCallRequest { id, block, method, data, }); - self.behaviour.send_packet(who, message) + self.behaviour.send_packet(who, message.encode()) } fn send_changes_request( @@ -237,7 +250,7 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> { storage_key: Option<Vec<u8>>, key: Vec<u8>, ) { - let message = message::generic::Message::RemoteChangesRequest(message::RemoteChangesRequest { + let message: Message<B> = message::generic::Message::RemoteChangesRequest(message::RemoteChangesRequest { id, first, last, @@ -247,7 +260,7 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> { key, }); - self.behaviour.send_packet(who, message) + self.behaviour.send_packet(who, message.encode()) } fn send_body_request( @@ -260,7 +273,7 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> { direction: Direction, max: Option<u32> ) { - let message = message::generic::Message::BlockRequest(message::BlockRequest::<B> { + let message: Message<B> = message::generic::Message::BlockRequest(message::BlockRequest::<B> { id, fields, from, @@ -269,7 +282,7 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a, B> { max, }); - self.behaviour.send_packet(who, message) + self.behaviour.send_packet(who, message.encode()) } } @@ -291,7 +304,7 @@ pub trait Context<B: BlockT> { /// Protocol context. struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { - behaviour: &'a mut LegacyProto<B, Substream<StreamMuxerBox>>, + behaviour: &'a mut LegacyProto<Substream<StreamMuxerBox>>, context_data: &'a mut ContextData<B, H>, peerset_handle: &'a peerset::PeersetHandle, } @@ -299,7 +312,7 @@ struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> { fn new( context_data: &'a mut ContextData<B, H>, - behaviour: &'a mut LegacyProto<B, Substream<StreamMuxerBox>>, + behaviour: &'a mut LegacyProto<Substream<StreamMuxerBox>>, peerset_handle: &'a peerset::PeersetHandle, ) -> Self { ProtocolContext { context_data, peerset_handle, behaviour } @@ -316,19 +329,19 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B, } fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage) { - send_message( + send_message::<B> ( self.behaviour, - &mut self.context_data.peers, - who, + &mut self.context_data.stats, + &who, GenericMessage::Consensus(consensus) ) } fn send_chain_specific(&mut self, who: PeerId, message: Vec<u8>) { - send_message( + send_message::<B> ( self.behaviour, - &mut self.context_data.peers, - who, + &mut self.context_data.stats, + &who, GenericMessage::ChainSpecific(message) ) } @@ -338,6 +351,7 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B, struct ContextData<B: BlockT, H: ExHashT> { // All connected peers peers: HashMap<PeerId, Peer<B, H>>, + stats: HashMap<&'static str, PacketStats>, pub chain: Arc<dyn Client<B>>, } @@ -388,6 +402,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { config, context_data: ContextData { peers: HashMap::new(), + stats: HashMap::new(), chain, }, light_dispatch: LightDispatch::new(checker), @@ -517,8 +532,22 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { pub fn on_custom_message( &mut self, who: PeerId, - message: Message<B>, + data: BytesMut, ) -> CustomMessageOutcome<B> { + + let message = match <Message<B> as Decode>::decode(&mut &data[..]) { + Ok(message) => message, + Err(err) => { + debug!(target: "sync", "Couldn't decode packet sent by {}: {:?}: {}", who, data, err.what()); + self.peerset_handle.report_peer(who.clone(), BAD_MESSAGE_REPUTATION_CHANGE); + return CustomMessageOutcome::None; + } + }; + + let mut stats = self.context_data.stats.entry(message.id()).or_default(); + stats.bytes_in += data.len() as u64; + stats.count_in += 1; + match message { GenericMessage::Status(s) => self.on_status_message(who, s), GenericMessage::BlockRequest(r) => self.on_block_request(who, r), @@ -581,15 +610,25 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { CustomMessageOutcome::None } - fn send_message(&mut self, who: PeerId, message: Message<B>) { - send_message::<B, H>( + fn send_request(&mut self, who: &PeerId, message: Message<B>) { + send_request::<B, H>( &mut self.behaviour, + &mut self.context_data.stats, &mut self.context_data.peers, who, message, ); } + fn send_message(&mut self, who: &PeerId, message: Message<B>) { + send_message::<B>( + &mut self.behaviour, + &mut self.context_data.stats, + who, + message, + ); + } + /// Locks `self` and returns a context plus the `ConsensusGossip` struct. pub fn consensus_gossip_lock<'a>( &'a mut self, @@ -622,7 +661,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { GossipMessageRecipient::BroadcastNew => self.consensus_gossip.multicast(&mut context, topic, message, false), GossipMessageRecipient::Peer(who) => - self.send_message(who, GenericMessage::Consensus(message)), + self.send_message(&who, GenericMessage::Consensus(message)), } } @@ -745,7 +784,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { blocks: blocks, }; trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len()); - self.send_message(peer, GenericMessage::BlockResponse(response)) + self.send_message(&peer, GenericMessage::BlockResponse(response)) } /// Adjusts the reputation of a node. @@ -792,7 +831,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { Ok(sync::OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), Ok(sync::OnBlockData::Request(peer, req)) => { - self.send_message(peer, GenericMessage::BlockRequest(req)); + self.send_request(&peer, GenericMessage::BlockRequest(req)); CustomMessageOutcome::None } Err(sync::BadPeer(id, repu)) => { @@ -939,7 +978,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { }, who.clone(), status.roles, status.best_number); match self.sync.new_peer(who.clone(), info) { Ok(None) => (), - Ok(Some(req)) => self.send_message(who.clone(), GenericMessage::BlockRequest(req)), + Ok(Some(req)) => self.send_request(&who, GenericMessage::BlockRequest(req)), Err(sync::BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id); self.peerset_handle.report_peer(id, repu) @@ -1020,7 +1059,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { .push(who.to_base58()); } trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); - self.behaviour.send_packet(who, GenericMessage::Transactions(to_send)) + send_message::<B> ( + &mut self.behaviour, + &mut self.context_data.stats, + &who, + GenericMessage::Transactions(to_send) + ) } } @@ -1061,7 +1105,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { trace!(target: "sync", "Announcing block {:?} to {}", hash, who); let inserted = peer.known_blocks.insert(hash); if inserted || force { - let message = GenericMessage::BlockAnnounce(message::BlockAnnounce { + let message: Message<B> = GenericMessage::BlockAnnounce(message::BlockAnnounce { header: header.clone(), state: if peer.info.protocol_version >= 4 { if is_best { @@ -1079,7 +1123,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { }, }); - self.behaviour.send_packet(who, message) + send_message::<B> ( + &mut self.behaviour, + &mut self.context_data.stats, + &who, + message, + ) } } } @@ -1097,7 +1146,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { chain_status: self.specialization.status(), }; - self.send_message(who, GenericMessage::Status(status)) + self.send_message(&who, GenericMessage::Status(status)) } fn on_block_announce(&mut self, who: PeerId, announce: BlockAnnounce<B::Header>) -> CustomMessageOutcome<B> { @@ -1157,7 +1206,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { match blocks_to_import { Ok(sync::OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), Ok(sync::OnBlockData::Request(peer, req)) => { - self.send_message(peer, GenericMessage::BlockRequest(req)); + self.send_request(&peer, GenericMessage::BlockRequest(req)); CustomMessageOutcome::None } Err(sync::BadPeer(id, repu)) => { @@ -1226,7 +1275,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { }; self.send_message( - who, + &who, GenericMessage::RemoteCallResponse(message::RemoteCallResponse { id: request.id, proof, @@ -1269,7 +1318,13 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { match result { Ok((id, req)) => { let msg = GenericMessage::BlockRequest(req); - send_message(&mut self.behaviour, &mut self.context_data.peers, id, msg) + send_request( + &mut self.behaviour, + &mut self.context_data.stats, + &mut self.context_data.peers, + &id, + msg + ) } Err(sync::BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id); @@ -1342,7 +1397,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } }; self.send_message( - who, + &who, GenericMessage::RemoteReadResponse(message::RemoteReadResponse { id: request.id, proof, @@ -1385,7 +1440,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } }; self.send_message( - who, + &who, GenericMessage::RemoteReadResponse(message::RemoteReadResponse { id: request.id, proof, @@ -1425,7 +1480,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } }; self.send_message( - who, + &who, GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse { id: request.id, header, @@ -1495,7 +1550,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } }; self.send_message( - who, + &who, GenericMessage::RemoteChangesResponse(message::RemoteChangesResponse { id: request.id, max: proof.max_block, @@ -1545,7 +1600,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { }, }; self.send_message( - who, + &who, GenericMessage::FinalityProofResponse(message::FinalityProofResponse { id: 0, block: request.block, @@ -1582,6 +1637,22 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { peerset: self.peerset_handle.clone(), }, peer, response); } + + fn format_stats(&self) -> String { + let mut out = String::new(); + for (id, stats) in &self.context_data.stats { + let _ = writeln!( + &mut out, + "{}: In: {} bytes ({}), Out: {} bytes ({})", + id, + stats.bytes_in, + stats.count_in, + stats.bytes_out, + stats.count_out, + ); + } + out + } } /// Outcome of an incoming custom message. @@ -1593,14 +1664,15 @@ pub enum CustomMessageOutcome<B: BlockT> { None, } -fn send_message<B: BlockT, H: ExHashT>( - behaviour: &mut LegacyProto<B, Substream<StreamMuxerBox>>, +fn send_request<B: BlockT, H: ExHashT>( + behaviour: &mut LegacyProto<Substream<StreamMuxerBox>>, + stats: &mut HashMap<&'static str, PacketStats>, peers: &mut HashMap<PeerId, Peer<B, H>>, - who: PeerId, + who: &PeerId, mut message: Message<B>, ) { if let GenericMessage::BlockRequest(ref mut r) = message { - if let Some(ref mut peer) = peers.get_mut(&who) { + if let Some(ref mut peer) = peers.get_mut(who) { r.id = peer.next_request_id; peer.next_request_id = peer.next_request_id + 1; if let Some((timestamp, request)) = peer.block_request.take() { @@ -1610,12 +1682,25 @@ fn send_message<B: BlockT, H: ExHashT>( peer.block_request = Some((time::Instant::now(), r.clone())); } } - behaviour.send_packet(&who, message); + send_message::<B>(behaviour, stats, who, message) +} + +fn send_message<B: BlockT>( + behaviour: &mut LegacyProto<Substream<StreamMuxerBox>>, + stats: &mut HashMap<&'static str, PacketStats>, + who: &PeerId, + message: Message<B>, +) { + let encoded = message.encode(); + let mut stats = stats.entry(message.id()).or_default(); + stats.bytes_out += encoded.len() as u64; + stats.count_out += 1; + behaviour.send_packet(who, encoded); } impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviour for Protocol<B, S, H> { - type ProtocolsHandler = <LegacyProto<B, Substream<StreamMuxerBox>> as NetworkBehaviour>::ProtocolsHandler; + type ProtocolsHandler = <LegacyProto<Substream<StreamMuxerBox>> as NetworkBehaviour>::ProtocolsHandler; type OutEvent = CustomMessageOutcome<B>; fn new_handler(&mut self) -> Self::ProtocolsHandler { @@ -1660,13 +1745,30 @@ Protocol<B, S, H> { } for (id, r) in self.sync.block_requests() { - send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::BlockRequest(r)) + send_request( + &mut self.behaviour, + &mut self.context_data.stats, + &mut self.context_data.peers, + &id, + GenericMessage::BlockRequest(r) + ) } for (id, r) in self.sync.justification_requests() { - send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::BlockRequest(r)) + send_request( + &mut self.behaviour, + &mut self.context_data.stats, + &mut self.context_data.peers, + &id, + GenericMessage::BlockRequest(r) + ) } for (id, r) in self.sync.finality_proof_requests() { - send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::FinalityProofRequest(r)) + send_request( + &mut self.behaviour, + &mut self.context_data.stats, + &mut self.context_data.peers, + &id, + GenericMessage::FinalityProofRequest(r)) } let event = match self.behaviour.poll(params) { @@ -1700,8 +1802,9 @@ Protocol<B, S, H> { LegacyProtoOut::Clogged { peer_id, messages } => { debug!(target: "sync", "{} clogging messages:", messages.len()); for msg in messages.into_iter().take(5) { - debug!(target: "sync", "{:?}", msg); - self.on_clogged_peer(peer_id.clone(), Some(msg)); + let message: Option<Message<B>> = Decode::decode(&mut &msg[..]).ok(); + debug!(target: "sync", "{:?}", message); + self.on_clogged_peer(peer_id.clone(), message); } CustomMessageOutcome::None } @@ -1749,3 +1852,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> DiscoveryNetBehaviour f self.behaviour.add_discovered_nodes(peer_ids) } } + +impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Drop for Protocol<B, S, H> { + fn drop(&mut self) { + debug!(target: "sync", "Network stats:\n{}", self.format_stats()); + } +} diff --git a/substrate/core/network/src/protocol/message.rs b/substrate/core/network/src/protocol/message.rs index ef5e4dbb1db0f4ad90e844b17d72851187528008..22fb35806da86ffef39d48ba28ab8dc8322c9aae 100644 --- a/substrate/core/network/src/protocol/message.rs +++ b/substrate/core/network/src/protocol/message.rs @@ -222,6 +222,32 @@ pub mod generic { ChainSpecific(Vec<u8>), } + impl<Header, Hash, Number, Extrinsic> Message<Header, Hash, Number, Extrinsic> { + /// Message id useful for logging. + pub fn id(&self) -> &'static str { + match self { + Message::Status(_) => "Status", + Message::BlockRequest(_) => "BlockRequest", + Message::BlockResponse(_) => "BlockResponse", + Message::BlockAnnounce(_) => "BlockAnnounce", + Message::Transactions(_) => "Transactions", + Message::Consensus(_) => "Consensus", + Message::RemoteCallRequest(_) => "RemoteCallRequest", + Message::RemoteCallResponse(_) => "RemoteCallResponse", + Message::RemoteReadRequest(_) => "RemoteReadRequest", + Message::RemoteReadResponse(_) => "RemoteReadResponse", + Message::RemoteHeaderRequest(_) => "RemoteHeaderRequest", + Message::RemoteHeaderResponse(_) => "RemoteHeaderResponse", + Message::RemoteChangesRequest(_) => "RemoteChangesRequest", + Message::RemoteChangesResponse(_) => "RemoteChangesResponse", + Message::RemoteReadChildRequest(_) => "RemoteReadChildRequest", + Message::FinalityProofRequest(_) => "FinalityProofRequest", + Message::FinalityProofResponse(_) => "FinalityProofResponse", + Message::ChainSpecific(_) => "ChainSpecific", + } + } + } + /// Status sent on connection. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] pub struct Status<Hash, Number> {