From 6bfe4523acf597ef47dfdcefd11b0eee396bc5c5 Mon Sep 17 00:00:00 2001
From: Andrei Eres <eresav@me.com>
Date: Thu, 9 Jan 2025 19:20:07 +0100
Subject: [PATCH] networking-bench: Update benchmarks payload (#7056)

# Description

- Used 10 notifications and requests within the benchmarks. After moving
the network workers' initialization out of the benchmarks, it is
acceptable to use this small number without losing precision.
- Removed the 128MB payload that consumed most of the execution time.
---
 .github/workflows/benchmarks-networking.yml   |   2 +
 .../network/benches/notifications_protocol.rs |  99 ++++++++---------
 .../benches/request_response_protocol.rs      | 102 ++++++++++--------
 3 files changed, 103 insertions(+), 100 deletions(-)

diff --git a/.github/workflows/benchmarks-networking.yml b/.github/workflows/benchmarks-networking.yml
index 79494b9a015..8f4246c7954 100644
--- a/.github/workflows/benchmarks-networking.yml
+++ b/.github/workflows/benchmarks-networking.yml
@@ -92,6 +92,7 @@ jobs:
         uses: benchmark-action/github-action-benchmark@v1
         with:
           tool: "cargo"
+          name: ${{ env.BENCH }}
           output-file-path: ./charts/${{ env.BENCH }}.txt
           benchmark-data-dir-path: ./bench/${{ env.BENCH }}
           github-token: ${{ steps.app-token.outputs.token }}
@@ -103,6 +104,7 @@ jobs:
         uses: benchmark-action/github-action-benchmark@v1
         with:
           tool: "cargo"
+          name: ${{ env.BENCH }}
           output-file-path: ./charts/${{ env.BENCH }}.txt
           benchmark-data-dir-path: ./bench/${{ env.BENCH }}
           github-token: ${{ steps.app-token.outputs.token }}
diff --git a/substrate/client/network/benches/notifications_protocol.rs b/substrate/client/network/benches/notifications_protocol.rs
index 40a810d616b..a406e328d5a 100644
--- a/substrate/client/network/benches/notifications_protocol.rs
+++ b/substrate/client/network/benches/notifications_protocol.rs
@@ -36,19 +36,16 @@ use std::{sync::Arc, time::Duration};
 use substrate_test_runtime_client::runtime;
 use tokio::{sync::Mutex, task::JoinHandle};
 
-const SMALL_PAYLOAD: &[(u32, usize, &'static str)] = &[
-	// (Exponent of size, number of notifications, label)
-	(6, 100, "64B"),
-	(9, 100, "512B"),
-	(12, 100, "4KB"),
-	(15, 100, "64KB"),
-];
-const LARGE_PAYLOAD: &[(u32, usize, &'static str)] = &[
-	// (Exponent of size, number of notifications, label)
-	(18, 10, "256KB"),
-	(21, 10, "2MB"),
-	(24, 10, "16MB"),
-	(27, 10, "128MB"),
+const NUMBER_OF_NOTIFICATIONS: usize = 100;
+const PAYLOAD: &[(u32, &'static str)] = &[
+	// (Exponent of size, label)
+	(6, "64B"),
+	(9, "512B"),
+	(12, "4KB"),
+	(15, "64KB"),
+	(18, "256KB"),
+	(21, "2MB"),
+	(24, "16MB"),
 ];
 const MAX_SIZE: u64 = 2u64.pow(30);
 
@@ -156,12 +153,19 @@ where
 				tokio::select! {
 					Some(event) = notification_service1.next_event() => {
 						if let NotificationEvent::NotificationStreamOpened { .. } = event {
-							break;
+							// Send a 32MB notification to preheat the network
+							notification_service1.send_async_notification(&peer_id2, vec![0; 2usize.pow(25)]).await.unwrap();
 						}
 					},
 					Some(event) = notification_service2.next_event() => {
-						if let NotificationEvent::ValidateInboundSubstream { result_tx, .. } = event {
-							result_tx.send(sc_network::service::traits::ValidationResult::Accept).unwrap();
+						match event {
+							NotificationEvent::ValidateInboundSubstream { result_tx, .. } => {
+								result_tx.send(sc_network::service::traits::ValidationResult::Accept).unwrap();
+							},
+							NotificationEvent::NotificationReceived { .. } => {
+								break;
+							}
+							_ => {}
 						}
 					},
 				}
@@ -255,64 +259,53 @@ async fn run_with_backpressure(setup: Arc<BenchSetup>, size: usize, limit: usize
 	let _ = tokio::join!(network1, network2);
 }
 
-fn run_benchmark(c: &mut Criterion, payload: &[(u32, usize, &'static str)], group: &str) {
+fn run_benchmark(c: &mut Criterion) {
 	let rt = tokio::runtime::Runtime::new().unwrap();
 	let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic);
-	let mut group = c.benchmark_group(group);
+	let mut group = c.benchmark_group("notifications_protocol");
 	group.plot_config(plot_config);
+	group.sample_size(10);
 
 	let libp2p_setup = setup_workers::<runtime::Block, runtime::Hash, NetworkWorker<_, _>>(&rt);
-	for &(exponent, limit, label) in payload.iter() {
+	for &(exponent, label) in PAYLOAD.iter() {
 		let size = 2usize.pow(exponent);
-		group.throughput(Throughput::Bytes(limit as u64 * size as u64));
-		group.bench_with_input(
-			BenchmarkId::new("libp2p/serially", label),
-			&(size, limit),
-			|b, &(size, limit)| {
-				b.to_async(&rt).iter(|| run_serially(Arc::clone(&libp2p_setup), size, limit));
-			},
-		);
+		group.throughput(Throughput::Bytes(NUMBER_OF_NOTIFICATIONS as u64 * size as u64));
+		group.bench_with_input(BenchmarkId::new("libp2p/serially", label), &size, |b, &size| {
+			b.to_async(&rt)
+				.iter(|| run_serially(Arc::clone(&libp2p_setup), size, NUMBER_OF_NOTIFICATIONS));
+		});
 		group.bench_with_input(
 			BenchmarkId::new("libp2p/with_backpressure", label),
-			&(size, limit),
-			|b, &(size, limit)| {
-				b.to_async(&rt)
-					.iter(|| run_with_backpressure(Arc::clone(&libp2p_setup), size, limit));
+			&size,
+			|b, &size| {
+				b.to_async(&rt).iter(|| {
+					run_with_backpressure(Arc::clone(&libp2p_setup), size, NUMBER_OF_NOTIFICATIONS)
+				});
 			},
 		);
 	}
 	drop(libp2p_setup);
 
 	let litep2p_setup = setup_workers::<runtime::Block, runtime::Hash, Litep2pNetworkBackend>(&rt);
-	for &(exponent, limit, label) in payload.iter() {
+	for &(exponent, label) in PAYLOAD.iter() {
 		let size = 2usize.pow(exponent);
-		group.throughput(Throughput::Bytes(limit as u64 * size as u64));
-		group.bench_with_input(
-			BenchmarkId::new("litep2p/serially", label),
-			&(size, limit),
-			|b, &(size, limit)| {
-				b.to_async(&rt).iter(|| run_serially(Arc::clone(&litep2p_setup), size, limit));
-			},
-		);
+		group.throughput(Throughput::Bytes(NUMBER_OF_NOTIFICATIONS as u64 * size as u64));
+		group.bench_with_input(BenchmarkId::new("litep2p/serially", label), &size, |b, &size| {
+			b.to_async(&rt)
+				.iter(|| run_serially(Arc::clone(&litep2p_setup), size, NUMBER_OF_NOTIFICATIONS));
+		});
 		group.bench_with_input(
 			BenchmarkId::new("litep2p/with_backpressure", label),
-			&(size, limit),
-			|b, &(size, limit)| {
-				b.to_async(&rt)
-					.iter(|| run_with_backpressure(Arc::clone(&litep2p_setup), size, limit));
+			&size,
+			|b, &size| {
+				b.to_async(&rt).iter(|| {
+					run_with_backpressure(Arc::clone(&litep2p_setup), size, NUMBER_OF_NOTIFICATIONS)
+				});
 			},
 		);
 	}
 	drop(litep2p_setup);
 }
 
-fn run_benchmark_with_small_payload(c: &mut Criterion) {
-	run_benchmark(c, SMALL_PAYLOAD, "notifications_protocol/small_payload");
-}
-
-fn run_benchmark_with_large_payload(c: &mut Criterion) {
-	run_benchmark(c, LARGE_PAYLOAD, "notifications_protocol/large_payload");
-}
-
-criterion_group!(benches, run_benchmark_with_small_payload, run_benchmark_with_large_payload);
+criterion_group!(benches, run_benchmark);
 criterion_main!(benches);
diff --git a/substrate/client/network/benches/request_response_protocol.rs b/substrate/client/network/benches/request_response_protocol.rs
index 85381112b75..97c6d72ddf1 100644
--- a/substrate/client/network/benches/request_response_protocol.rs
+++ b/substrate/client/network/benches/request_response_protocol.rs
@@ -37,19 +37,16 @@ use substrate_test_runtime_client::runtime;
 use tokio::{sync::Mutex, task::JoinHandle};
 
 const MAX_SIZE: u64 = 2u64.pow(30);
-const SMALL_PAYLOAD: &[(u32, usize, &'static str)] = &[
-	// (Exponent of size, number of requests, label)
-	(6, 100, "64B"),
-	(9, 100, "512B"),
-	(12, 100, "4KB"),
-	(15, 100, "64KB"),
-];
-const LARGE_PAYLOAD: &[(u32, usize, &'static str)] = &[
-	// (Exponent of size, number of requests, label)
-	(18, 10, "256KB"),
-	(21, 10, "2MB"),
-	(24, 10, "16MB"),
-	(27, 10, "128MB"),
+const NUMBER_OF_REQUESTS: usize = 100;
+const PAYLOAD: &[(u32, &'static str)] = &[
+	// (Exponent of size, label)
+	(6, "64B"),
+	(9, "512B"),
+	(12, "4KB"),
+	(15, "64KB"),
+	(18, "256KB"),
+	(21, "2MB"),
+	(24, "16MB"),
 ];
 
 pub fn create_network_worker<B, H, N>() -> (
@@ -154,6 +151,21 @@ where
 	let handle1 = tokio::spawn(worker1.run());
 	let handle2 = tokio::spawn(worker2.run());
 
+	let _ = tokio::spawn({
+		let rx2 = rx2.clone();
+
+		async move {
+			let req = rx2.recv().await.unwrap();
+			req.pending_response
+				.send(OutgoingResponse {
+					result: Ok(vec![0; 2usize.pow(25)]),
+					reputation_changes: vec![],
+					sent_feedback: None,
+				})
+				.unwrap();
+		}
+	});
+
 	let ready = tokio::spawn({
 		let network_service1 = Arc::clone(&network_service1);
 
@@ -165,6 +177,16 @@ where
 				network_service2.listen_addresses()[0].clone()
 			};
 			network_service1.add_known_address(peer_id2, listen_address2.into());
+			let _ = network_service1
+				.request(
+					peer_id2.into(),
+					"/request-response/1".into(),
+					vec![0; 2],
+					None,
+					IfDisconnected::TryConnect,
+				)
+				.await
+				.unwrap();
 		}
 	});
 
@@ -210,8 +232,8 @@ async fn run_serially(setup: Arc<BenchSetup>, size: usize, limit: usize) {
 		async move {
 			loop {
 				tokio::select! {
-					res = rx2.recv() => {
-						let IncomingRequest { pending_response, .. } = res.unwrap();
+					req = rx2.recv() => {
+						let IncomingRequest { pending_response, .. } = req.unwrap();
 						pending_response.send(OutgoingResponse {
 							result: Ok(vec![0; size]),
 							reputation_changes: vec![],
@@ -269,49 +291,35 @@ async fn run_with_backpressure(setup: Arc<BenchSetup>, size: usize, limit: usize
 	let _ = tokio::join!(network1, network2);
 }
 
-fn run_benchmark(c: &mut Criterion, payload: &[(u32, usize, &'static str)], group: &str) {
+fn run_benchmark(c: &mut Criterion) {
 	let rt = tokio::runtime::Runtime::new().unwrap();
 	let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic);
-	let mut group = c.benchmark_group(group);
+	let mut group = c.benchmark_group("request_response_protocol");
 	group.plot_config(plot_config);
+	group.sample_size(10);
 
 	let libp2p_setup = setup_workers::<runtime::Block, runtime::Hash, NetworkWorker<_, _>>(&rt);
-	for &(exponent, limit, label) in payload.iter() {
+	for &(exponent, label) in PAYLOAD.iter() {
 		let size = 2usize.pow(exponent);
-		group.throughput(Throughput::Bytes(limit as u64 * size as u64));
-		group.bench_with_input(
-			BenchmarkId::new("libp2p/serially", label),
-			&(size, limit),
-			|b, &(size, limit)| {
-				b.to_async(&rt).iter(|| run_serially(Arc::clone(&libp2p_setup), size, limit));
-			},
-		);
+		group.throughput(Throughput::Bytes(NUMBER_OF_REQUESTS as u64 * size as u64));
+		group.bench_with_input(BenchmarkId::new("libp2p/serially", label), &size, |b, &size| {
+			b.to_async(&rt)
+				.iter(|| run_serially(Arc::clone(&libp2p_setup), size, NUMBER_OF_REQUESTS));
+		});
 	}
 	drop(libp2p_setup);
 
-	// TODO: NetworkRequest::request should be implemented for Litep2pNetworkService
 	let litep2p_setup = setup_workers::<runtime::Block, runtime::Hash, Litep2pNetworkBackend>(&rt);
-	// for &(exponent, limit, label) in payload.iter() {
-	// 	let size = 2usize.pow(exponent);
-	// 	group.throughput(Throughput::Bytes(limit as u64 * size as u64));
-	// 	group.bench_with_input(
-	// 		BenchmarkId::new("litep2p/serially", label),
-	// 		&(size, limit),
-	// 		|b, &(size, limit)| {
-	// 			b.to_async(&rt).iter(|| run_serially(Arc::clone(&litep2p_setup), size, limit));
-	// 		},
-	// 	);
-	// }
+	for &(exponent, label) in PAYLOAD.iter() {
+		let size = 2usize.pow(exponent);
+		group.throughput(Throughput::Bytes(NUMBER_OF_REQUESTS as u64 * size as u64));
+		group.bench_with_input(BenchmarkId::new("litep2p/serially", label), &size, |b, &size| {
+			b.to_async(&rt)
+				.iter(|| run_serially(Arc::clone(&litep2p_setup), size, NUMBER_OF_REQUESTS));
+		});
+	}
 	drop(litep2p_setup);
 }
 
-fn run_benchmark_with_small_payload(c: &mut Criterion) {
-	run_benchmark(c, SMALL_PAYLOAD, "request_response_benchmark/small_payload");
-}
-
-fn run_benchmark_with_large_payload(c: &mut Criterion) {
-	run_benchmark(c, LARGE_PAYLOAD, "request_response_benchmark/large_payload");
-}
-
-criterion_group!(benches, run_benchmark_with_small_payload, run_benchmark_with_large_payload);
+criterion_group!(benches, run_benchmark);
 criterion_main!(benches);
-- 
GitLab