diff --git a/substrate/client/network/src/service/tests/chain_sync.rs b/substrate/client/network/src/service/tests/chain_sync.rs index 7ff8c589d855032c937600eafa0cb3ca015679fa..21149459413f4a8fd526d045e49676f0d4641653 100644 --- a/substrate/client/network/src/service/tests/chain_sync.rs +++ b/substrate/client/network/src/service/tests/chain_sync.rs @@ -16,7 +16,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -use crate::service::tests::TestNetworkBuilder; +use crate::{ + config, + service::tests::{TestNetworkBuilder, BLOCK_ANNOUNCE_PROTO_NAME}, +}; use futures::prelude::*; use libp2p::PeerId; @@ -24,16 +27,23 @@ use sc_block_builder::BlockBuilderProvider; use sc_client_api::HeaderBackend; use sc_consensus::JustificationSyncLink; use sc_network_common::{ + config::{MultiaddrWithPeerId, SetConfig}, + protocol::event::Event, service::NetworkSyncForkRequest, sync::{SyncState, SyncStatus}, }; -use sc_network_sync::{mock::MockChainSync, service::mock::MockChainSyncInterface}; +use sc_network_sync::{mock::MockChainSync, service::mock::MockChainSyncInterface, ChainSync}; use sp_core::H256; use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, Header as _}, }; -use std::{iter, sync::Arc, task::Poll}; +use std::{ + iter, + sync::{Arc, RwLock}, + task::Poll, + time::Duration, +}; use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _}; fn set_default_expecations_no_peers( @@ -224,3 +234,169 @@ async fn on_block_finalized() { }) .await; } + +// report from mock import queue that importing a justification was not successful +// and verify that connection to the peer is closed +#[async_std::test] +async fn invalid_justification_imported() { + struct DummyImportQueue( + Arc< + RwLock< + Option<( + PeerId, + substrate_test_runtime_client::runtime::Hash, + sp_runtime::traits::NumberFor<substrate_test_runtime_client::runtime::Block>, + )>, + >, + >, + ); + + impl sc_consensus::ImportQueue<substrate_test_runtime_client::runtime::Block> for DummyImportQueue { + fn import_blocks( + &mut self, + _origin: sp_consensus::BlockOrigin, + _blocks: Vec< + sc_consensus::IncomingBlock<substrate_test_runtime_client::runtime::Block>, + >, + ) { + } + + fn import_justifications( + &mut self, + _who: sc_consensus::import_queue::RuntimeOrigin, + _hash: substrate_test_runtime_client::runtime::Hash, + _number: sp_runtime::traits::NumberFor<substrate_test_runtime_client::runtime::Block>, + _justifications: sp_runtime::Justifications, + ) { + } + + fn poll_actions( + &mut self, + _cx: &mut futures::task::Context, + link: &mut dyn sc_consensus::Link<substrate_test_runtime_client::runtime::Block>, + ) { + if let Some((peer, hash, number)) = *self.0.read().unwrap() { + link.justification_imported(peer, &hash, number, false); + } + } + } + + let justification_info = Arc::new(RwLock::new(None)); + let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())]; + + let (service1, mut event_stream1) = TestNetworkBuilder::new() + .with_import_queue(Box::new(DummyImportQueue(justification_info.clone()))) + .with_listen_addresses(vec![listen_addr.clone()]) + .build() + .start_network(); + + let (service2, mut event_stream2) = TestNetworkBuilder::new() + .with_set_config(SetConfig { + reserved_nodes: vec![MultiaddrWithPeerId { + multiaddr: listen_addr, + peer_id: service1.local_peer_id, + }], + ..Default::default() + }) + .build() + .start_network(); + + async fn wait_for_events(stream: &mut (impl Stream<Item = Event> + std::marker::Unpin)) { + let mut notif_received = false; + let mut sync_received = false; + while !notif_received || !sync_received { + match stream.next().await.unwrap() { + Event::NotificationStreamOpened { .. } => notif_received = true, + Event::SyncConnected { .. } => sync_received = true, + _ => {}, + }; + } + } + + wait_for_events(&mut event_stream1).await; + wait_for_events(&mut event_stream2).await; + + { + let mut info = justification_info.write().unwrap(); + *info = Some((service2.local_peer_id, H256::random(), 1337u64)); + } + + let wait_disconnection = async { + while !std::matches!(event_stream1.next().await, Some(Event::SyncDisconnected { .. })) {} + }; + + if async_std::future::timeout(Duration::from_secs(5), wait_disconnection) + .await + .is_err() + { + panic!("did not receive disconnection event in time"); + } +} + +#[async_std::test] +async fn disconnect_peer_using_chain_sync_handle() { + let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); + let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())]; + + let (chain_sync_network_provider, chain_sync_network_handle) = + sc_network_sync::service::network::NetworkServiceProvider::new(); + let handle_clone = chain_sync_network_handle.clone(); + + let (chain_sync, chain_sync_service) = ChainSync::new( + sc_network_common::sync::SyncMode::Full, + client.clone(), + Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator), + 1u32, + None, + chain_sync_network_handle.clone(), + ) + .unwrap(); + + let (node1, mut event_stream1) = TestNetworkBuilder::new() + .with_listen_addresses(vec![listen_addr.clone()]) + .with_chain_sync((Box::new(chain_sync), chain_sync_service)) + .with_chain_sync_network((chain_sync_network_provider, chain_sync_network_handle)) + .with_client(client.clone()) + .build() + .start_network(); + + let (node2, mut event_stream2) = TestNetworkBuilder::new() + .with_set_config(SetConfig { + reserved_nodes: vec![MultiaddrWithPeerId { + multiaddr: listen_addr, + peer_id: node1.local_peer_id, + }], + ..Default::default() + }) + .with_client(client.clone()) + .build() + .start_network(); + + async fn wait_for_events(stream: &mut (impl Stream<Item = Event> + std::marker::Unpin)) { + let mut notif_received = false; + let mut sync_received = false; + while !notif_received || !sync_received { + match stream.next().await.unwrap() { + Event::NotificationStreamOpened { .. } => notif_received = true, + Event::SyncConnected { .. } => sync_received = true, + _ => {}, + }; + } + } + + wait_for_events(&mut event_stream1).await; + wait_for_events(&mut event_stream2).await; + + handle_clone.disconnect_peer(node2.local_peer_id, BLOCK_ANNOUNCE_PROTO_NAME.into()); + + let wait_disconnection = async { + while !std::matches!(event_stream1.next().await, Some(Event::SyncDisconnected { .. })) {} + }; + + if async_std::future::timeout(Duration::from_secs(5), wait_disconnection) + .await + .is_err() + { + panic!("did not receive disconnection event in time"); + } +} diff --git a/substrate/client/network/src/service/tests/mod.rs b/substrate/client/network/src/service/tests/mod.rs index f829d9d43090fd587907f2092e90738899e5198c..ef25616a07b0dc5e2471e51550e8097eb0c6321c 100644 --- a/substrate/client/network/src/service/tests/mod.rs +++ b/substrate/client/network/src/service/tests/mod.rs @@ -33,7 +33,9 @@ use sc_network_common::{ }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ - block_request_handler::BlockRequestHandler, state_request_handler::StateRequestHandler, + block_request_handler::BlockRequestHandler, + service::network::{NetworkServiceHandle, NetworkServiceProvider}, + state_request_handler::StateRequestHandler, ChainSync, }; use sp_runtime::traits::{Block as BlockT, Header as _, Zero}; @@ -93,6 +95,7 @@ struct TestNetworkBuilder { listen_addresses: Vec<Multiaddr>, set_config: Option<SetConfig>, chain_sync: Option<(Box<dyn ChainSyncT<TestBlock>>, Box<dyn ChainSyncInterface<TestBlock>>)>, + chain_sync_network: Option<(NetworkServiceProvider, NetworkServiceHandle)>, config: Option<config::NetworkConfiguration>, } @@ -104,6 +107,7 @@ impl TestNetworkBuilder { listen_addresses: Vec::new(), set_config: None, chain_sync: None, + chain_sync_network: None, config: None, } } @@ -136,6 +140,19 @@ impl TestNetworkBuilder { self } + pub fn with_chain_sync_network( + mut self, + chain_sync_network: (NetworkServiceProvider, NetworkServiceHandle), + ) -> Self { + self.chain_sync_network = Some(chain_sync_network); + self + } + + pub fn with_import_queue(mut self, import_queue: Box<dyn ImportQueue<TestBlock>>) -> Self { + self.import_queue = Some(import_queue); + self + } + pub fn build(mut self) -> TestNetwork { let client = self.client.as_mut().map_or( Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0), @@ -199,6 +216,9 @@ impl TestNetworkBuilder { None, ))); + let (chain_sync_network_provider, chain_sync_network_handle) = + self.chain_sync_network.unwrap_or(NetworkServiceProvider::new()); + let (chain_sync, chain_sync_service) = self.chain_sync.unwrap_or({ let (chain_sync, chain_sync_service) = ChainSync::new( match network_config.sync_mode { @@ -214,6 +234,7 @@ impl TestNetworkBuilder { Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator), network_config.max_parallel_downloads, None, + chain_sync_network_handle, ) .unwrap(); @@ -292,6 +313,11 @@ impl TestNetworkBuilder { }) .unwrap(); + let service = worker.service().clone(); + async_std::task::spawn(async move { + let _ = chain_sync_network_provider.run(service).await; + }); + TestNetwork::new(worker) } } diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 63f63f7188cfebf94152be6e3982e99c9cc8d32c..f369bdb47e1c6c282b2484209db12d2fdc133a55 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -271,6 +271,8 @@ pub struct ChainSync<B: BlockT, Client> { gap_sync: Option<GapSync<B>>, /// Channel for receiving service commands service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>, + /// Handle for communicating with `NetworkService` + _network_service: service::network::NetworkServiceHandle, } /// All the data we have about a Peer that we are trying to sync with @@ -1775,6 +1777,7 @@ where block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>, max_parallel_downloads: u32, warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>, + _network_service: service::network::NetworkServiceHandle, ) -> Result<(Self, Box<ChainSyncInterfaceHandle<B>>), ClientError> { let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync"); @@ -1800,6 +1803,7 @@ where import_existing: false, gap_sync: None, service_rx, + _network_service, }; sync.reset_sync_start_point()?; Ok((sync, Box::new(ChainSyncInterfaceHandle::new(tx)))) @@ -2670,6 +2674,7 @@ fn validate_blocks<Block: BlockT>( #[cfg(test)] mod test { use super::*; + use crate::service::network::NetworkServiceProvider; use futures::{executor::block_on, future::poll_fn}; use sc_block_builder::BlockBuilderProvider; use sc_network_common::sync::message::{BlockData, BlockState, FromBlock}; @@ -2691,9 +2696,17 @@ mod test { let block_announce_validator = Box::new(DefaultBlockAnnounceValidator); let peer_id = PeerId::random(); - let (mut sync, _) = - ChainSync::new(SyncMode::Full, client.clone(), block_announce_validator, 1, None) - .unwrap(); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); + let (mut sync, _) = ChainSync::new( + SyncMode::Full, + client.clone(), + block_announce_validator, + 1, + None, + chain_sync_network_handle, + ) + .unwrap(); let (a1_hash, a1_number) = { let a1 = client.new_block(Default::default()).unwrap().build().unwrap().block; @@ -2739,12 +2752,16 @@ mod test { #[test] fn restart_doesnt_affect_peers_downloading_finality_data() { let mut client = Arc::new(TestClientBuilder::new().build()); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); + let (mut sync, _) = ChainSync::new( SyncMode::Full, client.clone(), Box::new(DefaultBlockAnnounceValidator), 1, None, + chain_sync_network_handle, ) .unwrap(); @@ -2905,6 +2922,8 @@ mod test { sp_tracing::try_init_simple(); let mut client = Arc::new(TestClientBuilder::new().build()); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); let (mut sync, _) = ChainSync::new( SyncMode::Full, @@ -2912,6 +2931,7 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 5, None, + chain_sync_network_handle, ) .unwrap(); @@ -3019,6 +3039,8 @@ mod test { }; let mut client = Arc::new(TestClientBuilder::new().build()); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); let info = client.info(); let (mut sync, _) = ChainSync::new( @@ -3027,6 +3049,7 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 5, None, + chain_sync_network_handle, ) .unwrap(); @@ -3140,6 +3163,8 @@ mod test { fn can_sync_huge_fork() { sp_tracing::try_init_simple(); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 4) .map(|_| build_block(&mut client, None, false)) @@ -3170,6 +3195,7 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 5, None, + chain_sync_network_handle, ) .unwrap(); @@ -3269,6 +3295,8 @@ mod test { fn syncs_fork_without_duplicate_requests() { sp_tracing::try_init_simple(); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 4) .map(|_| build_block(&mut client, None, false)) @@ -3299,6 +3327,7 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 5, None, + chain_sync_network_handle, ) .unwrap(); @@ -3419,6 +3448,8 @@ mod test { #[test] fn removes_target_fork_on_disconnect() { sp_tracing::try_init_simple(); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..3).map(|_| build_block(&mut client, None, false)).collect::<Vec<_>>(); @@ -3428,6 +3459,7 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 1, None, + chain_sync_network_handle, ) .unwrap(); @@ -3450,6 +3482,8 @@ mod test { #[test] fn can_import_response_with_missing_blocks() { sp_tracing::try_init_simple(); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); let mut client2 = Arc::new(TestClientBuilder::new().build()); let blocks = (0..4).map(|_| build_block(&mut client2, None, false)).collect::<Vec<_>>(); @@ -3461,6 +3495,7 @@ mod test { Box::new(DefaultBlockAnnounceValidator), 1, None, + chain_sync_network_handle, ) .unwrap(); diff --git a/substrate/client/network/sync/src/service/mock.rs b/substrate/client/network/sync/src/service/mock.rs index e283907b392d12f3dd64e441c9eaa390dda4e652..c146e1ec07b489fa6ddb10bf17d6cb1ac62df719 100644 --- a/substrate/client/network/sync/src/service/mock.rs +++ b/substrate/client/network/sync/src/service/mock.rs @@ -16,10 +16,15 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -use libp2p::PeerId; -use sc_network_common::service::NetworkSyncForkRequest; +use sc_network_common::service::{NetworkPeers, NetworkSyncForkRequest}; use sp_runtime::traits::{Block as BlockT, NumberFor}; +pub use libp2p::{identity::error::SigningError, kad::record::Key as KademliaKey}; +use libp2p::{Multiaddr, PeerId}; +use sc_network_common::{config::MultiaddrWithPeerId, protocol::ProtocolName}; +use sc_peerset::ReputationChange; +use std::collections::HashSet; + mockall::mock! { pub ChainSyncInterface<B: BlockT> {} @@ -29,3 +34,42 @@ mockall::mock! { fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>); } } + +mockall::mock! { + pub NetworkServiceHandle {} +} + +// Mocked `Network` for `ChainSync`-related tests +mockall::mock! { + pub Network {} + + impl NetworkPeers for Network { + fn set_authorized_peers(&self, peers: HashSet<PeerId>); + fn set_authorized_only(&self, reserved_only: bool); + fn add_known_address(&self, peer_id: PeerId, addr: Multiaddr); + fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange); + fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName); + fn accept_unreserved_peers(&self); + fn deny_unreserved_peers(&self); + fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String>; + fn remove_reserved_peer(&self, peer_id: PeerId); + fn set_reserved_peers( + &self, + protocol: ProtocolName, + peers: HashSet<Multiaddr>, + ) -> Result<(), String>; + fn add_peers_to_reserved_set( + &self, + protocol: ProtocolName, + peers: HashSet<Multiaddr>, + ) -> Result<(), String>; + fn remove_peers_from_reserved_set(&self, protocol: ProtocolName, peers: Vec<PeerId>); + fn add_to_peers_set( + &self, + protocol: ProtocolName, + peers: HashSet<Multiaddr>, + ) -> Result<(), String>; + fn remove_from_peers_set(&self, protocol: ProtocolName, peers: Vec<PeerId>); + fn sync_num_connected(&self) -> usize; + } +} diff --git a/substrate/client/network/sync/src/service/mod.rs b/substrate/client/network/sync/src/service/mod.rs index d64d9bbd1b01f6aef3393825a91f57404d5d97e8..692aa269854583361f857498cecf193471fc3fc5 100644 --- a/substrate/client/network/sync/src/service/mod.rs +++ b/substrate/client/network/sync/src/service/mod.rs @@ -20,3 +20,4 @@ pub mod chain_sync; pub mod mock; +pub mod network; diff --git a/substrate/client/network/sync/src/service/network.rs b/substrate/client/network/sync/src/service/network.rs new file mode 100644 index 0000000000000000000000000000000000000000..44ed177661264fcdc3ffd1505ec77c01064f38b6 --- /dev/null +++ b/substrate/client/network/sync/src/service/network.rs @@ -0,0 +1,128 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use futures::StreamExt; +use libp2p::PeerId; +use sc_network_common::{protocol::ProtocolName, service::NetworkPeers}; +use sc_peerset::ReputationChange; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use std::sync::Arc; + +/// Network-related services required by `sc-network-sync` +pub trait Network: NetworkPeers {} + +impl<T> Network for T where T: NetworkPeers {} + +/// Network service provider for `ChainSync` +/// +/// It runs as an asynchronous task and listens to commands coming from `ChainSync` and +/// calls the `NetworkService` on its behalf. +pub struct NetworkServiceProvider { + rx: TracingUnboundedReceiver<ToServiceCommand>, +} + +/// Commands that `ChainSync` wishes to send to `NetworkService` +pub enum ToServiceCommand { + /// Call `NetworkPeers::disconnect_peer()` + DisconnectPeer(PeerId, ProtocolName), + + /// Call `NetworkPeers::report_peer()` + ReportPeer(PeerId, ReputationChange), +} + +/// Handle that is (temporarily) passed to `ChainSync` so it can +/// communicate with `NetworkService` through `SyncingEngine` +#[derive(Clone)] +pub struct NetworkServiceHandle { + tx: TracingUnboundedSender<ToServiceCommand>, +} + +impl NetworkServiceHandle { + /// Create new service handle + pub fn new(tx: TracingUnboundedSender<ToServiceCommand>) -> NetworkServiceHandle { + Self { tx } + } + + /// Report peer + pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { + let _ = self.tx.unbounded_send(ToServiceCommand::ReportPeer(who, cost_benefit)); + } + + /// Disconnect peer + pub fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) { + let _ = self.tx.unbounded_send(ToServiceCommand::DisconnectPeer(who, protocol)); + } +} + +impl NetworkServiceProvider { + /// Create new `NetworkServiceProvider` + pub fn new() -> (Self, NetworkServiceHandle) { + let (tx, rx) = tracing_unbounded("mpsc_network_service_provider"); + + (Self { rx }, NetworkServiceHandle::new(tx)) + } + + /// Run the `NetworkServiceProvider` + pub async fn run(mut self, service: Arc<dyn Network + Send + Sync>) { + while let Some(inner) = self.rx.next().await { + match inner { + ToServiceCommand::DisconnectPeer(peer, protocol_name) => + service.disconnect_peer(peer, protocol_name), + ToServiceCommand::ReportPeer(peer, reputation_change) => + service.report_peer(peer, reputation_change), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::service::mock::MockNetwork; + + // typical pattern in `Protocol` code where peer is disconnected + // and then reported + #[async_std::test] + async fn disconnect_and_report_peer() { + let (provider, handle) = NetworkServiceProvider::new(); + + let peer = PeerId::random(); + let proto = ProtocolName::from("test-protocol"); + let proto_clone = proto.clone(); + let change = sc_peerset::ReputationChange::new_fatal("test-change"); + + let mut mock_network = MockNetwork::new(); + mock_network + .expect_disconnect_peer() + .withf(move |in_peer, in_proto| &peer == in_peer && &proto == in_proto) + .once() + .returning(|_, _| ()); + mock_network + .expect_report_peer() + .withf(move |in_peer, in_change| &peer == in_peer && &change == in_change) + .once() + .returning(|_, _| ()); + + async_std::task::spawn(async move { + provider.run(Arc::new(mock_network)).await; + }); + + handle.disconnect_peer(peer, proto_clone); + handle.report_peer(peer, change); + } +} diff --git a/substrate/client/network/sync/src/tests.rs b/substrate/client/network/sync/src/tests.rs index 47483c4ac440d0f17701f3877e77037ca9f2aa12..479c78bfdea9735334da5388fd86156637319b3d 100644 --- a/substrate/client/network/sync/src/tests.rs +++ b/substrate/client/network/sync/src/tests.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -use crate::{ChainSync, ForkTarget}; +use crate::{service::network::NetworkServiceProvider, ChainSync, ForkTarget}; use libp2p::PeerId; use sc_network_common::{service::NetworkSyncForkRequest, sync::ChainSync as ChainSyncT}; @@ -29,12 +29,14 @@ use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _ // poll `ChainSync` and verify that a new sync fork request has been registered #[async_std::test] async fn delegate_to_chainsync() { + let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (mut chain_sync, chain_sync_service) = ChainSync::new( sc_network_common::sync::SyncMode::Full, Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0), Box::new(DefaultBlockAnnounceValidator), 1u32, None, + chain_sync_network_handle, ) .unwrap(); diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 5460cc7d524613cc9ad96878b2b0468343a96edb..c9b99fbc6af10771715cefa654c8c8743abe86c2 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -61,8 +61,8 @@ use sc_network_common::{ }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ - block_request_handler::BlockRequestHandler, state_request_handler::StateRequestHandler, - warp_request_handler, ChainSync, + block_request_handler::BlockRequestHandler, service::network::NetworkServiceProvider, + state_request_handler::StateRequestHandler, warp_request_handler, ChainSync, }; use sc_service::client::Client; use sp_blockchain::{ @@ -864,6 +864,8 @@ where let block_announce_validator = config .block_announce_validator .unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)); + let (chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); let (chain_sync, chain_sync_service) = ChainSync::new( match network_config.sync_mode { SyncMode::Full => sc_network_common::sync::SyncMode::Full, @@ -878,6 +880,7 @@ where block_announce_validator, network_config.max_parallel_downloads, Some(warp_sync), + chain_sync_network_handle, ) .unwrap(); let block_announce_config = chain_sync.get_block_announce_proto_config( @@ -915,6 +918,11 @@ where trace!(target: "test_network", "Peer identifier: {}", network.service().local_peer_id()); + let service = network.service().clone(); + async_std::task::spawn(async move { + chain_sync_network_provider.run(service).await; + }); + self.mut_peers(move |peers| { for peer in peers.iter_mut() { peer.network diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 1a162688390549cb494f2f19678662f988579c67..3cb064ec814c5563cec9028082222feabf85b665 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -46,7 +46,8 @@ use sc_network_common::{ }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ - block_request_handler::BlockRequestHandler, state_request_handler::StateRequestHandler, + block_request_handler::BlockRequestHandler, service::network::NetworkServiceProvider, + state_request_handler::StateRequestHandler, warp_request_handler::RequestHandler as WarpSyncRequestHandler, ChainSync, }; use sc_rpc::{ @@ -844,6 +845,7 @@ where protocol_config }; + let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (chain_sync, chain_sync_service) = ChainSync::new( match config.network.sync_mode { SyncMode::Full => sc_network_common::sync::SyncMode::Full, @@ -855,7 +857,9 @@ where block_announce_validator, config.network.max_parallel_downloads, warp_sync_provider, + chain_sync_network_handle, )?; + let block_announce_config = chain_sync.get_block_announce_proto_config( protocol_id.clone(), &config.chain_spec.fork_id().map(ToOwned::to_owned), @@ -926,7 +930,13 @@ where Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }), config.prometheus_config.as_ref().map(|config| &config.registry), )?; + spawn_handle.spawn("network-transactions-handler", Some("networking"), tx_handler.run()); + spawn_handle.spawn( + "chain-sync-network-service-provider", + Some("networking"), + chain_sync_network_provider.run(network.clone()), + ); let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc");