From 0fd5643e84b83d5814b6597caef84c7be71ad8d2 Mon Sep 17 00:00:00 2001
From: Pierre Krieger <pierre.krieger1708@gmail.com>
Date: Fri, 17 Apr 2020 09:11:47 +0200
Subject: [PATCH] Adjustments to Kademlia-related metrics (#5660)

* Turn kbuckets_num_nodes into a GaugeVec

* random_kademlia_queries -> kademlia_random_queries

* kademalia_random_queries_total now a CounterVec

* Add metrics about records store
---
 substrate/client/network/src/behaviour.rs     | 22 ++++--
 substrate/client/network/src/discovery.rs     | 33 +++++++--
 substrate/client/network/src/service.rs       | 73 +++++++++++++++----
 .../client/network/src/service/out_events.rs  |  6 +-
 4 files changed, 103 insertions(+), 31 deletions(-)

diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs
index d8f70f7ae0e..cb9c5521157 100644
--- a/substrate/client/network/src/behaviour.rs
+++ b/substrate/client/network/src/behaviour.rs
@@ -15,7 +15,7 @@
 // along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
 
 use crate::{
-	config::Role,
+	config::{ProtocolId, Role},
 	debug_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
 	Event, ObservedRole, DhtEvent, ExHashT,
 };
@@ -61,7 +61,7 @@ pub enum BehaviourOut<B: BlockT> {
 	JustificationImport(Origin, B::Hash, NumberFor<B>, Justification),
 	FinalityProofImport(Origin, B::Hash, NumberFor<B>, Vec<u8>),
 	/// Started a random Kademlia discovery query.
-	RandomKademliaStarted,
+	RandomKademliaStarted(ProtocolId),
 	Event(Event),
 }
 
@@ -98,10 +98,20 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
 	}
 
 	/// Returns the number of nodes that are in the Kademlia k-buckets.
-	pub fn num_kbuckets_entries(&mut self) -> usize {
+	pub fn num_kbuckets_entries(&mut self) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
 		self.discovery.num_kbuckets_entries()
 	}
 
+	/// Returns the number of records in the Kademlia record stores.
+	pub fn num_kademlia_records(&mut self) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
+		self.discovery.num_kademlia_records()
+	}
+
+	/// Returns the total size in bytes of all the records in the Kademlia record stores.
+	pub fn kademlia_records_total_size(&mut self) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
+		self.discovery.kademlia_records_total_size()
+	}
+
 	/// Borrows `self` and returns a struct giving access to the information about a node.
 	///
 	/// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes
@@ -268,8 +278,10 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut>
 			DiscoveryOut::ValuePutFailed(key) => {
 				self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePutFailed(key))));
 			}
-			DiscoveryOut::RandomKademliaStarted => {
-				self.events.push(BehaviourOut::RandomKademliaStarted);
+			DiscoveryOut::RandomKademliaStarted(protocols) => {
+				for protocol in protocols {
+					self.events.push(BehaviourOut::RandomKademliaStarted(protocol));
+				}
 			}
 		}
 	}
diff --git a/substrate/client/network/src/discovery.rs b/substrate/client/network/src/discovery.rs
index 86a5b84a9e3..fc78e9b3e32 100644
--- a/substrate/client/network/src/discovery.rs
+++ b/substrate/client/network/src/discovery.rs
@@ -55,7 +55,7 @@ use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, Quorum, Record};
 use libp2p::kad::GetClosestPeersError;
 use libp2p::kad::handler::KademliaHandler;
 use libp2p::kad::QueryId;
-use libp2p::kad::record::{self, store::MemoryStore};
+use libp2p::kad::record::{self, store::{MemoryStore, RecordStore}};
 #[cfg(not(target_os = "unknown"))]
 use libp2p::swarm::toggle::Toggle;
 #[cfg(not(target_os = "unknown"))]
@@ -77,7 +77,7 @@ pub struct DiscoveryConfig {
 }
 
 impl DiscoveryConfig {
-	/// Crate a default configuration with the given public key.
+	/// Create a default configuration with the given public key.
 	pub fn new(local_public_key: PublicKey) -> Self {
 		let mut this = DiscoveryConfig {
 			local_peer_id: local_public_key.into_peer_id(),
@@ -276,8 +276,27 @@ impl DiscoveryBehaviour {
 	}
 
 	/// Returns the number of nodes that are in the Kademlia k-buckets.
-	pub fn num_kbuckets_entries(&mut self) -> usize {
-		self.known_peers().count()
+	pub fn num_kbuckets_entries(&mut self) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
+		self.kademlias.iter_mut().map(|(id, kad)| (id, kad.kbuckets_entries().count()))
+	}
+
+	/// Returns the number of records in the Kademlia record stores.
+	pub fn num_kademlia_records(&mut self) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
+		// Note that this code is ok only because we use a `MemoryStore`.
+		self.kademlias.iter_mut().map(|(id, kad)| {
+			let num = kad.store_mut().records().count();
+			(id, num)
+		})
+	}
+
+	/// Returns the total size in bytes of all the records in the Kademlia record stores.
+	pub fn kademlia_records_total_size(&mut self) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
+		// Note that this code is ok only because we use a `MemoryStore`. If the records were
+		// for example stored on disk, this would load every single one of them every single time.
+		self.kademlias.iter_mut().map(|(id, kad)| {
+			let size = kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len());
+			(id, size)
+		})
 	}
 }
 
