Skip to content
Snippets Groups Projects
Commit c21d7436 authored by Pierre Krieger's avatar Pierre Krieger Committed by Gav Wood
Browse files

Add an RPC request for the state of the network (#1884)

* Add an RPC request for the state of the network

* Fix concerns

* Fix tests

* Replace comment with TODO

* Rename the RPC
parent 72b9df82
No related merge requests found
......@@ -98,6 +98,11 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
self.custom_protocols.cleanup();
}
/// Returns the list of reserved nodes.
pub fn reserved_peers(&self) -> impl Iterator<Item = &PeerId> {
self.custom_protocols.reserved_peers()
}
/// Try to add a reserved peer.
pub fn add_reserved_peer(&mut self, peer_id: PeerId, addr: Multiaddr) {
self.custom_protocols.add_reserved_peer(peer_id, addr)
......@@ -111,6 +116,11 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
self.custom_protocols.remove_reserved_peer(peer_id)
}
/// Returns true if we only accept reserved nodes.
pub fn is_reserved_only(&self) -> bool {
self.custom_protocols.is_reserved_only()
}
/// Start accepting all peers again if we weren't.
pub fn accept_unreserved_peers(&mut self) {
self.custom_protocols.accept_unreserved_peers()
......@@ -129,6 +139,21 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
self.custom_protocols.ban_peer(peer_id)
}
/// Returns a list of all the peers that are banned, and until when.
pub fn banned_nodes(&self) -> impl Iterator<Item = (&PeerId, Instant)> {
self.custom_protocols.banned_peers()
}
/// Returns true if we try to open protocols with the given peer.
pub fn is_enabled(&self, peer_id: &PeerId) -> bool {
self.custom_protocols.is_enabled(peer_id)
}
/// Returns the list of protocols we have open with the given peer.
pub fn open_protocols<'a>(&'a self, peer_id: &'a PeerId) -> impl Iterator<Item = ProtocolId> + 'a {
self.custom_protocols.open_protocols(peer_id)
}
/// Disconnects the custom protocols from a peer.
///
/// The peer will still be able to use Kademlia or other protocols, but will get disconnected
......@@ -142,6 +167,16 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
pub fn drop_node(&mut self, peer_id: &PeerId) {
self.custom_protocols.disconnect_peer(peer_id)
}
/// Returns the list of peers in the topology.
pub fn known_peers(&self) -> impl Iterator<Item = &PeerId> {
self.custom_protocols.known_peers()
}
/// Returns a list of addresses known for this peer, and their reputation score.
pub fn known_addresses(&mut self, peer_id: &PeerId) -> impl Iterator<Item = (&Multiaddr, u32)> {
self.custom_protocols.known_addresses(peer_id)
}
}
/// Event that can be emitted by the behaviour.
......@@ -196,6 +231,14 @@ pub enum BehaviourOut<TMessage> {
/// Information about the peer.
info: IdentifyInfo,
},
/// We have successfully pinged a peer.
PingSuccess {
/// Id of the peer that has been pinged.
peer_id: PeerId,
/// Time it took for the ping to come back.
ping_time: Duration,
},
}
impl<TMessage> From<CustomProtosOut<TMessage>> for BehaviourOut<TMessage> {
......@@ -290,6 +333,7 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour
match event {
PingEvent::PingSuccess { peer, time } => {
trace!(target: "sub-libp2p", "Ping time with {:?}: {:?}", peer, time);
self.events.push(BehaviourOut::PingSuccess { peer_id: peer, ping_time: time });
}
}
}
......
......@@ -177,6 +177,11 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
}
}
/// Returns the list of reserved nodes.
pub fn reserved_peers(&self) -> impl Iterator<Item = &PeerId> {
self.reserved_peers.iter()
}
/// Adds a reserved peer.
pub fn add_reserved_peer(&mut self, peer_id: PeerId, addr: Multiaddr) {
self.topology.add_bootstrap_addr(&peer_id, addr);
......@@ -194,6 +199,11 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
self.reserved_peers.remove(&peer_id);
}
/// Returns true if we only accept reserved nodes.
pub fn is_reserved_only(&self) -> bool {
self.reserved_only
}
/// Start accepting all peers again if we weren't.
pub fn accept_unreserved_peers(&mut self) {
if !self.reserved_only {
......@@ -258,6 +268,24 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
}
}
/// Returns a list of all the peers that are banned, and until when.
pub fn banned_peers(&self) -> impl Iterator<Item = (&PeerId, Instant)> {
self.banned_peers.iter().map(|&(ref id, until)| (id, until))
}
/// Returns true if we try to open protocols with the given peer.
pub fn is_enabled(&self, peer_id: &PeerId) -> bool {
self.enabled_peers.contains_key(peer_id)
}
/// Returns the list of protocols we have open with the given peer.
pub fn open_protocols<'a>(&'a self, peer_id: &'a PeerId) -> impl Iterator<Item = ProtocolId> + 'a {
self.open_protocols
.iter()
.filter(move |(p, _)| p == peer_id)
.map(|(_, proto)| *proto)
}
/// Sends a message to a peer using the given custom protocol.
///
/// Has no effect if the custom protocol is not open with the given peer.
......@@ -303,6 +331,16 @@ impl<TMessage, TSubstream> CustomProtos<TMessage, TSubstream> {
self.topology.cleanup();
}
/// Returns the list of peers in the topology.
pub fn known_peers(&self) -> impl Iterator<Item = &PeerId> {
self.topology.known_peers()
}
/// Returns a list of addresses known for this peer, and their reputation score.
pub fn known_addresses(&mut self, peer_id: &PeerId) -> impl Iterator<Item = (&Multiaddr, u32)> {
self.topology.addresses_of_peer(peer_id, true)
}
/// Updates the attempted connections to nodes.
///
/// Also updates `next_connect_to_nodes` with the earliest known moment when we need to
......@@ -381,7 +419,7 @@ where
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
self.topology.addresses_of_peer(peer_id)
self.topology.addresses_of_peer(peer_id, false).map(|(a, _)| a.clone()).collect()
}
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
......
......@@ -307,21 +307,25 @@ impl NetTopology {
anything_changed
}
/// Returns the addresses stored for a specific peer.
/// Returns the list of peers that are stored in the topology.
#[inline]
pub fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
let peer = if let Some(peer) = self.store.get_mut(peer) {
peer
} else {
return Vec::new()
};
pub fn known_peers(&self) -> impl Iterator<Item = &PeerId> {
self.store.keys()
}
/// Returns the addresses stored for a specific peer, and their reputation score.
///
/// If `include_expired` is true, includes expired addresses that shouldn't be taken into
/// account when dialing.
#[inline]
pub fn addresses_of_peer(&mut self, peer: &PeerId, include_expired: bool)
-> impl Iterator<Item = (&Multiaddr, u32)> {
let now_st = SystemTime::now();
let now_is = Instant::now();
let mut list = peer.addrs.iter_mut().filter_map(move |addr| {
let mut list = self.store.get_mut(peer).into_iter().flat_map(|p| p.addrs.iter_mut()).filter_map(move |addr| {
let (score, connected) = addr.score_and_is_connected();
if (addr.expires >= now_st && score > 0 && addr.back_off_until < now_is) || connected {
if include_expired || (addr.expires >= now_st && score > 0 && addr.back_off_until < now_is) || connected {
Some((score, &addr.addr))
} else {
None
......@@ -329,7 +333,7 @@ impl NetTopology {
}).collect::<Vec<_>>();
list.sort_by(|a, b| a.0.cmp(&b.0));
// TODO: meh, optimize
list.into_iter().map(|(_, addr)| addr.clone()).collect::<Vec<_>>()
list.into_iter().map(|(score, addr)| (addr, score))
}
/// Marks the given peer as connected through the given endpoint.
......
......@@ -32,13 +32,9 @@ pub use crate::traits::{NetworkConfiguration, NodeIndex, NodeId, NonReservedPeer
pub use crate::traits::{ProtocolId, Secret, Severity};
pub use libp2p::{Multiaddr, multiaddr::Protocol, build_multiaddr, PeerId, core::PublicKey};
/// Check if node url is valid
pub fn validate_node_url(url: &str) -> Result<(), Error> {
match url.parse::<Multiaddr>() {
Ok(_) => Ok(()),
Err(_) => Err(ErrorKind::InvalidNodeId.into()),
}
}
use libp2p::core::nodes::ConnectedPoint;
use serde_derive::Serialize;
use std::{collections::{HashMap, HashSet}, time::Duration};
/// Parses a string address and returns the component, if valid.
pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), Error> {
......@@ -50,3 +46,85 @@ pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), Error> {
};
Ok((who, addr))
}
/// Returns general information about the networking.
///
/// Meant for general diagnostic purposes.
///
/// **Warning**: This API is not stable.
#[derive(Debug, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct NetworkState {
/// PeerId of the local node.
pub peer_id: String,
/// List of addresses the node is currently listening on.
pub listened_addresses: HashSet<Multiaddr>,
// TODO (https://github.com/libp2p/rust-libp2p/issues/978): external_addresses: Vec<Multiaddr>,
/// If true, we only accept reserved peers.
pub is_reserved_only: bool,
/// PeerIds of the nodes that are marked as reserved.
pub reserved_peers: HashSet<String>,
/// PeerIds of the nodes that are banned, and how long in the seconds the ban remains.
pub banned_peers: HashMap<String, u64>,
/// List of node we're connected to.
pub connected_peers: HashMap<String, NetworkStatePeer>,
/// List of node that we know of but that we're not connected to.
pub not_connected_peers: HashMap<String, NetworkStateNotConnectedPeer>,
/// Downloaded bytes per second averaged over the past few seconds.
pub average_download_per_sec: u64,
/// Uploaded bytes per second averaged over the past few seconds.
pub average_upload_per_sec: u64,
}
#[derive(Debug, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct NetworkStatePeer {
/// How we are connected to the node.
pub endpoint: NetworkStatePeerEndpoint,
/// Node information, as provided by the node itself. Can be empty if not known yet.
pub version_string: Option<String>,
/// Latest ping duration with this node.
pub latest_ping_time: Option<Duration>,
/// If true, the peer is "enabled", which means that we try to open Substrate-related protocols
/// with this peer. If false, we stick to Kademlia and/or other network-only protocols.
pub enabled: bool,
/// List of protocols that we have open with the given peer.
pub open_protocols: HashSet<ProtocolId>,
/// List of addresses known for this node, with its reputation score.
pub known_addresses: HashMap<Multiaddr, u32>,
}
#[derive(Debug, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct NetworkStateNotConnectedPeer {
/// List of addresses known for this node, with its reputation score.
pub known_addresses: HashMap<Multiaddr, u32>,
}
#[derive(Debug, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum NetworkStatePeerEndpoint {
/// We are dialing the given address.
Dialing(Multiaddr),
/// We are listening.
Listening {
/// Address we're listening on that received the connection.
listen_addr: Multiaddr,
/// Address data is sent back to.
send_back_addr: Multiaddr,
},
}
impl From<ConnectedPoint> for NetworkStatePeerEndpoint {
fn from(endpoint: ConnectedPoint) -> Self {
match endpoint {
ConnectedPoint::Dialer { ref address } =>
NetworkStatePeerEndpoint::Dialing(address.clone()),
ConnectedPoint::Listener { ref listen_addr, ref send_back_addr } =>
NetworkStatePeerEndpoint::Listening {
listen_addr: listen_addr.clone(),
send_back_addr: send_back_addr.clone()
}
}
}
}
......@@ -16,7 +16,7 @@
use crate::{
behaviour::Behaviour, behaviour::BehaviourOut, secret::obtain_private_key_from_config,
transport
transport, NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer
};
use crate::custom_proto::{CustomMessage, RegisteredProtocol, RegisteredProtocols};
use crate::{Error, NetworkConfiguration, NodeIndex, ProtocolId, parse_str_addr};
......@@ -32,7 +32,7 @@ use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio_timer::Interval;
/// Starts the substrate libp2p service.
......@@ -223,10 +223,63 @@ struct NodeInfo {
endpoint: ConnectedPoint,
/// Version reported by the remote, or `None` if unknown.
client_version: Option<String>,
/// Latest ping time with this node.
latest_ping: Option<Duration>,
}
impl<TMessage> Service<TMessage>
where TMessage: CustomMessage + Send + 'static {
/// Returns a struct containing tons of useful information about the network.
pub fn state(&mut self) -> NetworkState {
let now = Instant::now();
let connected_peers = {
let swarm = &mut self.swarm;
self.nodes_info.values().map(move |info| {
let known_addresses = swarm.known_addresses(&info.peer_id)
.map(|(a, s)| (a.clone(), s)).collect();
(info.peer_id.to_base58(), NetworkStatePeer {
endpoint: info.endpoint.clone().into(),
version_string: info.client_version.clone(),
latest_ping_time: info.latest_ping,
enabled: swarm.is_enabled(&info.peer_id),
open_protocols: swarm.open_protocols(&info.peer_id).collect(),
known_addresses,
})
}).collect()
};
let not_connected_peers = {
let swarm = &mut self.swarm;
let index_by_id = &self.index_by_id;
let list = swarm.known_peers().filter(|p| !index_by_id.contains_key(p))
.cloned().collect::<Vec<_>>();
list.into_iter().map(move |peer_id| {
let known_addresses = swarm.known_addresses(&peer_id)
.map(|(a, s)| (a.clone(), s)).collect();
(peer_id.to_base58(), NetworkStateNotConnectedPeer {
known_addresses,
})
}).collect()
};
NetworkState {
peer_id: Swarm::local_peer_id(&self.swarm).to_base58(),
listened_addresses: Swarm::listeners(&self.swarm).cloned().collect(),
reserved_peers: self.swarm.reserved_peers().map(|p| p.to_base58()).collect(),
banned_peers: self.swarm.banned_nodes().map(|(p, until)| {
let dur = if until > now { until - now } else { Duration::new(0, 0) };
(p.to_base58(), dur.as_secs())
}).collect(),
is_reserved_only: self.swarm.is_reserved_only(),
average_download_per_sec: self.bandwidth.average_download_per_sec(),
average_upload_per_sec: self.bandwidth.average_upload_per_sec(),
connected_peers,
not_connected_peers,
}
}
/// Returns an iterator that produces the list of addresses we're listening on.
#[inline]
pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
......@@ -360,6 +413,7 @@ where TMessage: CustomMessage + Send + 'static {
peer_id: entry.key().clone(),
endpoint,
client_version: None,
latest_ping: None,
});
id
},
......@@ -370,6 +424,7 @@ where TMessage: CustomMessage + Send + 'static {
peer_id: entry.key().clone(),
endpoint,
client_version: None,
latest_ping: None,
});
entry.insert(id);
id
......@@ -427,6 +482,16 @@ where TMessage: CustomMessage + Send + 'static {
.client_version = Some(info.agent_version);
}
}
Ok(Async::Ready(Some(BehaviourOut::PingSuccess { peer_id, ping_time }))) => {
// Contrary to the other events, this one can happen even on nodes which don't
// have any open custom protocol slot. Therefore it is not necessarily in the
// list.
if let Some(id) = self.index_by_id.get(&peer_id) {
self.nodes_info.get_mut(id)
.expect("index_by_id and nodes_info are always kept in sync; QED")
.latest_ping = Some(ping_time);
}
}
Ok(Async::NotReady) => break Ok(Async::NotReady),
Ok(Async::Ready(None)) => unreachable!("The Swarm stream never ends"),
Err(_) => unreachable!("The Swarm never errors"),
......
......@@ -43,6 +43,7 @@ pub use protocol::{ProtocolStatus, PeerInfo, Context};
pub use sync::{Status as SyncStatus, SyncState};
pub use network_libp2p::{
NodeIndex, ProtocolId, Severity, Protocol, Multiaddr,
NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer, NetworkStatePeerEndpoint,
obtain_private_key, build_multiaddr, PeerId, PublicKey
};
pub use message::{generic as generic_message, RequestId, Status as StatusMessage, ConsensusEngineId};
......
......@@ -23,7 +23,7 @@ use futures::{Async, Future, Stream, stream, sync::oneshot};
use parking_lot::{Mutex, RwLock};
use network_libp2p::{ProtocolId, NetworkConfiguration, NodeIndex, ErrorKind, Severity};
use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent};
use network_libp2p::{Protocol as Libp2pProtocol, RegisteredProtocol};
use network_libp2p::{Protocol as Libp2pProtocol, RegisteredProtocol, NetworkState};
use consensus::import_queue::{ImportQueue, Link};
use crate::consensus_gossip::ConsensusGossip;
use crate::message::{Message, ConsensusEngineId};
......@@ -46,6 +46,8 @@ pub type FetchFuture = oneshot::Receiver<Vec<u8>>;
pub trait SyncProvider<B: BlockT>: Send + Sync {
/// Get sync status
fn status(&self) -> ProtocolStatus<B>;
/// Get network state.
fn network_state(&self) -> NetworkState;
/// Get currently connected peers
fn peers(&self) -> Vec<(NodeIndex, PeerInfo<B>)>;
}
......@@ -290,6 +292,10 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> SyncProvider<B> for Servi
2 Service keeps a sender to protocol, and the ProtocolMsg::Stop is never sent.")
}
fn network_state(&self) -> NetworkState {
self.network.lock().state()
}
fn peers(&self) -> Vec<(NodeIndex, PeerInfo<B>)> {
let peers = (*self.peers.read()).clone();
peers.into_iter().map(|(idx, connected)| (idx, connected.peer_info)).collect()
......
......@@ -60,6 +60,13 @@ pub trait SystemApi<Hash, Number> {
/// Returns currently connected peers
#[rpc(name = "system_peers")]
fn system_peers(&self) -> Result<Vec<PeerInfo<Hash, Number>>>;
/// Returns current state of the network.
///
/// **Warning**: This API is not stable.
// TODO: make this stable and move structs https://github.com/paritytech/substrate/issues/1890
#[rpc(name = "system_networkState")]
fn system_network_state(&self) -> Result<network::NetworkState>;
}
/// System API implementation
......@@ -120,4 +127,8 @@ impl<B: traits::Block> SystemApi<B::Hash, <B::Header as HeaderT>::Number> for Sy
best_number: p.best_number,
}).collect())
}
fn system_network_state(&self) -> Result<network::NetworkState> {
Ok(self.sync.network_state())
}
}
......@@ -52,6 +52,20 @@ impl network::SyncProvider<Block> for Status {
}
}
fn network_state(&self) -> network::NetworkState {
network::NetworkState {
peer_id: String::new(),
listened_addresses: Default::default(),
is_reserved_only: false,
reserved_peers: Default::default(),
banned_peers: Default::default(),
connected_peers: Default::default(),
not_connected_peers: Default::default(),
average_download_per_sec: 0,
average_upload_per_sec: 0,
}
}
fn peers(&self) -> Vec<(NodeIndex, NetworkPeerInfo<Block>)> {
vec![(1, NetworkPeerInfo {
peer_id: self.peer_id.clone(),
......@@ -181,3 +195,21 @@ fn system_peers() {
}]
);
}
#[test]
fn system_network_state() {
assert_eq!(
api(None).system_network_state().unwrap(),
network::NetworkState {
peer_id: String::new(),
listened_addresses: Default::default(),
is_reserved_only: false,
reserved_peers: Default::default(),
banned_peers: Default::default(),
connected_peers: Default::default(),
not_connected_peers: Default::default(),
average_download_per_sec: 0,
average_upload_per_sec: 0,
}
);
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment