diff --git a/Cargo.lock b/Cargo.lock index 493494c423b2725cdedd323c66aaa404b908f765..4882caf032bf9aa0c784ae458cf932ef5cd4ee77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23302,6 +23302,7 @@ dependencies = [ name = "sc-network-test" version = "0.8.0" dependencies = [ + "async-channel 1.9.0", "async-trait", "futures", "futures-timer", diff --git a/substrate/client/network/test/Cargo.toml b/substrate/client/network/test/Cargo.toml index 783d47f21fa76eee2431796c2cf3d5e0e92bd2db..c077fb78e24dd328704968da571ac4f84e13645f 100644 --- a/substrate/client/network/test/Cargo.toml +++ b/substrate/client/network/test/Cargo.toml @@ -16,6 +16,7 @@ workspace = true targets = ["x86_64-unknown-linux-gnu"] [dependencies] +async-channel = { workspace = true } async-trait = { workspace = true } futures = { workspace = true } futures-timer = { workspace = true } diff --git a/substrate/client/network/test/src/conformance/high_level.rs b/substrate/client/network/test/src/conformance/high_level.rs new file mode 100644 index 0000000000000000000000000000000000000000..90ab78e5c076ec8e1f12b09c933990ecb636c5c4 --- /dev/null +++ b/substrate/client/network/test/src/conformance/high_level.rs @@ -0,0 +1,236 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use crate::conformance::setup::{ + connect_backends, connect_notifications, create_network_backend, NetworkBackendClient, +}; + +use sc_network::{ + request_responses::OutgoingResponse, service::traits::NotificationEvent, IfDisconnected, + Litep2pNetworkBackend, NetworkWorker, +}; + +#[tokio::test] +async fn check_connectivity() { + // Libp2p dials litep2p. + connect_backends( + &create_network_backend::<NetworkWorker<_, _>>(), + &create_network_backend::<Litep2pNetworkBackend>(), + ) + .await; + + // Litep2p dials libp2p. + connect_backends( + &create_network_backend::<Litep2pNetworkBackend>(), + &create_network_backend::<NetworkWorker<_, _>>(), + ) + .await; +} + +#[tokio::test] +async fn check_request_response() { + async fn inner_check_request_response(left: NetworkBackendClient, right: NetworkBackendClient) { + connect_backends(&left, &right).await; + + let rx = right.receiver.clone(); + tokio::spawn(async move { + while let Ok(request) = rx.recv().await { + request + .pending_response + .send(OutgoingResponse { + result: Ok(request.payload), + reputation_changes: vec![], + sent_feedback: None, + }) + .expect("Valid response; qed"); + } + }); + + let channels = (0..32) + .map(|i| { + let (tx, rx) = futures::channel::oneshot::channel(); + left.network_service.start_request( + right.network_service.local_peer_id().into(), + "/request-response/1".into(), + vec![1, 2, 3, i], + None, + tx, + IfDisconnected::ImmediateError, + ); + + (i, rx) + }) + .collect::<Vec<_>>(); + + for (id, channel) in channels { + let response = channel + .await + .expect("Channel should not be closed") + .expect(format!("Channel {} should have a response", id).as_str()); + assert_eq!(response.0, vec![1, 2, 3, id]); + } + } + + inner_check_request_response( + create_network_backend::<NetworkWorker<_, _>>(), + create_network_backend::<Litep2pNetworkBackend>(), + ) + .await; + + inner_check_request_response( + create_network_backend::<Litep2pNetworkBackend>(), + create_network_backend::<NetworkWorker<_, _>>(), + ) + .await; +} + +#[tokio::test] +async fn check_notifications() { + async fn inner_check_notifications(left: NetworkBackendClient, right: NetworkBackendClient) { + const MAX_NOTIFICATIONS: usize = 128; + connect_notifications(&left, &right).await; + + let right_peer = right.network_service.local_peer_id(); + let (tx, rx) = async_channel::bounded(1); + + tokio::spawn(async move { + let mut notifications_left = left.notification_service.lock().await; + for _ in 0..MAX_NOTIFICATIONS { + notifications_left + .send_async_notification(&right_peer, vec![1, 2, 3]) + .await + .expect("qed; cannot fail"); + } + let _ = rx.recv().await; + }); + + let mut notifications_right = right.notification_service.lock().await; + let mut notification_index = 0; + while let Some(event) = notifications_right.next_event().await { + match event { + NotificationEvent::NotificationReceived { notification, .. } => { + notification_index += 1; + + if notification_index >= MAX_NOTIFICATIONS { + let _ = tx.send(()).await; + break; + } + + assert_eq!(notification, vec![1, 2, 3]); + }, + _ => {}, + } + } + } + + // Check libp2p -> litep2p. + inner_check_notifications( + create_network_backend::<NetworkWorker<_, _>>(), + create_network_backend::<Litep2pNetworkBackend>(), + ) + .await; + + // Check litep2p -> libp2p. + inner_check_notifications( + create_network_backend::<Litep2pNetworkBackend>(), + create_network_backend::<NetworkWorker<_, _>>(), + ) + .await; +} + +#[tokio::test] +async fn check_notifications_ping_pong() { + async fn inner_check_notifications_ping_pong( + left: NetworkBackendClient, + right: NetworkBackendClient, + ) { + const MAX_NOTIFICATIONS: usize = 128; + connect_notifications(&left, &right).await; + + let left_peer = left.network_service.local_peer_id(); + let right_peer = right.network_service.local_peer_id(); + + let mut notification_index = 0; + tokio::spawn(async move { + let mut notifications_left = left.notification_service.lock().await; + + notifications_left + .send_async_notification(&right_peer, vec![1, 2, 3]) + .await + .expect("qed; cannot fail"); + + while let Some(event) = notifications_left.next_event().await { + match event { + NotificationEvent::NotificationReceived { notification, .. } => { + assert_eq!(notification, vec![1, 2, 3, 4, 5]); + + notification_index += 1; + + if notification_index >= MAX_NOTIFICATIONS { + break; + } + + notifications_left + .send_async_notification(&right_peer, vec![1, 2, 3]) + .await + .expect("qed; cannot fail"); + }, + _ => {}, + } + } + + for _ in 0..MAX_NOTIFICATIONS {} + }); + + let mut notifications_right = right.notification_service.lock().await; + let mut notification_index = 0; + while let Some(event) = notifications_right.next_event().await { + match event { + NotificationEvent::NotificationReceived { notification, .. } => { + assert_eq!(notification, vec![1, 2, 3]); + + notification_index += 1; + + if notification_index >= MAX_NOTIFICATIONS { + break; + } + + notifications_right + .send_async_notification(&left_peer, vec![1, 2, 3, 4, 5]) + .await + .expect("qed; cannot fail"); + }, + _ => {}, + } + } + } + + // Check libp2p -> litep2p. + inner_check_notifications_ping_pong( + create_network_backend::<NetworkWorker<_, _>>(), + create_network_backend::<Litep2pNetworkBackend>(), + ) + .await; + + // Check litep2p -> libp2p. + inner_check_notifications_ping_pong( + create_network_backend::<Litep2pNetworkBackend>(), + create_network_backend::<NetworkWorker<_, _>>(), + ) + .await; +} diff --git a/substrate/client/network/test/src/conformance/mod.rs b/substrate/client/network/test/src/conformance/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..e24a7e1cc442cea2d9df2a6dbe7fd2e238e929ed --- /dev/null +++ b/substrate/client/network/test/src/conformance/mod.rs @@ -0,0 +1,20 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +mod high_level; +mod setup; diff --git a/substrate/client/network/test/src/conformance/setup.rs b/substrate/client/network/test/src/conformance/setup.rs new file mode 100644 index 0000000000000000000000000000000000000000..de7651a418a92621314ab51c8ae1219f9b8f3b77 --- /dev/null +++ b/substrate/client/network/test/src/conformance/setup.rs @@ -0,0 +1,225 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use sc_network::{ + config::{ + FullNetworkConfiguration, IncomingRequest, MultiaddrWithPeerId, NetworkConfiguration, + NonReservedPeerMode, NotificationHandshake, OutgoingResponse, Params, ProtocolId, Role, + SetConfig, + }, + service::traits::{NetworkService, NotificationEvent}, + IfDisconnected, NetworkBackend, NetworkRequest, NotificationMetrics, NotificationService, + Roles, +}; + +use sc_network_common::sync::message::BlockAnnouncesHandshake; +use sp_runtime::traits::Zero; +use std::{sync::Arc, time::Duration}; +use substrate_test_runtime_client::runtime; +use tokio::sync::Mutex; + +/// High level network backend (litep2p or libp2p) test client. +pub struct NetworkBackendClient { + pub network_service: Arc<dyn NetworkService>, + pub notification_service: Arc<Mutex<Box<dyn NotificationService>>>, + pub receiver: async_channel::Receiver<IncomingRequest>, +} + +/// Configure the network backend client for tests based on the given service. +/// +/// This will setup: +/// - `/request-response/1` request response protocol with bounded channel of 32 requests +/// - `/block-announces/1` notification protocol +pub fn create_network_backend<N>() -> NetworkBackendClient +where + N: NetworkBackend<runtime::Block, runtime::Hash>, +{ + let (tx, rx) = async_channel::bounded(32); + let request_response_config = N::request_response_config( + "/request-response/1".into(), + vec![], + 1024, + 1024, + Duration::from_secs(2), + Some(tx), + ); + + let role = Role::Full; + let net_conf = NetworkConfiguration::new_local(); + let mut network_config = FullNetworkConfiguration::new(&net_conf, None); + network_config.add_request_response_protocol(request_response_config); + let genesis_hash = runtime::Hash::zero(); + let (block_announce_config, notification_service) = N::notification_config( + "/block-announces/1".into(), + vec![], + 1024, + Some(NotificationHandshake::new(BlockAnnouncesHandshake::<runtime::Block>::build( + Roles::from(&Role::Full), + Zero::zero(), + genesis_hash, + genesis_hash, + ))), + SetConfig { + in_peers: 1, + out_peers: 1, + reserved_nodes: vec![], + non_reserved_mode: NonReservedPeerMode::Accept, + }, + NotificationMetrics::new(None), + network_config.peer_store_handle(), + ); + let worker = N::new(Params::<runtime::Block, runtime::Hash, N> { + block_announce_config, + role, + executor: Box::new(|f| { + tokio::spawn(f); + }), + genesis_hash: runtime::Hash::zero(), + network_config, + protocol_id: ProtocolId::from("test"), + fork_id: None, + metrics_registry: None, + bitswap_config: None, + notification_metrics: NotificationMetrics::new(None), + }) + .unwrap(); + let network_service = worker.network_service(); + + // Run the worker in the backend. + tokio::spawn(worker.run()); + + NetworkBackendClient { + network_service, + notification_service: Arc::new(Mutex::new(notification_service)), + receiver: rx, + } +} + +/// Connect two backends together and submit one request with `IfDisconnected::TryConnect` option +/// expecting the left backend to dial the right one. +pub async fn connect_backends(left: &NetworkBackendClient, right: &NetworkBackendClient) { + let right_peer_id = right.network_service.local_peer_id(); + + // Ensure the right backend responds to a first request + let rx = right.receiver.clone(); + tokio::spawn(async move { + let request = rx.recv().await.expect("Left backend should receive a request"); + assert_eq!(request.payload, vec![1, 2, 3]); + request + .pending_response + .send(OutgoingResponse { + result: Ok(vec![4, 5, 6]), + reputation_changes: vec![], + sent_feedback: None, + }) + .expect("Left backend should send a response"); + }); + + // Connect the two backends + while left.network_service.listen_addresses().is_empty() { + tokio::time::sleep(Duration::from_millis(10)).await; + } + while right.network_service.listen_addresses().is_empty() { + tokio::time::sleep(Duration::from_millis(10)).await; + } + let right_listen_address = right + .network_service + .listen_addresses() + .first() + .expect("qed; non empty") + .clone(); + + left.network_service + .add_known_address(right_peer_id, right_listen_address.clone().into()); + + let result = left + .network_service + .request( + right_peer_id, + "/request-response/1".into(), + vec![1, 2, 3], + None, + IfDisconnected::TryConnect, + ) + .await + .expect("Left backend should send a request"); + assert_eq!(result.0, vec![4, 5, 6]); + assert_eq!(result.1, "/request-response/1".into()); +} + +/// Ensure connectivity on the notification protocol level. +pub async fn connect_notifications(left: &NetworkBackendClient, right: &NetworkBackendClient) { + let right_peer_id = right.network_service.local_peer_id(); + + while left.network_service.listen_addresses().is_empty() { + tokio::time::sleep(Duration::from_millis(10)).await; + } + while right.network_service.listen_addresses().is_empty() { + tokio::time::sleep(Duration::from_millis(10)).await; + } + + let right_listen_address = right + .network_service + .listen_addresses() + .first() + .expect("qed; non empty") + .clone(); + + left.network_service + .add_reserved_peer(MultiaddrWithPeerId { + multiaddr: right_listen_address.into(), + peer_id: right_peer_id, + }) + .unwrap(); + + let mut notifications_left = left.notification_service.lock().await; + let mut notifications_right = right.notification_service.lock().await; + let mut opened = 0; + loop { + tokio::select! { + Some(event) = notifications_left.next_event() => { + match event { + NotificationEvent::NotificationStreamOpened { .. } => { + opened += 1; + if opened >= 2 { + break; + } + }, + NotificationEvent::ValidateInboundSubstream { result_tx, .. } => { + result_tx.send(sc_network::service::traits::ValidationResult::Accept).unwrap(); + }, + _ => {}, + }; + }, + Some(event) = notifications_right.next_event() => { + match event { + NotificationEvent::ValidateInboundSubstream { result_tx, .. } => { + result_tx.send(sc_network::service::traits::ValidationResult::Accept).unwrap(); + }, + NotificationEvent::NotificationStreamOpened { .. } => { + opened += 1; + if opened >= 2 { + break; + } + }, + _ => {} + } + }, + } + } +} diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 3cdf211e07f6898d044ec6fffdcdfa5a49ea47e0..1a810f66494d47a308c62bde4c414951ed5642a1 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -20,6 +20,8 @@ #[cfg(test)] mod block_import; #[cfg(test)] +mod conformance; +#[cfg(test)] mod fuzz; #[cfg(test)] mod service;