// This file is part of Substrate. // Copyright (C) 2017-2021 Parity Technologies (UK) Ltd. // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. //! Helper stream for waiting until one or more blocks are imported before //! passing through inner items. This is done in a generic way to support //! many different kinds of items. //! //! This is used for votes and commit messages currently. use super::{ BlockStatus as BlockStatusT, BlockSyncRequester as BlockSyncRequesterT, CommunicationIn, Error, SignedMessage, }; use log::{debug, warn}; use sp_utils::mpsc::TracingUnboundedReceiver; use futures::prelude::*; use futures::stream::{Fuse, StreamExt}; use futures_timer::Delay; use finality_grandpa::voter; use parking_lot::Mutex; use prometheus_endpoint::{ Gauge, U64, PrometheusError, register, Registry, }; use sc_client_api::{BlockImportNotification, ImportNotifications}; use sp_finality_grandpa::AuthorityId; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use std::collections::{HashMap, VecDeque}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; const LOG_PENDING_INTERVAL: Duration = Duration::from_secs(15); /// Something that needs to be withheld until specific blocks are available. /// /// For example a GRANDPA commit message which is not of any use without the corresponding block /// that it commits on. pub(crate) trait BlockUntilImported<Block: BlockT>: Sized { /// The type that is blocked on. type Blocked; /// Check if a new incoming item needs awaiting until a block(s) is imported. fn needs_waiting<S: BlockStatusT<Block>>( input: Self::Blocked, status_check: &S, ) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error>; /// called when the wait has completed. The canonical number is passed through /// for further checks. fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked>; } /// Describes whether a given [`BlockUntilImported`] (a) should be discarded, (b) is waiting for /// specific blocks to be imported or (c) is ready to be used. /// /// A reason for discarding a [`BlockUntilImported`] would be if a referenced block is perceived /// under a different number than specified in the message. pub(crate) enum DiscardWaitOrReady<Block: BlockT, W, R> { Discard, Wait(Vec<(Block::Hash, NumberFor<Block>, W)>), Ready(R), } /// Prometheus metrics for the `UntilImported` queue. // // At a given point in time there can be more than one `UntilImported` queue. One can not register a // metric twice, thus queues need to share the same Prometheus metrics instead of instantiating // their own ones. // // When a queue is dropped it might still contain messages. In order for those to not distort the // Prometheus metrics, the `Metric` struct cleans up after itself within its `Drop` implementation // by subtracting the local_waiting_messages (the amount of messages left in the queue about to // be dropped) from the global_waiting_messages gauge. pub(crate) struct Metrics { global_waiting_messages: Gauge<U64>, local_waiting_messages: u64, } impl Metrics { pub(crate) fn register(registry: &Registry) -> Result<Self, PrometheusError> { Ok(Self { global_waiting_messages: register(Gauge::new( "finality_grandpa_until_imported_waiting_messages_number", "Number of finality grandpa messages waiting within the until imported queue.", )?, registry)?, local_waiting_messages: 0, }) } fn waiting_messages_inc(&mut self) { self.local_waiting_messages += 1; self.global_waiting_messages.inc(); } fn waiting_messages_dec(&mut self) { self.local_waiting_messages -= 1; self.global_waiting_messages.dec(); } } impl Clone for Metrics { fn clone(&self) -> Self { Metrics { global_waiting_messages: self.global_waiting_messages.clone(), // When cloned, reset local_waiting_messages, so the global counter is not reduced a // second time for the same messages on `drop` of the clone. local_waiting_messages: 0, } } } impl Drop for Metrics { fn drop(&mut self) { // Reduce the global counter by the amount of messages that were still left in the dropped // queue. self.global_waiting_messages.sub(self.local_waiting_messages) } } /// Buffering incoming messages until blocks with given hashes are imported. pub(crate) struct UntilImported<Block, BlockStatus, BlockSyncRequester, I, M> where Block: BlockT, I: Stream<Item = M::Blocked> + Unpin, M: BlockUntilImported<Block>, { import_notifications: Fuse<TracingUnboundedReceiver<BlockImportNotification<Block>>>, block_sync_requester: BlockSyncRequester, status_check: BlockStatus, incoming_messages: Fuse<I>, ready: VecDeque<M::Blocked>, /// Interval at which to check status of each awaited block. check_pending: Pin<Box<dyn Stream<Item = Result<(), std::io::Error>> + Send + Sync>>, /// Mapping block hashes to their block number, the point in time it was /// first encountered (Instant) and a list of GRANDPA messages referencing /// the block hash. pending: HashMap<Block::Hash, (NumberFor<Block>, Instant, Vec<M>)>, /// Queue identifier for differentiation in logs. identifier: &'static str, /// Prometheus metrics. metrics: Option<Metrics>, } impl<Block, BlockStatus, BlockSyncRequester, I, M> Unpin for UntilImported<Block, BlockStatus, BlockSyncRequester, I, M > where Block: BlockT, I: Stream<Item = M::Blocked> + Unpin, M: BlockUntilImported<Block>, {} impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockStatus, BlockSyncRequester, I, M> where Block: BlockT, BlockStatus: BlockStatusT<Block>, BlockSyncRequester: BlockSyncRequesterT<Block>, I: Stream<Item = M::Blocked> + Unpin, M: BlockUntilImported<Block>, { /// Create a new `UntilImported` wrapper. pub(crate) fn new( import_notifications: ImportNotifications<Block>, block_sync_requester: BlockSyncRequester, status_check: BlockStatus, incoming_messages: I, identifier: &'static str, metrics: Option<Metrics>, ) -> Self { // how often to check if pending messages that are waiting for blocks to be // imported can be checked. // // the import notifications interval takes care of most of this; this is // used in the event of missed import notifications const CHECK_PENDING_INTERVAL: Duration = Duration::from_secs(5); let check_pending = futures::stream::unfold(Delay::new(CHECK_PENDING_INTERVAL), |delay| Box::pin(async move { delay.await; Some((Ok(()), Delay::new(CHECK_PENDING_INTERVAL))) })); UntilImported { import_notifications: import_notifications.fuse(), block_sync_requester, status_check, incoming_messages: incoming_messages.fuse(), ready: VecDeque::new(), check_pending: Box::pin(check_pending), pending: HashMap::new(), identifier, metrics, } } } impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStatus, BSyncRequester, I, M> where Block: BlockT, BStatus: BlockStatusT<Block>, BSyncRequester: BlockSyncRequesterT<Block>, I: Stream<Item = M::Blocked> + Unpin, M: BlockUntilImported<Block>, { type Item = Result<M::Blocked, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { // We are using a `this` variable in order to allow multiple simultaneous mutable borrow to // `self`. let this = &mut *self; loop { match StreamExt::poll_next_unpin(&mut this.incoming_messages, cx) { Poll::Ready(None) => return Poll::Ready(None), Poll::Ready(Some(input)) => { // new input: schedule wait of any parts which require // blocks to be known. match M::needs_waiting(input, &this.status_check)? { DiscardWaitOrReady::Discard => {}, DiscardWaitOrReady::Wait(items) => { for (target_hash, target_number, wait) in items { this.pending .entry(target_hash) .or_insert_with(|| (target_number, Instant::now(), Vec::new())) .2 .push(wait) } }, DiscardWaitOrReady::Ready(item) => this.ready.push_back(item), } if let Some(metrics) = &mut this.metrics { metrics.waiting_messages_inc(); } } Poll::Pending => break, } } loop { match StreamExt::poll_next_unpin(&mut this.import_notifications, cx) { Poll::Ready(None) => return Poll::Ready(None), Poll::Ready(Some(notification)) => { // new block imported. queue up all messages tied to that hash. if let Some((_, _, messages)) = this.pending.remove(¬ification.hash) { let canon_number = *notification.header.number(); let ready_messages = messages.into_iter() .filter_map(|m| m.wait_completed(canon_number)); this.ready.extend(ready_messages); } } Poll::Pending => break, } } let mut update_interval = false; while let Poll::Ready(Some(Ok(()))) = this.check_pending.poll_next_unpin(cx) { update_interval = true; } if update_interval { let mut known_keys = Vec::new(); for (&block_hash, &mut (block_number, ref mut last_log, ref v)) in this.pending.iter_mut() { if let Some(number) = this.status_check.block_number(block_hash)? { known_keys.push((block_hash, number)); } else { let next_log = *last_log + LOG_PENDING_INTERVAL; if Instant::now() >= next_log { debug!( target: "afg", "Waiting to import block {} before {} {} messages can be imported. \ Requesting network sync service to retrieve block from. \ Possible fork?", block_hash, v.len(), this.identifier, ); // NOTE: when sending an empty vec of peers the // underlying should make a best effort to sync the // block from any peers it knows about. this.block_sync_requester.set_sync_fork_request( vec![], block_hash, block_number, ); *last_log = next_log; } } } for (known_hash, canon_number) in known_keys { if let Some((_, _, pending_messages)) = this.pending.remove(&known_hash) { let ready_messages = pending_messages.into_iter() .filter_map(|m| m.wait_completed(canon_number)); this.ready.extend(ready_messages); } } } if let Some(ready) = this.ready.pop_front() { if let Some(metrics) = &mut this.metrics { metrics.waiting_messages_dec(); } return Poll::Ready(Some(Ok(ready))) } if this.import_notifications.is_done() && this.incoming_messages.is_done() { Poll::Ready(None) } else { Poll::Pending } } } fn warn_authority_wrong_target<H: ::std::fmt::Display>(hash: H, id: AuthorityId) { warn!( target: "afg", "Authority {:?} signed GRANDPA message with \ wrong block number for hash {}", id, hash, ); } impl<Block: BlockT> BlockUntilImported<Block> for SignedMessage<Block> { type Blocked = Self; fn needs_waiting<BlockStatus: BlockStatusT<Block>>( msg: Self::Blocked, status_check: &BlockStatus, ) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error> { let (&target_hash, target_number) = msg.target(); if let Some(number) = status_check.block_number(target_hash)? { if number != target_number { warn_authority_wrong_target(target_hash, msg.id); return Ok(DiscardWaitOrReady::Discard); } else { return Ok(DiscardWaitOrReady::Ready(msg)); } } Ok(DiscardWaitOrReady::Wait(vec![(target_hash, target_number, msg)])) } fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> { let (&target_hash, target_number) = self.target(); if canon_number != target_number { warn_authority_wrong_target(target_hash, self.id); None } else { Some(self) } } } /// Helper type definition for the stream which waits until vote targets for /// signed messages are imported. pub(crate) type UntilVoteTargetImported<Block, BlockStatus, BlockSyncRequester, I> = UntilImported< Block, BlockStatus, BlockSyncRequester, I, SignedMessage<Block>, >; /// This blocks a global message import, i.e. a commit or catch up messages, /// until all blocks referenced in its votes are known. /// /// This is used for compact commits and catch up messages which have already /// been checked for structural soundness (e.g. valid signatures). /// /// We use the `Arc`'s reference count to implicitly count the number of outstanding blocks that we /// are waiting on for the same message (i.e. other `BlockGlobalMessage` instances with the same /// `inner`). pub(crate) struct BlockGlobalMessage<Block: BlockT> { inner: Arc<Mutex<Option<CommunicationIn<Block>>>>, target_number: NumberFor<Block>, } impl<Block: BlockT> Unpin for BlockGlobalMessage<Block> {} impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> { type Blocked = CommunicationIn<Block>; fn needs_waiting<BlockStatus: BlockStatusT<Block>>( input: Self::Blocked, status_check: &BlockStatus, ) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error> { use std::collections::hash_map::Entry; enum KnownOrUnknown<N> { Known(N), Unknown(N), } impl<N> KnownOrUnknown<N> { fn number(&self) -> &N { match *self { KnownOrUnknown::Known(ref n) => n, KnownOrUnknown::Unknown(ref n) => n, } } } let mut checked_hashes: HashMap<_, KnownOrUnknown<NumberFor<Block>>> = HashMap::new(); { // returns false when should early exit. let mut query_known = |target_hash, perceived_number| -> Result<bool, Error> { // check integrity: all votes for same hash have same number. let canon_number = match checked_hashes.entry(target_hash) { Entry::Occupied(entry) => *entry.get().number(), Entry::Vacant(entry) => { if let Some(number) = status_check.block_number(target_hash)? { entry.insert(KnownOrUnknown::Known(number)); number } else { entry.insert(KnownOrUnknown::Unknown(perceived_number)); perceived_number } } }; if canon_number != perceived_number { // invalid global message: messages targeting wrong number // or at least different from other vote in same global // message. return Ok(false); } Ok(true) }; match input { voter::CommunicationIn::Commit(_, ref commit, ..) => { // add known hashes from all precommits. let precommit_targets = commit.precommits .iter() .map(|c| (c.target_number, c.target_hash)); for (target_number, target_hash) in precommit_targets { if !query_known(target_hash, target_number)? { return Ok(DiscardWaitOrReady::Discard); } } }, voter::CommunicationIn::CatchUp(ref catch_up, ..) => { // add known hashes from all prevotes and precommits. let prevote_targets = catch_up.prevotes .iter() .map(|s| (s.prevote.target_number, s.prevote.target_hash)); let precommit_targets = catch_up.precommits .iter() .map(|s| (s.precommit.target_number, s.precommit.target_hash)); let targets = prevote_targets.chain(precommit_targets); for (target_number, target_hash) in targets { if !query_known(target_hash, target_number)? { return Ok(DiscardWaitOrReady::Discard); } } }, }; } let unknown_hashes = checked_hashes.into_iter().filter_map(|(hash, num)| match num { KnownOrUnknown::Unknown(number) => Some((hash, number)), KnownOrUnknown::Known(_) => None, }).collect::<Vec<_>>(); if unknown_hashes.is_empty() { // none of the hashes in the global message were unknown. // we can just return the message directly. return Ok(DiscardWaitOrReady::Ready(input)); } let locked_global = Arc::new(Mutex::new(Some(input))); let items_to_await = unknown_hashes.into_iter().map(|(hash, target_number)| { (hash, target_number, BlockGlobalMessage { inner: locked_global.clone(), target_number }) }).collect(); // schedule waits for all unknown messages. // when the last one of these has `wait_completed` called on it, // the global message will be returned. Ok(DiscardWaitOrReady::Wait(items_to_await)) } fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> { if self.target_number != canon_number { // Delete the inner message so it won't ever be forwarded. Future calls to // `wait_completed` on the same `inner` will ignore it. *self.inner.lock() = None; return None; } match Arc::try_unwrap(self.inner) { // This is the last reference and thus the last outstanding block to be awaited. `inner` // is either `Some(_)` or `None`. The latter implies that a previous `wait_completed` // call witnessed a block number mismatch (see above). Ok(inner) => Mutex::into_inner(inner), // There are still other strong references to this `Arc`, thus the message is blocked on // other blocks to be imported. Err(_) => None, } } } /// A stream which gates off incoming global messages, i.e. commit and catch up /// messages, until all referenced block hashes have been imported. pub(crate) type UntilGlobalMessageBlocksImported<Block, BlockStatus, BlockSyncRequester, I> = UntilImported< Block, BlockStatus, BlockSyncRequester, I, BlockGlobalMessage<Block>, >; #[cfg(test)] mod tests { use super::*; use crate::{CatchUp, CompactCommit}; use substrate_test_runtime_client::runtime::{Block, Hash, Header}; use sp_consensus::BlockOrigin; use sc_client_api::BlockImportNotification; use futures::future::Either; use futures_timer::Delay; use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender}; use finality_grandpa::Precommit; #[derive(Clone)] struct TestChainState { sender: TracingUnboundedSender<BlockImportNotification<Block>>, known_blocks: Arc<Mutex<HashMap<Hash, u64>>>, } impl TestChainState { fn new() -> (Self, ImportNotifications<Block>) { let (tx, rx) = tracing_unbounded("test"); let state = TestChainState { sender: tx, known_blocks: Arc::new(Mutex::new(HashMap::new())), }; (state, rx) } fn block_status(&self) -> TestBlockStatus { TestBlockStatus { inner: self.known_blocks.clone() } } fn import_header(&self, header: Header) { let hash = header.hash(); let number = header.number().clone(); self.known_blocks.lock().insert(hash, number); self.sender.unbounded_send(BlockImportNotification { hash, origin: BlockOrigin::File, header, is_new_best: false, tree_route: None, }).unwrap(); } } struct TestBlockStatus { inner: Arc<Mutex<HashMap<Hash, u64>>>, } impl BlockStatusT<Block> for TestBlockStatus { fn block_number(&self, hash: Hash) -> Result<Option<u64>, Error> { Ok(self.inner.lock().get(&hash).map(|x| x.clone())) } } #[derive(Clone)] struct TestBlockSyncRequester { requests: Arc<Mutex<Vec<(Hash, NumberFor<Block>)>>>, } impl Default for TestBlockSyncRequester { fn default() -> Self { TestBlockSyncRequester { requests: Arc::new(Mutex::new(Vec::new())), } } } impl BlockSyncRequesterT<Block> for TestBlockSyncRequester { fn set_sync_fork_request(&self, _peers: Vec<sc_network::PeerId>, hash: Hash, number: NumberFor<Block>) { self.requests.lock().push((hash, number)); } } fn make_header(number: u64) -> Header { Header::new( number, Default::default(), Default::default(), Default::default(), Default::default(), ) } // unwrap the commit from `CommunicationIn` returning its fields in a tuple, // panics if the given message isn't a commit fn unapply_commit(msg: CommunicationIn<Block>) -> (u64, CompactCommit::<Block>) { match msg { voter::CommunicationIn::Commit(round, commit, ..) => (round, commit), _ => panic!("expected commit"), } } // unwrap the catch up from `CommunicationIn` returning its inner representation, // panics if the given message isn't a catch up fn unapply_catch_up(msg: CommunicationIn<Block>) -> CatchUp<Block> { match msg { voter::CommunicationIn::CatchUp(catch_up, ..) => catch_up, _ => panic!("expected catch up"), } } fn message_all_dependencies_satisfied<F>( msg: CommunicationIn<Block>, enact_dependencies: F, ) -> CommunicationIn<Block> where F: FnOnce(&TestChainState), { let (chain_state, import_notifications) = TestChainState::new(); let block_status = chain_state.block_status(); // enact all dependencies before importing the message enact_dependencies(&chain_state); let (global_tx, global_rx) = tracing_unbounded("test"); let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, TestBlockSyncRequester::default(), block_status, global_rx, "global", None, ); global_tx.unbounded_send(msg).unwrap(); let work = until_imported.into_future(); futures::executor::block_on(work).0.unwrap().unwrap() } fn blocking_message_on_dependencies<F>( msg: CommunicationIn<Block>, enact_dependencies: F, ) -> CommunicationIn<Block> where F: FnOnce(&TestChainState), { let (chain_state, import_notifications) = TestChainState::new(); let block_status = chain_state.block_status(); let (global_tx, global_rx) = tracing_unbounded("test"); let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, TestBlockSyncRequester::default(), block_status, global_rx, "global", None, ); global_tx.unbounded_send(msg).unwrap(); // NOTE: needs to be cloned otherwise it is moved to the stream and // dropped too early. let inner_chain_state = chain_state.clone(); let work = future::select(until_imported.into_future(), Delay::new(Duration::from_millis(100))) .then(move |res| match res { Either::Left(_) => panic!("timeout should have fired first"), Either::Right((_, until_imported)) => { // timeout fired. push in the headers. enact_dependencies(&inner_chain_state); until_imported } }); futures::executor::block_on(work).0.unwrap().unwrap() } #[test] fn blocking_commit_message() { let h1 = make_header(5); let h2 = make_header(6); let h3 = make_header(7); let unknown_commit = CompactCommit::<Block> { target_hash: h1.hash(), target_number: 5, precommits: vec![ Precommit { target_hash: h2.hash(), target_number: 6, }, Precommit { target_hash: h3.hash(), target_number: 7, }, ], auth_data: Vec::new(), // not used }; let unknown_commit = || voter::CommunicationIn::Commit( 0, unknown_commit.clone(), voter::Callback::Blank, ); let res = blocking_message_on_dependencies( unknown_commit(), |chain_state| { chain_state.import_header(h1); chain_state.import_header(h2); chain_state.import_header(h3); }, ); assert_eq!( unapply_commit(res), unapply_commit(unknown_commit()), ); } #[test] fn commit_message_all_known() { let h1 = make_header(5); let h2 = make_header(6); let h3 = make_header(7); let known_commit = CompactCommit::<Block> { target_hash: h1.hash(), target_number: 5, precommits: vec![ Precommit { target_hash: h2.hash(), target_number: 6, }, Precommit { target_hash: h3.hash(), target_number: 7, }, ], auth_data: Vec::new(), // not used }; let known_commit = || voter::CommunicationIn::Commit( 0, known_commit.clone(), voter::Callback::Blank, ); let res = message_all_dependencies_satisfied( known_commit(), |chain_state| { chain_state.import_header(h1); chain_state.import_header(h2); chain_state.import_header(h3); }, ); assert_eq!( unapply_commit(res), unapply_commit(known_commit()), ); } #[test] fn blocking_catch_up_message() { let h1 = make_header(5); let h2 = make_header(6); let h3 = make_header(7); let signed_prevote = |header: &Header| { finality_grandpa::SignedPrevote { id: Default::default(), signature: Default::default(), prevote: finality_grandpa::Prevote { target_hash: header.hash(), target_number: *header.number(), }, } }; let signed_precommit = |header: &Header| { finality_grandpa::SignedPrecommit { id: Default::default(), signature: Default::default(), precommit: finality_grandpa::Precommit { target_hash: header.hash(), target_number: *header.number(), }, } }; let prevotes = vec![ signed_prevote(&h1), signed_prevote(&h3), ]; let precommits = vec![ signed_precommit(&h1), signed_precommit(&h2), ]; let unknown_catch_up = finality_grandpa::CatchUp { round_number: 1, prevotes, precommits, base_hash: h1.hash(), base_number: *h1.number(), }; let unknown_catch_up = || voter::CommunicationIn::CatchUp( unknown_catch_up.clone(), voter::Callback::Blank, ); let res = blocking_message_on_dependencies( unknown_catch_up(), |chain_state| { chain_state.import_header(h1); chain_state.import_header(h2); chain_state.import_header(h3); }, ); assert_eq!( unapply_catch_up(res), unapply_catch_up(unknown_catch_up()), ); } #[test] fn catch_up_message_all_known() { let h1 = make_header(5); let h2 = make_header(6); let h3 = make_header(7); let signed_prevote = |header: &Header| { finality_grandpa::SignedPrevote { id: Default::default(), signature: Default::default(), prevote: finality_grandpa::Prevote { target_hash: header.hash(), target_number: *header.number(), }, } }; let signed_precommit = |header: &Header| { finality_grandpa::SignedPrecommit { id: Default::default(), signature: Default::default(), precommit: finality_grandpa::Precommit { target_hash: header.hash(), target_number: *header.number(), }, } }; let prevotes = vec![ signed_prevote(&h1), signed_prevote(&h3), ]; let precommits = vec![ signed_precommit(&h1), signed_precommit(&h2), ]; let unknown_catch_up = finality_grandpa::CatchUp { round_number: 1, prevotes, precommits, base_hash: h1.hash(), base_number: *h1.number(), }; let unknown_catch_up = || voter::CommunicationIn::CatchUp( unknown_catch_up.clone(), voter::Callback::Blank, ); let res = message_all_dependencies_satisfied( unknown_catch_up(), |chain_state| { chain_state.import_header(h1); chain_state.import_header(h2); chain_state.import_header(h3); }, ); assert_eq!( unapply_catch_up(res), unapply_catch_up(unknown_catch_up()), ); } #[test] fn request_block_sync_for_needed_blocks() { let (chain_state, import_notifications) = TestChainState::new(); let block_status = chain_state.block_status(); let (global_tx, global_rx) = tracing_unbounded("test"); let block_sync_requester = TestBlockSyncRequester::default(); let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, block_sync_requester.clone(), block_status, global_rx, "global", None, ); let h1 = make_header(5); let h2 = make_header(6); let h3 = make_header(7); // we create a commit message, with precommits for blocks 6 and 7 which // we haven't imported. let unknown_commit = CompactCommit::<Block> { target_hash: h1.hash(), target_number: 5, precommits: vec![ Precommit { target_hash: h2.hash(), target_number: 6, }, Precommit { target_hash: h3.hash(), target_number: 7, }, ], auth_data: Vec::new(), // not used }; let unknown_commit = || voter::CommunicationIn::Commit( 0, unknown_commit.clone(), voter::Callback::Blank, ); // we send the commit message and spawn the until_imported stream global_tx.unbounded_send(unknown_commit()).unwrap(); let threads_pool = futures::executor::ThreadPool::new().unwrap(); threads_pool.spawn_ok(until_imported.into_future().map(|_| ())); // assert that we will make sync requests let assert = futures::future::poll_fn(|_| { let block_sync_requests = block_sync_requester.requests.lock(); // we request blocks targeted by the precommits that aren't imported if block_sync_requests.contains(&(h2.hash(), *h2.number())) && block_sync_requests.contains(&(h3.hash(), *h3.number())) { return Poll::Ready(()); } Poll::Pending }); // the `until_imported` stream doesn't request the blocks immediately, // but it should request them after a small timeout let timeout = Delay::new(Duration::from_secs(60)); let test = future::select(assert, timeout).map(|res| match res { Either::Left(_) => {}, Either::Right(_) => panic!("timed out waiting for block sync request"), }).map(drop); futures::executor::block_on(test); } fn test_catch_up() -> Arc<Mutex<Option<CommunicationIn<Block>>>> { let header = make_header(5); let unknown_catch_up = finality_grandpa::CatchUp { round_number: 1, precommits: vec![], prevotes: vec![], base_hash: header.hash(), base_number: *header.number(), }; let catch_up = voter::CommunicationIn::CatchUp( unknown_catch_up.clone(), voter::Callback::Blank, ); Arc::new(Mutex::new(Some(catch_up))) } #[test] fn block_global_message_wait_completed_return_when_all_awaited() { let msg_inner = test_catch_up(); let waiting_block_1 = BlockGlobalMessage::<Block> { inner: msg_inner.clone(), target_number: 1, }; let waiting_block_2 = BlockGlobalMessage::<Block> { inner: msg_inner, target_number: 2, }; // waiting_block_2 is still waiting for block 2, thus this should return `None`. assert!(waiting_block_1.wait_completed(1).is_none()); // Message only depended on block 1 and 2. Both have been imported, thus this should yield // the message. assert!(waiting_block_2.wait_completed(2).is_some()); } #[test] fn block_global_message_wait_completed_return_none_on_block_number_missmatch() { let msg_inner = test_catch_up(); let waiting_block_1 = BlockGlobalMessage::<Block> { inner: msg_inner.clone(), target_number: 1, }; let waiting_block_2 = BlockGlobalMessage::<Block> { inner: msg_inner, target_number: 2, }; // Calling wait_completed with wrong block number should yield None. assert!(waiting_block_1.wait_completed(1234).is_none()); // All blocks, that the message depended on, have been imported. Still, given the above // block number mismatch this should return None. assert!(waiting_block_2.wait_completed(2).is_none()); } #[test] fn metrics_cleans_up_after_itself() { let r = Registry::new(); let mut m1 = Metrics::register(&r).unwrap(); let m2 = m1.clone(); // Add a new message to the 'queue' of m1. m1.waiting_messages_inc(); // m1 and m2 are synced through the shared atomic. assert_eq!(1, m2.global_waiting_messages.get()); // Drop 'queue' m1. drop(m1); // Make sure m1 cleaned up after itself, removing all messages that were left in its queue // when dropped from the global metric. assert_eq!(0, m2.global_waiting_messages.get()); } }