diff --git a/substrate/core/cli/src/lib.rs b/substrate/core/cli/src/lib.rs index e445addf5ddd762b8d8ba34309f08b89ca1f72b6..f24ff6eafa295968a97a8fd771ccb4df476b7276 100644 --- a/substrate/core/cli/src/lib.rs +++ b/substrate/core/cli/src/lib.rs @@ -628,6 +628,8 @@ fn fill_network_configuration( wasm_external_transport: None, }; + config.max_parallel_downloads = cli.max_parallel_downloads; + Ok(()) } diff --git a/substrate/core/cli/src/params.rs b/substrate/core/cli/src/params.rs index d0a235241667c827a025904f940f08b7555bba1b..7a296620da49277fc81d87c250bc684658a61a0e 100644 --- a/substrate/core/cli/src/params.rs +++ b/substrate/core/cli/src/params.rs @@ -146,11 +146,11 @@ pub struct NetworkConfigurationParams { pub port: Option<u16>, /// Specify the number of outgoing connections we're trying to maintain. - #[structopt(long = "out-peers", value_name = "OUT_PEERS", default_value = "25")] + #[structopt(long = "out-peers", value_name = "COUNT", default_value = "25")] pub out_peers: u32, /// Specify the maximum number of incoming connections we're accepting. - #[structopt(long = "in-peers", value_name = "IN_PEERS", default_value = "25")] + #[structopt(long = "in-peers", value_name = "COUNT", default_value = "25")] pub in_peers: u32, /// Disable mDNS discovery. @@ -160,6 +160,13 @@ pub struct NetworkConfigurationParams { #[structopt(long = "no-mdns")] pub no_mdns: bool, + /// Maximum number of peers to ask the same blocks in parallel. + /// + /// This allows downlading announced blocks from multiple peers. Decrease to save + /// traffic and risk increased latency. + #[structopt(long = "max-parallel-downloads", value_name = "COUNT", default_value = "5")] + pub max_parallel_downloads: u32, + #[allow(missing_docs)] #[structopt(flatten)] pub node_key_params: NodeKeyParams diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index efc2d3700bff9ada6bb026f2698676941aeea55f..bdb032df3decfc0d7ccd3781b24dc0e9ef94b0ed 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -94,9 +94,9 @@ impl TestNetFactory for GrandpaTestNet { fn default_config() -> ProtocolConfig { // the authority role ensures gossip hits all nodes here. - ProtocolConfig { - roles: Roles::AUTHORITY, - } + let mut config = ProtocolConfig::default(); + config.roles = Roles::AUTHORITY; + config } fn make_verifier( diff --git a/substrate/core/network/src/config.rs b/substrate/core/network/src/config.rs index be01b90c363636183f195f8dceb58b7505d7cd5d..b22e7d2790d458250aac9193184a895eeb0aa594 100644 --- a/substrate/core/network/src/config.rs +++ b/substrate/core/network/src/config.rs @@ -81,7 +81,7 @@ pub struct Params<B: BlockT, S, H: ExHashT> { pub specialization: S, /// Type to check incoming block announcements. - pub block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send> + pub block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>, } bitflags! { @@ -261,6 +261,8 @@ pub struct NetworkConfiguration { pub node_name: String, /// Configuration for the transport layer. pub transport: TransportConfig, + /// Maximum number of peers to ask the same blocks in parallel. + pub max_parallel_downloads: u32, } impl Default for NetworkConfiguration { @@ -282,6 +284,7 @@ impl Default for NetworkConfiguration { enable_mdns: false, wasm_external_transport: None, }, + max_parallel_downloads: 5, } } } diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 35de679489e2512c894689d6fa46b6c465734410..12a759c437ec3e16a7c87ff7e143f13c63eb18aa 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -362,12 +362,15 @@ struct ContextData<B: BlockT, H: ExHashT> { pub struct ProtocolConfig { /// Assigned roles. pub roles: Roles, + /// Maximum number of peers to ask the same blocks in parallel. + pub max_parallel_downloads: u32, } impl Default for ProtocolConfig { fn default() -> ProtocolConfig { ProtocolConfig { roles: Roles::FULL, + max_parallel_downloads: 5, } } } @@ -393,6 +396,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { &info, finality_proof_request_builder, block_announce_validator, + config.max_parallel_downloads, ); let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset_config); let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>(); diff --git a/substrate/core/network/src/protocol/sync.rs b/substrate/core/network/src/protocol/sync.rs index 70fe0d942b854220ddb2164ae9111715a1f81e38..d4ecd0e1dcb0347772e295ab39cf0fa1de8c3720 100644 --- a/substrate/core/network/src/protocol/sync.rs +++ b/substrate/core/network/src/protocol/sync.rs @@ -127,7 +127,9 @@ pub struct ChainSync<B: BlockT> { /// A flag that caches idle state with no pending requests. is_idle: bool, /// A type to check incoming block announcements. - block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send> + block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>, + /// Maximum number of peers to ask the same blocks in parallel. + max_parallel_downloads: u32, } /// All the data we have about a Peer that we are trying to sync with @@ -282,7 +284,8 @@ impl<B: BlockT> ChainSync<B> { client: Arc<dyn crate::chain::Client<B>>, info: &ClientInfo<B>, request_builder: Option<BoxFinalityProofRequestBuilder<B>>, - block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send> + block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>, + max_parallel_downloads: u32, ) -> Self { let mut required_block_attributes = BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION; @@ -306,6 +309,7 @@ impl<B: BlockT> ChainSync<B> { fork_targets: Default::default(), is_idle: false, block_announce_validator, + max_parallel_downloads, } } @@ -571,6 +575,7 @@ impl<B: BlockT> ChainSync<B> { let best_queued = self.best_queued_number; let client = &self.client; let queue = &self.queue_blocks; + let max_parallel = if major_sync { 1 } else { self.max_parallel_downloads }; let iter = self.peers.iter_mut().filter_map(move |(id, peer)| { if !peer.state.is_available() { trace!(target: "sync", "Peer {} is busy", id); @@ -592,13 +597,19 @@ impl<B: BlockT> ChainSync<B> { peer.state = PeerSyncState::DownloadingStale(hash); have_requests = true; Some((id.clone(), req)) - } else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs, major_sync) { + } else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs, max_parallel) { peer.state = PeerSyncState::DownloadingNew(range.start); - trace!(target: "sync", "New block request for {}", id); + trace!( + target: "sync", + "New block request for {}, (best:{}, common:{}) {:?}", + id, + peer.best_number, + peer.common_number, + req, + ); have_requests = true; Some((id.clone(), req)) } else { - trace!(target: "sync", "No new block request for {}", id); None } }); @@ -1006,7 +1017,7 @@ impl<B: BlockT> ChainSync<B> { { let header = &announce.header; let number = *header.number(); - debug!(target: "sync", "Received block announcement with number {:?}", number); + debug!(target: "sync", "Received block announcement {:?} with number {:?} from {}", hash, number, who); if number.is_zero() { warn!(target: "sync", "Ignored genesis block (#0) announcement from {}: {}", who, hash); return OnBlockAnnounce::Nothing @@ -1226,15 +1237,14 @@ fn peer_block_request<B: BlockT>( peer: &PeerSync<B>, blocks: &mut BlockCollection<B>, attrs: &message::BlockAttributes, - major_sync: bool, + max_parallel_downloads: u32, ) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> { - let max_parallel = if major_sync { 1 } else { 3 }; if let Some(range) = blocks.needed_blocks( id.clone(), MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number, - max_parallel, + max_parallel_downloads, ) { let request = message::generic::BlockRequest { id: 0, diff --git a/substrate/core/network/src/protocol/sync/blocks.rs b/substrate/core/network/src/protocol/sync/blocks.rs index a972caf9519ec49dae6a2e42747902cee8a69af8..c799a52c37d8a4d1da1c8fdcc9395c1bfe061ed2 100644 --- a/substrate/core/network/src/protocol/sync/blocks.rs +++ b/substrate/core/network/src/protocol/sync/blocks.rs @@ -105,6 +105,10 @@ impl<B: BlockT> BlockCollection<B> { max_parallel: u32, ) -> Option<Range<NumberFor<B>>> { + if peer_best <= common { + // Bail out early + return None; + } // First block number that we need to download let first_different = common + <NumberFor<B>>::one(); let count = (count as u32).into(); diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index e73bff9b1cd2e760a7fadb000a779c992d89a0bd..7ddc27995c523071af632a12cdae41c964bc1a1a 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -194,7 +194,10 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker let num_connected = Arc::new(AtomicUsize::new(0)); let is_major_syncing = Arc::new(AtomicBool::new(false)); let (protocol, peerset_handle) = Protocol::new( - protocol::ProtocolConfig { roles: params.roles }, + protocol::ProtocolConfig { + roles: params.roles, + max_parallel_downloads: params.network_config.max_parallel_downloads, + }, params.chain, params.on_demand.as_ref().map(|od| od.checker().clone()) .unwrap_or(Arc::new(AlwaysBadChecker)), diff --git a/substrate/core/service/test/src/lib.rs b/substrate/core/service/test/src/lib.rs index c147f8051065ea2c4435f8132ca9eb3896f5e256..af02c7c3aae52fc7ba8875456b1ee5d77eebaa2d 100644 --- a/substrate/core/service/test/src/lib.rs +++ b/substrate/core/service/test/src/lib.rs @@ -160,6 +160,7 @@ fn node_config<G, E: Clone> ( enable_mdns: false, wasm_external_transport: None, }, + max_parallel_downloads: NetworkConfiguration::default().max_parallel_downloads, }; Configuration {