@@ -307,8 +326,8 @@ pub enum DiscoveryOut {
 	/// Inserting a value into the DHT failed.
 	ValuePutFailed(record::Key),
 
-	/// Started a random Kademlia query.
-	RandomKademliaStarted,
+	/// Started a random Kademlia query for each DHT identified by the given `ProtocolId`s.
+	RandomKademliaStarted(Vec<ProtocolId>),
 }
 
 impl NetworkBehaviour for DiscoveryBehaviour {
@@ -515,7 +534,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
 				Duration::from_secs(60));
 
 			if actually_started {
-				let ev = DiscoveryOut::RandomKademliaStarted;
+				let ev = DiscoveryOut::RandomKademliaStarted(self.kademlias.keys().cloned().collect());
 				return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
 			}
 		}
diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs
index d9e970a61a0..da488d2a873 100644
--- a/substrate/client/network/src/service.rs
+++ b/substrate/client/network/src/service.rs
@@ -880,7 +880,10 @@ struct Metrics {
 	incoming_connections_total: Counter<U64>,
 	is_major_syncing: Gauge<U64>,
 	issued_light_requests: Counter<U64>,
-	kbuckets_num_nodes: Gauge<U64>,
+	kademlia_random_queries_total: CounterVec<U64>,
+	kademlia_records_count: GaugeVec<U64>,
+	kademlia_records_sizes_total: GaugeVec<U64>,
+	kbuckets_num_nodes: GaugeVec<U64>,
 	listeners_local_addresses: Gauge<U64>,
 	listeners_errors_total: Counter<U64>,
 	network_per_sec_bytes: GaugeVec<U64>,
@@ -893,7 +896,6 @@ struct Metrics {
 	peerset_num_requested: Gauge<U64>,
 	pending_connections: Gauge<U64>,
 	pending_connections_errors_total: CounterVec<U64>,
-	random_kademalia_queries_total: Counter<U64>,
 }
 
 impl Metrics {
@@ -945,8 +947,33 @@ impl Metrics {
 				"issued_light_requests",
 				"Number of light client requests that our node has issued.",
 			)?, registry)?,
-			kbuckets_num_nodes: register(Gauge::new(
-				"sub_libp2p_kbuckets_num_nodes", "Number of nodes in the Kademlia k-buckets"
+			kademlia_random_queries_total: register(CounterVec::new(
+				Opts::new(
+					"sub_libp2p_kademlia_random_queries_total",
+					"Number of random Kademlia queries started"
+				),
+				&["protocol"]
+			)?, registry)?,
+			kademlia_records_count: register(GaugeVec::new(
+				Opts::new(
+					"sub_libp2p_kademlia_records_count",
+					"Number of records in the Kademlia records store"
+				),
+				&["protocol"]
+			)?, registry)?,
+			kademlia_records_sizes_total: register(GaugeVec::new(
+				Opts::new(
+					"sub_libp2p_kademlia_records_sizes_total",
+					"Total size of all the records in the Kademlia records store"
+				),
+				&["protocol"]
+			)?, registry)?,
+			kbuckets_num_nodes: register(GaugeVec::new(
+				Opts::new(
+					"sub_libp2p_kbuckets_num_nodes",
+					"Number of nodes in the Kademlia k-buckets"
+				),
+				&["protocol"]
 			)?, registry)?,
 			listeners_local_addresses: register(Gauge::new(
 				"sub_libp2p_listeners_local_addresses", "Number of local addresses we're listening on"
@@ -1017,24 +1044,23 @@ impl Metrics {
 				),
 				&["reason"]
 			)?, registry)?,
-			random_kademalia_queries_total: register(Counter::new(
-				"sub_libp2p_random_kademalia_queries_total", "Number of random Kademlia queries started",
-			)?, registry)?,
 		})
 	}
 
 	fn update_with_network_event(&self, event: &Event) {
 		match event {
 			Event::NotificationStreamOpened { engine_id, .. } => {
-				self.notifications_streams_opened_total.with_label_values(&[&engine_id_to_string(&engine_id)]).inc();
+				self.notifications_streams_opened_total
+					.with_label_values(&[&maybe_utf8_bytes_to_string(engine_id)]).inc();
 			},
 			Event::NotificationStreamClosed { engine_id, .. } => {
-				self.notifications_streams_closed_total.with_label_values(&[&engine_id_to_string(&engine_id)]).inc();
+				self.notifications_streams_closed_total
+					.with_label_values(&[&maybe_utf8_bytes_to_string(engine_id)]).inc();
 			},
 			Event::NotificationsReceived { messages, .. } => {
 				for (engine_id, message) in messages {
 					self.notifications_sizes
-						.with_label_values(&["in", &engine_id_to_string(&engine_id)])
+						.with_label_values(&["in", &maybe_utf8_bytes_to_string(engine_id)])
 						.observe(message.len() as f64);
 				}
 			},
@@ -1097,7 +1123,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
 				ServiceToWorkerMsg::WriteNotification { message, engine_id, target } => {
 					if let Some(metrics) = this.metrics.as_ref() {
 						metrics.notifications_sizes
-							.with_label_values(&["out", &engine_id_to_string(&engine_id)])
+							.with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)])
 							.observe(message.len() as f64);
 					}
 					this.network_service.user_protocol_mut().write_notification(target, engine_id, message)
