From f5ca403af9cea41230b27d74c98b205d9ba023c9 Mon Sep 17 00:00:00 2001
From: Dmitry Markin <dmitry@markin.tech>
Date: Tue, 12 Sep 2023 14:38:31 +0300
Subject: [PATCH] Report `tracing_unbounded` channel size to prometheus (#1489)

---
 substrate/client/utils/src/metrics.rs | 27 ++++++++++++++++++++++-----
 substrate/client/utils/src/mpsc.rs    | 27 ++++++++++++++++++++-------
 2 files changed, 42 insertions(+), 12 deletions(-)

diff --git a/substrate/client/utils/src/metrics.rs b/substrate/client/utils/src/metrics.rs
index 6bbdbe2e2e5..308e90cb253 100644
--- a/substrate/client/utils/src/metrics.rs
+++ b/substrate/client/utils/src/metrics.rs
@@ -24,7 +24,10 @@ use prometheus::{
 	Error as PrometheusError, Registry,
 };
 
-use prometheus::{core::GenericCounterVec, Opts};
+use prometheus::{
+	core::{GenericCounterVec, GenericGaugeVec},
+	Opts,
+};
 
 lazy_static! {
 	pub static ref TOKIO_THREADS_TOTAL: GenericCounter<AtomicU64> =
@@ -36,18 +39,32 @@ lazy_static! {
 }
 
 lazy_static! {
-	pub static ref UNBOUNDED_CHANNELS_COUNTER : GenericCounterVec<AtomicU64> = GenericCounterVec::new(
-		Opts::new("substrate_unbounded_channel_len", "Items in each mpsc::unbounded instance"),
-		&["entity", "action"] // 'name of channel, send|received|dropped
+	pub static ref UNBOUNDED_CHANNELS_COUNTER: GenericCounterVec<AtomicU64> = GenericCounterVec::new(
+		Opts::new(
+			"substrate_unbounded_channel_len",
+			"Items sent/received/dropped on each mpsc::unbounded instance"
+		),
+		&["entity", "action"], // name of channel, send|received|dropped
+	).expect("Creating of statics doesn't fail. qed");
+	pub static ref UNBOUNDED_CHANNELS_SIZE: GenericGaugeVec<AtomicU64> = GenericGaugeVec::new(
+		Opts::new(
+			"substrate_unbounded_channel_size",
+			"Size (number of messages to be processed) of each mpsc::unbounded instance",
+		),
+		&["entity"], // name of channel
 	).expect("Creating of statics doesn't fail. qed");
-
 }
 
+pub static SENT_LABEL: &'static str = "send";
+pub static RECEIVED_LABEL: &'static str = "received";
+pub static DROPPED_LABEL: &'static str = "dropped";
+
 /// Register the statics to report to registry
 pub fn register_globals(registry: &Registry) -> Result<(), PrometheusError> {
 	registry.register(Box::new(TOKIO_THREADS_ALIVE.clone()))?;
 	registry.register(Box::new(TOKIO_THREADS_TOTAL.clone()))?;
 	registry.register(Box::new(UNBOUNDED_CHANNELS_COUNTER.clone()))?;
+	registry.register(Box::new(UNBOUNDED_CHANNELS_SIZE.clone()))?;
 
 	Ok(())
 }
diff --git a/substrate/client/utils/src/mpsc.rs b/substrate/client/utils/src/mpsc.rs
index 039e03f9e61..c24a5bd8904 100644
--- a/substrate/client/utils/src/mpsc.rs
+++ b/substrate/client/utils/src/mpsc.rs
@@ -20,7 +20,9 @@
 
 pub use async_channel::{TryRecvError, TrySendError};
 
-use crate::metrics::UNBOUNDED_CHANNELS_COUNTER;
+use crate::metrics::{
+	DROPPED_LABEL, RECEIVED_LABEL, SENT_LABEL, UNBOUNDED_CHANNELS_COUNTER, UNBOUNDED_CHANNELS_SIZE,
+};
 use async_channel::{Receiver, Sender};
 use futures::{
 	stream::{FusedStream, Stream},
@@ -102,7 +104,10 @@ impl<T> TracingUnboundedSender<T> {
 	/// Proxy function to `async_channel::Sender::try_send`.
 	pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
 		self.inner.try_send(msg).map(|s| {
-			UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "send"]).inc();
+			UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, SENT_LABEL]).inc();
+			UNBOUNDED_CHANNELS_SIZE
+				.with_label_values(&[self.name])
+				.set(self.inner.len().saturated_into());
 
 			if self.inner.len() >= self.queue_size_warning &&
 				self.warning_fired
@@ -140,7 +145,10 @@ impl<T> TracingUnboundedReceiver<T> {
 	/// that discounts the messages taken out.
 	pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
 		self.inner.try_recv().map(|s| {
-			UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "received"]).inc();
+			UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, RECEIVED_LABEL]).inc();
+			UNBOUNDED_CHANNELS_SIZE
+				.with_label_values(&[self.name])
+				.set(self.inner.len().saturated_into());
 			s
 		})
 	}
@@ -155,14 +163,16 @@ impl<T> Drop for TracingUnboundedReceiver<T> {
 	fn drop(&mut self) {
 		// Close the channel to prevent any further messages to be sent into the channel
 		self.close();
-		// the number of messages about to be dropped
+		// The number of messages about to be dropped
 		let count = self.inner.len();
-		// discount the messages
+		// Discount the messages
 		if count > 0 {
 			UNBOUNDED_CHANNELS_COUNTER
-				.with_label_values(&[self.name, "dropped"])
+				.with_label_values(&[self.name, DROPPED_LABEL])
 				.inc_by(count.saturated_into());
 		}
+		// Reset the size metric to 0
+		UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(0);
 		// Drain all the pending messages in the channel since they can never be accessed,
 		// this can be removed once https://github.com/smol-rs/async-channel/issues/23 is
 		// resolved
@@ -180,7 +190,10 @@ impl<T> Stream for TracingUnboundedReceiver<T> {
 		match Pin::new(&mut s.inner).poll_next(cx) {
 			Poll::Ready(msg) => {
 				if msg.is_some() {
-					UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, "received"]).inc();
+					UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, RECEIVED_LABEL]).inc();
+					UNBOUNDED_CHANNELS_SIZE
+						.with_label_values(&[s.name])
+						.set(s.inner.len().saturated_into());
 				}
 				Poll::Ready(msg)
 			},
-- 
GitLab