From 239d0998ea2d1cc874b69454a04617d3d9d7d8a5 Mon Sep 17 00:00:00 2001
From: Pierre Krieger <pierre.krieger1708@gmail.com>
Date: Thu, 16 Apr 2020 15:18:35 +0200
Subject: [PATCH] Several tweaks to networking Prometheus metrics (#5636)

---
 substrate/client/network/src/service.rs       | 86 +++++++++++--------
 .../client/network/src/service/out_events.rs  | 20 +++--
 2 files changed, 64 insertions(+), 42 deletions(-)

diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs
index 642f67d14aa..89ddf6fafd4 100644
--- a/substrate/client/network/src/service.rs
+++ b/substrate/client/network/src/service.rs
@@ -859,12 +859,12 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
 
 struct Metrics {
 	// This list is ordered alphabetically
-	connections: GaugeVec<U64>,
 	connections_closed_total: CounterVec<U64>,
+	connections_opened_total: CounterVec<U64>,
 	import_queue_blocks_submitted: Counter<U64>,
 	import_queue_finality_proofs_submitted: Counter<U64>,
 	import_queue_justifications_submitted: Counter<U64>,
-	incoming_connections_errors_total: Counter<U64>,
+	incoming_connections_errors_total: CounterVec<U64>,
 	incoming_connections_total: Counter<U64>,
 	is_major_syncing: Gauge<U64>,
 	issued_light_requests: Counter<U64>,
@@ -874,7 +874,8 @@ struct Metrics {
 	network_per_sec_bytes: GaugeVec<U64>,
 	notifications_queues_size: HistogramVec,
 	notifications_sizes: HistogramVec,
-	opened_notification_streams: GaugeVec<U64>,
+	notifications_streams_closed_total: CounterVec<U64>,
+	notifications_streams_opened_total: CounterVec<U64>,
 	peers_count: Gauge<U64>,
 	peerset_num_discovered: Gauge<U64>,
 	peerset_num_requested: Gauge<U64>,
@@ -887,19 +888,19 @@ impl Metrics {
 	fn register(registry: &Registry) -> Result<Self, PrometheusError> {
 		Ok(Self {
 			// This list is ordered alphabetically
-			connections: register(GaugeVec::new(
+			connections_closed_total: register(CounterVec::new(
 				Opts::new(
-					"sub_libp2p_connections",
-					"Number of established libp2p connections"
+					"sub_libp2p_connections_closed_total",
+					"Total number of connections closed, by reason and direction"
 				),
-				&["direction"]
+				&["direction", "reason"]
 			)?, registry)?,
-			connections_closed_total: register(CounterVec::new(
+			connections_opened_total: register(CounterVec::new(
 				Opts::new(
-					"sub_libp2p_connections_closed_total",
-					"Total number of connections closed, by reason"
+					"sub_libp2p_connections_opened_total",
+					"Total number of connections opened"
 				),
-				&["reason"]
+				&["direction"]
 			)?, registry)?,
 			import_queue_blocks_submitted: register(Counter::new(
 				"import_queue_blocks_submitted",
@@ -913,9 +914,13 @@ impl Metrics {
 				"import_queue_justifications_submitted",
 				"Number of justifications submitted to the import queue.",
 			)?, registry)?,
-			incoming_connections_errors_total: register(Counter::new(
-				"sub_libp2p_incoming_connections_handshake_errors_total",
-				"Total number of incoming connections that have failed during the initial handshake"
+			incoming_connections_errors_total: register(CounterVec::new(
+				Opts::new(
+					"sub_libp2p_incoming_connections_handshake_errors_total",
+					"Total number of incoming connections that have failed during the \
+					initial handshake"
+				),
+				&["reason"]
 			)?, registry)?,
 			incoming_connections_total: register(Counter::new(
 				"sub_libp2p_incoming_connections_total",
@@ -966,10 +971,17 @@ impl Metrics {
 				},
 				&["direction", "protocol"]
 			)?, registry)?,
-			opened_notification_streams: register(GaugeVec::new(
+			notifications_streams_closed_total: register(CounterVec::new(
 				Opts::new(
-					"sub_libp2p_opened_notification_streams",
-					"Number of open notification substreams"
+					"sub_libp2p_notifications_streams_closed_total",
+					"Total number of notification substreams that have been closed"
+				),
+				&["protocol"]
+			)?, registry)?,
+			notifications_streams_opened_total: register(CounterVec::new(
+				Opts::new(
+					"sub_libp2p_notifications_streams_opened_total",
+					"Total number of notification substreams that have been opened"
 				),
 				&["protocol"]
 			)?, registry)?,
@@ -1002,10 +1014,10 @@ impl Metrics {
 	fn update_with_network_event(&self, event: &Event) {
 		match event {
 			Event::NotificationStreamOpened { engine_id, .. } => {
-				self.opened_notification_streams.with_label_values(&[&engine_id_to_string(&engine_id)]).inc();
+				self.notifications_streams_opened_total.with_label_values(&[&engine_id_to_string(&engine_id)]).inc();
 			},
 			Event::NotificationStreamClosed { engine_id, .. } => {
-				self.opened_notification_streams.with_label_values(&[&engine_id_to_string(&engine_id)]).dec();
+				self.notifications_streams_closed_total.with_label_values(&[&engine_id_to_string(&engine_id)]).inc();
 			},
 			Event::NotificationsReceived { messages, .. } => {
 				for (engine_id, message) in messages {
@@ -1129,34 +1141,33 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
 					if let Some(metrics) = this.metrics.as_ref() {
 						match endpoint {
 							ConnectedPoint::Dialer { .. } =>
-								metrics.connections.with_label_values(&["out"]).inc(),
+								metrics.connections_opened_total.with_label_values(&["out"]).inc(),
 							ConnectedPoint::Listener { .. } =>
-								metrics.connections.with_label_values(&["in"]).inc(),
+								metrics.connections_opened_total.with_label_values(&["in"]).inc(),
 						}
 					}
 				},
 				Poll::Ready(SwarmEvent::ConnectionClosed { peer_id, cause, endpoint, .. }) => {
 					trace!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?})", peer_id, cause);
 					if let Some(metrics) = this.metrics.as_ref() {
-						match endpoint {
-							ConnectedPoint::Dialer { .. } =>
-								metrics.connections.with_label_values(&["out"]).dec(),
-							ConnectedPoint::Listener { .. } =>
-								metrics.connections.with_label_values(&["in"]).dec(),
-						}
+						let dir = match endpoint {
+							ConnectedPoint::Dialer { .. } => "out",
+							ConnectedPoint::Listener { .. } => "in",
+						};
+
 						match cause {
 							ConnectionError::IO(_) =>
-								metrics.connections_closed_total.with_label_values(&["transport-error"]).inc(),
+								metrics.connections_closed_total.with_label_values(&[dir, "transport-error"]).inc(),
 							ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
 								EitherError::A(EitherError::B(EitherError::A(PingFailure::Timeout))))))) =>
-								metrics.connections_closed_total.with_label_values(&["ping-timeout"]).inc(),
+								metrics.connections_closed_total.with_label_values(&[dir, "ping-timeout"]).inc(),
 							ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
 								EitherError::A(EitherError::A(EitherError::B(LegacyConnectionKillError))))))) =>
-								metrics.connections_closed_total.with_label_values(&["force-closed"]).inc(),
+								metrics.connections_closed_total.with_label_values(&[dir, "force-closed"]).inc(),
 							ConnectionError::Handler(NodeHandlerWrapperError::Handler(_)) =>
-								metrics.connections_closed_total.with_label_values(&["protocol-error"]).inc(),
+								metrics.connections_closed_total.with_label_values(&[dir, "protocol-error"]).inc(),
 							ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout) =>
-								metrics.connections_closed_total.with_label_values(&["keep-alive-timeout"]).inc(),
+								metrics.connections_closed_total.with_label_values(&[dir, "keep-alive-timeout"]).inc(),
 						}
 					}
 				},
@@ -1214,14 +1225,21 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
 					trace!(target: "sub-libp2p", "Libp2p => IncomingConnectionError({},{}): {}",
 						local_addr, send_back_addr, error);
 					if let Some(metrics) = this.metrics.as_ref() {
-						metrics.incoming_connections_errors_total.inc();
+						let reason = match error {
+							PendingConnectionError::ConnectionLimit(_) => "limit-reached",
+							PendingConnectionError::InvalidPeerId => "invalid-peer-id",
+							PendingConnectionError::Transport(_) |
+							PendingConnectionError::IO(_) => "transport-error",
+						};
+
+						metrics.incoming_connections_errors_total.with_label_values(&[reason]).inc();
 					}
 				},
 				Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }) => {
 					trace!(target: "sub-libp2p", "Libp2p => BannedPeer({}). Connected via {:?}.",
 						peer_id, endpoint);
 					if let Some(metrics) = this.metrics.as_ref() {
-						metrics.incoming_connections_errors_total.inc();
+						metrics.incoming_connections_errors_total.with_label_values(&["banned"]).inc();
 					}
 				},
 				Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr { address, error }) =>
diff --git a/substrate/client/network/src/service/out_events.rs b/substrate/client/network/src/service/out_events.rs
index b279be3c22d..cda53246de8 100644
--- a/substrate/client/network/src/service/out_events.rs
+++ b/substrate/client/network/src/service/out_events.rs
@@ -35,7 +35,7 @@ use super::engine_id_to_string;
 
 use futures::{prelude::*, channel::mpsc, ready};
 use parking_lot::Mutex;
-use prometheus_endpoint::{register, CounterVec, Gauge, Opts, PrometheusError, Registry, U64};
+use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
 use std::{
 	convert::TryFrom as _,
 	fmt, pin::Pin, sync::Arc,
@@ -77,7 +77,7 @@ impl Drop for Sender {
 	fn drop(&mut self) {
 		let metrics = self.metrics.lock();
 		if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) {
-			metrics.num_channels.dec();
+			metrics.num_channels.with_label_values(&[self.name]).dec();
 		}
 	}
 }
@@ -151,11 +151,12 @@ impl OutChannels {
 		debug_assert!(metrics.is_none());
 		*metrics = Some(self.metrics.clone());
 		drop(metrics);
-		self.event_streams.push(sender);
 
 		if let Some(metrics) = &*self.metrics {
-			metrics.num_channels.inc();
+			metrics.num_channels.with_label_values(&[sender.name]).inc();
 		}
+
+		self.event_streams.push(sender);
 	}
 
 	/// Sends an event.
@@ -184,7 +185,7 @@ struct Metrics {
 	// This list is ordered alphabetically
 	events_total: CounterVec<U64>,
 	notifications_sizes: CounterVec<U64>,
-	num_channels: Gauge<U64>,
+	num_channels: GaugeVec<U64>,
 }
 
 impl Metrics {
@@ -206,9 +207,12 @@ impl Metrics {
 				),
 				&["protocol", "action", "name"]
 			)?, registry)?,
-			num_channels: register(Gauge::new(
-				"sub_libp2p_out_events_num_channels",
-				"Number of internal active channels that broadcast network events",
+			num_channels: register(GaugeVec::new(
+				Opts::new(
+					"sub_libp2p_out_events_num_channels",
+					"Number of internal active channels that broadcast network events",
+				),
+				&["name"]
 			)?, registry)?,
 		})
 	}
-- 
GitLab