From e84b06a8c0338b1fabb5c5cb70c141bea6851fe7 Mon Sep 17 00:00:00 2001 From: Max Inden <mail@max-inden.de> Date: Thu, 23 Apr 2020 09:58:43 +0200 Subject: [PATCH] client/network-gossip/state_machine: Reduce indentation level (#5714) * client/network-gossip/src/state_machine: Add unit test for on_incoming Add two unit tests to ensure `on_incoming` is ingoring discarded messages and reports and ignores messages from unknown peers. * client/network-gossip/state_machine: Reduce indentation level * client/network-gossip/bridge: Remove unnecessary into_iter * client/network-gossip/state_machine: Report success after register check * client/network-gossip/state_machine: Error not report on unregistered `peers` contains all the peers we're connected to. If we receive a message from a peer not in this list, that means there's an internal problem within the local client. It's not the fault of the peer in question. With the above in mind instead of reducing the reputation of such peer, log an error. --- substrate/client/network-gossip/src/bridge.rs | 2 +- .../network-gossip/src/state_machine.rs | 184 +++++++++++------- 2 files changed, 119 insertions(+), 67 deletions(-) diff --git a/substrate/client/network-gossip/src/bridge.rs b/substrate/client/network-gossip/src/bridge.rs index a15195111e8..26e49fce8a8 100644 --- a/substrate/client/network-gossip/src/bridge.rs +++ b/substrate/client/network-gossip/src/bridge.rs @@ -182,7 +182,7 @@ impl<B: BlockT> Future for GossipEngine<B> { messages, ); - for (topic, notification) in to_forward.into_iter() { + for (topic, notification) in to_forward { if let Entry::Occupied(mut entry) = this.message_sinks.entry(topic) { trace!( target: "gossip", diff --git a/substrate/client/network-gossip/src/state_machine.rs b/substrate/client/network-gossip/src/state_machine.rs index 53b5b98245a..433457afe74 100644 --- a/substrate/client/network-gossip/src/state_machine.rs +++ b/substrate/client/network-gossip/src/state_machine.rs @@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::iter; use std::time; -use log::trace; +use log::{error, trace}; use lru::LruCache; use libp2p::PeerId; use sp_runtime::traits::{Block as BlockT, Hash, HashFor}; @@ -41,8 +41,6 @@ mod rep { pub const GOSSIP_SUCCESS: Rep = Rep::new(1 << 4, "Successfull gossip"); /// Reputation change when a peer sends us a gossip message that we already knew about. pub const DUPLICATE_GOSSIP: Rep = Rep::new(-(1 << 2), "Duplicate gossip"); - /// Reputation change when a peer sends a message from a topic it isn't registered on. - pub const UNREGISTERED_TOPIC: Rep = Rep::new(-(1 << 10), "Unregistered gossip message topic"); } struct PeerConsensus<H> { @@ -308,36 +306,37 @@ impl<B: BlockT> ConsensusGossip<B> { validator.validate(&mut context, &who, &message) }; - let validation_result = match validation { - ValidationResult::ProcessAndKeep(topic) => Some((topic, true)), - ValidationResult::ProcessAndDiscard(topic) => Some((topic, false)), - ValidationResult::Discard => None, + let (topic, keep) = match validation { + ValidationResult::ProcessAndKeep(topic) => (topic, true), + ValidationResult::ProcessAndDiscard(topic) => (topic, false), + ValidationResult::Discard => { + trace!(target:"gossip", "Discard message from peer {}", who); + continue; + }, }; - if let Some((topic, keep)) = validation_result { - network.report_peer(who.clone(), rep::GOSSIP_SUCCESS); - if let Some(ref mut peer) = self.peers.get_mut(&who) { - peer.known_messages.insert(message_hash); - - to_forward.push((topic, TopicNotification { - message: message.clone(), - sender: Some(who.clone()) - })); - - if keep { - self.register_message_hashed( - message_hash, - topic, - message, - Some(who.clone()), - ); - } - } else { - trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); - network.report_peer(who.clone(), rep::UNREGISTERED_TOPIC); + let peer = match self.peers.get_mut(&who) { + Some(peer) => peer, + None => { + error!(target:"gossip", "Got message from unregistered peer {}", who); + continue; } - } else { - trace!(target:"gossip", "Discard message from peer {}", who); + }; + + network.report_peer(who.clone(), rep::GOSSIP_SUCCESS); + peer.known_messages.insert(message_hash); + to_forward.push((topic, TopicNotification { + message: message.clone(), + sender: Some(who.clone()) + })); + + if keep { + self.register_message_hashed( + message_hash, + topic, + message, + Some(who.clone()), + ); } } @@ -416,9 +415,10 @@ impl<B: BlockT> ConsensusGossip<B> { #[cfg(test)] mod tests { - use std::sync::Arc; + use futures::prelude::*; + use sc_network::{Event, ReputationChange}; use sp_runtime::testing::{H256, Block as RawBlock, ExtrinsicWrapper}; - + use std::{borrow::Cow, pin::Pin, sync::{Arc, Mutex}}; use super::*; type Block = RawBlock<ExtrinsicWrapper<u64>>; @@ -448,6 +448,52 @@ mod tests { } } + struct DiscardAll; + impl Validator<Block> for DiscardAll{ + fn validate( + &self, + _context: &mut dyn ValidatorContext<Block>, + _sender: &PeerId, + _data: &[u8], + ) -> ValidationResult<H256> { + ValidationResult::Discard + } + } + + #[derive(Clone, Default)] + struct NoOpNetwork { + inner: Arc<Mutex<NoOpNetworkInner>>, + } + + #[derive(Clone, Default)] + struct NoOpNetworkInner { + peer_reports: Vec<(PeerId, ReputationChange)>, + } + + impl<B: BlockT> Network<B> for NoOpNetwork { + fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> { + unimplemented!(); + } + + fn report_peer(&self, peer_id: PeerId, reputation_change: ReputationChange) { + self.inner.lock().unwrap().peer_reports.push((peer_id, reputation_change)); + } + + fn disconnect_peer(&self, _: PeerId) { + unimplemented!(); + } + + fn write_notification(&self, _: PeerId, _: ConsensusEngineId, _: Vec<u8>) { + unimplemented!(); + } + + fn register_notifications_protocol(&self, _: ConsensusEngineId, _: Cow<'static, [u8]>) {} + + fn announce(&self, _: B::Hash, _: Vec<u8>) { + unimplemented!(); + } + } + #[test] fn collects_garbage() { struct AllowOne; @@ -528,42 +574,9 @@ mod tests { #[test] fn peer_is_removed_on_disconnect() { - struct TestNetwork; - impl Network<Block> for TestNetwork { - fn event_stream( - &self, - ) -> std::pin::Pin<Box<dyn futures::Stream<Item = crate::Event> + Send>> { - unimplemented!("Not required in tests") - } - - fn report_peer(&self, _: PeerId, _: crate::ReputationChange) { - unimplemented!("Not required in tests") - } - - fn disconnect_peer(&self, _: PeerId) { - unimplemented!("Not required in tests") - } - - fn write_notification(&self, _: PeerId, _: crate::ConsensusEngineId, _: Vec<u8>) { - unimplemented!("Not required in tests") - } - - fn register_notifications_protocol( - &self, - _: ConsensusEngineId, - _: std::borrow::Cow<'static, [u8]>, - ) { - unimplemented!("Not required in tests") - } - - fn announce(&self, _: H256, _: Vec<u8>) { - unimplemented!("Not required in tests") - } - } - let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), [0, 0, 0, 0]); - let mut network = TestNetwork; + let mut network = NoOpNetwork::default(); let peer_id = PeerId::random(); consensus.new_peer(&mut network, peer_id.clone(), ObservedRole::Full); @@ -572,4 +585,43 @@ mod tests { consensus.peer_disconnected(&mut network, peer_id.clone()); assert!(!consensus.peers.contains_key(&peer_id)); } + + #[test] + fn on_incoming_ignores_discarded_messages() { + let to_forward = ConsensusGossip::<Block>::new( + Arc::new(DiscardAll), + [0, 0, 0, 0], + ).on_incoming( + &mut NoOpNetwork::default(), + PeerId::random(), + vec![vec![1, 2, 3]], + ); + + assert!( + to_forward.is_empty(), + "Expected `on_incoming` to ignore discarded message but got {:?}", to_forward, + ); + } + + #[test] + fn on_incoming_ignores_unregistered_peer() { + let mut network = NoOpNetwork::default(); + let remote = PeerId::random(); + + let to_forward = ConsensusGossip::<Block>::new( + Arc::new(AllowAll), + [0, 0, 0, 0], + ).on_incoming( + &mut network, + // Unregistered peer. + remote.clone(), + vec![vec![1, 2, 3]], + ); + + assert!( + to_forward.is_empty(), + "Expected `on_incoming` to ignore message from unregistered peer but got {:?}", + to_forward, + ); + } } -- GitLab