From 1e3b8e1639c1cf784eabf0a9afcab1f3987e0ca4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bastian=20K=C3=B6cher?= <git@kchr.de>
Date: Sun, 24 Nov 2024 08:36:16 +0000
Subject: [PATCH] Notify telemetry only every second about the tx pool status
 (#6605)

Before this was done for every imported transaction. When a lot of
transactions got imported, the import notification channel was filled.
The underlying problem was that the `status` call is read locking the
`validated_pool` which will be write locked by the internal submitting
logic. Thus, the submitting and status reading was interferring which
each other.

---------

Co-authored-by: GitHub Action <action@github.com>
---
 Cargo.lock                              |  1 +
 cumulus/client/service/Cargo.toml       |  1 +
 prdoc/pr_6605.prdoc                     | 10 +++++
 substrate/client/service/src/builder.rs | 58 +++++++++++++++++--------
 4 files changed, 53 insertions(+), 17 deletions(-)
 create mode 100644 prdoc/pr_6605.prdoc

diff --git a/Cargo.lock b/Cargo.lock
index 330c2563d97..c79adee6f38 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4680,6 +4680,7 @@ dependencies = [
  "cumulus-relay-chain-interface",
  "cumulus-relay-chain-minimal-node",
  "futures",
+ "futures-timer",
  "polkadot-primitives 7.0.0",
  "sc-client-api",
  "sc-consensus",
diff --git a/cumulus/client/service/Cargo.toml b/cumulus/client/service/Cargo.toml
index 8e9e41ca89d..0a77b465d96 100644
--- a/cumulus/client/service/Cargo.toml
+++ b/cumulus/client/service/Cargo.toml
@@ -11,6 +11,7 @@ workspace = true
 
 [dependencies]
 futures = { workspace = true }
+futures-timer = { workspace = true }
 
 # Substrate
 sc-client-api = { workspace = true, default-features = true }
diff --git a/prdoc/pr_6605.prdoc b/prdoc/pr_6605.prdoc
new file mode 100644
index 00000000000..2adb1d8aee3
--- /dev/null
+++ b/prdoc/pr_6605.prdoc
@@ -0,0 +1,10 @@
+title: Notify telemetry only every second about the tx pool status
+doc:
+- audience: Node Operator
+  description: |-
+    Before this was done for every imported transaction. When a lot of transactions got imported, the import notification channel was filled. The underlying problem was that the `status` call is read locking the `validated_pool` which will be write locked by the internal submitting logic. Thus, the submitting and status reading was interferring which each other.
+crates:
+- name: cumulus-client-service
+  bump: patch
+- name: sc-service
+  bump: patch
diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs
index 68ac94539df..ac9371a8941 100644
--- a/substrate/client/service/src/builder.rs
+++ b/substrate/client/service/src/builder.rs
@@ -25,7 +25,7 @@ use crate::{
 	start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle,
 	TaskManager, TransactionPoolAdapter,
 };
-use futures::{future::ready, FutureExt, StreamExt};
+use futures::{select, FutureExt, StreamExt};
 use jsonrpsee::RpcModule;
 use log::info;
 use prometheus_endpoint::Registry;
@@ -90,7 +90,11 @@ use sp_consensus::block_validation::{
 use sp_core::traits::{CodeExecutor, SpawnNamed};
 use sp_keystore::KeystorePtr;
 use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
-use std::{str::FromStr, sync::Arc, time::SystemTime};
+use std::{
+	str::FromStr,
+	sync::Arc,
+	time::{Duration, SystemTime},
+};
 
 /// Full client type.
 pub type TFullClient<TBl, TRtApi, TExec> =
@@ -577,22 +581,42 @@ pub async fn propagate_transaction_notifications<Block, ExPool>(
 	Block: BlockT,
 	ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
 {
+	const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1);
+
 	// transaction notifications
-	transaction_pool
-		.import_notification_stream()
-		.for_each(move |hash| {
-			tx_handler_controller.propagate_transaction(hash);
-			let status = transaction_pool.status();
-			telemetry!(
-				telemetry;
-				SUBSTRATE_INFO;
-				"txpool.import";
-				"ready" => status.ready,
-				"future" => status.future,
-			);
-			ready(())
-		})
-		.await;
+	let mut notifications = transaction_pool.import_notification_stream().fuse();
+	let mut timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
+	let mut tx_imported = false;
+
+	loop {
+		select! {
+			notification = notifications.next() => {
+				let Some(hash) = notification else { return };
+
+				tx_handler_controller.propagate_transaction(hash);
+
+				tx_imported = true;
+			},
+			_ = timer => {
+				timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
+
+				if !tx_imported {
+					continue;
+				}
+
+				tx_imported = false;
+				let status = transaction_pool.status();
+
+				telemetry!(
+					telemetry;
+					SUBSTRATE_INFO;
+					"txpool.import";
+					"ready" => status.ready,
+					"future" => status.future,
+				);
+			}
+		}
+	}
 }
 
 /// Initialize telemetry with provided configuration and return telemetry handle
-- 
GitLab