diff --git a/substrate/client/beefy/src/tests.rs b/substrate/client/beefy/src/tests.rs index e8d32fe3e81279b2d68a6cf88816c447f3c1a9e8..e0058e5238d6a59db883650a557c574dd34cbe3d 100644 --- a/substrate/client/beefy/src/tests.rs +++ b/substrate/client/beefy/src/tests.rs @@ -811,3 +811,37 @@ fn beefy_importing_blocks() { })); } } + +#[test] +fn voter_initialization() { + sp_tracing::try_init_simple(); + // Regression test for voter initialization where finality notifications were dropped + // after waiting for BEEFY pallet availability. + + let mut runtime = Runtime::new().unwrap(); + let peers = &[BeefyKeyring::Alice, BeefyKeyring::Bob]; + let validator_set = ValidatorSet::new(make_beefy_ids(peers), 0).unwrap(); + let session_len = 5; + // Should vote on all mandatory blocks no matter the `min_block_delta`. + let min_block_delta = 10; + + let mut net = BeefyTestNet::new(2, 0); + let api = Arc::new(two_validators::TestApi {}); + let beefy_peers = peers.iter().enumerate().map(|(id, key)| (id, key, api.clone())).collect(); + runtime.spawn(initialize_beefy(&mut net, beefy_peers, min_block_delta)); + + // push 30 blocks + net.generate_blocks_and_sync(26, session_len, &validator_set, false); + let net = Arc::new(Mutex::new(net)); + + // Finalize multiple blocks at once to get a burst of finality notifications right from start. + // Need to finalize at least one block in each session, choose randomly. + // Expect voters to pick up all of them and BEEFY-finalize the mandatory blocks of each session. + finalize_block_and_wait_for_beefy( + &net, + peers, + &mut runtime, + &[1, 6, 10, 17, 24, 26], + &[1, 5, 10, 15, 20, 25], + ); +} diff --git a/substrate/client/beefy/src/worker.rs b/substrate/client/beefy/src/worker.rs index 9f54a300472de878d86a2c28240c776070d82afa..6e8c89d8049843747a474483c4a10a34f51861e5 100644 --- a/substrate/client/beefy/src/worker.rs +++ b/substrate/client/beefy/src/worker.rs @@ -24,10 +24,10 @@ use std::{ }; use codec::{Codec, Decode, Encode}; -use futures::StreamExt; +use futures::{stream::Fuse, StreamExt}; use log::{debug, error, info, log_enabled, trace, warn}; -use sc_client_api::{Backend, FinalityNotification, HeaderBackend}; +use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend}; use sc_network_gossip::GossipEngine; use sp_api::{BlockId, ProvideRuntimeApi}; @@ -723,12 +723,11 @@ where /// Wait for BEEFY runtime pallet to be available. /// Should be called only once during worker initialization. - async fn wait_for_runtime_pallet(&mut self) { + async fn wait_for_runtime_pallet(&mut self, finality: &mut Fuse<FinalityNotifications<B>>) { let mut gossip_engine = &mut self.gossip_engine; - let mut finality_stream = self.client.finality_notification_stream().fuse(); loop { futures::select! { - notif = finality_stream.next() => { + notif = finality.next() => { let notif = match notif { Some(notif) => notif, None => break @@ -762,11 +761,13 @@ where pub(crate) async fn run(mut self) { info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block_header.number()); let mut block_import_justif = self.links.from_block_import_justif_stream.subscribe().fuse(); + // Subscribe to finality notifications before waiting for runtime pallet and reuse stream, + // so we process notifications for all finalized blocks after pallet is available. + let mut finality_notifications = self.client.finality_notification_stream().fuse(); - self.wait_for_runtime_pallet().await; + self.wait_for_runtime_pallet(&mut finality_notifications).await; trace!(target: "beefy", "🥩 BEEFY pallet available, starting voter."); - let mut finality_notifications = self.client.finality_notification_stream().fuse(); let mut votes = Box::pin( self.gossip_engine .messages_for(topic::<B>())