...@@ -69,7 +69,7 @@ impl<B: BlockT> GossipEngine<B> { ...@@ -69,7 +69,7 @@ impl<B: BlockT> GossipEngine<B> {
pub fn new<N: Network<B> + Send + Clone + 'static>( pub fn new<N: Network<B> + Send + Clone + 'static>(
network: N, network: N,
engine_id: ConsensusEngineId, engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, [u8]>>, protocol_name: impl Into<Cow<'static, str>>,
validator: Arc<dyn Validator<B>>, validator: Arc<dyn Validator<B>>,
) -> Self where B: 'static { ) -> Self where B: 'static {
// We grab the event stream before registering the notifications protocol, otherwise we // We grab the event stream before registering the notifications protocol, otherwise we
...@@ -333,7 +333,7 @@ mod tests { ...@@ -333,7 +333,7 @@ mod tests {
unimplemented!(); unimplemented!();
} }
fn register_notifications_protocol(&self, _: ConsensusEngineId, _: Cow<'static, [u8]>) {} fn register_notifications_protocol(&self, _: ConsensusEngineId, _: Cow<'static, str>) {}
fn announce(&self, _: B::Hash, _: Vec<u8>) { fn announce(&self, _: B::Hash, _: Vec<u8>) {
unimplemented!(); unimplemented!();
...@@ -362,7 +362,7 @@ mod tests { ...@@ -362,7 +362,7 @@ mod tests {
let mut gossip_engine = GossipEngine::<Block>::new( let mut gossip_engine = GossipEngine::<Block>::new(
network.clone(), network.clone(),
[1, 2, 3, 4], [1, 2, 3, 4],
"my_protocol".as_bytes(), "my_protocol",
Arc::new(AllowAll{}), Arc::new(AllowAll{}),
); );
...@@ -390,7 +390,7 @@ mod tests { ...@@ -390,7 +390,7 @@ mod tests {
let mut gossip_engine = GossipEngine::<Block>::new( let mut gossip_engine = GossipEngine::<Block>::new(
network.clone(), network.clone(),
engine_id.clone(), engine_id.clone(),
"my_protocol".as_bytes(), "my_protocol",
Arc::new(AllowAll{}), Arc::new(AllowAll{}),
); );
...@@ -525,7 +525,7 @@ mod tests { ...@@ -525,7 +525,7 @@ mod tests {
let mut gossip_engine = GossipEngine::<Block>::new( let mut gossip_engine = GossipEngine::<Block>::new(
network.clone(), network.clone(),
engine_id.clone(), engine_id.clone(),
"my_protocol".as_bytes(), "my_protocol",
Arc::new(TestValidator{}), Arc::new(TestValidator{}),
); );
......
...@@ -87,7 +87,7 @@ pub trait Network<B: BlockT> { ...@@ -87,7 +87,7 @@ pub trait Network<B: BlockT> {
fn register_notifications_protocol( fn register_notifications_protocol(
&self, &self,
engine_id: ConsensusEngineId, engine_id: ConsensusEngineId,
protocol_name: Cow<'static, [u8]>, protocol_name: Cow<'static, str>,
); );
/// Notify everyone we're connected to that we have the given block. /// Notify everyone we're connected to that we have the given block.
...@@ -117,7 +117,7 @@ impl<B: BlockT, H: ExHashT> Network<B> for Arc<NetworkService<B, H>> { ...@@ -117,7 +117,7 @@ impl<B: BlockT, H: ExHashT> Network<B> for Arc<NetworkService<B, H>> {
fn register_notifications_protocol( fn register_notifications_protocol(
&self, &self,
engine_id: ConsensusEngineId, engine_id: ConsensusEngineId,
protocol_name: Cow<'static, [u8]>, protocol_name: Cow<'static, str>,
) { ) {
NetworkService::register_notifications_protocol(self, engine_id, protocol_name) NetworkService::register_notifications_protocol(self, engine_id, protocol_name)
} }
......
...@@ -489,7 +489,7 @@ mod tests { ...@@ -489,7 +489,7 @@ mod tests {
unimplemented!(); unimplemented!();
} }
fn register_notifications_protocol(&self, _: ConsensusEngineId, _: Cow<'static, [u8]>) {} fn register_notifications_protocol(&self, _: ConsensusEngineId, _: Cow<'static, str>) {}
fn announce(&self, _: B::Hash, _: Vec<u8>) { fn announce(&self, _: B::Hash, _: Vec<u8>) {
unimplemented!(); unimplemented!();
......
...@@ -16,6 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"] ...@@ -16,6 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]
prost-build = "0.6.1" prost-build = "0.6.1"
[dependencies] [dependencies]
async-trait = "0.1"
async-std = { version = "1.6.2", features = ["unstable"] } async-std = { version = "1.6.2", features = ["unstable"] }
bitflags = "1.2.0" bitflags = "1.2.0"
bs58 = "0.3.1" bs58 = "0.3.1"
...@@ -64,7 +65,7 @@ zeroize = "1.0.0" ...@@ -64,7 +65,7 @@ zeroize = "1.0.0"
[dependencies.libp2p] [dependencies.libp2p]
version = "0.24.0" version = "0.24.0"
default-features = false default-features = false
features = ["identify", "kad", "mdns-async-std", "mplex", "noise", "ping", "tcp-async-std", "websocket", "yamux"] features = ["identify", "kad", "mdns-async-std", "mplex", "noise", "ping", "request-response", "tcp-async-std", "websocket", "yamux"]
[dev-dependencies] [dev-dependencies]
assert_matches = "1.3" assert_matches = "1.3"
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
use crate::{ use crate::{
config::{ProtocolId, Role}, block_requests, light_client_handler, finality_requests, config::{ProtocolId, Role}, block_requests, light_client_handler, finality_requests,
peer_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, peer_info, request_responses, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
protocol::{message::{self, Roles}, CustomMessageOutcome, NotificationsSink, Protocol}, protocol::{message::{self, Roles}, CustomMessageOutcome, NotificationsSink, Protocol},
ObservedRole, DhtEvent, ExHashT, ObservedRole, DhtEvent, ExHashT,
}; };
...@@ -39,6 +39,10 @@ use std::{ ...@@ -39,6 +39,10 @@ use std::{
time::Duration, time::Duration,
}; };
pub use crate::request_responses::{
ResponseFailure, InboundFailure, RequestFailure, OutboundFailure, RequestId, SendRequestError
};
/// General behaviour of the network. Combines all protocols together. /// General behaviour of the network. Combines all protocols together.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOut<B>", poll_method = "poll")] #[behaviour(out_event = "BehaviourOut<B>", poll_method = "poll")]
...@@ -50,6 +54,8 @@ pub struct Behaviour<B: BlockT, H: ExHashT> { ...@@ -50,6 +54,8 @@ pub struct Behaviour<B: BlockT, H: ExHashT> {
peer_info: peer_info::PeerInfoBehaviour, peer_info: peer_info::PeerInfoBehaviour,
/// Discovers nodes of the network. /// Discovers nodes of the network.
discovery: DiscoveryBehaviour, discovery: DiscoveryBehaviour,
/// Generic request-reponse protocols.
request_responses: request_responses::RequestResponsesBehaviour,
/// Block request handling. /// Block request handling.
block_requests: block_requests::BlockRequests<B>, block_requests: block_requests::BlockRequests<B>,
/// Finality proof request handling. /// Finality proof request handling.
...@@ -76,22 +82,40 @@ pub enum BehaviourOut<B: BlockT> { ...@@ -76,22 +82,40 @@ pub enum BehaviourOut<B: BlockT> {
RandomKademliaStarted(ProtocolId), RandomKademliaStarted(ProtocolId),
/// We have received a request from a peer and answered it. /// We have received a request from a peer and answered it.
AnsweredRequest { ///
/// This event is generated for statistics purposes.
InboundRequest {
/// Peer which sent us a request. /// Peer which sent us a request.
peer: PeerId, peer: PeerId,
/// Protocol name of the request. /// Protocol name of the request.
protocol: String, protocol: Cow<'static, str>,
/// Time it took to build the response. /// If `Ok`, contains the time elapsed between when we received the request and when we
build_time: Duration, /// sent back the response. If `Err`, the error that happened.
result: Result<Duration, ResponseFailure>,
}, },
/// A request initiated using [`Behaviour::send_request`] has succeeded or failed.
RequestFinished {
/// Request that has succeeded.
request_id: RequestId,
/// Response sent by the remote or reason for failure.
result: Result<Vec<u8>, RequestFailure>,
},
/// Started a new request with the given node. /// Started a new request with the given node.
RequestStarted { ///
/// This event is for statistics purposes only. The request and response handling are entirely
/// internal to the behaviour.
OpaqueRequestStarted {
peer: PeerId, peer: PeerId,
/// Protocol name of the request. /// Protocol name of the request.
protocol: String, protocol: String,
}, },
/// Finished, successfully or not, a previously-started request. /// Finished, successfully or not, a previously-started request.
RequestFinished { ///
/// This event is for statistics purposes only. The request and response handling are entirely
/// internal to the behaviour.
OpaqueRequestFinished {
/// Who we were requesting. /// Who we were requesting.
peer: PeerId, peer: PeerId,
/// Protocol name of the request. /// Protocol name of the request.
...@@ -161,17 +185,20 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> { ...@@ -161,17 +185,20 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
finality_proof_requests: finality_requests::FinalityProofRequests<B>, finality_proof_requests: finality_requests::FinalityProofRequests<B>,
light_client_handler: light_client_handler::LightClientHandler<B>, light_client_handler: light_client_handler::LightClientHandler<B>,
disco_config: DiscoveryConfig, disco_config: DiscoveryConfig,
) -> Self { request_response_protocols: Vec<request_responses::ProtocolConfig>,
Behaviour { ) -> Result<Self, request_responses::RegisterError> {
Ok(Behaviour {
substrate, substrate,
peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key), peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key),
discovery: disco_config.finish(), discovery: disco_config.finish(),
request_responses:
request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?,
block_requests, block_requests,
finality_proof_requests, finality_proof_requests,
light_client_handler, light_client_handler,
events: VecDeque::new(), events: VecDeque::new(),
role, role,
} })
} }
/// Returns the list of nodes that we know exist in the network. /// Returns the list of nodes that we know exist in the network.
...@@ -208,6 +235,16 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> { ...@@ -208,6 +235,16 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
self.peer_info.node(peer_id) self.peer_info.node(peer_id)
} }
/// Initiates sending a request.
///
/// An error is returned if we are not connected to the target peer of if the protocol doesn't
/// match one that has been registered.
pub fn send_request(&mut self, target: &PeerId, protocol: &str, request: Vec<u8>)
-> Result<RequestId, SendRequestError>
{
self.request_responses.send_request(target, protocol, request)
}
/// Registers a new notifications protocol. /// Registers a new notifications protocol.
/// ///
/// Please call `event_stream` before registering a protocol, otherwise you may miss events /// Please call `event_stream` before registering a protocol, otherwise you may miss events
...@@ -218,7 +255,7 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> { ...@@ -218,7 +255,7 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
pub fn register_notifications_protocol( pub fn register_notifications_protocol(
&mut self, &mut self,
engine_id: ConsensusEngineId, engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, [u8]>>, protocol_name: impl Into<Cow<'static, str>>,
) { ) {
// This is the message that we will send to the remote as part of the initial handshake. // This is the message that we will send to the remote as part of the initial handshake.
// At the moment, we force this to be an encoded `Roles`. // At the moment, we force this to be an encoded `Roles`.
...@@ -298,18 +335,18 @@ Behaviour<B, H> { ...@@ -298,18 +335,18 @@ Behaviour<B, H> {
CustomMessageOutcome::BlockRequest { target, request } => { CustomMessageOutcome::BlockRequest { target, request } => {
match self.block_requests.send_request(&target, request) { match self.block_requests.send_request(&target, request) {
block_requests::SendRequestOutcome::Ok => { block_requests::SendRequestOutcome::Ok => {
self.events.push_back(BehaviourOut::RequestStarted { self.events.push_back(BehaviourOut::OpaqueRequestStarted {
peer: target, peer: target,
protocol: self.block_requests.protocol_name().to_owned(), protocol: self.block_requests.protocol_name().to_owned(),
}); });
}, },
block_requests::SendRequestOutcome::Replaced { request_duration, .. } => { block_requests::SendRequestOutcome::Replaced { request_duration, .. } => {
self.events.push_back(BehaviourOut::RequestFinished { self.events.push_back(BehaviourOut::OpaqueRequestFinished {
peer: target.clone(), peer: target.clone(),
protocol: self.block_requests.protocol_name().to_owned(), protocol: self.block_requests.protocol_name().to_owned(),
request_duration, request_duration,
}); });
self.events.push_back(BehaviourOut::RequestStarted { self.events.push_back(BehaviourOut::OpaqueRequestStarted {
peer: target, peer: target,
protocol: self.block_requests.protocol_name().to_owned(), protocol: self.block_requests.protocol_name().to_owned(),
}); });
...@@ -358,18 +395,39 @@ Behaviour<B, H> { ...@@ -358,18 +395,39 @@ Behaviour<B, H> {
} }
} }
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<request_responses::Event> for Behaviour<B, H> {
fn inject_event(&mut self, event: request_responses::Event) {
match event {
request_responses::Event::InboundRequest { peer, protocol, result } => {
self.events.push_back(BehaviourOut::InboundRequest {
peer,
protocol,
result,
});
}
request_responses::Event::RequestFinished { request_id, result } => {
self.events.push_back(BehaviourOut::RequestFinished {
request_id,
result,
});
},
}
}
}
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B>> for Behaviour<B, H> { impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B>> for Behaviour<B, H> {
fn inject_event(&mut self, event: block_requests::Event<B>) { fn inject_event(&mut self, event: block_requests::Event<B>) {
match event { match event {
block_requests::Event::AnsweredRequest { peer, total_handling_time } => { block_requests::Event::AnsweredRequest { peer, total_handling_time } => {
self.events.push_back(BehaviourOut::AnsweredRequest { self.events.push_back(BehaviourOut::InboundRequest {
peer, peer,
protocol: self.block_requests.protocol_name().to_owned(), protocol: self.block_requests.protocol_name().to_owned().into(),
build_time: total_handling_time, result: Ok(total_handling_time),
}); });
}, },
block_requests::Event::Response { peer, original_request: _, response, request_duration } => { block_requests::Event::Response { peer, original_request: _, response, request_duration } => {
self.events.push_back(BehaviourOut::RequestFinished { self.events.push_back(BehaviourOut::OpaqueRequestFinished {
peer: peer.clone(), peer: peer.clone(),
protocol: self.block_requests.protocol_name().to_owned(), protocol: self.block_requests.protocol_name().to_owned(),
request_duration, request_duration,
...@@ -381,7 +439,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B ...@@ -381,7 +439,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B
block_requests::Event::RequestTimeout { peer, request_duration, .. } => { block_requests::Event::RequestTimeout { peer, request_duration, .. } => {
// There doesn't exist any mechanism to report cancellations or timeouts yet, so // There doesn't exist any mechanism to report cancellations or timeouts yet, so
// we process them by disconnecting the node. // we process them by disconnecting the node.
self.events.push_back(BehaviourOut::RequestFinished { self.events.push_back(BehaviourOut::OpaqueRequestFinished {
peer: peer.clone(), peer: peer.clone(),
protocol: self.block_requests.protocol_name().to_owned(), protocol: self.block_requests.protocol_name().to_owned(),
request_duration, request_duration,
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
pub use crate::chain::{Client, FinalityProofProvider}; pub use crate::chain::{Client, FinalityProofProvider};
pub use crate::on_demand_layer::{AlwaysBadChecker, OnDemand}; pub use crate::on_demand_layer::{AlwaysBadChecker, OnDemand};
pub use crate::request_responses::{IncomingRequest, ProtocolConfig as RequestResponseConfig};
pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multiaddr}; pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multiaddr};
// Note: this re-export shouldn't be part of the public API of the crate and will be removed in // Note: this re-export shouldn't be part of the public API of the crate and will be removed in
...@@ -34,9 +35,10 @@ use crate::ExHashT; ...@@ -34,9 +35,10 @@ use crate::ExHashT;
use core::{fmt, iter}; use core::{fmt, iter};
use futures::future; use futures::future;
use libp2p::identity::{ed25519, Keypair}; use libp2p::{
use libp2p::wasm_ext; identity::{ed25519, Keypair},
use libp2p::{multiaddr, Multiaddr, PeerId}; multiaddr, wasm_ext, Multiaddr, PeerId,
};
use prometheus_endpoint::Registry; use prometheus_endpoint::Registry;
use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue}; use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue};
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
...@@ -413,7 +415,9 @@ pub struct NetworkConfiguration { ...@@ -413,7 +415,9 @@ pub struct NetworkConfiguration {
pub node_key: NodeKeyConfig, pub node_key: NodeKeyConfig,
/// List of notifications protocols that the node supports. Must also include a /// List of notifications protocols that the node supports. Must also include a
/// `ConsensusEngineId` for backwards-compatibility. /// `ConsensusEngineId` for backwards-compatibility.
pub notifications_protocols: Vec<(ConsensusEngineId, Cow<'static, [u8]>)>, pub notifications_protocols: Vec<(ConsensusEngineId, Cow<'static, str>)>,
/// List of request-response protocols that the node supports.
pub request_response_protocols: Vec<RequestResponseConfig>,
/// Maximum allowed number of incoming connections. /// Maximum allowed number of incoming connections.
pub in_peers: u32, pub in_peers: u32,
/// Number of outgoing connections we're trying to maintain. /// Number of outgoing connections we're trying to maintain.
...@@ -449,6 +453,7 @@ impl NetworkConfiguration { ...@@ -449,6 +453,7 @@ impl NetworkConfiguration {
boot_nodes: Vec::new(), boot_nodes: Vec::new(),
node_key, node_key,
notifications_protocols: Vec::new(), notifications_protocols: Vec::new(),
request_response_protocols: Vec::new(),
in_peers: 25, in_peers: 25,
out_peers: 75, out_peers: 75,
reserved_nodes: Vec::new(), reserved_nodes: Vec::new(),
...@@ -465,9 +470,7 @@ impl NetworkConfiguration { ...@@ -465,9 +470,7 @@ impl NetworkConfiguration {
allow_non_globals_in_dht: false, allow_non_globals_in_dht: false,
} }
} }
}
impl NetworkConfiguration {
/// Create new default configuration for localhost-only connection with random port (useful for testing) /// Create new default configuration for localhost-only connection with random port (useful for testing)
pub fn new_local() -> NetworkConfiguration { pub fn new_local() -> NetworkConfiguration {
let mut config = NetworkConfiguration::new( let mut config = NetworkConfiguration::new(
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
use crate::config::TransportConfig; use crate::config::TransportConfig;
use libp2p::{PeerId, Multiaddr}; use libp2p::{PeerId, Multiaddr};
use std::fmt; use std::{borrow::Cow, fmt};
/// Result type alias for the network. /// Result type alias for the network.
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
...@@ -61,6 +61,12 @@ pub enum Error { ...@@ -61,6 +61,12 @@ pub enum Error {
/// The invalid addresses. /// The invalid addresses.
addresses: Vec<Multiaddr>, addresses: Vec<Multiaddr>,
}, },
/// The same request-response protocol has been registered multiple times.
#[display(fmt = "Request-response protocol registered multiple times: {}", protocol)]
DuplicateRequestResponseProtocol {
/// Name of the protocol registered multiple times.
protocol: Cow<'static, str>,
},
} }
// Make `Debug` use the `Display` implementation. // Make `Debug` use the `Display` implementation.
...@@ -78,6 +84,7 @@ impl std::error::Error for Error { ...@@ -78,6 +84,7 @@ impl std::error::Error for Error {
Error::DuplicateBootnode { .. } => None, Error::DuplicateBootnode { .. } => None,
Error::Prometheus(ref err) => Some(err), Error::Prometheus(ref err) => Some(err),
Error::AddressesForAnotherTransport { .. } => None, Error::AddressesForAnotherTransport { .. } => None,
Error::DuplicateRequestResponseProtocol { .. } => None,
} }
} }
} }
...@@ -130,14 +130,14 @@ fn build_nodes_one_proto() ...@@ -130,14 +130,14 @@ fn build_nodes_one_proto()
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())]; let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];
let (node1, events_stream1) = build_test_full_node(config::NetworkConfiguration { let (node1, events_stream1) = build_test_full_node(config::NetworkConfiguration {
notifications_protocols: vec![(ENGINE_ID, From::from(&b"/foo"[..]))], notifications_protocols: vec![(ENGINE_ID, From::from("/foo"))],
listen_addresses: vec![listen_addr.clone()], listen_addresses: vec![listen_addr.clone()],
transport: config::TransportConfig::MemoryOnly, transport: config::TransportConfig::MemoryOnly,
.. config::NetworkConfiguration::new_local() .. config::NetworkConfiguration::new_local()
}); });
let (node2, events_stream2) = build_test_full_node(config::NetworkConfiguration { let (node2, events_stream2) = build_test_full_node(config::NetworkConfiguration {
notifications_protocols: vec![(ENGINE_ID, From::from(&b"/foo"[..]))], notifications_protocols: vec![(ENGINE_ID, From::from("/foo"))],
listen_addresses: vec![], listen_addresses: vec![],
reserved_nodes: vec![config::MultiaddrWithPeerId { reserved_nodes: vec![config::MultiaddrWithPeerId {
multiaddr: listen_addr, multiaddr: listen_addr,
......
...@@ -253,6 +253,7 @@ mod finality_requests; ...@@ -253,6 +253,7 @@ mod finality_requests;
mod light_client_handler; mod light_client_handler;
mod on_demand_layer; mod on_demand_layer;
mod protocol; mod protocol;
mod request_responses;
mod schema; mod schema;
mod service; mod service;
mod transport; mod transport;
...@@ -263,13 +264,10 @@ pub mod error; ...@@ -263,13 +264,10 @@ pub mod error;
pub mod gossip; pub mod gossip;
pub mod network_state; pub mod network_state;
pub use service::{NetworkService, NetworkWorker};
pub use protocol::PeerInfo;
pub use protocol::event::{Event, DhtEvent, ObservedRole};
pub use protocol::sync::SyncState;
pub use libp2p::{Multiaddr, PeerId};
#[doc(inline)] #[doc(inline)]
pub use libp2p::multiaddr; pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use protocol::{event::{DhtEvent, Event, ObservedRole}, sync::SyncState, PeerInfo};
pub use service::{NetworkService, NetworkWorker, RequestFailure, OutboundFailure};
pub use sc_peerset::ReputationChange; pub use sc_peerset::ReputationChange;
use sp_runtime::traits::{Block as BlockT, NumberFor}; use sp_runtime::traits::{Block as BlockT, NumberFor};
......
...@@ -245,13 +245,13 @@ pub struct Protocol<B: BlockT, H: ExHashT> { ...@@ -245,13 +245,13 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
/// Handles opening the unique substream and sending and receiving raw messages. /// Handles opening the unique substream and sending and receiving raw messages.
behaviour: GenericProto, behaviour: GenericProto,
/// For each legacy gossiping engine ID, the corresponding new protocol name. /// For each legacy gossiping engine ID, the corresponding new protocol name.
protocol_name_by_engine: HashMap<ConsensusEngineId, Cow<'static, [u8]>>, protocol_name_by_engine: HashMap<ConsensusEngineId, Cow<'static, str>>,
/// For each protocol name, the legacy equivalent. /// For each protocol name, the legacy equivalent.
legacy_equiv_by_name: HashMap<Cow<'static, [u8]>, Fallback>, legacy_equiv_by_name: HashMap<Cow<'static, str>, Fallback>,
/// Name of the protocol used for transactions. /// Name of the protocol used for transactions.
transactions_protocol: Cow<'static, [u8]>, transactions_protocol: Cow<'static, str>,
/// Name of the protocol used for block announces. /// Name of the protocol used for block announces.
block_announces_protocol: Cow<'static, [u8]>, block_announces_protocol: Cow<'static, str>,
/// Prometheus metrics. /// Prometheus metrics.
metrics: Option<Metrics>, metrics: Option<Metrics>,
/// The `PeerId`'s of all boot nodes. /// The `PeerId`'s of all boot nodes.
...@@ -417,19 +417,21 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> { ...@@ -417,19 +417,21 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
let mut legacy_equiv_by_name = HashMap::new(); let mut legacy_equiv_by_name = HashMap::new();
let transactions_protocol: Cow<'static, [u8]> = Cow::from({ let transactions_protocol: Cow<'static, str> = Cow::from({
let mut proto = b"/".to_vec(); let mut proto = String::new();
proto.extend(protocol_id.as_ref().as_bytes()); proto.push_str("/");
proto.extend(b"/transactions/1"); proto.push_str(protocol_id.as_ref());
proto.push_str("/transactions/1");
proto proto
}); });
behaviour.register_notif_protocol(transactions_protocol.clone(), Vec::new()); behaviour.register_notif_protocol(transactions_protocol.clone(), Vec::new());
legacy_equiv_by_name.insert(transactions_protocol.clone(), Fallback::Transactions); legacy_equiv_by_name.insert(transactions_protocol.clone(), Fallback::Transactions);
let block_announces_protocol: Cow<'static, [u8]> = Cow::from({ let block_announces_protocol: Cow<'static, str> = Cow::from({
let mut proto = b"/".to_vec(); let mut proto = String::new();
proto.extend(protocol_id.as_ref().as_bytes()); proto.push_str("/");
proto.extend(b"/block-announces/1"); proto.push_str(protocol_id.as_ref());
proto.push_str("/block-announces/1");
proto proto
}); });
behaviour.register_notif_protocol( behaviour.register_notif_protocol(
...@@ -646,7 +648,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> { ...@@ -646,7 +648,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
messages: vec![(msg.engine_id, From::from(msg.data))], messages: vec![(msg.engine_id, From::from(msg.data))],
} }
} else { } else {
warn!(target: "sync", "Received message on non-registered protocol: {:?}", msg.engine_id); debug!(target: "sync", "Received message on non-registered protocol: {:?}", msg.engine_id);
CustomMessageOutcome::None CustomMessageOutcome::None
}, },
GenericMessage::ConsensusBatch(messages) => { GenericMessage::ConsensusBatch(messages) => {
...@@ -656,7 +658,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> { ...@@ -656,7 +658,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
if self.protocol_name_by_engine.contains_key(&msg.engine_id) { if self.protocol_name_by_engine.contains_key(&msg.engine_id) {
Some((msg.engine_id, From::from(msg.data))) Some((msg.engine_id, From::from(msg.data)))
} else { } else {
warn!(target: "sync", "Received message on non-registered protocol: {:?}", msg.engine_id); debug!(target: "sync", "Received message on non-registered protocol: {:?}", msg.engine_id);
None None
} }
}) })
...@@ -679,7 +681,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> { ...@@ -679,7 +681,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
fn send_message( fn send_message(
&mut self, &mut self,
who: &PeerId, who: &PeerId,
message: Option<(Cow<'static, [u8]>, Vec<u8>)>, message: Option<(Cow<'static, str>, Vec<u8>)>,
legacy: Message<B>, legacy: Message<B>,
) { ) {
send_message::<B>( send_message::<B>(
...@@ -1076,7 +1078,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> { ...@@ -1076,7 +1078,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
pub fn register_notifications_protocol<'a>( pub fn register_notifications_protocol<'a>(
&'a mut self, &'a mut self,
engine_id: ConsensusEngineId, engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, [u8]>>, protocol_name: impl Into<Cow<'static, str>>,
handshake_message: Vec<u8>, handshake_message: Vec<u8>,
) -> impl Iterator<Item = (&'a PeerId, Roles, &'a NotificationsSink)> + 'a { ) -> impl Iterator<Item = (&'a PeerId, Roles, &'a NotificationsSink)> + 'a {
let protocol_name = protocol_name.into(); let protocol_name = protocol_name.into();
...@@ -1607,7 +1609,7 @@ fn send_message<B: BlockT>( ...@@ -1607,7 +1609,7 @@ fn send_message<B: BlockT>(
behaviour: &mut GenericProto, behaviour: &mut GenericProto,
stats: &mut HashMap<&'static str, PacketStats>, stats: &mut HashMap<&'static str, PacketStats>,
who: &PeerId, who: &PeerId,
message: Option<(Cow<'static, [u8]>, Vec<u8>)>, message: Option<(Cow<'static, str>, Vec<u8>)>,
legacy_message: Message<B>, legacy_message: Message<B>,
) { ) {
let encoded = legacy_message.encode(); let encoded = legacy_message.encode();
...@@ -1795,7 +1797,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> { ...@@ -1795,7 +1797,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
} }
} }
None => { None => {
error!(target: "sub-libp2p", "Received notification from unknown protocol {:?}", protocol_name); debug!(target: "sub-libp2p", "Received notification from unknown protocol {:?}", protocol_name);
CustomMessageOutcome::None CustomMessageOutcome::None
} }
} }
......
...@@ -120,7 +120,7 @@ pub struct GenericProto { ...@@ -120,7 +120,7 @@ pub struct GenericProto {
/// Notification protocols. Entries are only ever added and not removed. /// Notification protocols. Entries are only ever added and not removed.
/// Contains, for each protocol, the protocol name and the message to send as part of the /// Contains, for each protocol, the protocol name and the message to send as part of the
/// initial handshake. /// initial handshake.
notif_protocols: Vec<(Cow<'static, [u8]>, Arc<RwLock<Vec<u8>>>)>, notif_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>,
/// Receiver for instructions about who to connect to or disconnect from. /// Receiver for instructions about who to connect to or disconnect from.
peerset: sc_peerset::Peerset, peerset: sc_peerset::Peerset,
...@@ -322,7 +322,7 @@ pub enum GenericProtoOut { ...@@ -322,7 +322,7 @@ pub enum GenericProtoOut {
/// Id of the peer the message came from. /// Id of the peer the message came from.
peer_id: PeerId, peer_id: PeerId,
/// Engine corresponding to the message. /// Engine corresponding to the message.
protocol_name: Cow<'static, [u8]>, protocol_name: Cow<'static, str>,
/// Message that has been received. /// Message that has been received.
message: BytesMut, message: BytesMut,
}, },
...@@ -360,7 +360,7 @@ impl GenericProto { ...@@ -360,7 +360,7 @@ impl GenericProto {
/// will retain the protocols that were registered then, and not any new one. /// will retain the protocols that were registered then, and not any new one.
pub fn register_notif_protocol( pub fn register_notif_protocol(
&mut self, &mut self,
protocol_name: impl Into<Cow<'static, [u8]>>, protocol_name: impl Into<Cow<'static, str>>,
handshake_msg: impl Into<Vec<u8>> handshake_msg: impl Into<Vec<u8>>
) { ) {
self.notif_protocols.push((protocol_name.into(), Arc::new(RwLock::new(handshake_msg.into())))); self.notif_protocols.push((protocol_name.into(), Arc::new(RwLock::new(handshake_msg.into()))));
...@@ -371,10 +371,10 @@ impl GenericProto { ...@@ -371,10 +371,10 @@ impl GenericProto {
/// Has no effect if the protocol is unknown. /// Has no effect if the protocol is unknown.
pub fn set_notif_protocol_handshake( pub fn set_notif_protocol_handshake(
&mut self, &mut self,
protocol_name: &[u8], protocol_name: &str,
handshake_message: impl Into<Vec<u8>> handshake_message: impl Into<Vec<u8>>
) { ) {
if let Some(protocol) = self.notif_protocols.iter_mut().find(|(name, _)| name == &protocol_name) { if let Some(protocol) = self.notif_protocols.iter_mut().find(|(name, _)| name == protocol_name) {
*protocol.1.write() = handshake_message.into(); *protocol.1.write() = handshake_message.into();
} }
} }
...@@ -551,7 +551,7 @@ impl GenericProto { ...@@ -551,7 +551,7 @@ impl GenericProto {
pub fn write_notification( pub fn write_notification(
&mut self, &mut self,
target: &PeerId, target: &PeerId,
protocol_name: Cow<'static, [u8]>, protocol_name: Cow<'static, str>,
message: impl Into<Vec<u8>>, message: impl Into<Vec<u8>>,
encoded_fallback_message: Vec<u8>, encoded_fallback_message: Vec<u8>,
) { ) {
...@@ -569,11 +569,11 @@ impl GenericProto { ...@@ -569,11 +569,11 @@ impl GenericProto {
target: "sub-libp2p", target: "sub-libp2p",
"External API => Notification({:?}, {:?})", "External API => Notification({:?}, {:?})",
target, target,
str::from_utf8(&protocol_name) protocol_name,
); );
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);
notifs_sink.send_sync_notification( notifs_sink.send_sync_notification(
&protocol_name, protocol_name,
encoded_fallback_message, encoded_fallback_message,
message message
); );
...@@ -1374,7 +1374,7 @@ impl NetworkBehaviour for GenericProto { ...@@ -1374,7 +1374,7 @@ impl NetworkBehaviour for GenericProto {
target: "sub-libp2p", target: "sub-libp2p",
"Handler({:?}) => Notification({:?})", "Handler({:?}) => Notification({:?})",
source, source,
str::from_utf8(&protocol_name) protocol_name,
); );
trace!(target: "sub-libp2p", "External API <= Message({:?}, {:?})", protocol_name, source); trace!(target: "sub-libp2p", "External API <= Message({:?}, {:?})", protocol_name, source);
let event = GenericProtoOut::Notification { let event = GenericProtoOut::Notification {
......
...@@ -224,7 +224,7 @@ pub enum NotifsHandlerOut { ...@@ -224,7 +224,7 @@ pub enum NotifsHandlerOut {
/// Received a message on a custom protocol substream. /// Received a message on a custom protocol substream.
Notification { Notification {
/// Name of the protocol of the message. /// Name of the protocol of the message.
protocol_name: Cow<'static, [u8]>, protocol_name: Cow<'static, str>,
/// Message that has been received. /// Message that has been received.
message: BytesMut, message: BytesMut,
...@@ -270,7 +270,7 @@ enum NotificationsSinkMessage { ...@@ -270,7 +270,7 @@ enum NotificationsSinkMessage {
/// Message emitted by [`NotificationsSink::reserve_notification`] and /// Message emitted by [`NotificationsSink::reserve_notification`] and
/// [`NotificationsSink::write_notification_now`]. /// [`NotificationsSink::write_notification_now`].
Notification { Notification {
protocol_name: Vec<u8>, protocol_name: Cow<'static, str>,
encoded_fallback_message: Vec<u8>, encoded_fallback_message: Vec<u8>,
message: Vec<u8>, message: Vec<u8>,
}, },
...@@ -311,13 +311,13 @@ impl NotificationsSink { ...@@ -311,13 +311,13 @@ impl NotificationsSink {
/// This method will be removed in a future version. /// This method will be removed in a future version.
pub fn send_sync_notification<'a>( pub fn send_sync_notification<'a>(
&'a self, &'a self,
protocol_name: &[u8], protocol_name: Cow<'static, str>,
encoded_fallback_message: impl Into<Vec<u8>>, encoded_fallback_message: impl Into<Vec<u8>>,
message: impl Into<Vec<u8>> message: impl Into<Vec<u8>>
) { ) {
let mut lock = self.inner.sync_channel.lock(); let mut lock = self.inner.sync_channel.lock();
let result = lock.try_send(NotificationsSinkMessage::Notification { let result = lock.try_send(NotificationsSinkMessage::Notification {
protocol_name: protocol_name.to_owned(), protocol_name: protocol_name,
encoded_fallback_message: encoded_fallback_message.into(), encoded_fallback_message: encoded_fallback_message.into(),
message: message.into() message: message.into()
}); });
...@@ -336,12 +336,12 @@ impl NotificationsSink { ...@@ -336,12 +336,12 @@ impl NotificationsSink {
/// ///
/// The protocol name is expected to be checked ahead of calling this method. It is a logic /// The protocol name is expected to be checked ahead of calling this method. It is a logic
/// error to send a notification using an unknown protocol. /// error to send a notification using an unknown protocol.
pub async fn reserve_notification<'a>(&'a self, protocol_name: &[u8]) -> Result<Ready<'a>, ()> { pub async fn reserve_notification<'a>(&'a self, protocol_name: Cow<'static, str>) -> Result<Ready<'a>, ()> {
let mut lock = self.inner.async_channel.lock().await; let mut lock = self.inner.async_channel.lock().await;
let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await; let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await;
if poll_ready.is_ok() { if poll_ready.is_ok() {
Ok(Ready { protocol_name: protocol_name.to_owned(), lock }) Ok(Ready { protocol_name: protocol_name, lock })
} else { } else {
Err(()) Err(())
} }
...@@ -355,7 +355,7 @@ pub struct Ready<'a> { ...@@ -355,7 +355,7 @@ pub struct Ready<'a> {
/// Guarded channel. The channel inside is guaranteed to not be full. /// Guarded channel. The channel inside is guaranteed to not be full.
lock: FuturesMutexGuard<'a, mpsc::Sender<NotificationsSinkMessage>>, lock: FuturesMutexGuard<'a, mpsc::Sender<NotificationsSinkMessage>>,
/// Name of the protocol. Should match one of the protocols passed at initialization. /// Name of the protocol. Should match one of the protocols passed at initialization.
protocol_name: Vec<u8>, protocol_name: Cow<'static, str>,
} }
impl<'a> Ready<'a> { impl<'a> Ready<'a> {
...@@ -392,7 +392,7 @@ impl NotifsHandlerProto { ...@@ -392,7 +392,7 @@ impl NotifsHandlerProto {
/// ourselves or respond to handshake from the remote. /// ourselves or respond to handshake from the remote.
pub fn new( pub fn new(
legacy: RegisteredProtocol, legacy: RegisteredProtocol,
list: impl Into<Vec<(Cow<'static, [u8]>, Arc<RwLock<Vec<u8>>>)>>, list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>>,
) -> Self { ) -> Self {
let list = list.into(); let list = list.into();
...@@ -613,7 +613,7 @@ impl ProtocolsHandler for NotifsHandler { ...@@ -613,7 +613,7 @@ impl ProtocolsHandler for NotifsHandler {
message message
} => { } => {
for (handler, _) in &mut self.out_handlers { for (handler, _) in &mut self.out_handlers {
if handler.protocol_name() == &protocol_name[..] && handler.is_open() { if *handler.protocol_name() == protocol_name && handler.is_open() {
handler.send_or_discard(message); handler.send_or_discard(message);
continue 'poll_notifs_sink; continue 'poll_notifs_sink;
} }
...@@ -698,7 +698,7 @@ impl ProtocolsHandler for NotifsHandler { ...@@ -698,7 +698,7 @@ impl ProtocolsHandler for NotifsHandler {
if self.notifications_sink_rx.is_some() { if self.notifications_sink_rx.is_some() {
let msg = NotifsHandlerOut::Notification { let msg = NotifsHandlerOut::Notification {
message, message,
protocol_name: handler.protocol_name().to_owned().into(), protocol_name: handler.protocol_name().clone(),
}; };
return Poll::Ready(ProtocolsHandlerEvent::Custom(msg)); return Poll::Ready(ProtocolsHandlerEvent::Custom(msg));
} }
......
...@@ -109,7 +109,7 @@ pub enum NotifsInHandlerOut { ...@@ -109,7 +109,7 @@ pub enum NotifsInHandlerOut {
impl NotifsInHandlerProto { impl NotifsInHandlerProto {
/// Builds a new `NotifsInHandlerProto`. /// Builds a new `NotifsInHandlerProto`.
pub fn new( pub fn new(
protocol_name: impl Into<Cow<'static, [u8]>> protocol_name: impl Into<Cow<'static, str>>
) -> Self { ) -> Self {
NotifsInHandlerProto { NotifsInHandlerProto {
in_protocol: NotificationsIn::new(protocol_name), in_protocol: NotificationsIn::new(protocol_name),
...@@ -136,7 +136,7 @@ impl IntoProtocolsHandler for NotifsInHandlerProto { ...@@ -136,7 +136,7 @@ impl IntoProtocolsHandler for NotifsInHandlerProto {
impl NotifsInHandler { impl NotifsInHandler {
/// Returns the name of the protocol that we accept. /// Returns the name of the protocol that we accept.
pub fn protocol_name(&self) -> &[u8] { pub fn protocol_name(&self) -> &Cow<'static, str> {
self.in_protocol.protocol_name() self.in_protocol.protocol_name()
} }
} }
......
...@@ -57,13 +57,13 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5); ...@@ -57,13 +57,13 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
/// See the documentation of [`NotifsOutHandler`] for more information. /// See the documentation of [`NotifsOutHandler`] for more information.
pub struct NotifsOutHandlerProto { pub struct NotifsOutHandlerProto {
/// Name of the protocol to negotiate. /// Name of the protocol to negotiate.
protocol_name: Cow<'static, [u8]>, protocol_name: Cow<'static, str>,
} }
impl NotifsOutHandlerProto { impl NotifsOutHandlerProto {
/// Builds a new [`NotifsOutHandlerProto`]. Will use the given protocol name for the /// Builds a new [`NotifsOutHandlerProto`]. Will use the given protocol name for the
/// notifications substream. /// notifications substream.
pub fn new(protocol_name: impl Into<Cow<'static, [u8]>>) -> Self { pub fn new(protocol_name: impl Into<Cow<'static, str>>) -> Self {
NotifsOutHandlerProto { NotifsOutHandlerProto {
protocol_name: protocol_name.into(), protocol_name: protocol_name.into(),
} }
...@@ -97,7 +97,7 @@ impl IntoProtocolsHandler for NotifsOutHandlerProto { ...@@ -97,7 +97,7 @@ impl IntoProtocolsHandler for NotifsOutHandlerProto {
/// the remote for the purpose of sending notifications to it. /// the remote for the purpose of sending notifications to it.
pub struct NotifsOutHandler { pub struct NotifsOutHandler {
/// Name of the protocol to negotiate. /// Name of the protocol to negotiate.
protocol_name: Cow<'static, [u8]>, protocol_name: Cow<'static, str>,
/// Relationship with the node we're connected to. /// Relationship with the node we're connected to.
state: State, state: State,
...@@ -220,7 +220,7 @@ impl NotifsOutHandler { ...@@ -220,7 +220,7 @@ impl NotifsOutHandler {
} }
/// Returns the name of the protocol that we negotiate. /// Returns the name of the protocol that we negotiate.
pub fn protocol_name(&self) -> &[u8] { pub fn protocol_name(&self) -> &Cow<'static, str> {
&self.protocol_name &self.protocol_name
} }
......
...@@ -50,7 +50,7 @@ const MAX_HANDSHAKE_SIZE: usize = 1024; ...@@ -50,7 +50,7 @@ const MAX_HANDSHAKE_SIZE: usize = 1024;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct NotificationsIn { pub struct NotificationsIn {
/// Protocol name to use when negotiating the substream. /// Protocol name to use when negotiating the substream.
protocol_name: Cow<'static, [u8]>, protocol_name: Cow<'static, str>,
} }
/// Upgrade that opens a substream, waits for the remote to accept by sending back a status /// Upgrade that opens a substream, waits for the remote to accept by sending back a status
...@@ -58,7 +58,7 @@ pub struct NotificationsIn { ...@@ -58,7 +58,7 @@ pub struct NotificationsIn {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct NotificationsOut { pub struct NotificationsOut {
/// Protocol name to use when negotiating the substream. /// Protocol name to use when negotiating the substream.
protocol_name: Cow<'static, [u8]>, protocol_name: Cow<'static, str>,
/// Message to send when we start the handshake. /// Message to send when we start the handshake.
initial_message: Vec<u8>, initial_message: Vec<u8>,
} }
...@@ -100,14 +100,14 @@ pub struct NotificationsOutSubstream<TSubstream> { ...@@ -100,14 +100,14 @@ pub struct NotificationsOutSubstream<TSubstream> {
impl NotificationsIn { impl NotificationsIn {
/// Builds a new potential upgrade. /// Builds a new potential upgrade.
pub fn new(protocol_name: impl Into<Cow<'static, [u8]>>) -> Self { pub fn new(protocol_name: impl Into<Cow<'static, str>>) -> Self {
NotificationsIn { NotificationsIn {
protocol_name: protocol_name.into(), protocol_name: protocol_name.into(),
} }
} }
/// Returns the name of the protocol that we accept. /// Returns the name of the protocol that we accept.
pub fn protocol_name(&self) -> &[u8] { pub fn protocol_name(&self) -> &Cow<'static, str> {
&self.protocol_name &self.protocol_name
} }
} }
...@@ -117,7 +117,11 @@ impl UpgradeInfo for NotificationsIn { ...@@ -117,7 +117,11 @@ impl UpgradeInfo for NotificationsIn {
type InfoIter = iter::Once<Self::Info>; type InfoIter = iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter { fn protocol_info(&self) -> Self::InfoIter {
iter::once(self.protocol_name.clone()) let bytes: Cow<'static, [u8]> = match &self.protocol_name {
Cow::Borrowed(s) => Cow::Borrowed(s.as_bytes()),
Cow::Owned(s) => Cow::Owned(s.as_bytes().to_vec())
};
iter::once(bytes)
} }
} }
...@@ -144,7 +148,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, ...@@ -144,7 +148,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
let mut initial_message = vec![0u8; initial_message_len]; let mut initial_message = vec![0u8; initial_message_len];
if !initial_message.is_empty() { if !initial_message.is_empty() {
socket.read(&mut initial_message).await?; socket.read_exact(&mut initial_message).await?;
} }
let substream = NotificationsInSubstream { let substream = NotificationsInSubstream {
...@@ -244,7 +248,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, ...@@ -244,7 +248,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
impl NotificationsOut { impl NotificationsOut {
/// Builds a new potential upgrade. /// Builds a new potential upgrade.
pub fn new(protocol_name: impl Into<Cow<'static, [u8]>>, initial_message: impl Into<Vec<u8>>) -> Self { pub fn new(protocol_name: impl Into<Cow<'static, str>>, initial_message: impl Into<Vec<u8>>) -> Self {
let initial_message = initial_message.into(); let initial_message = initial_message.into();
if initial_message.len() > MAX_HANDSHAKE_SIZE { if initial_message.len() > MAX_HANDSHAKE_SIZE {
error!(target: "sub-libp2p", "Outbound networking handshake is above allowed protocol limit"); error!(target: "sub-libp2p", "Outbound networking handshake is above allowed protocol limit");
...@@ -262,7 +266,11 @@ impl UpgradeInfo for NotificationsOut { ...@@ -262,7 +266,11 @@ impl UpgradeInfo for NotificationsOut {
type InfoIter = iter::Once<Self::Info>; type InfoIter = iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter { fn protocol_info(&self) -> Self::InfoIter {
iter::once(self.protocol_name.clone()) let bytes: Cow<'static, [u8]> = match &self.protocol_name {
Cow::Borrowed(s) => Cow::Borrowed(s.as_bytes()),
Cow::Owned(s) => Cow::Owned(s.as_bytes().to_vec())
};
iter::once(bytes)
} }
} }
...@@ -292,7 +300,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, ...@@ -292,7 +300,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
let mut handshake = vec![0u8; handshake_len]; let mut handshake = vec![0u8; handshake_len];
if !handshake.is_empty() { if !handshake.is_empty() {
socket.read(&mut handshake).await?; socket.read_exact(&mut handshake).await?;
} }
Ok((handshake, NotificationsOutSubstream { Ok((handshake, NotificationsOutSubstream {
...@@ -378,10 +386,11 @@ mod tests { ...@@ -378,10 +386,11 @@ mod tests {
use async_std::net::{TcpListener, TcpStream}; use async_std::net::{TcpListener, TcpStream};
use futures::{prelude::*, channel::oneshot}; use futures::{prelude::*, channel::oneshot};
use libp2p::core::upgrade; use libp2p::core::upgrade;
use std::borrow::Cow;
#[test] #[test]
fn basic_works() { fn basic_works() {
const PROTO_NAME: &'static [u8] = b"/test/proto/1"; const PROTO_NAME: Cow<'static, str> = Cow::Borrowed("/test/proto/1");
let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move { let client = async_std::task::spawn(async move {
...@@ -420,7 +429,7 @@ mod tests { ...@@ -420,7 +429,7 @@ mod tests {
fn empty_handshake() { fn empty_handshake() {
// Check that everything still works when the handshake messages are empty. // Check that everything still works when the handshake messages are empty.
const PROTO_NAME: &'static [u8] = b"/test/proto/1"; const PROTO_NAME: Cow<'static, str> = Cow::Borrowed("/test/proto/1");
let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move { let client = async_std::task::spawn(async move {
...@@ -457,7 +466,7 @@ mod tests { ...@@ -457,7 +466,7 @@ mod tests {
#[test] #[test]
fn refused() { fn refused() {
const PROTO_NAME: &'static [u8] = b"/test/proto/1"; const PROTO_NAME: Cow<'static, str> = Cow::Borrowed("/test/proto/1");
let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move { let client = async_std::task::spawn(async move {
...@@ -495,7 +504,7 @@ mod tests { ...@@ -495,7 +504,7 @@ mod tests {
#[test] #[test]
fn large_initial_message_refused() { fn large_initial_message_refused() {
const PROTO_NAME: &'static [u8] = b"/test/proto/1"; const PROTO_NAME: Cow<'static, str> = Cow::Borrowed("/test/proto/1");
let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move { let client = async_std::task::spawn(async move {
...@@ -526,7 +535,7 @@ mod tests { ...@@ -526,7 +535,7 @@ mod tests {
#[test] #[test]
fn large_handshake_refused() { fn large_handshake_refused() {
const PROTO_NAME: &'static [u8] = b"/test/proto/1"; const PROTO_NAME: Cow<'static, str> = Cow::Borrowed("/test/proto/1");
let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move { let client = async_std::task::spawn(async move {
......
This diff is collapsed.
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
use crate::{ use crate::{
ExHashT, NetworkStateInfo, ExHashT, NetworkStateInfo,
behaviour::{Behaviour, BehaviourOut}, behaviour::{self, Behaviour, BehaviourOut},
config::{parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig}, config::{parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig},
DhtEvent, DhtEvent,
discovery::DiscoveryConfig, discovery::DiscoveryConfig,
...@@ -42,7 +42,7 @@ use crate::{ ...@@ -42,7 +42,7 @@ use crate::{
protocol::{self, event::Event, NotifsHandlerError, LegacyConnectionKillError, NotificationsSink, Ready, sync::SyncState, PeerInfo, Protocol}, protocol::{self, event::Event, NotifsHandlerError, LegacyConnectionKillError, NotificationsSink, Ready, sync::SyncState, PeerInfo, Protocol},
transport, ReputationChange, transport, ReputationChange,
}; };
use futures::prelude::*; use futures::{channel::oneshot, prelude::*};
use libp2p::{PeerId, multiaddr, Multiaddr}; use libp2p::{PeerId, multiaddr, Multiaddr};
use libp2p::core::{ConnectedPoint, Executor, connection::{ConnectionError, PendingConnectionError}, either::EitherError}; use libp2p::core::{ConnectedPoint, Executor, connection::{ConnectionError, PendingConnectionError}, either::EitherError};
use libp2p::kad::record; use libp2p::kad::record;
...@@ -76,6 +76,9 @@ use std::{ ...@@ -76,6 +76,9 @@ use std::{
}, },
task::Poll, task::Poll,
}; };
use wasm_timer::Instant;
pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure};
mod out_events; mod out_events;
#[cfg(test)] #[cfg(test)]
...@@ -102,7 +105,7 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> { ...@@ -102,7 +105,7 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
/// that peer. Updated by the [`NetworkWorker`]. /// that peer. Updated by the [`NetworkWorker`].
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ConsensusEngineId), NotificationsSink>>>, peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ConsensusEngineId), NotificationsSink>>>,
/// For each legacy gossiping engine ID, the corresponding new protocol name. /// For each legacy gossiping engine ID, the corresponding new protocol name.
protocol_name_by_engine: Mutex<HashMap<ConsensusEngineId, Cow<'static, [u8]>>>, protocol_name_by_engine: Mutex<HashMap<ConsensusEngineId, Cow<'static, str>>>,
/// Field extracted from the [`Metrics`] struct and necessary to report the /// Field extracted from the [`Metrics`] struct and necessary to report the
/// notifications-related metrics. /// notifications-related metrics.
notifications_sizes_metric: Option<HistogramVec>, notifications_sizes_metric: Option<HistogramVec>,
...@@ -309,16 +312,28 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> { ...@@ -309,16 +312,28 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
config config
}; };
let mut behaviour = Behaviour::new( let mut behaviour = {
protocol, let result = Behaviour::new(
params.role, protocol,
user_agent, params.role,
local_public, user_agent,
block_requests, local_public,
finality_proof_requests, block_requests,
light_client_handler, finality_proof_requests,
discovery_config light_client_handler,
); discovery_config,
params.network_config.request_response_protocols,
);
match result {
Ok(b) => b,
Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) => {
return Err(Error::DuplicateRequestResponseProtocol {
protocol: proto,
})
},
}
};
for (engine_id, protocol_name) in &params.network_config.notifications_protocols { for (engine_id, protocol_name) in &params.network_config.notifications_protocols {
behaviour.register_notifications_protocol(*engine_id, protocol_name.clone()); behaviour.register_notifications_protocol(*engine_id, protocol_name.clone());
...@@ -404,6 +419,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> { ...@@ -404,6 +419,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
peers_notifications_sinks, peers_notifications_sinks,
metrics, metrics,
boot_node_ids, boot_node_ids,
pending_requests: HashMap::with_capacity(128),
}) })
} }
...@@ -630,7 +646,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> { ...@@ -630,7 +646,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
}) })
}); });
sink.send_sync_notification(&protocol_name, fallback, message); sink.send_sync_notification(protocol_name, fallback, message);
} else { } else {
return; return;
} }
...@@ -752,12 +768,50 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> { ...@@ -752,12 +768,50 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
/// parameter is a `&'static str`, and not a `String`, in order to avoid accidentally having /// parameter is a `&'static str`, and not a `String`, in order to avoid accidentally having
/// an unbounded set of Prometheus metrics, which would be quite bad in terms of memory /// an unbounded set of Prometheus metrics, which would be quite bad in terms of memory
pub fn event_stream(&self, name: &'static str) -> impl Stream<Item = Event> { pub fn event_stream(&self, name: &'static str) -> impl Stream<Item = Event> {
// Note: when transitioning to stable futures, remove the `Error` entirely
let (tx, rx) = out_events::channel(name); let (tx, rx) = out_events::channel(name);
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx)); let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
rx rx
} }
/// Sends a single targeted request to a specific peer. On success, returns the response of
/// the peer.
///
/// Request-response protocols are a way to complement notifications protocols, but
/// notifications should remain the default ways of communicating information. For example, a
/// peer can announce something through a notification, after which the recipient can obtain
/// more information by performing a request.
/// As such, this function is meant to be called only with peers we are already connected to.
/// Calling this method with a `target` we are not connected to will *not* attempt to connect
/// to said peer.
///
/// No limit or throttling of concurrent outbound requests per peer and protocol are enforced.
/// Such restrictions, if desired, need to be enforced at the call site(s).
///
/// The protocol must have been registered through
/// [`NetworkConfiguration::request_response_protocols`].
pub async fn request(
&self,
target: PeerId,
protocol: impl Into<Cow<'static, str>>,
request: Vec<u8>
) -> Result<Vec<u8>, RequestFailure> {
let (tx, rx) = oneshot::channel();
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
target,
protocol: protocol.into(),
request,
pending_response: tx
});
match rx.await {
Ok(v) => v,
// The channel can only be closed if the network worker no longer exists. If the
// network worker no longer exists, then all connections to `target` are necessarily
// closed, and we legitimately report this situation as a "ConnectionClosed".
Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
}
}
/// Registers a new notifications protocol. /// Registers a new notifications protocol.
/// ///
/// After a protocol has been registered, you can call `write_notifications`. /// After a protocol has been registered, you can call `write_notifications`.
...@@ -774,7 +828,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> { ...@@ -774,7 +828,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
pub fn register_notifications_protocol( pub fn register_notifications_protocol(
&self, &self,
engine_id: ConsensusEngineId, engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, [u8]>>, protocol_name: impl Into<Cow<'static, str>>,
) { ) {
let protocol_name = protocol_name.into(); let protocol_name = protocol_name.into();
self.protocol_name_by_engine.lock().insert(engine_id, protocol_name.clone()); self.protocol_name_by_engine.lock().insert(engine_id, protocol_name.clone());
...@@ -1008,7 +1062,7 @@ pub struct NotificationSender { ...@@ -1008,7 +1062,7 @@ pub struct NotificationSender {
sink: NotificationsSink, sink: NotificationsSink,
/// Name of the protocol on the wire. /// Name of the protocol on the wire.
protocol_name: Cow<'static, [u8]>, protocol_name: Cow<'static, str>,
/// Engine ID used for the fallback message. /// Engine ID used for the fallback message.
engine_id: ConsensusEngineId, engine_id: ConsensusEngineId,
...@@ -1022,7 +1076,7 @@ impl NotificationSender { ...@@ -1022,7 +1076,7 @@ impl NotificationSender {
/// Returns a future that resolves when the `NotificationSender` is ready to send a notification. /// Returns a future that resolves when the `NotificationSender` is ready to send a notification.
pub async fn ready<'a>(&'a self) -> Result<NotificationSenderReady<'a>, NotificationSenderError> { pub async fn ready<'a>(&'a self) -> Result<NotificationSenderReady<'a>, NotificationSenderError> {
Ok(NotificationSenderReady { Ok(NotificationSenderReady {
ready: match self.sink.reserve_notification(&self.protocol_name).await { ready: match self.sink.reserve_notification(self.protocol_name.clone()).await {
Ok(r) => r, Ok(r) => r,
Err(()) => return Err(NotificationSenderError::Closed), Err(()) => return Err(NotificationSenderError::Closed),
}, },
...@@ -1096,9 +1150,15 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> { ...@@ -1096,9 +1150,15 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
AddKnownAddress(PeerId, Multiaddr), AddKnownAddress(PeerId, Multiaddr),
SyncFork(Vec<PeerId>, B::Hash, NumberFor<B>), SyncFork(Vec<PeerId>, B::Hash, NumberFor<B>),
EventStream(out_events::Sender), EventStream(out_events::Sender),
Request {
target: PeerId,
protocol: Cow<'static, str>,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
RegisterNotifProtocol { RegisterNotifProtocol {
engine_id: ConsensusEngineId, engine_id: ConsensusEngineId,
protocol_name: Cow<'static, [u8]>, protocol_name: Cow<'static, str>,
}, },
DisconnectPeer(PeerId), DisconnectPeer(PeerId),
UpdateChain, UpdateChain,
...@@ -1132,6 +1192,13 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> { ...@@ -1132,6 +1192,13 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
metrics: Option<Metrics>, metrics: Option<Metrics>,
/// The `PeerId`'s of all boot nodes. /// The `PeerId`'s of all boot nodes.
boot_node_ids: Arc<HashSet<PeerId>>, boot_node_ids: Arc<HashSet<PeerId>>,
/// Requests started using [`NetworkService::request`]. Includes the channel to send back the
/// response, when the request has started, and the name of the protocol for diagnostic
/// purposes.
pending_requests: HashMap<
behaviour::RequestId,
(oneshot::Sender<Result<Vec<u8>, RequestFailure>>, Instant, String)
>,
/// For each peer and protocol combination, an object that allows sending notifications to /// For each peer and protocol combination, an object that allows sending notifications to
/// that peer. Shared with the [`NetworkService`]. /// that peer. Shared with the [`NetworkService`].
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ConsensusEngineId), NotificationsSink>>>, peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ConsensusEngineId), NotificationsSink>>>,
...@@ -1165,8 +1232,10 @@ struct Metrics { ...@@ -1165,8 +1232,10 @@ struct Metrics {
peerset_num_requested: Gauge<U64>, peerset_num_requested: Gauge<U64>,
pending_connections: Gauge<U64>, pending_connections: Gauge<U64>,
pending_connections_errors_total: CounterVec<U64>, pending_connections_errors_total: CounterVec<U64>,
requests_in_total: HistogramVec, requests_in_failure_total: CounterVec<U64>,
requests_out_finished: HistogramVec, requests_in_success_total: HistogramVec,
requests_out_failure_total: CounterVec<U64>,
requests_out_success_total: HistogramVec,
requests_out_started_total: CounterVec<U64>, requests_out_started_total: CounterVec<U64>,
} }
...@@ -1347,10 +1416,17 @@ impl Metrics { ...@@ -1347,10 +1416,17 @@ impl Metrics {
), ),
&["reason"] &["reason"]
)?, registry)?, )?, registry)?,
requests_in_total: register(HistogramVec::new( requests_in_failure_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_requests_in_failure_total",
"Total number of incoming requests that the node has failed to answer"
),
&["protocol", "reason"]
)?, registry)?,
requests_in_success_total: register(HistogramVec::new(
HistogramOpts { HistogramOpts {
common_opts: Opts::new( common_opts: Opts::new(
"sub_libp2p_requests_in_total", "sub_libp2p_requests_in_success_total",
"Total number of requests received and answered" "Total number of requests received and answered"
), ),
buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16) buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16)
...@@ -1358,11 +1434,18 @@ impl Metrics { ...@@ -1358,11 +1434,18 @@ impl Metrics {
}, },
&["protocol"] &["protocol"]
)?, registry)?, )?, registry)?,
requests_out_finished: register(HistogramVec::new( requests_out_failure_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_requests_out_failure_total",
"Total number of requests that have failed"
),
&["protocol", "reason"]
)?, registry)?,
requests_out_success_total: register(HistogramVec::new(
HistogramOpts { HistogramOpts {
common_opts: Opts::new( common_opts: Opts::new(
"sub_libp2p_requests_out_finished", "sub_libp2p_requests_out_success_total",
"Time between a request's start and finish (successful or not)" "For successful requests, time between a request's start and finish"
), ),
buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16) buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16)
.expect("parameters are always valid values; qed"), .expect("parameters are always valid values; qed"),
...@@ -1446,6 +1529,31 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> { ...@@ -1446,6 +1529,31 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number), this.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number),
ServiceToWorkerMsg::EventStream(sender) => ServiceToWorkerMsg::EventStream(sender) =>
this.event_streams.push(sender), this.event_streams.push(sender),
ServiceToWorkerMsg::Request { target, protocol, request, pending_response } => {
// Calling `send_request` can fail immediately in some circumstances.
// This is handled by sending back an error on the channel.
match this.network_service.send_request(&target, &protocol, request) {
Ok(request_id) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.requests_out_started_total
.with_label_values(&[&protocol])
.inc();
}
this.pending_requests.insert(
request_id,
(pending_response, Instant::now(), protocol.to_string())
);
},
Err(behaviour::SendRequestError::NotConnected) => {
let err = RequestFailure::Network(OutboundFailure::ConnectionClosed);
let _ = pending_response.send(Err(err));
},
Err(behaviour::SendRequestError::UnknownProtocol) => {
let err = RequestFailure::Network(OutboundFailure::UnsupportedProtocols);
let _ = pending_response.send(Err(err));
},
}
},
ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => { ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => {
this.network_service this.network_service
.register_notifications_protocol(engine_id, protocol_name); .register_notifications_protocol(engine_id, protocol_name);
...@@ -1494,23 +1602,72 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> { ...@@ -1494,23 +1602,72 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
} }
this.import_queue.import_finality_proof(origin, hash, nb, proof); this.import_queue.import_finality_proof(origin, hash, nb, proof);
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::AnsweredRequest { protocol, build_time, .. })) => { Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. })) => {
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = this.metrics.as_ref() {
metrics.requests_in_total match result {
.with_label_values(&[&protocol]) Ok(serve_time) => {
.observe(build_time.as_secs_f64()); metrics.requests_in_success_total
.with_label_values(&[&protocol])
.observe(serve_time.as_secs_f64());
}
Err(err) => {
let reason = match err {
ResponseFailure::Busy => "busy",
ResponseFailure::Network(InboundFailure::Timeout) => "timeout",
ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
"unsupported",
};
metrics.requests_in_failure_total
.with_label_values(&[&protocol, reason])
.inc();
}
}
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { request_id, result })) => {
if let Some((send_back, started, protocol)) = this.pending_requests.remove(&request_id) {
if let Some(metrics) = this.metrics.as_ref() {
match &result {
Ok(_) => {
metrics.requests_out_success_total
.with_label_values(&[&protocol])
.observe(started.elapsed().as_secs_f64());
}
Err(err) => {
let reason = match err {
RequestFailure::Refused => "refused",
RequestFailure::Network(OutboundFailure::DialFailure) =>
"dial-failure",
RequestFailure::Network(OutboundFailure::Timeout) =>
"timeout",
RequestFailure::Network(OutboundFailure::ConnectionClosed) =>
"connection-closed",
RequestFailure::Network(OutboundFailure::UnsupportedProtocols) =>
"unsupported",
};
metrics.requests_out_failure_total
.with_label_values(&[&protocol, reason])
.inc();
}
}
}
let _ = send_back.send(result);
} else {
error!("Request not in pending_requests");
} }
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestStarted { protocol, .. })) => { Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::OpaqueRequestStarted { protocol, .. })) => {
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = this.metrics.as_ref() {
metrics.requests_out_started_total metrics.requests_out_started_total
.with_label_values(&[&protocol]) .with_label_values(&[&protocol])
.inc(); .inc();
} }
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { protocol, request_duration, .. })) => { Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::OpaqueRequestFinished { protocol, request_duration, .. })) => {
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = this.metrics.as_ref() {
metrics.requests_out_finished metrics.requests_out_success_total
.with_label_values(&[&protocol]) .with_label_values(&[&protocol])
.observe(request_duration.as_secs_f64()); .observe(request_duration.as_secs_f64());
} }
...@@ -1635,14 +1792,14 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> { ...@@ -1635,14 +1792,14 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
let reason = match cause { let reason = match cause {
Some(ConnectionError::IO(_)) => "transport-error", Some(ConnectionError::IO(_)) => "transport-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::B( EitherError::A(EitherError::A(EitherError::A(EitherError::B(
EitherError::A(PingFailure::Timeout))))))))) => "ping-timeout", EitherError::A(PingFailure::Timeout)))))))))) => "ping-timeout",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::A( EitherError::A(EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::Legacy(LegacyConnectionKillError))))))))) => "force-closed", NotifsHandlerError::Legacy(LegacyConnectionKillError)))))))))) => "force-closed",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::A( EitherError::A(EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::SyncNotificationsClogged)))))))) => "sync-notifications-clogged", NotifsHandlerError::SyncNotificationsClogged))))))))) => "sync-notifications-clogged",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(_))) => "protocol-error", Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(_))) => "protocol-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout)) => "keep-alive-timeout", Some(ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout)) => "keep-alive-timeout",
None => "actively-closed", None => "actively-closed",
...@@ -1800,7 +1957,7 @@ impl<B: BlockT + 'static, H: ExHashT> Unpin for NetworkWorker<B, H> { ...@@ -1800,7 +1957,7 @@ impl<B: BlockT + 'static, H: ExHashT> Unpin for NetworkWorker<B, H> {
/// Turns bytes that are potentially UTF-8 into a reasonable representable string. /// Turns bytes that are potentially UTF-8 into a reasonable representable string.
/// ///
/// Meant to be used only for debugging or metrics-reporting purposes. /// Meant to be used only for debugging or metrics-reporting purposes.
fn maybe_utf8_bytes_to_string(id: &[u8]) -> Cow<str> { pub(crate) fn maybe_utf8_bytes_to_string(id: &[u8]) -> Cow<str> {
if let Ok(s) = std::str::from_utf8(&id[..]) { if let Ok(s) = std::str::from_utf8(&id[..]) {
Cow::Borrowed(s) Cow::Borrowed(s)
} else { } else {
......
...@@ -131,14 +131,14 @@ fn build_nodes_one_proto() ...@@ -131,14 +131,14 @@ fn build_nodes_one_proto()
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())]; let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];
let (node1, events_stream1) = build_test_full_node(config::NetworkConfiguration { let (node1, events_stream1) = build_test_full_node(config::NetworkConfiguration {
notifications_protocols: vec![(ENGINE_ID, From::from(&b"/foo"[..]))], notifications_protocols: vec![(ENGINE_ID, From::from("/foo"))],
listen_addresses: vec![listen_addr.clone()], listen_addresses: vec![listen_addr.clone()],
transport: config::TransportConfig::MemoryOnly, transport: config::TransportConfig::MemoryOnly,
.. config::NetworkConfiguration::new_local() .. config::NetworkConfiguration::new_local()
}); });
let (node2, events_stream2) = build_test_full_node(config::NetworkConfiguration { let (node2, events_stream2) = build_test_full_node(config::NetworkConfiguration {
notifications_protocols: vec![(ENGINE_ID, From::from(&b"/foo"[..]))], notifications_protocols: vec![(ENGINE_ID, From::from("/foo"))],
listen_addresses: vec![], listen_addresses: vec![],
reserved_nodes: vec![config::MultiaddrWithPeerId { reserved_nodes: vec![config::MultiaddrWithPeerId {
multiaddr: listen_addr, multiaddr: listen_addr,
...@@ -281,7 +281,7 @@ fn lots_of_incoming_peers_works() { ...@@ -281,7 +281,7 @@ fn lots_of_incoming_peers_works() {
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())]; let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];
let (main_node, _) = build_test_full_node(config::NetworkConfiguration { let (main_node, _) = build_test_full_node(config::NetworkConfiguration {
notifications_protocols: vec![(ENGINE_ID, From::from(&b"/foo"[..]))], notifications_protocols: vec![(ENGINE_ID, From::from("/foo"))],
listen_addresses: vec![listen_addr.clone()], listen_addresses: vec![listen_addr.clone()],
in_peers: u32::max_value(), in_peers: u32::max_value(),
transport: config::TransportConfig::MemoryOnly, transport: config::TransportConfig::MemoryOnly,
...@@ -298,7 +298,7 @@ fn lots_of_incoming_peers_works() { ...@@ -298,7 +298,7 @@ fn lots_of_incoming_peers_works() {
let main_node_peer_id = main_node_peer_id.clone(); let main_node_peer_id = main_node_peer_id.clone();
let (_dialing_node, event_stream) = build_test_full_node(config::NetworkConfiguration { let (_dialing_node, event_stream) = build_test_full_node(config::NetworkConfiguration {
notifications_protocols: vec![(ENGINE_ID, From::from(&b"/foo"[..]))], notifications_protocols: vec![(ENGINE_ID, From::from("/foo"))],
listen_addresses: vec![], listen_addresses: vec![],
reserved_nodes: vec![config::MultiaddrWithPeerId { reserved_nodes: vec![config::MultiaddrWithPeerId {
multiaddr: listen_addr.clone(), multiaddr: listen_addr.clone(),
......
...@@ -18,7 +18,7 @@ tokio = "0.1.22" ...@@ -18,7 +18,7 @@ tokio = "0.1.22"
futures01 = { package = "futures", version = "0.1.29" } futures01 = { package = "futures", version = "0.1.29" }
log = "0.4.8" log = "0.4.8"
env_logger = "0.7.0" env_logger = "0.7.0"
fdlimit = "0.1.4" fdlimit = "0.2.0"
parking_lot = "0.10.0" parking_lot = "0.10.0"
sc-light = { version = "2.0.0-rc6", path = "../../light" } sc-light = { version = "2.0.0-rc6", path = "../../light" }
sp-blockchain = { version = "2.0.0-rc6", path = "../../../primitives/blockchain" } sp-blockchain = { version = "2.0.0-rc6", path = "../../../primitives/blockchain" }
......
...@@ -255,7 +255,7 @@ decl_module! { ...@@ -255,7 +255,7 @@ decl_module! {
/// the equivocation proof and validate the given key ownership proof /// the equivocation proof and validate the given key ownership proof
/// against the extracted offender. If both are valid, the offence will /// against the extracted offender. If both are valid, the offence will
/// be reported. /// be reported.
#[weight = weight::weight_for_report_equivocation::<T>()] #[weight = weight_for::report_equivocation::<T>(key_owner_proof.validator_count())]
fn report_equivocation( fn report_equivocation(
origin, origin,
equivocation_proof: EquivocationProof<T::Header>, equivocation_proof: EquivocationProof<T::Header>,
...@@ -278,7 +278,7 @@ decl_module! { ...@@ -278,7 +278,7 @@ decl_module! {
/// block authors will call it (validated in `ValidateUnsigned`), as such /// block authors will call it (validated in `ValidateUnsigned`), as such
/// if the block author is defined it will be defined as the equivocation /// if the block author is defined it will be defined as the equivocation
/// reporter. /// reporter.
#[weight = weight::weight_for_report_equivocation::<T>()] #[weight = weight_for::report_equivocation::<T>(key_owner_proof.validator_count())]
fn report_equivocation_unsigned( fn report_equivocation_unsigned(
origin, origin,
equivocation_proof: EquivocationProof<T::Header>, equivocation_proof: EquivocationProof<T::Header>,
...@@ -295,24 +295,35 @@ decl_module! { ...@@ -295,24 +295,35 @@ decl_module! {
} }
} }
mod weight { mod weight_for {
use frame_support::{ use frame_support::{
traits::Get, traits::Get,
weights::{constants::WEIGHT_PER_MICROS, Weight}, weights::{
constants::{WEIGHT_PER_MICROS, WEIGHT_PER_NANOS},
Weight,
},
}; };
pub fn weight_for_report_equivocation<T: super::Trait>() -> Weight { pub fn report_equivocation<T: super::Trait>(validator_count: u32) -> Weight {
// we take the validator set count from the membership proof to
// calculate the weight but we set a floor of 100 validators.
let validator_count = validator_count.max(100) as u64;
// worst case we are considering is that the given offender
// is backed by 200 nominators
const MAX_NOMINATORS: u64 = 200;
// checking membership proof // checking membership proof
(35 * WEIGHT_PER_MICROS) (35 * WEIGHT_PER_MICROS)
.saturating_add((175 * WEIGHT_PER_NANOS).saturating_mul(validator_count))
.saturating_add(T::DbWeight::get().reads(5)) .saturating_add(T::DbWeight::get().reads(5))
// check equivocation proof // check equivocation proof
.saturating_add(110 * WEIGHT_PER_MICROS) .saturating_add(110 * WEIGHT_PER_MICROS)
// report offence // report offence
.saturating_add(110 * WEIGHT_PER_MICROS) .saturating_add(110 * WEIGHT_PER_MICROS)
// worst case we are considering is that the given offender .saturating_add(25 * WEIGHT_PER_MICROS * MAX_NOMINATORS)
// is backed by 200 nominators .saturating_add(T::DbWeight::get().reads(14 + 3 * MAX_NOMINATORS))
.saturating_add(T::DbWeight::get().reads(14 + 3 * 200)) .saturating_add(T::DbWeight::get().writes(10 + 3 * MAX_NOMINATORS))
.saturating_add(T::DbWeight::get().writes(10 + 3 * 200))
} }
} }
......