From a8ce80b72dff43d3bf0c2f26599c6eed5cb706f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= <andre.beat@gmail.com> Date: Fri, 8 Nov 2019 20:08:14 +0100 Subject: [PATCH] grandpa: progressively increase target gossip peers (#4050) * grandpa: stricter gossip message filtering * gossip: remove filtered message on send_message * gossip: add test for tracking of broadcast attempts * grandpa: only restrict gossip if we're connected to more than 5 authorities * grandpa: add test for progressive gossip * grandpa: add test for gossip filtering on local non-authority node * grandpa: fix doc * gossip, grandpa: tabify * grandpa: relax filtering logic for global messages --- .../src/communication/gossip.rs | 304 +++++++++++++++++- .../network/src/protocol/consensus_gossip.rs | 183 ++++++++++- 2 files changed, 469 insertions(+), 18 deletions(-) diff --git a/substrate/core/finality-grandpa/src/communication/gossip.rs b/substrate/core/finality-grandpa/src/communication/gossip.rs index efcd1d48c67..7758de6afa7 100644 --- a/substrate/core/finality-grandpa/src/communication/gossip.rs +++ b/substrate/core/finality-grandpa/src/communication/gossip.rs @@ -92,6 +92,7 @@ use substrate_telemetry::{telemetry, CONSENSUS_DEBUG}; use log::{trace, debug, warn}; use futures::prelude::*; use futures::sync::mpsc; +use rand::Rng; use crate::{environment, CatchUp, CompactCommit, SignedMessage}; use super::{cost, benefit, Round, SetId}; @@ -483,6 +484,14 @@ impl<N: Ord> Peers<N> { fn peer<'a>(&'a self, who: &PeerId) -> Option<&'a PeerInfo<N>> { self.inner.get(who) } + + fn authorities(&self) -> usize { + self.inner.iter().filter(|(_, info)| info.roles.is_authority()).count() + } + + fn non_authorities(&self) -> usize { + self.inner.iter().filter(|(_, info)| !info.roles.is_authority()).count() + } } #[derive(Debug, PartialEq)] @@ -980,6 +989,122 @@ impl<Block: BlockT> Inner<Block> { (true, report) } + + /// The initial logic for filtering round messages follows the given state + /// transitions: + /// + /// - State 0: not allowed to anyone (only if our local node is not an authority) + /// - State 1: allowed to random `sqrt(authorities)` + /// - State 2: allowed to all authorities + /// - State 3: allowed to random `sqrt(non-authorities)` + /// - State 4: allowed to all non-authorities + /// + /// Transitions will be triggered on repropagation attempts by the + /// underlying gossip layer, which should happen every 30 seconds. + fn round_message_allowed<N>(&self, peer: &PeerInfo<N>, mut previous_attempts: usize) -> bool { + const MIN_AUTHORITIES: usize = 5; + + if !self.config.is_authority && previous_attempts == 0 { + // non-authority nodes don't gossip any messages right away. we + // assume that authorities (and sentries) are strongly connected, so + // it should be unnecessary for non-authorities to gossip all + // messages right away. + return false; + } + + if !self.config.is_authority { + // since the node is not an authority we skipped the initial attempt + // to gossip the message, therefore we decrement `previous_attempts` + // so that the state machine below works the same way it does for + // authority nodes. + previous_attempts -= 1; + } + + if peer.roles.is_authority() { + let authorities = self.peers.authorities(); + + // the target node is an authority, on the first attempt we start by + // sending the message to only `sqrt(authorities)` (if we're + // connected to at least `MIN_AUTHORITIES`). + if previous_attempts == 0 && authorities > MIN_AUTHORITIES { + let authorities = authorities as f64; + let p = (authorities.sqrt()).max(MIN_AUTHORITIES as f64) / authorities; + rand::thread_rng().gen_bool(p) + } else { + // otherwise we already went through the step above, so + // we won't filter the message and send it to all + // authorities for whom it is polite to do so + true + } + } else { + // the node is not an authority so we apply stricter filters + if previous_attempts >= 3 { + // if we previously tried to send this message 3 (or more) + // times, then it is allowed to be sent to all peers. + true + } else if previous_attempts == 2 { + // otherwise we only send it to `sqrt(non-authorities)`. + let non_authorities = self.peers.non_authorities() as f64; + let p = non_authorities.sqrt() / non_authorities; + rand::thread_rng().gen_bool(p) + } else { + false + } + } + } + + /// The initial logic for filtering global messages follows the given state + /// transitions: + /// + /// - State 0: send to `sqrt(authorities)` ++ `sqrt(non-authorities)`. + /// - State 1: send to all authorities + /// - State 2: send to all non-authorities + /// + /// We are more lenient with global messages since there should be a lot + /// less global messages than round messages (just commits), and we want + /// these to propagate to non-authorities fast enough so that they can + /// observe finality. + /// + /// Transitions will be triggered on repropagation attempts by the + /// underlying gossip layer, which should happen every 30 seconds. + fn global_message_allowed<N>(&self, peer: &PeerInfo<N>, previous_attempts: usize) -> bool { + const MIN_PEERS: usize = 5; + + if peer.roles.is_authority() { + let authorities = self.peers.authorities(); + + // the target node is an authority, on the first attempt we start by + // sending the message to only `sqrt(authorities)` (if we're + // connected to at least `MIN_PEERS`). + if previous_attempts == 0 && authorities > MIN_PEERS { + let authorities = authorities as f64; + let p = (authorities.sqrt()).max(MIN_PEERS as f64) / authorities; + rand::thread_rng().gen_bool(p) + } else { + // otherwise we already went through the step above, so + // we won't filter the message and send it to all + // authorities for whom it is polite to do so + true + } + } else { + let non_authorities = self.peers.non_authorities(); + + // the target node is not an authority, on the first and second + // attempt we start by sending the message to only + // `sqrt(non_authorities)` (if we're connected to at least + // `MIN_PEERS`). + if previous_attempts <= 1 && non_authorities > MIN_PEERS { + let non_authorities = non_authorities as f64; + let p = (non_authorities.sqrt()).max(MIN_PEERS as f64) / non_authorities ; + rand::thread_rng().gen_bool(p) + } else { + // otherwise we already went through the step above, so + // we won't filter the message and send it to all + // non-authorities for whom it is polite to do so + true + } + } + } } /// A validator for GRANDPA gossip messages. @@ -1190,6 +1315,20 @@ impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block> Some(x) => x, }; + if let MessageIntent::Broadcast { previous_attempts } = intent { + if maybe_round.is_some() { + if !inner.round_message_allowed(peer, previous_attempts) { + // early return if the vote message isn't allowed at this stage. + return false; + } + } else { + if !inner.global_message_allowed(peer, previous_attempts) { + // early return if the global message isn't allowed at this stage. + return false; + } + } + } + // if the topic is not something the peer accepts, discard. if let Some(round) = maybe_round { return peer.view.consider_vote(round, set_id) == Consider::Accept @@ -1209,8 +1348,8 @@ impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block> Ok(GossipMessage::Commit(full)) => { // we only broadcast our best commit and only if it's // better than last received by peer. - Some(full.message.target_number) == our_best_commit - && Some(full.message.target_number) > peer_best_commit + Some(full.message.target_number) == our_best_commit && + Some(full.message.target_number) > peer_best_commit } Ok(GossipMessage::Neighbor(_)) => false, Ok(GossipMessage::CatchUpRequest(_)) => false, @@ -1311,7 +1450,7 @@ mod tests { use super::environment::SharedVoterSetState; use network_gossip::Validator as GossipValidatorT; use network::test::Block; - use primitives::crypto::Public; + use primitives::{crypto::Public, H256}; // some random config (not really needed) fn config() -> crate::Config { @@ -1329,7 +1468,6 @@ mod tests { fn voter_set_state() -> SharedVoterSetState<Block> { use crate::authorities::AuthoritySet; use crate::environment::VoterSetState; - use primitives::H256; let base = (H256::zero(), 0); let voters = AuthoritySet::genesis(Vec::new()); @@ -1991,4 +2129,162 @@ mod tests { ) } } + + #[test] + fn progressively_gossips_to_more_peers() { + let (val, _) = GossipValidator::<Block>::new( + config(), + voter_set_state(), + ); + + // the validator start at set id 0 + val.note_set(SetId(0), Vec::new(), |_, _| {}); + + // add 60 peers, 30 authorities and 30 full nodes + let mut authorities = Vec::new(); + authorities.resize_with(30, || PeerId::random()); + + let mut full_nodes = Vec::new(); + full_nodes.resize_with(30, || PeerId::random()); + + for i in 0..30 { + val.inner.write().peers.new_peer(authorities[i].clone(), Roles::AUTHORITY); + val.inner.write().peers.new_peer(full_nodes[i].clone(), Roles::FULL); + } + + let test = |previous_attempts, peers| { + let mut message_allowed = val.message_allowed(); + + move || { + let mut allowed = 0; + for peer in peers { + if message_allowed( + peer, + MessageIntent::Broadcast { previous_attempts }, + &crate::communication::round_topic::<Block>(1, 0), + &[], + ) { + allowed += 1; + } + } + allowed + } + }; + + fn trial<F: FnMut() -> usize>(mut test: F) -> usize { + let mut results = Vec::new(); + let n = 1000; + + for _ in 0..n { + results.push(test()); + } + + let n = results.len(); + let sum: usize = results.iter().sum(); + + sum / n + } + + // on the first attempt we will only gossip to `sqrt(authorities)`, + // which should average out to 5 peers after a couple of trials + assert_eq!(trial(test(0, &authorities)), 5); + + // on the second (and subsequent attempts) we should gossip to all + // authorities we're connected to. + assert_eq!(trial(test(1, &authorities)), 30); + assert_eq!(trial(test(2, &authorities)), 30); + + // we should only gossip to non-authorities after the third attempt + assert_eq!(trial(test(0, &full_nodes)), 0); + assert_eq!(trial(test(1, &full_nodes)), 0); + + // and only to `sqrt(non-authorities)` + assert_eq!(trial(test(2, &full_nodes)), 5); + + // only on the fourth attempt should we gossip to all non-authorities + assert_eq!(trial(test(3, &full_nodes)), 30); + } + + #[test] + fn only_restricts_gossip_to_authorities_after_a_minimum_threshold() { + let (val, _) = GossipValidator::<Block>::new( + config(), + voter_set_state(), + ); + + // the validator start at set id 0 + val.note_set(SetId(0), Vec::new(), |_, _| {}); + + let mut authorities = Vec::new(); + for _ in 0..5 { + let peer_id = PeerId::random(); + val.inner.write().peers.new_peer(peer_id.clone(), Roles::AUTHORITY); + authorities.push(peer_id); + } + + let mut message_allowed = val.message_allowed(); + + // since we're only connected to 5 authorities, we should never restrict + // sending of gossip messages, and instead just allow them to all + // non-authorities on the first attempt. + for authority in &authorities { + assert!( + message_allowed( + authority, + MessageIntent::Broadcast { previous_attempts: 0 }, + &crate::communication::round_topic::<Block>(1, 0), + &[], + ) + ); + } + } + + #[test] + fn non_authorities_never_gossip_messages_on_first_attempt() { + let mut config = config(); + config.is_authority = false; + + let (val, _) = GossipValidator::<Block>::new( + config, + voter_set_state(), + ); + + // the validator start at set id 0 + val.note_set(SetId(0), Vec::new(), |_, _| {}); + + let mut authorities = Vec::new(); + for _ in 0..100 { + let peer_id = PeerId::random(); + val.inner.write().peers.new_peer(peer_id.clone(), Roles::AUTHORITY); + authorities.push(peer_id); + } + + let mut message_allowed = val.message_allowed(); + + // since our node is not an authority we should **never** gossip any + // messages on the first attempt. + for authority in &authorities { + assert!( + !message_allowed( + authority, + MessageIntent::Broadcast { previous_attempts: 0 }, + &crate::communication::round_topic::<Block>(1, 0), + &[], + ) + ); + } + + // on the third attempt we should allow messages to authorities + // (on the second attempt we would do `sqrt(authorities)`) + for authority in &authorities { + assert!( + message_allowed( + authority, + MessageIntent::Broadcast { previous_attempts: 2 }, + &crate::communication::round_topic::<Block>(1, 0), + &[], + ) + ); + } + } } diff --git a/substrate/core/network/src/protocol/consensus_gossip.rs b/substrate/core/network/src/protocol/consensus_gossip.rs index f3d4e536a78..67e8364abbc 100644 --- a/substrate/core/network/src/protocol/consensus_gossip.rs +++ b/substrate/core/network/src/protocol/consensus_gossip.rs @@ -73,6 +73,7 @@ const UNREGISTERED_TOPIC_REPUTATION_CHANGE: i32 = -(1 << 10); struct PeerConsensus<H> { known_messages: HashSet<H>, + filtered_messages: HashMap<H, usize>, roles: Roles, } @@ -104,9 +105,14 @@ pub enum MessageRecipient { /// The reason for sending out the message. #[derive(Eq, PartialEq, Copy, Clone)] +#[cfg_attr(test, derive(Debug))] pub enum MessageIntent { - /// Requested broadcast - Broadcast, + /// Requested broadcast. + Broadcast { + /// How many times this message was previously filtered by the gossip + /// validator when trying to propagate to a given peer. + previous_attempts: usize + }, /// Requested broadcast to all peers. ForcedBroadcast, /// Periodic rebroadcast of all messages to all peers. @@ -123,6 +129,12 @@ pub enum ValidationResult<H> { Discard, } +impl MessageIntent { + fn broadcast() -> MessageIntent { + MessageIntent::Broadcast { previous_attempts: 0 } + } +} + /// Validation context. Allows reacting to incoming messages by sending out further messages. pub trait ValidatorContext<B: BlockT> { /// Broadcast all messages with given topic to peers that do not have it yet. @@ -196,12 +208,17 @@ fn propagate<'a, B: BlockT, I>( for (message_hash, topic, message) in messages { for (id, ref mut peer) in peers.iter_mut() { + let previous_attempts = peer.filtered_messages + .get(&message_hash) + .cloned() + .unwrap_or(0); + let intent = match intent { - MessageIntent::Broadcast => + MessageIntent::Broadcast { .. } => if peer.known_messages.contains(&message_hash) { - continue + continue; } else { - MessageIntent::Broadcast + MessageIntent::Broadcast { previous_attempts } }, MessageIntent::PeriodicRebroadcast => if peer.known_messages.contains(&message_hash) { @@ -209,15 +226,24 @@ fn propagate<'a, B: BlockT, I>( } else { // peer doesn't know message, so the logic should treat it as an // initial broadcast. - MessageIntent::Broadcast + MessageIntent::Broadcast { previous_attempts } }, other => other, }; if !message_allowed(id, intent, &topic, &message) { - continue + let count = peer.filtered_messages + .entry(message_hash.clone()) + .or_insert(0); + + *count += 1; + + continue; } + + peer.filtered_messages.remove(message_hash); peer.known_messages.insert(message_hash.clone()); + trace!(target: "gossip", "Propagating to {}: {:?}", id, message); protocol.send_consensus(id.clone(), message.clone()); } @@ -310,6 +336,7 @@ impl<B: BlockT> ConsensusGossip<B> { trace!(target:"gossip", "Registering {:?} {}", roles, who); self.peers.insert(who.clone(), PeerConsensus { known_messages: HashSet::new(), + filtered_messages: HashMap::new(), roles, }); for (engine_id, v) in self.validators.clone() { @@ -379,7 +406,7 @@ impl<B: BlockT> ConsensusGossip<B> { .filter_map(|entry| if entry.topic == topic { Some((&entry.message_hash, &entry.topic, &entry.message)) } else { None } ); - let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; + let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() }; propagate(protocol, messages, intent, &mut self.peers, &self.validators); } @@ -527,17 +554,36 @@ impl<B: BlockT> ConsensusGossip<B> { Some(validator) => validator.message_allowed(), }; - let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; - if let Some(ref mut peer) = self.peers.get_mut(who) { for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) { + let intent = if force { + MessageIntent::ForcedBroadcast + } else { + let previous_attempts = peer.filtered_messages + .get(&entry.message_hash) + .cloned() + .unwrap_or(0); + + MessageIntent::Broadcast { previous_attempts } + }; + if !force && peer.known_messages.contains(&entry.message_hash) { - continue + continue; } + if !message_allowed(who, intent, &entry.topic, &entry.message.data) { - continue + let count = peer.filtered_messages + .entry(entry.message_hash) + .or_insert(0); + + *count += 1; + + continue; } + + peer.filtered_messages.remove(&entry.message_hash); peer.known_messages.insert(entry.message_hash.clone()); + trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message); protocol.send_consensus(who.clone(), ConsensusMessage { engine_id: engine_id.clone(), @@ -557,7 +603,7 @@ impl<B: BlockT> ConsensusGossip<B> { ) { let message_hash = HashFor::<B>::hash(&message.data); self.register_message_hashed(message_hash, topic, message.clone(), None); - let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; + let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() }; propagate(protocol, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators); } @@ -578,7 +624,9 @@ impl<B: BlockT> ConsensusGossip<B> { trace!(target: "gossip", "Sending direct to {}: {:?}", who, message); + peer.filtered_messages.remove(&message_hash); peer.known_messages.insert(message_hash); + protocol.send_consensus(who.clone(), message.clone()); } } @@ -607,6 +655,8 @@ impl<B: BlockT> Validator<B> for DiscardAll { #[cfg(test)] mod tests { + use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; + use parking_lot::Mutex; use sr_primitives::testing::{H256, Block as RawBlock, ExtrinsicWrapper}; use futures03::executor::block_on_stream; @@ -657,7 +707,7 @@ mod tests { } fn message_expired<'a>(&'a self) -> Box<dyn FnMut(H256, &[u8]) -> bool + 'a> { - Box::new(move |_topic, data| data[0] != 1 ) + Box::new(move |_topic, data| data[0] != 1) } } @@ -755,4 +805,109 @@ mod tests { let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic)); assert_eq!(stream.next(), None); } + + #[test] + fn keeps_track_of_broadcast_attempts() { + struct DummyNetworkContext; + impl<B: BlockT> Context<B> for DummyNetworkContext { + fn report_peer(&mut self, _who: PeerId, _reputation: i32) {} + fn disconnect_peer(&mut self, _who: PeerId) {} + fn send_consensus(&mut self, _who: PeerId, _consensus: ConsensusMessage) {} + fn send_chain_specific(&mut self, _who: PeerId, _message: Vec<u8>) {} + } + + // A mock gossip validator that never expires any message, allows + // setting whether messages should be allowed and keeps track of any + // messages passed to `message_allowed`. + struct MockValidator { + allow: AtomicBool, + messages: Arc<Mutex<Vec<(Vec<u8>, MessageIntent)>>>, + } + + impl MockValidator { + fn new() -> MockValidator { + MockValidator { + allow: AtomicBool::new(false), + messages: Arc::new(Mutex::new(Vec::new())), + } + } + } + + impl Validator<Block> for MockValidator { + fn validate( + &self, + _context: &mut dyn ValidatorContext<Block>, + _sender: &PeerId, + _data: &[u8], + ) -> ValidationResult<H256> { + ValidationResult::ProcessAndKeep(H256::default()) + } + + fn message_expired<'a>(&'a self) -> Box<dyn FnMut(H256, &[u8]) -> bool + 'a> { + Box::new(move |_topic, _data| false) + } + + fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &H256, &[u8]) -> bool + 'a> { + let messages = self.messages.clone(); + Box::new(move |_, intent, _, data| { + messages.lock().push((data.to_vec(), intent)); + self.allow.load(Ordering::SeqCst) + }) + } + } + + // we setup an instance of the mock gossip validator, add a new peer to + // it and register a message. + let mut consensus = ConsensusGossip::<Block>::new(); + let validator = Arc::new(MockValidator::new()); + consensus.register_validator_internal([0, 0, 0, 0], validator.clone()); + consensus.new_peer( + &mut DummyNetworkContext, + PeerId::random(), + Roles::AUTHORITY, + ); + + let data = vec![1, 2, 3]; + let msg = ConsensusMessage { data: data.clone(), engine_id: [0, 0, 0, 0] }; + consensus.register_message(H256::default(), msg); + + // tick the gossip handler and make sure it triggers a message rebroadcast + let mut tick = || { + consensus.next_broadcast = std::time::Instant::now(); + consensus.tick(&mut DummyNetworkContext); + }; + + // by default we won't allow the message we registered, so everytime we + // tick the gossip handler, the message intent should be kept as + // `Broadcast` but the previous attempts should be incremented. + tick(); + assert_eq!( + validator.messages.lock().pop().unwrap(), + (data.clone(), MessageIntent::Broadcast { previous_attempts: 0 }), + ); + + tick(); + assert_eq!( + validator.messages.lock().pop().unwrap(), + (data.clone(), MessageIntent::Broadcast { previous_attempts: 1 }), + ); + + // we set the validator to allow the message to go through + validator.allow.store(true, Ordering::SeqCst); + + // we still get the same message intent but it should be delivered now + tick(); + assert_eq!( + validator.messages.lock().pop().unwrap(), + (data.clone(), MessageIntent::Broadcast { previous_attempts: 2 }), + ); + + // ticking the gossip handler again the message intent should change to + // `PeriodicRebroadcast` since it was sent. + tick(); + assert_eq!( + validator.messages.lock().pop().unwrap(), + (data.clone(), MessageIntent::PeriodicRebroadcast), + ); + } } -- GitLab