diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index 8222767e1a1e6973d0a95caee06002934264f1f6..82f0d775eccc23425f74eeddaf6b89578e6a82c1 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -1965,7 +1965,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> { target: id, request: r, }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + self.pending_messages.push_back(event); } else { send_request( &mut self.behaviour, @@ -1982,7 +1982,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> { target: id, request: r, }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + self.pending_messages.push_back(event); } else { send_request( &mut self.behaviour, @@ -2000,7 +2000,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> { block_hash: r.block, request: r.request, }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + self.pending_messages.push_back(event); } else { send_request( &mut self.behaviour, @@ -2010,6 +2010,9 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> { GenericMessage::FinalityProofRequest(r)) } } + if let Some(message) = self.pending_messages.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message)); + } let event = match self.behaviour.poll(cx, params) { Poll::Pending => return Poll::Pending, diff --git a/substrate/client/network/src/protocol/sync.rs b/substrate/client/network/src/protocol/sync.rs index 98fbd4ae4f7b87b393e95388c153f71e2b023b94..1aba1bb66f66f5f8c3468bfaa5ebd51442740bc5 100644 --- a/substrate/client/network/src/protocol/sync.rs +++ b/substrate/client/network/src/protocol/sync.rs @@ -106,6 +106,50 @@ mod rep { pub const UNKNOWN_ANCESTOR:Rep = Rep::new(-(1 << 16), "DB Error"); } +enum PendingRequests { + Some(HashSet<PeerId>), + All, +} + +impl PendingRequests { + fn add(&mut self, id: &PeerId) { + match self { + PendingRequests::Some(set) => { + set.insert(id.clone()); + } + PendingRequests::All => {}, + } + } + + fn take(&mut self) -> PendingRequests { + std::mem::replace(self, Default::default()) + } + + fn set_all(&mut self) { + *self = PendingRequests::All; + } + + fn contains(&self, id: &PeerId) -> bool { + match self { + PendingRequests::Some(set) => set.contains(id), + PendingRequests::All => true, + } + } + + fn is_empty(&self) -> bool { + match self { + PendingRequests::Some(set) => set.is_empty(), + PendingRequests::All => false, + } + } +} + +impl Default for PendingRequests { + fn default() -> Self { + PendingRequests::Some(HashSet::default()) + } +} + /// The main data structure which contains all the state for a chains /// active syncing strategy. pub struct ChainSync<B: BlockT> { @@ -138,8 +182,8 @@ pub struct ChainSync<B: BlockT> { request_builder: Option<BoxFinalityProofRequestBuilder<B>>, /// Fork sync targets. fork_targets: HashMap<B::Hash, ForkTarget<B>>, - /// A flag that caches idle state with no pending requests. - is_idle: bool, + /// A set of peers for which there might be potential block requests + pending_requests: PendingRequests, /// A type to check incoming block announcements. block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>, /// Maximum number of peers to ask the same blocks in parallel. @@ -327,7 +371,7 @@ impl<B: BlockT> ChainSync<B> { queue_blocks: Default::default(), request_builder, fork_targets: Default::default(), - is_idle: false, + pending_requests: Default::default(), block_announce_validator, max_parallel_downloads, processed_blocks: 0, @@ -426,7 +470,7 @@ impl<B: BlockT> ChainSync<B> { state: PeerSyncState::Available, recently_announced: Default::default(), }); - self.is_idle = false; + self.pending_requests.add(&who); return Ok(None) } @@ -438,6 +482,7 @@ impl<B: BlockT> ChainSync<B> { best_number ); + self.pending_requests.add(&who); self.peers.insert(who, PeerSync { common_number: Zero::zero(), best_hash, @@ -449,7 +494,6 @@ impl<B: BlockT> ChainSync<B> { }, recently_announced: Default::default() }); - self.is_idle = false; Ok(Some(ancestry_request::<B>(common_best))) } @@ -462,7 +506,7 @@ impl<B: BlockT> ChainSync<B> { state: PeerSyncState::Available, recently_announced: Default::default(), }); - self.is_idle = false; + self.pending_requests.add(&who); Ok(None) } } @@ -516,7 +560,6 @@ impl<B: BlockT> ChainSync<B> { } trace!(target: "sync", "Downloading requested old fork {:?}", hash); - self.is_idle = false; for peer_id in &peers { if let Some(peer) = self.peers.get_mut(peer_id) { if let PeerSyncState::AncestorSearch {..} = peer.state { @@ -527,6 +570,7 @@ impl<B: BlockT> ChainSync<B> { peer.best_number = number; peer.best_hash = hash.clone(); } + self.pending_requests.add(peer_id); } } @@ -590,7 +634,7 @@ impl<B: BlockT> ChainSync<B> { /// Get an iterator over all block requests of all peers. pub fn block_requests(&mut self) -> impl Iterator<Item = (PeerId, BlockRequest<B>)> + '_ { - if self.is_idle { + if self.pending_requests.is_empty() { return Either::Left(std::iter::empty()) } if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { @@ -606,10 +650,13 @@ impl<B: BlockT> ChainSync<B> { let best_queued = self.best_queued_number; let client = &self.client; let queue = &self.queue_blocks; + let pending_requests = self.pending_requests.take(); let max_parallel = if major_sync { 1 } else { self.max_parallel_downloads }; let iter = self.peers.iter_mut().filter_map(move |(id, peer)| { if !peer.state.is_available() { - trace!(target: "sync", "Peer {} is busy", id); + return None + } + if !pending_requests.contains(id) { return None } if let Some((range, req)) = peer_block_request( @@ -652,9 +699,6 @@ impl<B: BlockT> ChainSync<B> { None } }); - if !have_requests { - self.is_idle = true; - } Either::Right(iter) } @@ -675,7 +719,7 @@ impl<B: BlockT> ChainSync<B> { trace!(target: "sync", "Reversing incoming block list"); blocks.reverse() } - self.is_idle = false; + self.pending_requests.add(&who); if request.is_some() { match &mut peer.state { PeerSyncState::DownloadingNew(start_block) => { @@ -859,7 +903,7 @@ impl<B: BlockT> ChainSync<B> { return Ok(OnBlockJustification::Nothing) }; - self.is_idle = false; + self.pending_requests.add(&who); if let PeerSyncState::DownloadingJustification(hash) = peer.state { peer.state = PeerSyncState::Available; @@ -906,7 +950,7 @@ impl<B: BlockT> ChainSync<B> { return Ok(OnBlockFinalityProof::Nothing) }; - self.is_idle = false; + self.pending_requests.add(&who); if let PeerSyncState::DownloadingFinalityProof(hash) = peer.state { peer.state = PeerSyncState::Available; @@ -1029,7 +1073,7 @@ impl<B: BlockT> ChainSync<B> { }; } - self.is_idle = false; + self.pending_requests.set_all(); output.into_iter() } @@ -1038,12 +1082,12 @@ impl<B: BlockT> ChainSync<B> { pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) { let finalization_result = if success { Ok((hash, number)) } else { Err(()) }; self.extra_justifications.try_finalize_root((hash, number), finalization_result, true); - self.is_idle = false; + self.pending_requests.set_all(); } pub fn on_finality_proof_import(&mut self, req: (B::Hash, NumberFor<B>), res: Result<(B::Hash, NumberFor<B>), ()>) { self.extra_finality_proofs.try_finalize_root(req, res, true); - self.is_idle = false; + self.pending_requests.set_all(); } /// Notify about finalization of the given block. @@ -1101,7 +1145,7 @@ impl<B: BlockT> ChainSync<B> { peer.common_number = new_common_number; } } - self.is_idle = false; + self.pending_requests.set_all(); } /// Call when a node announces a new block. @@ -1154,7 +1198,7 @@ impl<B: BlockT> ChainSync<B> { peer.common_number = number - One::one(); } } - self.is_idle = false; + self.pending_requests.add(&who); // known block case if known || self.is_already_downloading(&hash) { @@ -1214,7 +1258,7 @@ impl<B: BlockT> ChainSync<B> { self.peers.remove(&who); self.extra_justifications.peer_disconnected(&who); self.extra_finality_proofs.peer_disconnected(&who); - self.is_idle = false; + self.pending_requests.set_all(); } /// Restart the sync process. @@ -1224,7 +1268,7 @@ impl<B: BlockT> ChainSync<B> { let info = self.client.info(); self.best_queued_hash = info.best_hash; self.best_queued_number = std::cmp::max(info.best_number, self.best_imported_number); - self.is_idle = false; + self.pending_requests.set_all(); debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash); let old_peers = std::mem::replace(&mut self.peers, HashMap::new()); old_peers.into_iter().filter_map(move |(id, p)| {