diff --git a/substrate/client/network-gossip/src/state_machine.rs b/substrate/client/network-gossip/src/state_machine.rs index 26433e63ec3ea7e09a6185628f4a5a3ec894e484..db5ea3603dcacea771f7bb1d458eac7b3c9bd9cc 100644 --- a/substrate/client/network-gossip/src/state_machine.rs +++ b/substrate/client/network-gossip/src/state_machine.rs @@ -258,6 +258,7 @@ impl<B: BlockT> ConsensusGossip<B> { let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() }; v.peer_disconnected(&mut context, &who); } + self.peers.remove(&who); } /// Perform periodic maintenance @@ -644,4 +645,52 @@ mod tests { let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic)); assert_eq!(stream.next(), None); } + + #[test] + fn peer_is_removed_on_disconnect() { + struct TestNetwork; + impl Network<Block> for TestNetwork { + fn event_stream( + &self, + ) -> std::pin::Pin<Box<dyn futures::Stream<Item = crate::Event> + Send>> { + unimplemented!("Not required in tests") + } + + fn report_peer(&self, _: PeerId, _: crate::ReputationChange) { + unimplemented!("Not required in tests") + } + + fn disconnect_peer(&self, _: PeerId) { + unimplemented!("Not required in tests") + } + + fn write_notification(&self, _: PeerId, _: crate::ConsensusEngineId, _: Vec<u8>) { + unimplemented!("Not required in tests") + } + + fn register_notifications_protocol( + &self, + _: ConsensusEngineId, + _: std::borrow::Cow<'static, [u8]>, + ) { + unimplemented!("Not required in tests") + } + + fn announce(&self, _: H256, _: Vec<u8>) { + unimplemented!("Not required in tests") + } + } + + let mut consensus = ConsensusGossip::<Block>::new(); + consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll)); + + let mut network = TestNetwork; + + let peer_id = PeerId::random(); + consensus.new_peer(&mut network, peer_id.clone(), Roles::FULL); + assert!(consensus.peers.contains_key(&peer_id)); + + consensus.peer_disconnected(&mut network, peer_id.clone()); + assert!(!consensus.peers.contains_key(&peer_id)); + } } diff --git a/substrate/client/network/src/protocol/sync.rs b/substrate/client/network/src/protocol/sync.rs index b1cd89155effed3647c9ce445914ad96fc88b380..d0427e61a818c6ad2596c1bdd6de5ef9a8ba3b8b 100644 --- a/substrate/client/network/src/protocol/sync.rs +++ b/substrate/client/network/src/protocol/sync.rs @@ -1167,8 +1167,7 @@ impl<B: BlockT> ChainSync<B> { } /// Restart the sync process. - fn restart<'a>(&'a mut self) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a - { + fn restart<'a>(&'a mut self) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a { self.queue_blocks.clear(); self.blocks.clear(); let info = self.client.info(); diff --git a/substrate/client/network/src/protocol/sync/blocks.rs b/substrate/client/network/src/protocol/sync/blocks.rs index d4e4581c670bd1f2f1abf2a9c00302aed68b44d1..279150a225506d0ea48309cd32fcdffad8852d7a 100644 --- a/substrate/client/network/src/protocol/sync/blocks.rs +++ b/substrate/client/network/src/protocol/sync/blocks.rs @@ -104,8 +104,7 @@ impl<B: BlockT> BlockCollection<B> { common: NumberFor<B>, max_parallel: u32, max_ahead: u32, - ) -> Option<Range<NumberFor<B>>> - { + ) -> Option<Range<NumberFor<B>>> { if peer_best <= common { // Bail out early return None; @@ -165,20 +164,20 @@ impl<B: BlockT> BlockCollection<B> { pub fn drain(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> { let mut drained = Vec::new(); let mut ranges = Vec::new(); - { - let mut prev = from; - for (start, range_data) in &mut self.blocks { - match range_data { - &mut BlockRangeState::Complete(ref mut blocks) if *start <= prev => { - prev = *start + (blocks.len() as u32).into(); - let mut blocks = mem::replace(blocks, Vec::new()); - drained.append(&mut blocks); - ranges.push(*start); - }, - _ => break, - } + + let mut prev = from; + for (start, range_data) in &mut self.blocks { + match range_data { + &mut BlockRangeState::Complete(ref mut blocks) if *start <= prev => { + prev = *start + (blocks.len() as u32).into(); + // Remove all elements from `blocks` and add them to `drained` + drained.append(blocks); + ranges.push(*start); + }, + _ => break, } } + for r in ranges { self.blocks.remove(&r); }