@@ -1137,9 +1163,11 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
 					}
 					this.import_queue.import_finality_proof(origin, hash, nb, proof);
 				},
-				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted)) => {
+				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted(protocol))) => {
 					if let Some(metrics) = this.metrics.as_ref() {
-						metrics.random_kademalia_queries_total.inc();
+						metrics.kademlia_random_queries_total
+							.with_label_values(&[&maybe_utf8_bytes_to_string(protocol.as_bytes())])
+							.inc();
 					}
 				},
 				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Event(ev))) => {
@@ -1292,7 +1320,18 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
 			metrics.network_per_sec_bytes.with_label_values(&["in"]).set(this.service.bandwidth.average_download_per_sec());
 			metrics.network_per_sec_bytes.with_label_values(&["out"]).set(this.service.bandwidth.average_upload_per_sec());
 			metrics.is_major_syncing.set(is_major_syncing as u64);
-			metrics.kbuckets_num_nodes.set(this.network_service.num_kbuckets_entries() as u64);
+			for (proto, num_entries) in this.network_service.num_kbuckets_entries() {
+				let proto = maybe_utf8_bytes_to_string(proto.as_bytes());
+				metrics.kbuckets_num_nodes.with_label_values(&[&proto]).set(num_entries as u64);
+			}
+			for (proto, num_entries) in this.network_service.num_kademlia_records() {
+				let proto = maybe_utf8_bytes_to_string(proto.as_bytes());
+				metrics.kademlia_records_count.with_label_values(&[&proto]).set(num_entries as u64);
+			}
+			for (proto, num_entries) in this.network_service.kademlia_records_total_size() {
+				let proto = maybe_utf8_bytes_to_string(proto.as_bytes());
+				metrics.kademlia_records_sizes_total.with_label_values(&[&proto]).set(num_entries as u64);
+			}
 			metrics.peers_count.set(num_connected_peers as u64);
 			metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64);
 			metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64);
@@ -1306,8 +1345,10 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
 impl<B: BlockT + 'static, H: ExHashT> Unpin for NetworkWorker<B, H> {
 }
 
-/// Turns a `ConsensusEngineId` into a representable string.
-fn engine_id_to_string(id: &ConsensusEngineId) -> Cow<str> {
+/// Turns bytes that are potentially UTF-8 into a reasonable representable string.
+///
+/// Meant to be used only for debugging or metrics-reporting purposes.
+fn maybe_utf8_bytes_to_string(id: &[u8]) -> Cow<str> {
 	if let Ok(s) = std::str::from_utf8(&id[..]) {
 		Cow::Borrowed(s)
 	} else {
diff --git a/substrate/client/network/src/service/out_events.rs b/substrate/client/network/src/service/out_events.rs
index cda53246de8..8f9c138095f 100644
--- a/substrate/client/network/src/service/out_events.rs
+++ b/substrate/client/network/src/service/out_events.rs
@@ -31,7 +31,7 @@
 //!
 
 use crate::Event;
-use super::engine_id_to_string;
+use super::maybe_utf8_bytes_to_string;
 
 use futures::{prelude::*, channel::mpsc, ready};
 use parking_lot::Mutex;
@@ -240,7 +240,7 @@ impl Metrics {
 						.with_label_values(&[&format!("notif-{:?}", engine_id), "sent", name])
 						.inc_by(num);
 					self.notifications_sizes
-						.with_label_values(&[&engine_id_to_string(engine_id), "sent", name])
+						.with_label_values(&[&maybe_utf8_bytes_to_string(engine_id), "sent", name])
 						.inc_by(num.saturating_mul(u64::try_from(message.len()).unwrap_or(u64::max_value())));
 				}
 			},
@@ -270,7 +270,7 @@ impl Metrics {
 						.with_label_values(&[&format!("notif-{:?}", engine_id), "received", name])
 						.inc();
 					self.notifications_sizes
-						.with_label_values(&[&engine_id_to_string(engine_id), "received", name])
+						.with_label_values(&[&maybe_utf8_bytes_to_string(engine_id), "received", name])
 						.inc_by(u64::try_from(message.len()).unwrap_or(u64::max_value()));
 				}
 			},
-- 
GitLab