From 53a482146bfd75d0d51ec31e9f3c41a1f73a2ad9 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan <arkady.paronyan@gmail.com> Date: Tue, 26 Nov 2019 18:17:14 +0100 Subject: [PATCH] Time-based gradual gossip (#4176) --- .../src/communication/gossip.rs | 222 ++++++++++++------ .../network/src/protocol/consensus_gossip.rs | 153 +----------- 2 files changed, 152 insertions(+), 223 deletions(-) diff --git a/substrate/client/finality-grandpa/src/communication/gossip.rs b/substrate/client/finality-grandpa/src/communication/gossip.rs index 9a07d010e09..3e0f2a7ac4f 100644 --- a/substrate/client/finality-grandpa/src/communication/gossip.rs +++ b/substrate/client/finality-grandpa/src/communication/gossip.rs @@ -92,12 +92,12 @@ use substrate_telemetry::{telemetry, CONSENSUS_DEBUG}; use log::{trace, debug, warn}; use futures::prelude::*; use futures::sync::mpsc; -use rand::Rng; +use rand::seq::SliceRandom; use crate::{environment, CatchUp, CompactCommit, SignedMessage}; use super::{cost, benefit, Round, SetId}; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, VecDeque, HashSet}; use std::time::{Duration, Instant}; const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5); @@ -107,6 +107,13 @@ const CATCH_UP_PROCESS_TIMEOUT: Duration = Duration::from_secs(30); /// catch up request. const CATCH_UP_THRESHOLD: u64 = 2; +const PROPAGATION_ALL: u32 = 4; //in rounds; +const PROPAGATION_ALL_AUTHORITIES: u32 = 2; //in rounds; +const PROPAGATION_SOME_NON_AUTHORITIES: u32 = 3; //in rounds; +const ROUND_DURATION: u32 = 4; // measured in gossip durations + +const MIN_LUCKY: usize = 5; + type Report = (PeerId, i32); /// An outcome of examining a message. @@ -417,21 +424,37 @@ impl<N> PeerInfo<N> { /// The peers we're connected do in gossip. struct Peers<N> { inner: HashMap<PeerId, PeerInfo<N>>, + lucky_peers: HashSet<PeerId>, + lucky_authorities: HashSet<PeerId>, } impl<N> Default for Peers<N> { fn default() -> Self { - Peers { inner: HashMap::new() } + Peers { + inner: HashMap::new(), + lucky_peers: HashSet::new(), + lucky_authorities: HashSet::new(), + } } } impl<N: Ord> Peers<N> { fn new_peer(&mut self, who: PeerId, roles: Roles) { + if roles.is_authority() && self.lucky_authorities.len() < MIN_LUCKY { + self.lucky_authorities.insert(who.clone()); + } + if !roles.is_authority() && self.lucky_peers.len() < MIN_LUCKY { + self.lucky_peers.insert(who.clone()); + } self.inner.insert(who, PeerInfo::new(roles)); } fn peer_disconnected(&mut self, who: &PeerId) { self.inner.remove(who); + // This does not happen often enough compared to round duration, + // so we don't reshuffle. + self.lucky_peers.remove(who); + self.lucky_authorities.remove(who); } // returns a reference to the new view, if the peer is known. @@ -492,6 +515,37 @@ impl<N: Ord> Peers<N> { fn non_authorities(&self) -> usize { self.inner.iter().filter(|(_, info)| !info.roles.is_authority()).count() } + + fn reshuffle(&mut self) { + let mut lucky_peers : Vec<_> = self.inner + .iter() + .filter_map(|(id, info)| if !info.roles.is_authority() { Some(id.clone()) } else { None }) + .collect(); + let mut lucky_authorities: Vec<_> = self.inner + .iter() + .filter_map(|(id, info)| if info.roles.is_authority() { Some(id.clone()) } else { None }) + .collect(); + + let num_non_authorities = ((lucky_peers.len() as f32).sqrt() as usize) + .max(MIN_LUCKY) + .min(lucky_peers.len()); + + let num_authorities = ((lucky_authorities.len() as f32).sqrt() as usize) + .max(MIN_LUCKY) + .min(lucky_authorities.len()); + + lucky_peers.partial_shuffle(&mut rand::thread_rng(), num_non_authorities); + lucky_peers.truncate(num_non_authorities); + + lucky_authorities.partial_shuffle(&mut rand::thread_rng(), num_authorities); + lucky_authorities.truncate(num_authorities); + + self.lucky_peers.clear(); + self.lucky_peers.extend(lucky_peers.into_iter()); + + self.lucky_authorities.clear(); + self.lucky_authorities.extend(lucky_authorities.into_iter()); + } } #[derive(Debug, PartialEq)] @@ -559,6 +613,7 @@ struct Inner<Block: BlockT> { local_view: Option<View<NumberFor<Block>>>, peers: Peers<NumberFor<Block>>, live_topics: KeepTopics<Block>, + round_start: Instant, authorities: Vec<AuthorityId>, config: crate::Config, next_rebroadcast: Instant, @@ -591,6 +646,7 @@ impl<Block: BlockT> Inner<Block> { local_view: None, peers: Peers::default(), live_topics: KeepTopics::new(), + round_start: Instant::now(), next_rebroadcast: Instant::now() + REBROADCAST_AFTER, authorities: Vec::new(), pending_catch_up: PendingCatchUp::None, @@ -619,6 +675,8 @@ impl<Block: BlockT> Inner<Block> { local_view.round = round; self.live_topics.push(round, set_id); + self.round_start = Instant::now(); + self.peers.reshuffle(); } self.multicast_neighbor_packet() } @@ -1001,10 +1059,14 @@ impl<Block: BlockT> Inner<Block> { /// /// Transitions will be triggered on repropagation attempts by the /// underlying gossip layer, which should happen every 30 seconds. - fn round_message_allowed<N>(&self, peer: &PeerInfo<N>, mut previous_attempts: usize) -> bool { - const MIN_AUTHORITIES: usize = 5; + fn round_message_allowed<N>(&self, who: &PeerId, peer: &PeerInfo<N>) -> bool { + let round_duration = self.config.gossip_duration * ROUND_DURATION; + let round_elapsed = self.round_start.elapsed(); - if !self.config.is_authority && previous_attempts == 0 { + + if !self.config.is_authority + && round_elapsed < round_duration * PROPAGATION_ALL + { // non-authority nodes don't gossip any messages right away. we // assume that authorities (and sentries) are strongly connected, so // it should be unnecessary for non-authorities to gossip all @@ -1012,24 +1074,16 @@ impl<Block: BlockT> Inner<Block> { return false; } - if !self.config.is_authority { - // since the node is not an authority we skipped the initial attempt - // to gossip the message, therefore we decrement `previous_attempts` - // so that the state machine below works the same way it does for - // authority nodes. - previous_attempts -= 1; - } - if peer.roles.is_authority() { let authorities = self.peers.authorities(); - // the target node is an authority, on the first attempt we start by + // the target node is an authority, on the first round duration we start by // sending the message to only `sqrt(authorities)` (if we're - // connected to at least `MIN_AUTHORITIES`). - if previous_attempts == 0 && authorities > MIN_AUTHORITIES { - let authorities = authorities as f64; - let p = (authorities.sqrt()).max(MIN_AUTHORITIES as f64) / authorities; - rand::thread_rng().gen_bool(p) + // connected to at least `MIN_LUCKY`). + if round_elapsed < round_duration * PROPAGATION_ALL_AUTHORITIES + && authorities > MIN_LUCKY + { + self.peers.lucky_authorities.contains(who) } else { // otherwise we already went through the step above, so // we won't filter the message and send it to all @@ -1038,15 +1092,13 @@ impl<Block: BlockT> Inner<Block> { } } else { // the node is not an authority so we apply stricter filters - if previous_attempts >= 3 { - // if we previously tried to send this message 3 (or more) - // times, then it is allowed to be sent to all peers. + if round_elapsed >= round_duration * PROPAGATION_ALL { + // if we waited for 3 (or more) rounds + // then it is allowed to be sent to all peers. true - } else if previous_attempts == 2 { + } else if round_elapsed >= round_duration * PROPAGATION_SOME_NON_AUTHORITIES { // otherwise we only send it to `sqrt(non-authorities)`. - let non_authorities = self.peers.non_authorities() as f64; - let p = non_authorities.sqrt() / non_authorities; - rand::thread_rng().gen_bool(p) + self.peers.lucky_peers.contains(who) } else { false } @@ -1067,19 +1119,20 @@ impl<Block: BlockT> Inner<Block> { /// /// Transitions will be triggered on repropagation attempts by the /// underlying gossip layer, which should happen every 30 seconds. - fn global_message_allowed<N>(&self, peer: &PeerInfo<N>, previous_attempts: usize) -> bool { - const MIN_PEERS: usize = 5; + fn global_message_allowed<N>(&self, who: &PeerId, peer: &PeerInfo<N>) -> bool { + let round_duration = self.config.gossip_duration * ROUND_DURATION; + let round_elapsed = self.round_start.elapsed(); if peer.roles.is_authority() { let authorities = self.peers.authorities(); - // the target node is an authority, on the first attempt we start by + // the target node is an authority, on the first round duration we start by // sending the message to only `sqrt(authorities)` (if we're - // connected to at least `MIN_PEERS`). - if previous_attempts == 0 && authorities > MIN_PEERS { - let authorities = authorities as f64; - let p = (authorities.sqrt()).max(MIN_PEERS as f64) / authorities; - rand::thread_rng().gen_bool(p) + // connected to at least `MIN_LUCKY`). + if round_elapsed < round_duration * PROPAGATION_ALL_AUTHORITIES + && authorities > MIN_LUCKY + { + self.peers.lucky_authorities.contains(who) } else { // otherwise we already went through the step above, so // we won't filter the message and send it to all @@ -1090,13 +1143,13 @@ impl<Block: BlockT> Inner<Block> { let non_authorities = self.peers.non_authorities(); // the target node is not an authority, on the first and second - // attempt we start by sending the message to only + // round duration we start by sending the message to only // `sqrt(non_authorities)` (if we're connected to at least - // `MIN_PEERS`). - if previous_attempts <= 1 && non_authorities > MIN_PEERS { - let non_authorities = non_authorities as f64; - let p = (non_authorities.sqrt()).max(MIN_PEERS as f64) / non_authorities ; - rand::thread_rng().gen_bool(p) + // `MIN_LUCKY`). + if round_elapsed < round_duration * PROPAGATION_SOME_NON_AUTHORITIES + && non_authorities > MIN_LUCKY + { + self.peers.lucky_peers.contains(who) } else { // otherwise we already went through the step above, so // we won't filter the message and send it to all @@ -1315,14 +1368,14 @@ impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block> Some(x) => x, }; - if let MessageIntent::Broadcast { previous_attempts } = intent { + if let MessageIntent::Broadcast = intent { if maybe_round.is_some() { - if !inner.round_message_allowed(peer, previous_attempts) { + if !inner.round_message_allowed(who, peer) { // early return if the vote message isn't allowed at this stage. return false; } } else { - if !inner.global_message_allowed(peer, previous_attempts) { + if !inner.global_message_allowed(who, peer) { // early return if the global message isn't allowed at this stage. return false; } @@ -2132,8 +2185,12 @@ mod tests { #[test] fn progressively_gossips_to_more_peers() { + let mut config = config(); + config.gossip_duration = Duration::from_secs(300); // Set to high value to prevent test race + let round_duration = config.gossip_duration * ROUND_DURATION; + let (val, _) = GossipValidator::<Block>::new( - config(), + config, voter_set_state(), ); @@ -2152,7 +2209,9 @@ mod tests { val.inner.write().peers.new_peer(full_nodes[i].clone(), Roles::FULL); } - let test = |previous_attempts, peers| { + let test = |num_round, peers| { + // rewind n round durations + val.inner.write().round_start = Instant::now() - round_duration * num_round; let mut message_allowed = val.message_allowed(); move || { @@ -2160,7 +2219,7 @@ mod tests { for peer in peers { if message_allowed( peer, - MessageIntent::Broadcast { previous_attempts }, + MessageIntent::Broadcast, &crate::communication::round_topic::<Block>(1, 0), &[], ) { @@ -2187,22 +2246,22 @@ mod tests { // on the first attempt we will only gossip to `sqrt(authorities)`, // which should average out to 5 peers after a couple of trials - assert_eq!(trial(test(0, &authorities)), 5); + assert_eq!(trial(test(1, &authorities)), 5); // on the second (and subsequent attempts) we should gossip to all // authorities we're connected to. - assert_eq!(trial(test(1, &authorities)), 30); assert_eq!(trial(test(2, &authorities)), 30); + assert_eq!(trial(test(3, &authorities)), 30); // we should only gossip to non-authorities after the third attempt - assert_eq!(trial(test(0, &full_nodes)), 0); assert_eq!(trial(test(1, &full_nodes)), 0); + assert_eq!(trial(test(2, &full_nodes)), 0); // and only to `sqrt(non-authorities)` - assert_eq!(trial(test(2, &full_nodes)), 5); + assert_eq!(trial(test(3, &full_nodes)), 5); // only on the fourth attempt should we gossip to all non-authorities - assert_eq!(trial(test(3, &full_nodes)), 30); + assert_eq!(trial(test(4, &full_nodes)), 30); } #[test] @@ -2231,7 +2290,7 @@ mod tests { assert!( message_allowed( authority, - MessageIntent::Broadcast { previous_attempts: 0 }, + MessageIntent::Broadcast, &crate::communication::round_topic::<Block>(1, 0), &[], ) @@ -2240,9 +2299,11 @@ mod tests { } #[test] - fn non_authorities_never_gossip_messages_on_first_attempt() { + fn non_authorities_never_gossip_messages_on_first_round_duration() { let mut config = config(); + config.gossip_duration = Duration::from_secs(300); // Set to high value to prevent test race config.is_authority = false; + let round_duration = config.gossip_duration * ROUND_DURATION; let (val, _) = GossipValidator::<Block>::new( config, @@ -2259,32 +2320,37 @@ mod tests { authorities.push(peer_id); } - let mut message_allowed = val.message_allowed(); - - // since our node is not an authority we should **never** gossip any - // messages on the first attempt. - for authority in &authorities { - assert!( - !message_allowed( - authority, - MessageIntent::Broadcast { previous_attempts: 0 }, - &crate::communication::round_topic::<Block>(1, 0), - &[], - ) - ); + { + let mut message_allowed = val.message_allowed(); + // since our node is not an authority we should **never** gossip any + // messages on the first attempt. + for authority in &authorities { + assert!( + !message_allowed( + authority, + MessageIntent::Broadcast, + &crate::communication::round_topic::<Block>(1, 0), + &[], + ) + ); + } } - // on the third attempt we should allow messages to authorities - // (on the second attempt we would do `sqrt(authorities)`) - for authority in &authorities { - assert!( - message_allowed( - authority, - MessageIntent::Broadcast { previous_attempts: 2 }, - &crate::communication::round_topic::<Block>(1, 0), - &[], - ) - ); + { + val.inner.write().round_start = Instant::now() - round_duration * 4; + let mut message_allowed = val.message_allowed(); + // on the fourth round duration we should allow messages to authorities + // (on the second we would do `sqrt(authorities)`) + for authority in &authorities { + assert!( + message_allowed( + authority, + MessageIntent::Broadcast, + &crate::communication::round_topic::<Block>(1, 0), + &[], + ) + ); + } } } } diff --git a/substrate/client/network/src/protocol/consensus_gossip.rs b/substrate/client/network/src/protocol/consensus_gossip.rs index b1f56bc8f50..8cde3c66feb 100644 --- a/substrate/client/network/src/protocol/consensus_gossip.rs +++ b/substrate/client/network/src/protocol/consensus_gossip.rs @@ -73,7 +73,6 @@ const UNREGISTERED_TOPIC_REPUTATION_CHANGE: i32 = -(1 << 10); struct PeerConsensus<H> { known_messages: HashSet<H>, - filtered_messages: HashMap<H, usize>, roles: Roles, } @@ -108,11 +107,7 @@ pub enum MessageRecipient { #[cfg_attr(test, derive(Debug))] pub enum MessageIntent { /// Requested broadcast. - Broadcast { - /// How many times this message was previously filtered by the gossip - /// validator when trying to propagate to a given peer. - previous_attempts: usize - }, + Broadcast, /// Requested broadcast to all peers. ForcedBroadcast, /// Periodic rebroadcast of all messages to all peers. @@ -131,7 +126,7 @@ pub enum ValidationResult<H> { impl MessageIntent { fn broadcast() -> MessageIntent { - MessageIntent::Broadcast { previous_attempts: 0 } + MessageIntent::Broadcast } } @@ -190,7 +185,8 @@ fn propagate<'a, B: BlockT, I>( peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>, validators: &HashMap<ConsensusEngineId, Arc<dyn Validator<B>>>, ) - where I: Clone + IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a ConsensusMessage)>, // (msg_hash, topic, message) + // (msg_hash, topic, message) + where I: Clone + IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a ConsensusMessage)>, { let mut check_fns = HashMap::new(); let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, message: &ConsensusMessage| { @@ -209,17 +205,12 @@ fn propagate<'a, B: BlockT, I>( for (id, ref mut peer) in peers.iter_mut() { let mut batch = Vec::new(); for (message_hash, topic, message) in messages.clone() { - let previous_attempts = peer.filtered_messages - .get(&message_hash) - .cloned() - .unwrap_or(0); - let intent = match intent { MessageIntent::Broadcast { .. } => if peer.known_messages.contains(&message_hash) { continue; } else { - MessageIntent::Broadcast { previous_attempts } + MessageIntent::Broadcast }, MessageIntent::PeriodicRebroadcast => if peer.known_messages.contains(&message_hash) { @@ -227,22 +218,15 @@ fn propagate<'a, B: BlockT, I>( } else { // peer doesn't know message, so the logic should treat it as an // initial broadcast. - MessageIntent::Broadcast { previous_attempts } + MessageIntent::Broadcast }, other => other, }; if !message_allowed(id, intent, &topic, &message) { - let count = peer.filtered_messages - .entry(message_hash.clone()) - .or_insert(0); - - *count += 1; - continue; } - peer.filtered_messages.remove(message_hash); peer.known_messages.insert(message_hash.clone()); trace!(target: "gossip", "Propagating to {}: {:?}", id, message); @@ -338,7 +322,6 @@ impl<B: BlockT> ConsensusGossip<B> { trace!(target:"gossip", "Registering {:?} {}", roles, who); self.peers.insert(who.clone(), PeerConsensus { known_messages: HashSet::new(), - filtered_messages: HashMap::new(), roles, }); for (engine_id, v) in self.validators.clone() { @@ -448,7 +431,6 @@ impl<B: BlockT> ConsensusGossip<B> { for (_, ref mut peer) in self.peers.iter_mut() { peer.known_messages.retain(|h| known_messages.contains(h)); - peer.filtered_messages.retain(|h, _| known_messages.contains(h)); } } @@ -566,12 +548,7 @@ impl<B: BlockT> ConsensusGossip<B> { let intent = if force { MessageIntent::ForcedBroadcast } else { - let previous_attempts = peer.filtered_messages - .get(&entry.message_hash) - .cloned() - .unwrap_or(0); - - MessageIntent::Broadcast { previous_attempts } + MessageIntent::Broadcast }; if !force && peer.known_messages.contains(&entry.message_hash) { @@ -579,16 +556,9 @@ impl<B: BlockT> ConsensusGossip<B> { } if !message_allowed(who, intent, &entry.topic, &entry.message.data) { - let count = peer.filtered_messages - .entry(entry.message_hash) - .or_insert(0); - - *count += 1; - continue; } - peer.filtered_messages.remove(&entry.message_hash); peer.known_messages.insert(entry.message_hash.clone()); trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message); @@ -632,7 +602,6 @@ impl<B: BlockT> ConsensusGossip<B> { trace!(target: "gossip", "Sending direct to {}: {:?}", who, message); - peer.filtered_messages.remove(&message_hash); peer.known_messages.insert(message_hash); protocol.send_consensus(who.clone(), vec![message.clone()]); } @@ -662,8 +631,7 @@ impl<B: BlockT> Validator<B> for DiscardAll { #[cfg(test)] mod tests { - use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; - use parking_lot::Mutex; + use std::sync::Arc; use sr_primitives::testing::{H256, Block as RawBlock, ExtrinsicWrapper}; use futures03::executor::block_on_stream; @@ -812,109 +780,4 @@ mod tests { let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic)); assert_eq!(stream.next(), None); } - - #[test] - fn keeps_track_of_broadcast_attempts() { - struct DummyNetworkContext; - impl<B: BlockT> Context<B> for DummyNetworkContext { - fn report_peer(&mut self, _who: PeerId, _reputation: i32) {} - fn disconnect_peer(&mut self, _who: PeerId) {} - fn send_consensus(&mut self, _who: PeerId, _consensus: Vec<ConsensusMessage>) {} - fn send_chain_specific(&mut self, _who: PeerId, _message: Vec<u8>) {} - } - - // A mock gossip validator that never expires any message, allows - // setting whether messages should be allowed and keeps track of any - // messages passed to `message_allowed`. - struct MockValidator { - allow: AtomicBool, - messages: Arc<Mutex<Vec<(Vec<u8>, MessageIntent)>>>, - } - - impl MockValidator { - fn new() -> MockValidator { - MockValidator { - allow: AtomicBool::new(false), - messages: Arc::new(Mutex::new(Vec::new())), - } - } - } - - impl Validator<Block> for MockValidator { - fn validate( - &self, - _context: &mut dyn ValidatorContext<Block>, - _sender: &PeerId, - _data: &[u8], - ) -> ValidationResult<H256> { - ValidationResult::ProcessAndKeep(H256::default()) - } - - fn message_expired<'a>(&'a self) -> Box<dyn FnMut(H256, &[u8]) -> bool + 'a> { - Box::new(move |_topic, _data| false) - } - - fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &H256, &[u8]) -> bool + 'a> { - let messages = self.messages.clone(); - Box::new(move |_, intent, _, data| { - messages.lock().push((data.to_vec(), intent)); - self.allow.load(Ordering::SeqCst) - }) - } - } - - // we setup an instance of the mock gossip validator, add a new peer to - // it and register a message. - let mut consensus = ConsensusGossip::<Block>::new(); - let validator = Arc::new(MockValidator::new()); - consensus.register_validator_internal([0, 0, 0, 0], validator.clone()); - consensus.new_peer( - &mut DummyNetworkContext, - PeerId::random(), - Roles::AUTHORITY, - ); - - let data = vec![1, 2, 3]; - let msg = ConsensusMessage { data: data.clone(), engine_id: [0, 0, 0, 0] }; - consensus.register_message(H256::default(), msg); - - // tick the gossip handler and make sure it triggers a message rebroadcast - let mut tick = || { - consensus.next_broadcast = std::time::Instant::now(); - consensus.tick(&mut DummyNetworkContext); - }; - - // by default we won't allow the message we registered, so everytime we - // tick the gossip handler, the message intent should be kept as - // `Broadcast` but the previous attempts should be incremented. - tick(); - assert_eq!( - validator.messages.lock().pop().unwrap(), - (data.clone(), MessageIntent::Broadcast { previous_attempts: 0 }), - ); - - tick(); - assert_eq!( - validator.messages.lock().pop().unwrap(), - (data.clone(), MessageIntent::Broadcast { previous_attempts: 1 }), - ); - - // we set the validator to allow the message to go through - validator.allow.store(true, Ordering::SeqCst); - - // we still get the same message intent but it should be delivered now - tick(); - assert_eq!( - validator.messages.lock().pop().unwrap(), - (data.clone(), MessageIntent::Broadcast { previous_attempts: 2 }), - ); - - // ticking the gossip handler again the message intent should change to - // `PeriodicRebroadcast` since it was sent. - tick(); - assert_eq!( - validator.messages.lock().pop().unwrap(), - (data.clone(), MessageIntent::PeriodicRebroadcast), - ); - } } -- GitLab