Skip to content
Snippets Groups Projects
Commit d3244b72 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Make sure we remove a peer on disconnect in gossip (#5104)

* Make sure we remove peers on disconnect in gossip state machine

* Clear up the code

* Add a comment
parent 9ab9134e
Branches
No related merge requests found
......@@ -258,6 +258,7 @@ impl<B: BlockT> ConsensusGossip<B> {
let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() };
v.peer_disconnected(&mut context, &who);
}
self.peers.remove(&who);
}
/// Perform periodic maintenance
......@@ -644,4 +645,52 @@ mod tests {
let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic));
assert_eq!(stream.next(), None);
}
#[test]
fn peer_is_removed_on_disconnect() {
struct TestNetwork;
impl Network<Block> for TestNetwork {
fn event_stream(
&self,
) -> std::pin::Pin<Box<dyn futures::Stream<Item = crate::Event> + Send>> {
unimplemented!("Not required in tests")
}
fn report_peer(&self, _: PeerId, _: crate::ReputationChange) {
unimplemented!("Not required in tests")
}
fn disconnect_peer(&self, _: PeerId) {
unimplemented!("Not required in tests")
}
fn write_notification(&self, _: PeerId, _: crate::ConsensusEngineId, _: Vec<u8>) {
unimplemented!("Not required in tests")
}
fn register_notifications_protocol(
&self,
_: ConsensusEngineId,
_: std::borrow::Cow<'static, [u8]>,
) {
unimplemented!("Not required in tests")
}
fn announce(&self, _: H256, _: Vec<u8>) {
unimplemented!("Not required in tests")
}
}
let mut consensus = ConsensusGossip::<Block>::new();
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
let mut network = TestNetwork;
let peer_id = PeerId::random();
consensus.new_peer(&mut network, peer_id.clone(), Roles::FULL);
assert!(consensus.peers.contains_key(&peer_id));
consensus.peer_disconnected(&mut network, peer_id.clone());
assert!(!consensus.peers.contains_key(&peer_id));
}
}
......@@ -1167,8 +1167,7 @@ impl<B: BlockT> ChainSync<B> {
}
/// Restart the sync process.
fn restart<'a>(&'a mut self) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a
{
fn restart<'a>(&'a mut self) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a {
self.queue_blocks.clear();
self.blocks.clear();
let info = self.client.info();
......
......@@ -104,8 +104,7 @@ impl<B: BlockT> BlockCollection<B> {
common: NumberFor<B>,
max_parallel: u32,
max_ahead: u32,
) -> Option<Range<NumberFor<B>>>
{
) -> Option<Range<NumberFor<B>>> {
if peer_best <= common {
// Bail out early
return None;
......@@ -165,20 +164,20 @@ impl<B: BlockT> BlockCollection<B> {
pub fn drain(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
let mut drained = Vec::new();
let mut ranges = Vec::new();
{
let mut prev = from;
for (start, range_data) in &mut self.blocks {
match range_data {
&mut BlockRangeState::Complete(ref mut blocks) if *start <= prev => {
prev = *start + (blocks.len() as u32).into();
let mut blocks = mem::replace(blocks, Vec::new());
drained.append(&mut blocks);
ranges.push(*start);
},
_ => break,
}
let mut prev = from;
for (start, range_data) in &mut self.blocks {
match range_data {
&mut BlockRangeState::Complete(ref mut blocks) if *start <= prev => {
prev = *start + (blocks.len() as u32).into();
// Remove all elements from `blocks` and add them to `drained`
drained.append(blocks);
ranges.push(*start);
},
_ => break,
}
}
for r in ranges {
self.blocks.remove(&r);
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment