diff --git a/substrate/client/network/src/config.rs b/substrate/client/network/src/config.rs index 58349fe973330cb3cb818c45eb63d4f6a3009d61..2622762da5fc9565cef134382482b7c42f95f00b 100644 --- a/substrate/client/network/src/config.rs +++ b/substrate/client/network/src/config.rs @@ -96,14 +96,8 @@ where /// valid. pub import_queue: Box<dyn ImportQueue<B>>, - /// Factory function that creates a new instance of chain sync. - pub create_chain_sync: Box< - dyn FnOnce( - sc_network_common::sync::SyncMode, - Arc<Client>, - Option<Arc<dyn WarpSyncProvider<B>>>, - ) -> crate::error::Result<Box<dyn ChainSync<B>>>, - >, + /// Instance of chain sync implementation. + pub chain_sync: Box<dyn ChainSync<B>>, /// Registry for recording prometheus metrics to. pub metrics_registry: Option<Registry>, @@ -138,8 +132,8 @@ where /// both outgoing and incoming requests. pub state_request_protocol_config: RequestResponseConfig, - /// Optional warp sync protocol support. Include protocol config and sync provider. - pub warp_sync: Option<(Arc<dyn WarpSyncProvider<B>>, RequestResponseConfig)>, + /// Optional warp sync protocol config. + pub warp_sync_protocol_config: Option<RequestResponseConfig>, } /// Role of the local node. @@ -352,7 +346,7 @@ impl From<multiaddr::Error> for ParseErr { } /// Sync operation mode. -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum SyncMode { /// Full block download and verification. Full, diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index aeb5e25497bb36f253e547b1331485a52992225c..409ed88c75c004f9f38662b8c0d952ee850ae188 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -30,7 +30,7 @@ use crate::{ behaviour::{self, Behaviour, BehaviourOut}, bitswap::Bitswap, - config::{self, parse_str_addr, Params, TransportConfig}, + config::{parse_str_addr, Params, TransportConfig}, discovery::DiscoveryConfig, error::Error, network_state::{ @@ -60,7 +60,7 @@ use metrics::{Histogram, HistogramVec, MetricSources, Metrics}; use parking_lot::Mutex; use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::{BlockImportError, BlockImportStatus, ImportQueue, Link}; -use sc_network_common::sync::{SyncMode, SyncState, SyncStatus}; +use sc_network_common::sync::{SyncState, SyncStatus}; use sc_peerset::PeersetHandle; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_blockchain::{HeaderBackend, HeaderMetadata}; @@ -239,21 +239,6 @@ where let default_notif_handshake_message = Roles::from(¶ms.role).encode(); - let (warp_sync_provider, warp_sync_protocol_config) = match params.warp_sync { - Some((p, c)) => (Some(p), Some(c)), - None => (None, None), - }; - - let chain_sync = (params.create_chain_sync)( - match params.network_config.sync_mode { - config::SyncMode::Full => SyncMode::Full, - config::SyncMode::Fast { skip_proofs, storage_chain_mode } => - SyncMode::LightState { skip_proofs, storage_chain_mode }, - config::SyncMode::Warp => SyncMode::Warp, - }, - params.chain.clone(), - warp_sync_provider, - )?; let (protocol, peerset_handle, mut known_addresses) = Protocol::new( From::from(¶ms.role), params.chain.clone(), @@ -266,7 +251,7 @@ where ) .collect(), params.metrics_registry.as_ref(), - chain_sync, + params.chain_sync, )?; // List of multiaddresses that we know in the network. @@ -303,7 +288,6 @@ where let is_major_syncing = Arc::new(AtomicBool::new(false)); // Build the swarm. - let client = params.chain.clone(); let (mut swarm, bandwidth): (Swarm<Behaviour<B, Client>>, _) = { let user_agent = format!( "{} ({})", @@ -389,7 +373,7 @@ where }; let behaviour = { - let bitswap = params.network_config.ipfs_server.then(|| Bitswap::new(client)); + let bitswap = params.network_config.ipfs_server.then(|| Bitswap::new(params.chain)); let result = Behaviour::new( protocol, user_agent, @@ -397,7 +381,7 @@ where discovery_config, params.block_request_protocol_config, params.state_request_protocol_config, - warp_sync_protocol_config, + params.warp_sync_protocol_config, bitswap, params.light_client_request_protocol_config, params.network_config.request_response_protocols, diff --git a/substrate/client/network/src/service/tests.rs b/substrate/client/network/src/service/tests.rs index 181d58130aa6b2587aff67a67992ea3c2268e814..de474ee8fe4d00b83c77a8103366e9281f163882 100644 --- a/substrate/client/network/src/service/tests.rs +++ b/substrate/client/network/src/service/tests.rs @@ -42,7 +42,7 @@ type TestNetworkService = NetworkService< /// > **Note**: We return the events stream in order to not possibly lose events between the /// > construction of the service and the moment the events stream is grabbed. fn build_test_full_node( - config: config::NetworkConfiguration, + network_config: config::NetworkConfiguration, ) -> (Arc<TestNetworkService>, impl Stream<Item = Event>) { let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); @@ -111,35 +111,36 @@ fn build_test_full_node( protocol_config }; - let max_parallel_downloads = config.max_parallel_downloads; + let chain_sync = ChainSync::new( + match network_config.sync_mode { + config::SyncMode::Full => sc_network_common::sync::SyncMode::Full, + config::SyncMode::Fast { skip_proofs, storage_chain_mode } => + sc_network_common::sync::SyncMode::LightState { skip_proofs, storage_chain_mode }, + config::SyncMode::Warp => sc_network_common::sync::SyncMode::Warp, + }, + client.clone(), + Box::new(DefaultBlockAnnounceValidator), + network_config.max_parallel_downloads, + None, + ) + .unwrap(); let worker = NetworkWorker::new(config::Params { role: config::Role::Full, executor: None, transactions_handler_executor: Box::new(|task| { async_std::task::spawn(task); }), - network_config: config, + network_config, chain: client.clone(), - transaction_pool: Arc::new(crate::config::EmptyTransactionPool), + transaction_pool: Arc::new(config::EmptyTransactionPool), protocol_id, import_queue, - create_chain_sync: Box::new( - move |sync_mode, chain, warp_sync_provider| match ChainSync::new( - sync_mode, - chain, - Box::new(DefaultBlockAnnounceValidator), - max_parallel_downloads, - warp_sync_provider, - ) { - Ok(chain_sync) => Ok(Box::new(chain_sync)), - Err(error) => Err(Box::new(error).into()), - }, - ), + chain_sync: Box::new(chain_sync), metrics_registry: None, block_request_protocol_config, state_request_protocol_config, light_client_request_protocol_config, - warp_sync: None, + warp_sync_protocol_config: None, }) .unwrap(); diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index c0e1e9f0e944f1c851966a2b8b4f3250ff68b22a..4659684987f7752f2af7c59940a238d4f3fda34b 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -838,10 +838,25 @@ where protocol_config }; - let max_parallel_downloads = network_config.max_parallel_downloads; let block_announce_validator = config .block_announce_validator .unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)); + let chain_sync = ChainSync::new( + match network_config.sync_mode { + SyncMode::Full => sc_network_common::sync::SyncMode::Full, + SyncMode::Fast { skip_proofs, storage_chain_mode } => + sc_network_common::sync::SyncMode::LightState { + skip_proofs, + storage_chain_mode, + }, + SyncMode::Warp => sc_network_common::sync::SyncMode::Warp, + }, + client.clone(), + block_announce_validator, + network_config.max_parallel_downloads, + Some(warp_sync), + ) + .unwrap(); let network = NetworkWorker::new(sc_network::config::Params { role: if config.is_authority { Role::Authority } else { Role::Full }, executor: None, @@ -853,23 +868,12 @@ where transaction_pool: Arc::new(EmptyTransactionPool), protocol_id, import_queue, - create_chain_sync: Box::new(move |sync_mode, chain, warp_sync_provider| { - match ChainSync::new( - sync_mode, - chain, - block_announce_validator, - max_parallel_downloads, - warp_sync_provider, - ) { - Ok(chain_sync) => Ok(Box::new(chain_sync)), - Err(error) => Err(Box::new(error).into()), - } - }), + chain_sync: Box::new(chain_sync), metrics_registry: None, block_request_protocol_config, state_request_protocol_config, light_client_request_protocol_config, - warp_sync: Some((warp_sync, warp_protocol_config)), + warp_sync_protocol_config: Some(warp_protocol_config), }) .unwrap(); diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 0a18943f450068c0a42a6ad05e02c07dfd24647a..ec537a33b72d5ef80a65329fa6e4d074ff85fbb5 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -760,13 +760,15 @@ where protocol_config }; - let warp_sync_params = warp_sync.map(|provider| { - // Allow both outgoing and incoming requests. - let (handler, protocol_config) = - WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone()); - spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run()); - (provider, protocol_config) - }); + let (warp_sync_provider, warp_sync_protocol_config) = warp_sync + .map(|provider| { + // Allow both outgoing and incoming requests. + let (handler, protocol_config) = + WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone()); + spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run()); + (Some(provider), Some(protocol_config)) + }) + .unwrap_or_default(); let light_client_request_protocol_config = { // Allow both outgoing and incoming requests. @@ -776,7 +778,18 @@ where protocol_config }; - let max_parallel_downloads = config.network.max_parallel_downloads; + let chain_sync = ChainSync::new( + match config.network.sync_mode { + SyncMode::Full => sc_network_common::sync::SyncMode::Full, + SyncMode::Fast { skip_proofs, storage_chain_mode } => + sc_network_common::sync::SyncMode::LightState { skip_proofs, storage_chain_mode }, + SyncMode::Warp => sc_network_common::sync::SyncMode::Warp, + }, + client.clone(), + block_announce_validator, + config.network.max_parallel_downloads, + warp_sync_provider, + )?; let network_params = sc_network::config::Params { role: config.role.clone(), executor: { @@ -796,22 +809,11 @@ where transaction_pool: transaction_pool_adapter as _, protocol_id, import_queue: Box::new(import_queue), - create_chain_sync: Box::new( - move |sync_mode, chain, warp_sync_provider| match ChainSync::new( - sync_mode, - chain, - block_announce_validator, - max_parallel_downloads, - warp_sync_provider, - ) { - Ok(chain_sync) => Ok(Box::new(chain_sync)), - Err(error) => Err(Box::new(error).into()), - }, - ), + chain_sync: Box::new(chain_sync), metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_request_protocol_config, state_request_protocol_config, - warp_sync: warp_sync_params, + warp_sync_protocol_config, light_client_request_protocol_config, };