From 832060000bbdbc87cac233bde8954ae7263f6500 Mon Sep 17 00:00:00 2001 From: Adrian Catangiu <adrian@parity.io> Date: Fri, 13 Oct 2023 10:12:30 +0300 Subject: [PATCH] sc-consensus-beefy: improve gossip logic (#1852) - Remove cached messages used for deduplication in `GossipValidator` since they're already deduplicated in upper layer `NetworkGossip`. - Add cache for "justified rounds" to quickly discard any further (even if potentially different) justifications at the gossip level, once a valid one (for a respective round) is submitted to the worker. - Add short-circuit in worker `finalize()` method to not attempt to finalize same block multiple times (for example when we get justifications for same block from multiple components like block-import, gossip or on-demand). - Change a test which had A LOT of latency in syncing blocks for some weird reason and would only run after ~150seconds. It now runs instantly. Fixes https://github.com/paritytech/polkadot-sdk/issues/1728 --- .../beefy/src/communication/gossip.rs | 156 ++++++------------ .../consensus/beefy/src/communication/mod.rs | 2 +- substrate/client/consensus/beefy/src/tests.rs | 12 +- .../client/consensus/beefy/src/worker.rs | 11 +- 4 files changed, 67 insertions(+), 114 deletions(-) diff --git a/substrate/client/consensus/beefy/src/communication/gossip.rs b/substrate/client/consensus/beefy/src/communication/gossip.rs index 8c025ca0676..342cd0511a5 100644 --- a/substrate/client/consensus/beefy/src/communication/gossip.rs +++ b/substrate/client/consensus/beefy/src/communication/gossip.rs @@ -16,11 +16,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use std::{collections::BTreeSet, sync::Arc, time::Duration}; use sc_network::{PeerId, ReputationChange}; use sc_network_gossip::{MessageIntent, ValidationResult, Validator, ValidatorContext}; -use sp_core::hashing::twox_64; use sp_runtime::traits::{Block, Hash, Header, NumberFor}; use codec::{Decode, DecodeAll, Encode}; @@ -115,9 +114,6 @@ where <<B::Header as Header>::Hashing as Hash>::hash(b"beefy-justifications") } -/// A type that represents hash of the message. -pub type MessageHash = [u8; 8]; - #[derive(Clone, Debug)] pub(crate) struct GossipFilterCfg<'a, B: Block> { pub start: NumberFor<B>, @@ -133,18 +129,21 @@ struct FilterInner<B: Block> { } struct Filter<B: Block> { + // specifies live rounds inner: Option<FilterInner<B>>, - live_votes: BTreeMap<NumberFor<B>, fnv::FnvHashSet<MessageHash>>, + // cache of seen valid justifications in active rounds + rounds_with_valid_proofs: BTreeSet<NumberFor<B>>, } impl<B: Block> Filter<B> { pub fn new() -> Self { - Self { inner: None, live_votes: BTreeMap::new() } + Self { inner: None, rounds_with_valid_proofs: BTreeSet::new() } } /// Update filter to new `start` and `set_id`. fn update(&mut self, cfg: GossipFilterCfg<B>) { - self.live_votes.retain(|&round, _| round >= cfg.start && round <= cfg.end); + self.rounds_with_valid_proofs + .retain(|&round| round >= cfg.start && round <= cfg.end); // only clone+overwrite big validator_set if set_id changed match self.inner.as_mut() { Some(f) if f.validator_set.id() == cfg.validator_set.id() => { @@ -203,14 +202,14 @@ impl<B: Block> Filter<B> { .unwrap_or(Consider::RejectOutOfScope) } - /// Add new _known_ `hash` to the round's known votes. - fn add_known_vote(&mut self, round: NumberFor<B>, hash: MessageHash) { - self.live_votes.entry(round).or_default().insert(hash); + /// Add new _known_ `round` to the set of seen valid justifications. + fn mark_round_as_proven(&mut self, round: NumberFor<B>) { + self.rounds_with_valid_proofs.insert(round); } - /// Check if `hash` is already part of round's known votes. - fn is_known_vote(&self, round: NumberFor<B>, hash: &MessageHash) -> bool { - self.live_votes.get(&round).map(|known| known.contains(hash)).unwrap_or(false) + /// Check if `round` is already part of seen valid justifications. + fn is_already_proven(&self, round: NumberFor<B>) -> bool { + self.rounds_with_valid_proofs.contains(&round) } fn validator_set(&self) -> Option<&ValidatorSet<AuthorityId>> { @@ -273,16 +272,13 @@ where &self, vote: VoteMessage<NumberFor<B>, AuthorityId, Signature>, sender: &PeerId, - data: &[u8], ) -> Action<B::Hash> { - let msg_hash = twox_64(data); let round = vote.commitment.block_number; let set_id = vote.commitment.validator_set_id; self.known_peers.lock().note_vote_for(*sender, round); // Verify general usefulness of the message. - // We are going to discard old votes right away (without verification) - // Also we keep track of already received votes to avoid verifying duplicates. + // We are going to discard old votes right away (without verification). { let filter = self.gossip_filter.read(); @@ -293,10 +289,6 @@ where Consider::Accept => {}, } - if filter.is_known_vote(round, &msg_hash) { - return Action::Keep(self.votes_topic, benefit::KNOWN_VOTE_MESSAGE) - } - // ensure authority is part of the set. if !filter .validator_set() @@ -309,7 +301,6 @@ where } if BeefyKeystore::verify(&vote.id, &vote.signature, &vote.commitment.encode()) { - self.gossip_filter.write().add_known_vote(round, msg_hash); Action::Keep(self.votes_topic, benefit::VOTE_MESSAGE) } else { debug!( @@ -328,34 +319,46 @@ where let (round, set_id) = proof_block_num_and_set_id::<B>(&proof); self.known_peers.lock().note_vote_for(*sender, round); - let guard = self.gossip_filter.read(); - // Verify general usefulness of the justification. - match guard.consider_finality_proof(round, set_id) { - Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE), - Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE), - Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE), - Consider::Accept => {}, + let action = { + let guard = self.gossip_filter.read(); + + // Verify general usefulness of the justification. + match guard.consider_finality_proof(round, set_id) { + Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE), + Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE), + Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE), + Consider::Accept => {}, + } + + if guard.is_already_proven(round) { + return Action::Discard(benefit::NOT_INTERESTED) + } + + // Verify justification signatures. + guard + .validator_set() + .map(|validator_set| { + if let Err((_, signatures_checked)) = + verify_with_validator_set::<B>(round, validator_set, &proof) + { + debug!( + target: LOG_TARGET, + "🥩 Bad signatures on message: {:?}, from: {:?}", proof, sender + ); + let mut cost = cost::INVALID_PROOF; + cost.value += + cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32); + Action::Discard(cost) + } else { + Action::Keep(self.justifs_topic, benefit::VALIDATED_PROOF) + } + }) + .unwrap_or(Action::Discard(cost::OUT_OF_SCOPE_MESSAGE)) + }; + if matches!(action, Action::Keep(_, _)) { + self.gossip_filter.write().mark_round_as_proven(round); } - // Verify justification signatures. - guard - .validator_set() - .map(|validator_set| { - if let Err((_, signatures_checked)) = - verify_with_validator_set::<B>(round, validator_set, &proof) - { - debug!( - target: LOG_TARGET, - "🥩 Bad signatures on message: {:?}, from: {:?}", proof, sender - ); - let mut cost = cost::INVALID_PROOF; - cost.value += - cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32); - Action::Discard(cost) - } else { - Action::Keep(self.justifs_topic, benefit::VALIDATED_PROOF) - } - }) - .unwrap_or(Action::Discard(cost::OUT_OF_SCOPE_MESSAGE)) + action } } @@ -375,7 +378,7 @@ where ) -> ValidationResult<B::Hash> { let raw = data; let action = match GossipMessage::<B>::decode_all(&mut data) { - Ok(GossipMessage::Vote(msg)) => self.validate_vote(msg, sender, raw), + Ok(GossipMessage::Vote(msg)) => self.validate_vote(msg, sender), Ok(GossipMessage::FinalityProof(proof)) => self.validate_finality_proof(proof, sender), Err(e) => { debug!(target: LOG_TARGET, "Error decoding message: {}", e); @@ -483,41 +486,6 @@ pub(crate) mod tests { }; use sp_keystore::{testing::MemoryKeystore, Keystore}; - #[test] - fn known_votes_insert_remove() { - let mut filter = Filter::<Block>::new(); - let msg_hash = twox_64(b"data"); - let keys = vec![Keyring::Alice.public()]; - let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 1).unwrap(); - - filter.add_known_vote(1, msg_hash); - filter.add_known_vote(1, msg_hash); - filter.add_known_vote(2, msg_hash); - assert_eq!(filter.live_votes.len(), 2); - - filter.add_known_vote(3, msg_hash); - assert!(filter.is_known_vote(3, &msg_hash)); - assert!(!filter.is_known_vote(3, &twox_64(b"other"))); - assert!(!filter.is_known_vote(4, &msg_hash)); - assert_eq!(filter.live_votes.len(), 3); - - assert!(filter.inner.is_none()); - assert_eq!(filter.consider_vote(1, 1), Consider::RejectOutOfScope); - - filter.update(GossipFilterCfg { start: 3, end: 10, validator_set: &validator_set }); - assert_eq!(filter.live_votes.len(), 1); - assert!(filter.live_votes.contains_key(&3)); - assert_eq!(filter.consider_vote(2, 1), Consider::RejectPast); - assert_eq!(filter.consider_vote(3, 1), Consider::Accept); - assert_eq!(filter.consider_vote(4, 1), Consider::Accept); - assert_eq!(filter.consider_vote(20, 1), Consider::RejectFuture); - assert_eq!(filter.consider_vote(4, 2), Consider::RejectFuture); - - let validator_set = ValidatorSet::<AuthorityId>::new(keys, 2).unwrap(); - filter.update(GossipFilterCfg { start: 5, end: 10, validator_set: &validator_set }); - assert!(filter.live_votes.is_empty()); - } - struct TestContext; impl<B: sp_runtime::traits::Block> ValidatorContext<B> for TestContext { fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) { @@ -610,20 +578,6 @@ pub(crate) mod tests { assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); expected_report.cost_benefit = benefit::VOTE_MESSAGE; assert_eq!(report_stream.try_recv().unwrap(), expected_report); - assert_eq!( - gv.gossip_filter - .read() - .live_votes - .get(&vote.commitment.block_number) - .map(|x| x.len()), - Some(1) - ); - - // second time we should hit the cache - let res = gv.validate(&mut context, &sender, &encoded); - assert!(matches!(res, ValidationResult::ProcessAndKeep(_))); - expected_report.cost_benefit = benefit::KNOWN_VOTE_MESSAGE; - assert_eq!(report_stream.try_recv().unwrap(), expected_report); // reject vote, voter not in validator set let mut bad_vote = vote.clone(); @@ -692,7 +646,7 @@ pub(crate) mod tests { // reject proof, bad signatures (Bob instead of Alice) let bad_validator_set = ValidatorSet::<AuthorityId>::new(vec![Keyring::Bob.public()], 0).unwrap(); - let proof = dummy_proof(20, &bad_validator_set); + let proof = dummy_proof(21, &bad_validator_set); let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode(); let res = gv.validate(&mut context, &sender, &encoded_proof); assert!(matches!(res, ValidationResult::Discard)); diff --git a/substrate/client/consensus/beefy/src/communication/mod.rs b/substrate/client/consensus/beefy/src/communication/mod.rs index 7f9535bfc23..10a6071aae6 100644 --- a/substrate/client/consensus/beefy/src/communication/mod.rs +++ b/substrate/client/consensus/beefy/src/communication/mod.rs @@ -102,7 +102,7 @@ mod cost { mod benefit { use sc_network::ReputationChange as Rep; pub(super) const VOTE_MESSAGE: Rep = Rep::new(100, "BEEFY: Round vote message"); - pub(super) const KNOWN_VOTE_MESSAGE: Rep = Rep::new(50, "BEEFY: Known vote"); + pub(super) const NOT_INTERESTED: Rep = Rep::new(10, "BEEFY: Not interested in round"); pub(super) const VALIDATED_PROOF: Rep = Rep::new(100, "BEEFY: Justification"); } diff --git a/substrate/client/consensus/beefy/src/tests.rs b/substrate/client/consensus/beefy/src/tests.rs index 3bb65e9d57f..90b63c9cd44 100644 --- a/substrate/client/consensus/beefy/src/tests.rs +++ b/substrate/client/consensus/beefy/src/tests.rs @@ -1302,7 +1302,7 @@ async fn gossipped_finality_proofs() { // Only Alice and Bob are running the voter -> finality threshold not reached let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob]; let validator_set = ValidatorSet::new(make_beefy_ids(&validators), 0).unwrap(); - let session_len = 30; + let session_len = 10; let min_block_delta = 1; let mut net = BeefyTestNet::new(3); @@ -1332,14 +1332,8 @@ async fn gossipped_finality_proofs() { let net = Arc::new(Mutex::new(net)); - // Pump net + Charlie gossip to see peers. - let timeout = Box::pin(tokio::time::sleep(Duration::from_millis(200))); - let gossip_engine_pump = &mut charlie_gossip_engine; - let pump_with_timeout = future::select(gossip_engine_pump, timeout); - run_until(pump_with_timeout, &net).await; - - // push 10 blocks - let hashes = net.lock().generate_blocks_and_sync(10, session_len, &validator_set, true).await; + // push 42 blocks + let hashes = net.lock().generate_blocks_and_sync(42, session_len, &validator_set, true).await; let peers = peers.into_iter().enumerate(); diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs index a239e34030c..309d8c5135b 100644 --- a/substrate/client/consensus/beefy/src/worker.rs +++ b/substrate/client/consensus/beefy/src/worker.rs @@ -604,6 +604,11 @@ where VersionedFinalityProof::V1(ref sc) => sc.commitment.block_number, }; + if block_num <= self.persisted_state.voting_oracle.best_beefy_block { + // we've already finalized this round before, short-circuit. + return Ok(()) + } + // Finalize inner round and update voting_oracle state. self.persisted_state.voting_oracle.finalize(block_num)?; @@ -629,7 +634,7 @@ where self.backend .append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode())) }) { - error!( + debug!( target: LOG_TARGET, "🥩 Error {:?} on appending justification: {:?}", e, finality_proof ); @@ -648,7 +653,7 @@ where } /// Handle previously buffered justifications, that now land in the voting interval. - fn try_pending_justififactions(&mut self) -> Result<(), Error> { + fn try_pending_justifications(&mut self) -> Result<(), Error> { // Interval of blocks for which we can process justifications and votes right now. let (start, end) = self.voting_oracle().accepted_interval()?; // Process pending justifications. @@ -782,7 +787,7 @@ where fn process_new_state(&mut self) { // Handle pending justifications and/or votes for now GRANDPA finalized blocks. - if let Err(err) = self.try_pending_justififactions() { + if let Err(err) = self.try_pending_justifications() { debug!(target: LOG_TARGET, "🥩 {}", err); } -- GitLab