Skip to content
Snippets Groups Projects
Commit 1e2424ec authored by Adrian Catangiu's avatar Adrian Catangiu Committed by GitHub
Browse files

BEEFY voter bugfixes (#11335)


* beefy: gadget should always use current validator set

The gadget/client-voter was using previous' session validator set
to sign the 1st block in the new session (to have chained validator
set handoffs).

This is not necessary because:
1. BEEFY piggy-backs on GRANDPA and only works on canonical chain,
   so it need not concern itself with the validity of the block header
   (which contains digest with the new session's validator set). It
   can safely assume header is valid and simply use new validator set.
2. The BEEFY payload itself already contains a merkle root for the
   next validator set keys. So at the BEEFY-payload layer we already
   have a validated/trusted hand-off of authority.

Signed-off-by: default avataracatangiu <adrian@parity.io>

* beefy: buffer votes for not yet finalized blocks

Signed-off-by: default avataracatangiu <adrian@parity.io>

* beefy: add buffered votes regression test
parent 9fd4df9c
No related merge requests found
......@@ -73,7 +73,6 @@ pub(crate) struct Rounds<Payload, B: Block> {
best_done: Option<NumberFor<B>>,
session_start: NumberFor<B>,
validator_set: ValidatorSet<Public>,
prev_validator_set: ValidatorSet<Public>,
}
impl<P, B> Rounds<P, B>
......@@ -81,18 +80,8 @@ where
P: Ord + Hash + Clone,
B: Block,
{
pub(crate) fn new(
session_start: NumberFor<B>,
validator_set: ValidatorSet<Public>,
prev_validator_set: ValidatorSet<Public>,
) -> Self {
Rounds {
rounds: BTreeMap::new(),
best_done: None,
session_start,
validator_set,
prev_validator_set,
}
pub(crate) fn new(session_start: NumberFor<B>, validator_set: ValidatorSet<Public>) -> Self {
Rounds { rounds: BTreeMap::new(), best_done: None, session_start, validator_set }
}
}
......@@ -101,24 +90,12 @@ where
P: Ord + Hash + Clone,
B: Block,
{
pub(crate) fn validator_set_id_for(&self, block_number: NumberFor<B>) -> ValidatorSetId {
if block_number > self.session_start {
self.validator_set.id()
} else {
self.prev_validator_set.id()
}
}
pub(crate) fn validators_for(&self, block_number: NumberFor<B>) -> &[Public] {
if block_number > self.session_start {
self.validator_set.validators()
} else {
self.prev_validator_set.validators()
}
pub(crate) fn validator_set_id(&self) -> ValidatorSetId {
self.validator_set.id()
}
pub(crate) fn validator_set(&self) -> &ValidatorSet<Public> {
&self.validator_set
pub(crate) fn validators(&self) -> &[Public] {
self.validator_set.validators()
}
pub(crate) fn session_start(&self) -> &NumberFor<B> {
......@@ -143,7 +120,7 @@ where
round.1
);
false
} else if !self.validator_set.validators().iter().any(|id| vote.0 == *id) {
} else if !self.validators().iter().any(|id| vote.0 == *id) {
debug!(
target: "beefy",
"🥩 received vote {:?} from validator that is not in the validator set, ignoring",
......@@ -170,12 +147,11 @@ where
// remove this and older (now stale) rounds
let signatures = self.rounds.remove(round)?.votes;
self.rounds.retain(|&(_, number), _| number > round.1);
self.best_done = self.best_done.clone().max(Some(round.1.clone()));
self.best_done = self.best_done.max(Some(round.1));
debug!(target: "beefy", "🥩 Concluded round #{}", round.1);
Some(
self.validator_set
.validators()
self.validators()
.iter()
.map(|authority_id| signatures.get(authority_id).cloned())
.collect(),
......@@ -247,13 +223,13 @@ mod tests {
.unwrap();
let session_start = 1u64.into();
let rounds = Rounds::<H256, Block>::new(session_start, validators.clone(), validators);
let rounds = Rounds::<H256, Block>::new(session_start, validators);
assert_eq!(42, rounds.validator_set_id_for(session_start));
assert_eq!(42, rounds.validator_set_id());
assert_eq!(1, *rounds.session_start());
assert_eq!(
&vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()],
rounds.validators_for(session_start)
rounds.validators()
);
}
......@@ -274,7 +250,7 @@ mod tests {
let round = (H256::from_low_u64_le(1), 1);
let session_start = 1u64.into();
let mut rounds = Rounds::<H256, Block>::new(session_start, validators.clone(), validators);
let mut rounds = Rounds::<H256, Block>::new(session_start, validators);
// no self vote yet, should self vote
assert!(rounds.should_self_vote(&round));
......@@ -347,7 +323,7 @@ mod tests {
.unwrap();
let session_start = 1u64.into();
let mut rounds = Rounds::<H256, Block>::new(session_start, validators.clone(), validators);
let mut rounds = Rounds::<H256, Block>::new(session_start, validators);
// round 1
assert!(rounds.add_vote(
......
......@@ -469,8 +469,8 @@ fn finalize_block_and_wait_for_beefy(
}
if expected_beefy.is_empty() {
// run for 1 second then verify no new best beefy block available
let timeout = Some(Duration::from_millis(500));
// run for quarter second then verify no new best beefy block available
let timeout = Some(Duration::from_millis(250));
streams_empty_after_timeout(best_blocks, &net, runtime, timeout);
streams_empty_after_timeout(signed_commitments, &net, runtime, None);
} else {
......@@ -535,8 +535,8 @@ fn lagging_validators() {
let beefy_peers = peers.iter().enumerate().map(|(id, key)| (id, key, api.clone())).collect();
runtime.spawn(initialize_beefy(&mut net, beefy_peers, min_block_delta));
// push 42 blocks including `AuthorityChange` digests every 30 blocks.
net.generate_blocks(42, session_len, &validator_set, true);
// push 62 blocks including `AuthorityChange` digests every 30 blocks.
net.generate_blocks(62, session_len, &validator_set, true);
net.block_until_sync();
let net = Arc::new(Mutex::new(net));
......@@ -550,7 +550,7 @@ fn lagging_validators() {
let (best_blocks, signed_commitments) = get_beefy_streams(&mut *net.lock(), peers);
net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap();
// verify nothing gets finalized by BEEFY
let timeout = Some(Duration::from_millis(500));
let timeout = Some(Duration::from_millis(250));
streams_empty_after_timeout(best_blocks, &net, &mut runtime, timeout);
streams_empty_after_timeout(signed_commitments, &net, &mut runtime, None);
......@@ -563,6 +563,26 @@ fn lagging_validators() {
// Both finalize #30 (mandatory session) and #32 -> BEEFY finalize #30 (mandatory), #31, #32
finalize_block_and_wait_for_beefy(&net, peers, &mut runtime, &[30, 32], &[30, 31, 32]);
// Verify that session-boundary votes get buffered by client and only processed once
// session-boundary block is GRANDPA-finalized (this guarantees authenticity for the new session
// validator set).
// Alice finalizes session-boundary mandatory block #60, Bob lags behind
let (best_blocks, signed_commitments) = get_beefy_streams(&mut *net.lock(), peers);
let finalize = BlockId::number(60);
net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap();
// verify nothing gets finalized by BEEFY
let timeout = Some(Duration::from_millis(250));
streams_empty_after_timeout(best_blocks, &net, &mut runtime, timeout);
streams_empty_after_timeout(signed_commitments, &net, &mut runtime, None);
// Bob catches up and also finalizes #60 (and should have buffered Alice's vote on #60)
let (best_blocks, signed_commitments) = get_beefy_streams(&mut *net.lock(), peers);
net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap();
// verify beefy skips intermediary votes, and successfully finalizes mandatory block #40
wait_for_best_beefy_blocks(best_blocks, &net, &mut runtime, &[60]);
wait_for_beefy_signed_commitments(signed_commitments, &net, &mut runtime, &[60]);
}
#[test]
......@@ -624,7 +644,7 @@ fn correct_beefy_payload() {
.unwrap();
// verify consensus is _not_ reached
let timeout = Some(Duration::from_millis(500));
let timeout = Some(Duration::from_millis(250));
streams_empty_after_timeout(best_blocks, &net, &mut runtime, timeout);
streams_empty_after_timeout(signed_commitments, &net, &mut runtime, None);
......
......@@ -16,7 +16,13 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::{collections::BTreeSet, fmt::Debug, marker::PhantomData, sync::Arc, time::Duration};
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Debug,
marker::PhantomData,
sync::Arc,
time::Duration,
};
use codec::{Codec, Decode, Encode};
use futures::{future, FutureExt, StreamExt};
......@@ -27,7 +33,7 @@ use sc_client_api::{Backend, FinalityNotification, FinalityNotifications};
use sc_network_gossip::GossipEngine;
use sp_api::{BlockId, ProvideRuntimeApi};
use sp_arithmetic::traits::AtLeast32Bit;
use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
use sp_consensus::SyncOracle;
use sp_mmr_primitives::MmrApi;
use sp_runtime::{
......@@ -80,6 +86,8 @@ pub(crate) struct BeefyWorker<B: Block, BE, C, R, SO> {
min_block_delta: u32,
metrics: Option<Metrics>,
rounds: Option<Rounds<Payload, B>>,
/// Buffer holding votes for blocks that the client hasn't seen finality for.
pending_votes: BTreeMap<NumberFor<B>, Vec<VoteMessage<NumberFor<B>, AuthorityId, Signature>>>,
finality_notifications: FinalityNotifications<B>,
/// Best block we received a GRANDPA notification for
best_grandpa_block_header: <B as Block>::Header,
......@@ -141,6 +149,7 @@ where
min_block_delta: min_block_delta.max(1),
metrics,
rounds: None,
pending_votes: BTreeMap::new(),
finality_notifications: client.finality_notification_stream(),
best_grandpa_block_header: last_finalized_header,
best_beefy_block: None,
......@@ -238,7 +247,11 @@ where
}
/// Handle session changes by starting new voting round for mandatory blocks.
fn init_session_at(&mut self, active: ValidatorSet<AuthorityId>, session_start: NumberFor<B>) {
fn init_session_at(
&mut self,
active: ValidatorSet<AuthorityId>,
new_session_start: NumberFor<B>,
) {
debug!(target: "beefy", "🥩 New active validator set: {:?}", active);
metric_set!(self, beefy_validator_set_id, active.id());
// BEEFY should produce a signed commitment for each session
......@@ -246,23 +259,22 @@ where
active.id() != GENESIS_AUTHORITY_SET_ID &&
self.last_signed_id != 0
{
debug!(
target: "beefy", "🥩 Detected skipped session: active-id {:?}, last-signed-id {:?}",
active.id(),
self.last_signed_id,
);
metric_inc!(self, beefy_skipped_sessions);
}
if log_enabled!(target: "beefy", log::Level::Debug) {
// verify the new validator set - only do it if we're also logging the warning
let _ = self.verify_validator_set(&session_start, &active);
let _ = self.verify_validator_set(&new_session_start, &active);
}
let prev_validator_set = if let Some(r) = &self.rounds {
r.validator_set().clone()
} else {
// no previous rounds present use new validator set instead (genesis case)
active.clone()
};
let id = active.id();
self.rounds = Some(Rounds::new(session_start, active, prev_validator_set));
info!(target: "beefy", "🥩 New Rounds for validator set id: {:?} with session_start {:?}", id, session_start);
self.rounds = Some(Rounds::new(new_session_start, active));
info!(target: "beefy", "🥩 New Rounds for validator set id: {:?} with session_start {:?}", id, new_session_start);
}
fn handle_finality_notification(&mut self, notification: &FinalityNotification<B>) {
......@@ -287,12 +299,36 @@ where
self.init_session_at(new_validator_set, *header.number());
}
// Handle any pending votes for now finalized blocks.
self.check_pending_votes();
// Vote if there's now a new vote target.
if let Some(target_number) = self.current_vote_target() {
self.do_vote(target_number);
}
}
// Handles all buffered votes for now finalized blocks.
fn check_pending_votes(&mut self) {
let not_finalized = self.best_grandpa_block_header.number().saturating_add(1u32.into());
let still_pending = self.pending_votes.split_off(&not_finalized);
let votes_to_handle = std::mem::replace(&mut self.pending_votes, still_pending);
for (num, votes) in votes_to_handle.into_iter() {
if Some(num) > self.best_beefy_block {
debug!(target: "beefy", "🥩 Handling buffered votes for now GRANDPA finalized block: {:?}.", num);
for v in votes.into_iter() {
self.handle_vote(
(v.commitment.payload, v.commitment.block_number),
(v.id, v.signature),
false,
);
}
} else {
debug!(target: "beefy", "🥩 Dropping outdated buffered votes for now BEEFY finalized block: {:?}.", num);
}
}
}
fn handle_vote(
&mut self,
round: (Payload, NumberFor<B>),
......@@ -313,7 +349,7 @@ where
self.gossip_validator.conclude_round(round.1);
// id is stored for skipped session metric calculation
self.last_signed_id = rounds.validator_set_id_for(round.1);
self.last_signed_id = rounds.validator_set_id();
let block_num = round.1;
let commitment = Commitment {
......@@ -390,7 +426,7 @@ where
debug!(target: "beefy", "🥩 Don't double vote for block number: {:?}", target_number);
return
}
(rounds.validators_for(target_number), rounds.validator_set_id_for(target_number))
(rounds.validators(), rounds.validator_set_id())
} else {
debug!(target: "beefy", "🥩 Missing validator set - can't vote for: {:?}", target_hash);
return
......@@ -506,11 +542,23 @@ where
},
vote = votes.next().fuse() => {
if let Some(vote) = vote {
self.handle_vote(
(vote.commitment.payload, vote.commitment.block_number),
(vote.id, vote.signature),
false
);
let block_num = vote.commitment.block_number;
if block_num > *self.best_grandpa_block_header.number() {
// Only handle votes for blocks we _know_ have been finalized.
// Buffer vote to be handled later.
debug!(
target: "beefy",
"🥩 Buffering vote for not (yet) finalized block: {:?}.",
block_num
);
self.pending_votes.entry(block_num).or_default().push(vote);
} else {
self.handle_vote(
(vote.commitment.payload, vote.commitment.block_number),
(vote.id, vote.signature),
false
);
}
} else {
return;
}
......@@ -854,8 +902,7 @@ pub(crate) mod tests {
worker.best_grandpa_block_header = grandpa_header;
worker.best_beefy_block = best_beefy;
worker.min_block_delta = min_delta;
worker.rounds =
Some(Rounds::new(session_start, validator_set.clone(), validator_set.clone()));
worker.rounds = Some(Rounds::new(session_start, validator_set.clone()));
};
// under min delta
......@@ -970,11 +1017,10 @@ pub(crate) mod tests {
worker.init_session_at(validator_set.clone(), 1);
let worker_rounds = worker.rounds.as_ref().unwrap();
assert_eq!(worker_rounds.validator_set(), &validator_set);
assert_eq!(worker_rounds.session_start(), &1);
// in genesis case both current and prev validator sets are the same
assert_eq!(worker_rounds.validator_set_id_for(1), validator_set.id());
assert_eq!(worker_rounds.validator_set_id_for(2), validator_set.id());
assert_eq!(worker_rounds.validators(), validator_set.validators());
assert_eq!(worker_rounds.validator_set_id(), validator_set.id());
// new validator set
let keys = &[Keyring::Bob];
......@@ -984,11 +1030,8 @@ pub(crate) mod tests {
worker.init_session_at(new_validator_set.clone(), 11);
let worker_rounds = worker.rounds.as_ref().unwrap();
assert_eq!(worker_rounds.validator_set(), &new_validator_set);
assert_eq!(worker_rounds.session_start(), &11);
// mandatory block gets prev set, further blocks get new set
assert_eq!(worker_rounds.validator_set_id_for(11), validator_set.id());
assert_eq!(worker_rounds.validator_set_id_for(12), new_validator_set.id());
assert_eq!(worker_rounds.validator_set_id_for(13), new_validator_set.id());
assert_eq!(worker_rounds.validators(), new_validator_set.validators());
assert_eq!(worker_rounds.validator_set_id(), new_validator_set.id());
}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment