diff --git a/substrate/client/finality-grandpa/src/until_imported.rs b/substrate/client/finality-grandpa/src/until_imported.rs index c3804e1272ffa62ea5f3154aad6ccb520e980a90..223078ec92c27a7ab4884cd16ca1b333a6c9a857 100644 --- a/substrate/client/finality-grandpa/src/until_imported.rs +++ b/substrate/client/finality-grandpa/src/until_imported.rs @@ -47,30 +47,36 @@ use sp_finality_grandpa::AuthorityId; const LOG_PENDING_INTERVAL: Duration = Duration::from_secs(15); -// something which will block until imported. +/// Something that needs to be withheld until specific blocks are available. +/// +/// For example a GRANDPA commit message which is not of any use without the corresponding block +/// that it commits on. pub(crate) trait BlockUntilImported<Block: BlockT>: Sized { - // the type that is blocked on. + /// The type that is blocked on. type Blocked; - /// new incoming item. For all internal items, - /// check if they require to be waited for. - /// if so, call the `Wait` closure. - /// if they are ready, call the `Ready` closure. - fn schedule_wait<S, Wait, Ready>( + /// Check if a new incoming item needs awaiting until a block(s) is imported. + fn needs_waiting<S: BlockStatusT<Block>>( input: Self::Blocked, status_check: &S, - wait: Wait, - ready: Ready, - ) -> Result<(), Error> where - S: BlockStatusT<Block>, - Wait: FnMut(Block::Hash, NumberFor<Block>, Self), - Ready: FnMut(Self::Blocked); + ) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error>; /// called when the wait has completed. The canonical number is passed through /// for further checks. fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked>; } +/// Describes whether a given [`BlockUntilImported`] (a) should be discarded, (b) is waiting for +/// specific blocks to be imported or (c) is ready to be used. +/// +/// A reason for discarding a [`BlockUntilImported`] would be if a referenced block is perceived +/// under a different number than specified in the message. +pub(crate) enum DiscardWaitOrReady<Block: BlockT, W, R> { + Discard, + Wait(Vec<(Block::Hash, NumberFor<Block>, W)>), + Ready(R), +} + /// Buffering imported messages until blocks with given hashes are imported. #[pin_project::pin_project] pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester, I, M: BlockUntilImported<Block>> { @@ -149,18 +155,19 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat Poll::Ready(Some(input)) => { // new input: schedule wait of any parts which require // blocks to be known. - let ready = &mut this.ready; - let pending = &mut this.pending; - M::schedule_wait( - input, - this.status_check, - |target_hash, target_number, wait| pending - .entry(target_hash) - .or_insert_with(|| (target_number, Instant::now(), Vec::new())) - .2 - .push(wait), - |ready_item| ready.push_back(ready_item), - )?; + match M::needs_waiting(input, this.status_check)? { + DiscardWaitOrReady::Discard => {}, + DiscardWaitOrReady::Wait(items) => { + for (target_hash, target_number, wait) in items { + this.pending + .entry(target_hash) + .or_insert_with(|| (target_number, Instant::now(), Vec::new())) + .2 + .push(wait) + } + }, + DiscardWaitOrReady::Ready(item) => this.ready.push_back(item), + } } Poll::Pending => break, } @@ -255,29 +262,22 @@ fn warn_authority_wrong_target<H: ::std::fmt::Display>(hash: H, id: AuthorityId) impl<Block: BlockT> BlockUntilImported<Block> for SignedMessage<Block> { type Blocked = Self; - fn schedule_wait<BlockStatus, Wait, Ready>( + fn needs_waiting<BlockStatus: BlockStatusT<Block>>( msg: Self::Blocked, status_check: &BlockStatus, - mut wait: Wait, - mut ready: Ready, - ) -> Result<(), Error> where - BlockStatus: BlockStatusT<Block>, - Wait: FnMut(Block::Hash, NumberFor<Block>, Self), - Ready: FnMut(Self::Blocked), - { + ) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error> { let (&target_hash, target_number) = msg.target(); if let Some(number) = status_check.block_number(target_hash)? { if number != target_number { warn_authority_wrong_target(target_hash, msg.id); + return Ok(DiscardWaitOrReady::Discard); } else { - ready(msg); + return Ok(DiscardWaitOrReady::Ready(msg)); } - } else { - wait(target_hash, target_number, msg) } - Ok(()) + return Ok(DiscardWaitOrReady::Wait(vec![(target_hash, target_number, msg)])) } fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> { @@ -321,16 +321,10 @@ impl<Block: BlockT> Unpin for BlockGlobalMessage<Block> {} impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> { type Blocked = CommunicationIn<Block>; - fn schedule_wait<BlockStatus, Wait, Ready>( + fn needs_waiting<BlockStatus: BlockStatusT<Block>>( input: Self::Blocked, status_check: &BlockStatus, - mut wait: Wait, - mut ready: Ready, - ) -> Result<(), Error> where - BlockStatus: BlockStatusT<Block>, - Wait: FnMut(Block::Hash, NumberFor<Block>, Self), - Ready: FnMut(Self::Blocked), - { + ) -> Result<DiscardWaitOrReady<Block, Self, Self::Blocked>, Error> { use std::collections::hash_map::Entry; enum KnownOrUnknown<N> { @@ -348,7 +342,6 @@ impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> { } let mut checked_hashes: HashMap<_, KnownOrUnknown<NumberFor<Block>>> = HashMap::new(); - let mut unknown_count = 0; { // returns false when should early exit. @@ -363,7 +356,6 @@ impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> { } else { entry.insert(KnownOrUnknown::Unknown(perceived_number)); - unknown_count += 1; perceived_number } } @@ -388,7 +380,7 @@ impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> { for (target_number, target_hash) in precommit_targets { if !query_known(target_hash, target_number)? { - return Ok(()) + return Ok(DiscardWaitOrReady::Discard); } } }, @@ -406,38 +398,34 @@ impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> { for (target_number, target_hash) in targets { if !query_known(target_hash, target_number)? { - return Ok(()) + return Ok(DiscardWaitOrReady::Discard); } } }, }; } - // none of the hashes in the global message were unknown. - // we can just return the message directly. - if unknown_count == 0 { - ready(input); - return Ok(()) + let unknown_hashes = checked_hashes.into_iter().filter_map(|(hash, num)| match num { + KnownOrUnknown::Unknown(number) => Some((hash, number)), + KnownOrUnknown::Known(_) => None, + }).collect::<Vec<_>>(); + + if unknown_hashes.is_empty() { + // none of the hashes in the global message were unknown. + // we can just return the message directly. + return Ok(DiscardWaitOrReady::Ready(input)); } let locked_global = Arc::new(Mutex::new(Some(input))); + let items_to_await = unknown_hashes.into_iter().map(|(hash, target_number)| { + (hash, target_number, BlockGlobalMessage { inner: locked_global.clone(), target_number }) + }).collect(); + // schedule waits for all unknown messages. // when the last one of these has `wait_completed` called on it, // the global message will be returned. - // - // in the future, we may want to issue sync requests to the network - // if this is taking a long time. - for (hash, is_known) in checked_hashes { - if let KnownOrUnknown::Unknown(target_number) = is_known { - wait(hash, target_number, BlockGlobalMessage { - inner: locked_global.clone(), - target_number, - }) - } - } - - Ok(()) + Ok(DiscardWaitOrReady::Wait(items_to_await)) } fn wait_completed(self, canon_number: NumberFor<Block>) -> Option<Self::Blocked> {