From 20ffada06152b8926cbccfa19334fe7bbb116857 Mon Sep 17 00:00:00 2001
From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
Date: Fri, 14 Feb 2025 13:52:17 +0200
Subject: [PATCH] network/tests: Add conformance testing for litep2p and libp2p
 (#7361)

This PR implements conformance tests between our network backends
(litep2p and libp2p).

The PR creates a setup for extending testing in the future, while
implementing the following tests:
- connectivity check: Connect litep2p -> libp2p and libp2p -> litep2p
- request response check: Send 32 requests from one backend to the other
- notification check: Send 128 ping pong notifications and 128 from one
backend to the other

cc @paritytech/networking

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
---
 Cargo.lock                                    |   1 +
 substrate/client/network/test/Cargo.toml      |   1 +
 .../test/src/conformance/high_level.rs        | 236 ++++++++++++++++++
 .../network/test/src/conformance/mod.rs       |  20 ++
 .../network/test/src/conformance/setup.rs     | 225 +++++++++++++++++
 substrate/client/network/test/src/lib.rs      |   2 +
 6 files changed, 485 insertions(+)
 create mode 100644 substrate/client/network/test/src/conformance/high_level.rs
 create mode 100644 substrate/client/network/test/src/conformance/mod.rs
 create mode 100644 substrate/client/network/test/src/conformance/setup.rs

diff --git a/Cargo.lock b/Cargo.lock
index 493494c423b..4882caf032b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -23302,6 +23302,7 @@ dependencies = [
 name = "sc-network-test"
 version = "0.8.0"
 dependencies = [
+ "async-channel 1.9.0",
  "async-trait",
  "futures",
  "futures-timer",
diff --git a/substrate/client/network/test/Cargo.toml b/substrate/client/network/test/Cargo.toml
index 783d47f21fa..c077fb78e24 100644
--- a/substrate/client/network/test/Cargo.toml
+++ b/substrate/client/network/test/Cargo.toml
@@ -16,6 +16,7 @@ workspace = true
 targets = ["x86_64-unknown-linux-gnu"]
 
 [dependencies]
+async-channel = { workspace = true }
 async-trait = { workspace = true }
 futures = { workspace = true }
 futures-timer = { workspace = true }
diff --git a/substrate/client/network/test/src/conformance/high_level.rs b/substrate/client/network/test/src/conformance/high_level.rs
new file mode 100644
index 00000000000..90ab78e5c07
--- /dev/null
+++ b/substrate/client/network/test/src/conformance/high_level.rs
@@ -0,0 +1,236 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+use crate::conformance::setup::{
+	connect_backends, connect_notifications, create_network_backend, NetworkBackendClient,
+};
+
+use sc_network::{
+	request_responses::OutgoingResponse, service::traits::NotificationEvent, IfDisconnected,
+	Litep2pNetworkBackend, NetworkWorker,
+};
+
+#[tokio::test]
+async fn check_connectivity() {
+	// Libp2p dials litep2p.
+	connect_backends(
+		&create_network_backend::<NetworkWorker<_, _>>(),
+		&create_network_backend::<Litep2pNetworkBackend>(),
+	)
+	.await;
+
+	// Litep2p dials libp2p.
+	connect_backends(
+		&create_network_backend::<Litep2pNetworkBackend>(),
+		&create_network_backend::<NetworkWorker<_, _>>(),
+	)
+	.await;
+}
+
+#[tokio::test]
+async fn check_request_response() {
+	async fn inner_check_request_response(left: NetworkBackendClient, right: NetworkBackendClient) {
+		connect_backends(&left, &right).await;
+
+		let rx = right.receiver.clone();
+		tokio::spawn(async move {
+			while let Ok(request) = rx.recv().await {
+				request
+					.pending_response
+					.send(OutgoingResponse {
+						result: Ok(request.payload),
+						reputation_changes: vec![],
+						sent_feedback: None,
+					})
+					.expect("Valid response; qed");
+			}
+		});
+
+		let channels = (0..32)
+			.map(|i| {
+				let (tx, rx) = futures::channel::oneshot::channel();
+				left.network_service.start_request(
+					right.network_service.local_peer_id().into(),
+					"/request-response/1".into(),
+					vec![1, 2, 3, i],
+					None,
+					tx,
+					IfDisconnected::ImmediateError,
+				);
+
+				(i, rx)
+			})
+			.collect::<Vec<_>>();
+
+		for (id, channel) in channels {
+			let response = channel
+				.await
+				.expect("Channel should not be closed")
+				.expect(format!("Channel {} should have a response", id).as_str());
+			assert_eq!(response.0, vec![1, 2, 3, id]);
+		}
+	}
+
+	inner_check_request_response(
+		create_network_backend::<NetworkWorker<_, _>>(),
+		create_network_backend::<Litep2pNetworkBackend>(),
+	)
+	.await;
+
+	inner_check_request_response(
+		create_network_backend::<Litep2pNetworkBackend>(),
+		create_network_backend::<NetworkWorker<_, _>>(),
+	)
+	.await;
+}
+
+#[tokio::test]
+async fn check_notifications() {
+	async fn inner_check_notifications(left: NetworkBackendClient, right: NetworkBackendClient) {
+		const MAX_NOTIFICATIONS: usize = 128;
+		connect_notifications(&left, &right).await;
+
+		let right_peer = right.network_service.local_peer_id();
+		let (tx, rx) = async_channel::bounded(1);
+
+		tokio::spawn(async move {
+			let mut notifications_left = left.notification_service.lock().await;
+			for _ in 0..MAX_NOTIFICATIONS {
+				notifications_left
+					.send_async_notification(&right_peer, vec![1, 2, 3])
+					.await
+					.expect("qed; cannot fail");
+			}
+			let _ = rx.recv().await;
+		});
+
+		let mut notifications_right = right.notification_service.lock().await;
+		let mut notification_index = 0;
+		while let Some(event) = notifications_right.next_event().await {
+			match event {
+				NotificationEvent::NotificationReceived { notification, .. } => {
+					notification_index += 1;
+
+					if notification_index >= MAX_NOTIFICATIONS {
+						let _ = tx.send(()).await;
+						break;
+					}
+
+					assert_eq!(notification, vec![1, 2, 3]);
+				},
+				_ => {},
+			}
+		}
+	}
+
+	// Check libp2p -> litep2p.
+	inner_check_notifications(
+		create_network_backend::<NetworkWorker<_, _>>(),
+		create_network_backend::<Litep2pNetworkBackend>(),
+	)
+	.await;
+
+	// Check litep2p -> libp2p.
+	inner_check_notifications(
+		create_network_backend::<Litep2pNetworkBackend>(),
+		create_network_backend::<NetworkWorker<_, _>>(),
+	)
+	.await;
+}
+
+#[tokio::test]
+async fn check_notifications_ping_pong() {
+	async fn inner_check_notifications_ping_pong(
+		left: NetworkBackendClient,
+		right: NetworkBackendClient,
+	) {
+		const MAX_NOTIFICATIONS: usize = 128;
+		connect_notifications(&left, &right).await;
+
+		let left_peer = left.network_service.local_peer_id();
+		let right_peer = right.network_service.local_peer_id();
+
+		let mut notification_index = 0;
+		tokio::spawn(async move {
+			let mut notifications_left = left.notification_service.lock().await;
+
+			notifications_left
+				.send_async_notification(&right_peer, vec![1, 2, 3])
+				.await
+				.expect("qed; cannot fail");
+
+			while let Some(event) = notifications_left.next_event().await {
+				match event {
+					NotificationEvent::NotificationReceived { notification, .. } => {
+						assert_eq!(notification, vec![1, 2, 3, 4, 5]);
+
+						notification_index += 1;
+
+						if notification_index >= MAX_NOTIFICATIONS {
+							break;
+						}
+
+						notifications_left
+							.send_async_notification(&right_peer, vec![1, 2, 3])
+							.await
+							.expect("qed; cannot fail");
+					},
+					_ => {},
+				}
+			}
+
+			for _ in 0..MAX_NOTIFICATIONS {}
+		});
+
+		let mut notifications_right = right.notification_service.lock().await;
+		let mut notification_index = 0;
+		while let Some(event) = notifications_right.next_event().await {
+			match event {
+				NotificationEvent::NotificationReceived { notification, .. } => {
+					assert_eq!(notification, vec![1, 2, 3]);
+
+					notification_index += 1;
+
+					if notification_index >= MAX_NOTIFICATIONS {
+						break;
+					}
+
+					notifications_right
+						.send_async_notification(&left_peer, vec![1, 2, 3, 4, 5])
+						.await
+						.expect("qed; cannot fail");
+				},
+				_ => {},
+			}
+		}
+	}
+
+	// Check libp2p -> litep2p.
+	inner_check_notifications_ping_pong(
+		create_network_backend::<NetworkWorker<_, _>>(),
+		create_network_backend::<Litep2pNetworkBackend>(),
+	)
+	.await;
+
+	// Check litep2p -> libp2p.
+	inner_check_notifications_ping_pong(
+		create_network_backend::<Litep2pNetworkBackend>(),
+		create_network_backend::<NetworkWorker<_, _>>(),
+	)
+	.await;
+}
diff --git a/substrate/client/network/test/src/conformance/mod.rs b/substrate/client/network/test/src/conformance/mod.rs
new file mode 100644
index 00000000000..e24a7e1cc44
--- /dev/null
+++ b/substrate/client/network/test/src/conformance/mod.rs
@@ -0,0 +1,20 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+mod high_level;
+mod setup;
diff --git a/substrate/client/network/test/src/conformance/setup.rs b/substrate/client/network/test/src/conformance/setup.rs
new file mode 100644
index 00000000000..de7651a418a
--- /dev/null
+++ b/substrate/client/network/test/src/conformance/setup.rs
@@ -0,0 +1,225 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+use sc_network::{
+	config::{
+		FullNetworkConfiguration, IncomingRequest, MultiaddrWithPeerId, NetworkConfiguration,
+		NonReservedPeerMode, NotificationHandshake, OutgoingResponse, Params, ProtocolId, Role,
+		SetConfig,
+	},
+	service::traits::{NetworkService, NotificationEvent},
+	IfDisconnected, NetworkBackend, NetworkRequest, NotificationMetrics, NotificationService,
+	Roles,
+};
+
+use sc_network_common::sync::message::BlockAnnouncesHandshake;
+use sp_runtime::traits::Zero;
+use std::{sync::Arc, time::Duration};
+use substrate_test_runtime_client::runtime;
+use tokio::sync::Mutex;
+
+/// High level network backend (litep2p or libp2p) test client.
+pub struct NetworkBackendClient {
+	pub network_service: Arc<dyn NetworkService>,
+	pub notification_service: Arc<Mutex<Box<dyn NotificationService>>>,
+	pub receiver: async_channel::Receiver<IncomingRequest>,
+}
+
+/// Configure the network backend client for tests based on the given service.
+///
+/// This will setup:
+/// - `/request-response/1` request response protocol with bounded channel of 32 requests
+/// - `/block-announces/1` notification protocol
+pub fn create_network_backend<N>() -> NetworkBackendClient
+where
+	N: NetworkBackend<runtime::Block, runtime::Hash>,
+{
+	let (tx, rx) = async_channel::bounded(32);
+	let request_response_config = N::request_response_config(
+		"/request-response/1".into(),
+		vec![],
+		1024,
+		1024,
+		Duration::from_secs(2),
+		Some(tx),
+	);
+
+	let role = Role::Full;
+	let net_conf = NetworkConfiguration::new_local();
+	let mut network_config = FullNetworkConfiguration::new(&net_conf, None);
+	network_config.add_request_response_protocol(request_response_config);
+	let genesis_hash = runtime::Hash::zero();
+	let (block_announce_config, notification_service) = N::notification_config(
+		"/block-announces/1".into(),
+		vec![],
+		1024,
+		Some(NotificationHandshake::new(BlockAnnouncesHandshake::<runtime::Block>::build(
+			Roles::from(&Role::Full),
+			Zero::zero(),
+			genesis_hash,
+			genesis_hash,
+		))),
+		SetConfig {
+			in_peers: 1,
+			out_peers: 1,
+			reserved_nodes: vec![],
+			non_reserved_mode: NonReservedPeerMode::Accept,
+		},
+		NotificationMetrics::new(None),
+		network_config.peer_store_handle(),
+	);
+	let worker = N::new(Params::<runtime::Block, runtime::Hash, N> {
+		block_announce_config,
+		role,
+		executor: Box::new(|f| {
+			tokio::spawn(f);
+		}),
+		genesis_hash: runtime::Hash::zero(),
+		network_config,
+		protocol_id: ProtocolId::from("test"),
+		fork_id: None,
+		metrics_registry: None,
+		bitswap_config: None,
+		notification_metrics: NotificationMetrics::new(None),
+	})
+	.unwrap();
+	let network_service = worker.network_service();
+
+	// Run the worker in the backend.
+	tokio::spawn(worker.run());
+
+	NetworkBackendClient {
+		network_service,
+		notification_service: Arc::new(Mutex::new(notification_service)),
+		receiver: rx,
+	}
+}
+
+/// Connect two backends together and submit one request with `IfDisconnected::TryConnect` option
+/// expecting the left backend to dial the right one.
+pub async fn connect_backends(left: &NetworkBackendClient, right: &NetworkBackendClient) {
+	let right_peer_id = right.network_service.local_peer_id();
+
+	// Ensure the right backend responds to a first request
+	let rx = right.receiver.clone();
+	tokio::spawn(async move {
+		let request = rx.recv().await.expect("Left backend should receive a request");
+		assert_eq!(request.payload, vec![1, 2, 3]);
+		request
+			.pending_response
+			.send(OutgoingResponse {
+				result: Ok(vec![4, 5, 6]),
+				reputation_changes: vec![],
+				sent_feedback: None,
+			})
+			.expect("Left backend should send a response");
+	});
+
+	// Connect the two backends
+	while left.network_service.listen_addresses().is_empty() {
+		tokio::time::sleep(Duration::from_millis(10)).await;
+	}
+	while right.network_service.listen_addresses().is_empty() {
+		tokio::time::sleep(Duration::from_millis(10)).await;
+	}
+	let right_listen_address = right
+		.network_service
+		.listen_addresses()
+		.first()
+		.expect("qed; non empty")
+		.clone();
+
+	left.network_service
+		.add_known_address(right_peer_id, right_listen_address.clone().into());
+
+	let result = left
+		.network_service
+		.request(
+			right_peer_id,
+			"/request-response/1".into(),
+			vec![1, 2, 3],
+			None,
+			IfDisconnected::TryConnect,
+		)
+		.await
+		.expect("Left backend should send a request");
+	assert_eq!(result.0, vec![4, 5, 6]);
+	assert_eq!(result.1, "/request-response/1".into());
+}
+
+/// Ensure connectivity on the notification protocol level.
+pub async fn connect_notifications(left: &NetworkBackendClient, right: &NetworkBackendClient) {
+	let right_peer_id = right.network_service.local_peer_id();
+
+	while left.network_service.listen_addresses().is_empty() {
+		tokio::time::sleep(Duration::from_millis(10)).await;
+	}
+	while right.network_service.listen_addresses().is_empty() {
+		tokio::time::sleep(Duration::from_millis(10)).await;
+	}
+
+	let right_listen_address = right
+		.network_service
+		.listen_addresses()
+		.first()
+		.expect("qed; non empty")
+		.clone();
+
+	left.network_service
+		.add_reserved_peer(MultiaddrWithPeerId {
+			multiaddr: right_listen_address.into(),
+			peer_id: right_peer_id,
+		})
+		.unwrap();
+
+	let mut notifications_left = left.notification_service.lock().await;
+	let mut notifications_right = right.notification_service.lock().await;
+	let mut opened = 0;
+	loop {
+		tokio::select! {
+			Some(event) = notifications_left.next_event() => {
+				match event {
+					NotificationEvent::NotificationStreamOpened { .. } => {
+						opened += 1;
+						if opened >= 2 {
+							break;
+						}
+					},
+					NotificationEvent::ValidateInboundSubstream { result_tx, .. } => {
+						result_tx.send(sc_network::service::traits::ValidationResult::Accept).unwrap();
+					},
+					_ => {},
+				};
+			},
+			Some(event) = notifications_right.next_event() => {
+				match event {
+					NotificationEvent::ValidateInboundSubstream { result_tx, .. } => {
+						result_tx.send(sc_network::service::traits::ValidationResult::Accept).unwrap();
+					},
+					NotificationEvent::NotificationStreamOpened { .. } => {
+						opened += 1;
+						if opened >= 2 {
+							break;
+						}
+					},
+					_ => {}
+				}
+			},
+		}
+	}
+}
diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs
index 3cdf211e07f..1a810f66494 100644
--- a/substrate/client/network/test/src/lib.rs
+++ b/substrate/client/network/test/src/lib.rs
@@ -20,6 +20,8 @@
 #[cfg(test)]
 mod block_import;
 #[cfg(test)]
+mod conformance;
+#[cfg(test)]
 mod fuzz;
 #[cfg(test)]
 mod service;
-- 
GitLab