diff --git a/substrate/client/network-gossip/src/bridge.rs b/substrate/client/network-gossip/src/bridge.rs index a15195111e800d5b42ac91a4521e2a9437eed8e3..26e49fce8a820e4d5525154fce74d81cac1887d4 100644 --- a/substrate/client/network-gossip/src/bridge.rs +++ b/substrate/client/network-gossip/src/bridge.rs @@ -182,7 +182,7 @@ impl<B: BlockT> Future for GossipEngine<B> { messages, ); - for (topic, notification) in to_forward.into_iter() { + for (topic, notification) in to_forward { if let Entry::Occupied(mut entry) = this.message_sinks.entry(topic) { trace!( target: "gossip", diff --git a/substrate/client/network-gossip/src/state_machine.rs b/substrate/client/network-gossip/src/state_machine.rs index 53b5b98245a4181aa331f2d2fd749939b2b0b4fc..433457afe748e422c78c0a2d5bde7c2af54c173c 100644 --- a/substrate/client/network-gossip/src/state_machine.rs +++ b/substrate/client/network-gossip/src/state_machine.rs @@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::iter; use std::time; -use log::trace; +use log::{error, trace}; use lru::LruCache; use libp2p::PeerId; use sp_runtime::traits::{Block as BlockT, Hash, HashFor}; @@ -41,8 +41,6 @@ mod rep { pub const GOSSIP_SUCCESS: Rep = Rep::new(1 << 4, "Successfull gossip"); /// Reputation change when a peer sends us a gossip message that we already knew about. pub const DUPLICATE_GOSSIP: Rep = Rep::new(-(1 << 2), "Duplicate gossip"); - /// Reputation change when a peer sends a message from a topic it isn't registered on. - pub const UNREGISTERED_TOPIC: Rep = Rep::new(-(1 << 10), "Unregistered gossip message topic"); } struct PeerConsensus<H> { @@ -308,36 +306,37 @@ impl<B: BlockT> ConsensusGossip<B> { validator.validate(&mut context, &who, &message) }; - let validation_result = match validation { - ValidationResult::ProcessAndKeep(topic) => Some((topic, true)), - ValidationResult::ProcessAndDiscard(topic) => Some((topic, false)), - ValidationResult::Discard => None, + let (topic, keep) = match validation { + ValidationResult::ProcessAndKeep(topic) => (topic, true), + ValidationResult::ProcessAndDiscard(topic) => (topic, false), + ValidationResult::Discard => { + trace!(target:"gossip", "Discard message from peer {}", who); + continue; + }, }; - if let Some((topic, keep)) = validation_result { - network.report_peer(who.clone(), rep::GOSSIP_SUCCESS); - if let Some(ref mut peer) = self.peers.get_mut(&who) { - peer.known_messages.insert(message_hash); - - to_forward.push((topic, TopicNotification { - message: message.clone(), - sender: Some(who.clone()) - })); - - if keep { - self.register_message_hashed( - message_hash, - topic, - message, - Some(who.clone()), - ); - } - } else { - trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); - network.report_peer(who.clone(), rep::UNREGISTERED_TOPIC); + let peer = match self.peers.get_mut(&who) { + Some(peer) => peer, + None => { + error!(target:"gossip", "Got message from unregistered peer {}", who); + continue; } - } else { - trace!(target:"gossip", "Discard message from peer {}", who); + }; + + network.report_peer(who.clone(), rep::GOSSIP_SUCCESS); + peer.known_messages.insert(message_hash); + to_forward.push((topic, TopicNotification { + message: message.clone(), + sender: Some(who.clone()) + })); + + if keep { + self.register_message_hashed( + message_hash, + topic, + message, + Some(who.clone()), + ); } } @@ -416,9 +415,10 @@ impl<B: BlockT> ConsensusGossip<B> { #[cfg(test)] mod tests { - use std::sync::Arc; + use futures::prelude::*; + use sc_network::{Event, ReputationChange}; use sp_runtime::testing::{H256, Block as RawBlock, ExtrinsicWrapper}; - + use std::{borrow::Cow, pin::Pin, sync::{Arc, Mutex}}; use super::*; type Block = RawBlock<ExtrinsicWrapper<u64>>; @@ -448,6 +448,52 @@ mod tests { } } + struct DiscardAll; + impl Validator<Block> for DiscardAll{ + fn validate( + &self, + _context: &mut dyn ValidatorContext<Block>, + _sender: &PeerId, + _data: &[u8], + ) -> ValidationResult<H256> { + ValidationResult::Discard + } + } + + #[derive(Clone, Default)] + struct NoOpNetwork { + inner: Arc<Mutex<NoOpNetworkInner>>, + } + + #[derive(Clone, Default)] + struct NoOpNetworkInner { + peer_reports: Vec<(PeerId, ReputationChange)>, + } + + impl<B: BlockT> Network<B> for NoOpNetwork { + fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> { + unimplemented!(); + } + + fn report_peer(&self, peer_id: PeerId, reputation_change: ReputationChange) { + self.inner.lock().unwrap().peer_reports.push((peer_id, reputation_change)); + } + + fn disconnect_peer(&self, _: PeerId) { + unimplemented!(); + } + + fn write_notification(&self, _: PeerId, _: ConsensusEngineId, _: Vec<u8>) { + unimplemented!(); + } + + fn register_notifications_protocol(&self, _: ConsensusEngineId, _: Cow<'static, [u8]>) {} + + fn announce(&self, _: B::Hash, _: Vec<u8>) { + unimplemented!(); + } + } + #[test] fn collects_garbage() { struct AllowOne; @@ -528,42 +574,9 @@ mod tests { #[test] fn peer_is_removed_on_disconnect() { - struct TestNetwork; - impl Network<Block> for TestNetwork { - fn event_stream( - &self, - ) -> std::pin::Pin<Box<dyn futures::Stream<Item = crate::Event> + Send>> { - unimplemented!("Not required in tests") - } - - fn report_peer(&self, _: PeerId, _: crate::ReputationChange) { - unimplemented!("Not required in tests") - } - - fn disconnect_peer(&self, _: PeerId) { - unimplemented!("Not required in tests") - } - - fn write_notification(&self, _: PeerId, _: crate::ConsensusEngineId, _: Vec<u8>) { - unimplemented!("Not required in tests") - } - - fn register_notifications_protocol( - &self, - _: ConsensusEngineId, - _: std::borrow::Cow<'static, [u8]>, - ) { - unimplemented!("Not required in tests") - } - - fn announce(&self, _: H256, _: Vec<u8>) { - unimplemented!("Not required in tests") - } - } - let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), [0, 0, 0, 0]); - let mut network = TestNetwork; + let mut network = NoOpNetwork::default(); let peer_id = PeerId::random(); consensus.new_peer(&mut network, peer_id.clone(), ObservedRole::Full); @@ -572,4 +585,43 @@ mod tests { consensus.peer_disconnected(&mut network, peer_id.clone()); assert!(!consensus.peers.contains_key(&peer_id)); } + + #[test] + fn on_incoming_ignores_discarded_messages() { + let to_forward = ConsensusGossip::<Block>::new( + Arc::new(DiscardAll), + [0, 0, 0, 0], + ).on_incoming( + &mut NoOpNetwork::default(), + PeerId::random(), + vec![vec![1, 2, 3]], + ); + + assert!( + to_forward.is_empty(), + "Expected `on_incoming` to ignore discarded message but got {:?}", to_forward, + ); + } + + #[test] + fn on_incoming_ignores_unregistered_peer() { + let mut network = NoOpNetwork::default(); + let remote = PeerId::random(); + + let to_forward = ConsensusGossip::<Block>::new( + Arc::new(AllowAll), + [0, 0, 0, 0], + ).on_incoming( + &mut network, + // Unregistered peer. + remote.clone(), + vec![vec![1, 2, 3]], + ); + + assert!( + to_forward.is_empty(), + "Expected `on_incoming` to ignore message from unregistered peer but got {:?}", + to_forward, + ); + } }