// Copyright (C) Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! The inclusion pallet is responsible for inclusion and availability of scheduled parachains. //! //! It is responsible for carrying candidates from being backable to being backed, and then from //! backed to included. use crate::{ configuration::{self, HostConfiguration}, disputes, dmp, hrmp, paras, scheduler::{self, AvailabilityTimeoutStatus}, shared::{self, AllowedRelayParentsTracker}, }; use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec}; use frame_support::{ defensive, pallet_prelude::*, traits::{Defensive, EnqueueMessage}, BoundedSlice, }; use frame_system::pallet_prelude::*; use pallet_message_queue::OnQueueChanged; use parity_scale_codec::{Decode, Encode}; use primitives::{ effective_minimum_backing_votes, supermajority_threshold, well_known_keys, AvailabilityBitfield, BackedCandidate, CandidateCommitments, CandidateDescriptor, CandidateHash, CandidateReceipt, CommittedCandidateReceipt, CoreIndex, GroupIndex, Hash, HeadData, Id as ParaId, SignedAvailabilityBitfields, SigningContext, UpwardMessage, ValidatorId, ValidatorIndex, ValidityAttestation, }; use scale_info::TypeInfo; use sp_runtime::{traits::One, DispatchError, SaturatedConversion, Saturating}; #[cfg(feature = "std")] use sp_std::fmt; use sp_std::{ collections::{btree_map::BTreeMap, btree_set::BTreeSet}, prelude::*, }; pub use pallet::*; #[cfg(test)] pub(crate) mod tests; #[cfg(feature = "runtime-benchmarks")] mod benchmarking; pub trait WeightInfo { fn receive_upward_messages(i: u32) -> Weight; } pub struct TestWeightInfo; impl WeightInfo for TestWeightInfo { fn receive_upward_messages(_: u32) -> Weight { Weight::MAX } } impl WeightInfo for () { fn receive_upward_messages(_: u32) -> Weight { Weight::zero() } } /// Maximum value that `config.max_upward_message_size` can be set to. /// /// This is used for benchmarking sanely bounding relevant storage items. It is expected from the /// `configuration` pallet to check these values before setting. pub const MAX_UPWARD_MESSAGE_SIZE_BOUND: u32 = 128 * 1024; /// A bitfield signed by a validator indicating that it is keeping its piece of the erasure-coding /// for any backed candidates referred to by a `1` bit available. /// /// The bitfield's signature should be checked at the point of submission. Afterwards it can be /// dropped. #[derive(Encode, Decode, TypeInfo)] #[cfg_attr(test, derive(Debug))] pub struct AvailabilityBitfieldRecord { bitfield: AvailabilityBitfield, // one bit per core. submitted_at: N, // for accounting, as meaning of bits may change over time. } /// A backed candidate pending availability. #[derive(Encode, Decode, PartialEq, TypeInfo)] #[cfg_attr(test, derive(Debug))] pub struct CandidatePendingAvailability { /// The availability core this is assigned to. core: CoreIndex, /// The candidate hash. hash: CandidateHash, /// The candidate descriptor. descriptor: CandidateDescriptor, /// The received availability votes. One bit per validator. availability_votes: BitVec, /// The backers of the candidate pending availability. backers: BitVec, /// The block number of the relay-parent of the receipt. relay_parent_number: N, /// The block number of the relay-chain block this was backed in. backed_in_number: N, /// The group index backing this block. backing_group: GroupIndex, } impl CandidatePendingAvailability { /// Get the availability votes on the candidate. pub(crate) fn availability_votes(&self) -> &BitVec { &self.availability_votes } /// Get the relay-chain block number this was backed in. pub(crate) fn backed_in_number(&self) -> &N { &self.backed_in_number } /// Get the core index. pub(crate) fn core_occupied(&self) -> CoreIndex { self.core } /// Get the candidate hash. pub(crate) fn candidate_hash(&self) -> CandidateHash { self.hash } /// Get the candidate descriptor. pub(crate) fn candidate_descriptor(&self) -> &CandidateDescriptor { &self.descriptor } /// Get the candidate's relay parent's number. pub(crate) fn relay_parent_number(&self) -> N where N: Clone, { self.relay_parent_number.clone() } #[cfg(any(feature = "runtime-benchmarks", test))] pub(crate) fn new( core: CoreIndex, hash: CandidateHash, descriptor: CandidateDescriptor, availability_votes: BitVec, backers: BitVec, relay_parent_number: N, backed_in_number: N, backing_group: GroupIndex, ) -> Self { Self { core, hash, descriptor, availability_votes, backers, relay_parent_number, backed_in_number, backing_group, } } } /// A hook for applying validator rewards pub trait RewardValidators { // Reward the validators with the given indices for issuing backing statements. fn reward_backing(validators: impl IntoIterator); // Reward the validators with the given indices for issuing availability bitfields. // Validators are sent to this hook when they have contributed to the availability // of a candidate by setting a bit in their bitfield. fn reward_bitfields(validators: impl IntoIterator); } /// Helper return type for `process_candidates`. #[derive(Encode, Decode, PartialEq, TypeInfo)] #[cfg_attr(test, derive(Debug))] pub(crate) struct ProcessedCandidates { pub(crate) core_indices: Vec<(CoreIndex, ParaId)>, pub(crate) candidate_receipt_with_backing_validator_indices: Vec<(CandidateReceipt, Vec<(ValidatorIndex, ValidityAttestation)>)>, } impl Default for ProcessedCandidates { fn default() -> Self { Self { core_indices: Vec::new(), candidate_receipt_with_backing_validator_indices: Vec::new(), } } } /// Reads the footprint of queues for a specific origin type. pub trait QueueFootprinter { type Origin; fn message_count(origin: Self::Origin) -> u64; } impl QueueFootprinter for () { type Origin = UmpQueueId; fn message_count(_: Self::Origin) -> u64 { 0 } } /// Aggregate message origin for the `MessageQueue` pallet. /// /// Can be extended to serve further use-cases besides just UMP. Is stored in storage, so any change /// to existing values will require a migration. #[derive(Encode, Decode, Clone, MaxEncodedLen, Eq, PartialEq, RuntimeDebug, TypeInfo)] pub enum AggregateMessageOrigin { /// Inbound upward message. #[codec(index = 0)] Ump(UmpQueueId), } /// Identifies a UMP queue inside the `MessageQueue` pallet. /// /// It is written in verbose form since future variants like `Loopback` and `Bridged` are already /// forseeable. #[derive(Encode, Decode, Clone, MaxEncodedLen, Eq, PartialEq, RuntimeDebug, TypeInfo)] pub enum UmpQueueId { /// The message originated from this parachain. #[codec(index = 0)] Para(ParaId), } #[cfg(feature = "runtime-benchmarks")] impl From for AggregateMessageOrigin { fn from(n: u32) -> Self { // Some dummy for the benchmarks. Self::Ump(UmpQueueId::Para(n.into())) } } /// The maximal length of a UMP message. pub type MaxUmpMessageLenOf = <::MessageQueue as EnqueueMessage>::MaxMessageLen; #[frame_support::pallet] pub mod pallet { use super::*; #[pallet::pallet] #[pallet::without_storage_info] pub struct Pallet(_); #[pallet::config] pub trait Config: frame_system::Config + shared::Config + paras::Config + dmp::Config + hrmp::Config + configuration::Config + scheduler::Config { type RuntimeEvent: From> + IsType<::RuntimeEvent>; type DisputesHandler: disputes::DisputesHandler>; type RewardValidators: RewardValidators; /// The system message queue. /// /// The message queue provides general queueing and processing functionality. Currently it /// replaces the old `UMP` dispatch queue. Other use-cases can be implemented as well by /// adding new variants to `AggregateMessageOrigin`. type MessageQueue: EnqueueMessage; /// Weight info for the calls of this pallet. type WeightInfo: WeightInfo; } #[pallet::event] #[pallet::generate_deposit(pub(super) fn deposit_event)] pub enum Event { /// A candidate was backed. `[candidate, head_data]` CandidateBacked(CandidateReceipt, HeadData, CoreIndex, GroupIndex), /// A candidate was included. `[candidate, head_data]` CandidateIncluded(CandidateReceipt, HeadData, CoreIndex, GroupIndex), /// A candidate timed out. `[candidate, head_data]` CandidateTimedOut(CandidateReceipt, HeadData, CoreIndex), /// Some upward messages have been received and will be processed. UpwardMessagesReceived { from: ParaId, count: u32 }, } #[pallet::error] pub enum Error { /// Validator indices are out of order or contains duplicates. UnsortedOrDuplicateValidatorIndices, /// Dispute statement sets are out of order or contain duplicates. UnsortedOrDuplicateDisputeStatementSet, /// Backed candidates are out of order (core index) or contain duplicates. UnsortedOrDuplicateBackedCandidates, /// A different relay parent was provided compared to the on-chain stored one. UnexpectedRelayParent, /// Availability bitfield has unexpected size. WrongBitfieldSize, /// Bitfield consists of zeros only. BitfieldAllZeros, /// Multiple bitfields submitted by same validator or validators out of order by index. BitfieldDuplicateOrUnordered, /// Validator index out of bounds. ValidatorIndexOutOfBounds, /// Invalid signature InvalidBitfieldSignature, /// Candidate submitted but para not scheduled. UnscheduledCandidate, /// Candidate scheduled despite pending candidate already existing for the para. CandidateScheduledBeforeParaFree, /// Scheduled cores out of order. ScheduledOutOfOrder, /// Head data exceeds the configured maximum. HeadDataTooLarge, /// Code upgrade prematurely. PrematureCodeUpgrade, /// Output code is too large NewCodeTooLarge, /// The candidate's relay-parent was not allowed. Either it was /// not recent enough or it didn't advance based on the last parachain block. DisallowedRelayParent, /// Failed to compute group index for the core: either it's out of bounds /// or the relay parent doesn't belong to the current session. InvalidAssignment, /// Invalid group index in core assignment. InvalidGroupIndex, /// Insufficient (non-majority) backing. InsufficientBacking, /// Invalid (bad signature, unknown validator, etc.) backing. InvalidBacking, /// Collator did not sign PoV. NotCollatorSigned, /// The validation data hash does not match expected. ValidationDataHashMismatch, /// The downward message queue is not processed correctly. IncorrectDownwardMessageHandling, /// At least one upward message sent does not pass the acceptance criteria. InvalidUpwardMessages, /// The candidate didn't follow the rules of HRMP watermark advancement. HrmpWatermarkMishandling, /// The HRMP messages sent by the candidate is not valid. InvalidOutboundHrmp, /// The validation code hash of the candidate is not valid. InvalidValidationCodeHash, /// The `para_head` hash in the candidate descriptor doesn't match the hash of the actual /// para head in the commitments. ParaHeadMismatch, /// A bitfield that references a freed core, /// either intentionally or as part of a concluded /// invalid dispute. BitfieldReferencesFreedCore, } /// The latest bitfield for each validator, referred to by their index in the validator set. #[pallet::storage] pub(crate) type AvailabilityBitfields = StorageMap<_, Twox64Concat, ValidatorIndex, AvailabilityBitfieldRecord>>; /// Candidates pending availability by `ParaId`. #[pallet::storage] pub(crate) type PendingAvailability = StorageMap< _, Twox64Concat, ParaId, CandidatePendingAvailability>, >; /// The commitments of candidates pending availability, by `ParaId`. #[pallet::storage] pub(crate) type PendingAvailabilityCommitments = StorageMap<_, Twox64Concat, ParaId, CandidateCommitments>; #[pallet::call] impl Pallet {} } const LOG_TARGET: &str = "runtime::inclusion"; /// The reason that a candidate's outputs were rejected for. #[derive(derive_more::From)] #[cfg_attr(feature = "std", derive(Debug))] enum AcceptanceCheckErr { HeadDataTooLarge, /// Code upgrades are not permitted at the current time. PrematureCodeUpgrade, /// The new runtime blob is too large. NewCodeTooLarge, /// The candidate violated this DMP acceptance criteria. ProcessedDownwardMessages(dmp::ProcessedDownwardMessagesAcceptanceErr), /// The candidate violated this UMP acceptance criteria. UpwardMessages(UmpAcceptanceCheckErr), /// The candidate violated this HRMP watermark acceptance criteria. HrmpWatermark(hrmp::HrmpWatermarkAcceptanceErr), /// The candidate violated this outbound HRMP acceptance criteria. OutboundHrmp(hrmp::OutboundHrmpAcceptanceErr), } /// An error returned by [`Pallet::check_upward_messages`] that indicates a violation of one of /// acceptance criteria rules. #[cfg_attr(test, derive(PartialEq))] pub(crate) enum UmpAcceptanceCheckErr { /// The maximal number of messages that can be submitted in one batch was exceeded. MoreMessagesThanPermitted { sent: u32, permitted: u32 }, /// The maximal size of a single message was exceeded. MessageSize { idx: u32, msg_size: u32, max_size: u32 }, /// The allowed number of messages in the queue was exceeded. CapacityExceeded { count: u64, limit: u64 }, /// The allowed combined message size in the queue was exceeded. TotalSizeExceeded { total_size: u64, limit: u64 }, /// A para-chain cannot send UMP messages while it is offboarding. IsOffboarding, } #[cfg(feature = "std")] impl fmt::Debug for UmpAcceptanceCheckErr { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { match *self { UmpAcceptanceCheckErr::MoreMessagesThanPermitted { sent, permitted } => write!( fmt, "more upward messages than permitted by config ({} > {})", sent, permitted, ), UmpAcceptanceCheckErr::MessageSize { idx, msg_size, max_size } => write!( fmt, "upward message idx {} larger than permitted by config ({} > {})", idx, msg_size, max_size, ), UmpAcceptanceCheckErr::CapacityExceeded { count, limit } => write!( fmt, "the ump queue would have more items than permitted by config ({} > {})", count, limit, ), UmpAcceptanceCheckErr::TotalSizeExceeded { total_size, limit } => write!( fmt, "the ump queue would have grown past the max size permitted by config ({} > {})", total_size, limit, ), UmpAcceptanceCheckErr::IsOffboarding => write!(fmt, "upward message rejected because the para is off-boarding",), } } } impl Pallet { /// Block initialization logic, called by initializer. pub(crate) fn initializer_initialize(_now: BlockNumberFor) -> Weight { Weight::zero() } /// Block finalization logic, called by initializer. pub(crate) fn initializer_finalize() {} /// Handle an incoming session change. pub(crate) fn initializer_on_new_session( _notification: &crate::initializer::SessionChangeNotification>, outgoing_paras: &[ParaId], ) { // unlike most drain methods, drained elements are not cleared on `Drop` of the iterator // and require consumption. for _ in >::drain() {} for _ in >::drain() {} for _ in >::drain() {} Self::cleanup_outgoing_ump_dispatch_queues(outgoing_paras); } pub(crate) fn cleanup_outgoing_ump_dispatch_queues(outgoing: &[ParaId]) { for outgoing_para in outgoing { Self::cleanup_outgoing_ump_dispatch_queue(*outgoing_para); } } pub(crate) fn cleanup_outgoing_ump_dispatch_queue(para: ParaId) { T::MessageQueue::sweep_queue(AggregateMessageOrigin::Ump(UmpQueueId::Para(para))); } /// Extract the freed cores based on cores that became available. /// /// Bitfields are expected to have been sanitized already. E.g. via `sanitize_bitfields`! /// /// Updates storage items `PendingAvailability` and `AvailabilityBitfields`. /// /// Returns a `Vec` of `CandidateHash`es and their respective `AvailabilityCore`s that became /// available, and cores free. pub(crate) fn update_pending_availability_and_get_freed_cores( expected_bits: usize, validators: &[ValidatorId], signed_bitfields: SignedAvailabilityBitfields, core_lookup: F, ) -> Vec<(CoreIndex, CandidateHash)> where F: Fn(CoreIndex) -> Option, { let mut assigned_paras_record = (0..expected_bits) .map(|bit_index| core_lookup(CoreIndex::from(bit_index as u32))) .map(|opt_para_id| { opt_para_id.map(|para_id| (para_id, PendingAvailability::::get(¶_id))) }) .collect::>(); let now = >::block_number(); for (checked_bitfield, validator_index) in signed_bitfields.into_iter().map(|signed_bitfield| { let validator_idx = signed_bitfield.validator_index(); let checked_bitfield = signed_bitfield.into_payload(); (checked_bitfield, validator_idx) }) { for (bit_idx, _) in checked_bitfield.0.iter().enumerate().filter(|(_, is_av)| **is_av) { let pending_availability = if let Some((_, pending_availability)) = assigned_paras_record[bit_idx].as_mut() { pending_availability } else { // For honest validators, this happens in case of unoccupied cores, // which in turn happens in case of a disputed candidate. // A malicious one might include arbitrary indices, but they are represented // by `None` values and will be sorted out in the next if case. continue }; // defensive check - this is constructed by loading the availability bitfield // record, which is always `Some` if the core is occupied - that's why we're here. let validator_index = validator_index.0 as usize; if let Some(mut bit) = pending_availability.as_mut().and_then(|candidate_pending_availability| { candidate_pending_availability.availability_votes.get_mut(validator_index) }) { *bit = true; } } let record = AvailabilityBitfieldRecord { bitfield: checked_bitfield, submitted_at: now }; >::insert(&validator_index, record); } let threshold = availability_threshold(validators.len()); let mut freed_cores = Vec::with_capacity(expected_bits); for (para_id, pending_availability) in assigned_paras_record .into_iter() .flatten() .filter_map(|(id, p)| p.map(|p| (id, p))) { if pending_availability.availability_votes.count_ones() >= threshold { >::remove(¶_id); let commitments = match PendingAvailabilityCommitments::::take(¶_id) { Some(commitments) => commitments, None => { log::warn!( target: LOG_TARGET, "Inclusion::process_bitfields: PendingAvailability and PendingAvailabilityCommitments are out of sync, did someone mess with the storage?", ); continue }, }; let receipt = CommittedCandidateReceipt { descriptor: pending_availability.descriptor, commitments, }; let _weight = Self::enact_candidate( pending_availability.relay_parent_number, receipt, pending_availability.backers, pending_availability.availability_votes, pending_availability.core, pending_availability.backing_group, ); freed_cores.push((pending_availability.core, pending_availability.hash)); } else { >::insert(¶_id, &pending_availability); } } freed_cores } /// Process candidates that have been backed. Provide the relay storage root, a set of /// candidates and scheduled cores. /// /// Both should be sorted ascending by core index, and the candidates should be a subset of /// scheduled cores. If these conditions are not met, the execution of the function fails. pub(crate) fn process_candidates( allowed_relay_parents: &AllowedRelayParentsTracker>, candidates: Vec>, scheduled: &BTreeMap, group_validators: GV, ) -> Result, DispatchError> where GV: Fn(GroupIndex) -> Option>, { let now = >::block_number(); ensure!(candidates.len() <= scheduled.len(), Error::::UnscheduledCandidate); if scheduled.is_empty() { return Ok(ProcessedCandidates::default()) } let minimum_backing_votes = configuration::Pallet::::config().minimum_backing_votes; let validators = shared::Pallet::::active_validator_keys(); // Collect candidate receipts with backers. let mut candidate_receipt_with_backing_validator_indices = Vec::with_capacity(candidates.len()); // Do all checks before writing storage. let core_indices_and_backers = { let mut core_indices_and_backers = Vec::with_capacity(candidates.len()); let mut last_core = None; let mut check_assignment_in_order = |core_idx| -> DispatchResult { ensure!( last_core.map_or(true, |core| core_idx > core), Error::::ScheduledOutOfOrder, ); last_core = Some(core_idx); Ok(()) }; // We combine an outer loop over candidates with an inner loop over the scheduled, // where each iteration of the outer loop picks up at the position // in scheduled just after the past iteration left off. // // If the candidates appear in the same order as they appear in `scheduled`, // then they should always be found. If the end of `scheduled` is reached, // then the candidate was either not scheduled or out-of-order. // // In the meantime, we do certain sanity checks on the candidates and on the scheduled // list. for (candidate_idx, backed_candidate) in candidates.iter().enumerate() { let relay_parent_hash = backed_candidate.descriptor().relay_parent; let para_id = backed_candidate.descriptor().para_id; let prev_context = >::para_most_recent_context(para_id); let check_ctx = CandidateCheckContext::::new(prev_context); let signing_context = SigningContext { parent_hash: relay_parent_hash, session_index: shared::Pallet::::session_index(), }; let relay_parent_number = match check_ctx.verify_backed_candidate( &allowed_relay_parents, candidate_idx, backed_candidate, )? { Err(FailedToCreatePVD) => { log::debug!( target: LOG_TARGET, "Failed to create PVD for candidate {}", candidate_idx, ); // We don't want to error out here because it will // brick the relay-chain. So we return early without // doing anything. return Ok(ProcessedCandidates::default()) }, Ok(rpn) => rpn, }; let para_id = backed_candidate.descriptor().para_id; let mut backers = bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()]; let core_idx = *scheduled.get(¶_id).ok_or(Error::::UnscheduledCandidate)?; check_assignment_in_order(core_idx)?; ensure!( >::get(¶_id).is_none() && >::get(¶_id).is_none(), Error::::CandidateScheduledBeforeParaFree, ); // The candidate based upon relay parent `N` should be backed by a group // assigned to core at block `N + 1`. Thus, `relay_parent_number + 1` // will always land in the current session. let group_idx = >::group_assigned_to_core( core_idx, relay_parent_number + One::one(), ) .ok_or_else(|| { log::warn!( target: LOG_TARGET, "Failed to compute group index for candidate {}", candidate_idx ); Error::::InvalidAssignment })?; let group_vals = group_validators(group_idx).ok_or_else(|| Error::::InvalidGroupIndex)?; // check the signatures in the backing and that it is a majority. { let maybe_amount_validated = primitives::check_candidate_backing( &backed_candidate, &signing_context, group_vals.len(), |intra_group_vi| { group_vals .get(intra_group_vi) .and_then(|vi| validators.get(vi.0 as usize)) .map(|v| v.clone()) }, ); match maybe_amount_validated { Ok(amount_validated) => ensure!( amount_validated >= effective_minimum_backing_votes( group_vals.len(), minimum_backing_votes ), Error::::InsufficientBacking, ), Err(()) => { Err(Error::::InvalidBacking)?; }, } let mut backer_idx_and_attestation = Vec::<(ValidatorIndex, ValidityAttestation)>::with_capacity( backed_candidate.validator_indices.count_ones(), ); let candidate_receipt = backed_candidate.receipt(); for ((bit_idx, _), attestation) in backed_candidate .validator_indices .iter() .enumerate() .filter(|(_, signed)| **signed) .zip(backed_candidate.validity_votes.iter().cloned()) { let val_idx = group_vals.get(bit_idx).expect("this query succeeded above; qed"); backer_idx_and_attestation.push((*val_idx, attestation)); backers.set(val_idx.0 as _, true); } candidate_receipt_with_backing_validator_indices .push((candidate_receipt, backer_idx_and_attestation)); } core_indices_and_backers.push(( (core_idx, para_id), backers, group_idx, relay_parent_number, )); } core_indices_and_backers }; // one more sweep for actually writing to storage. let core_indices = core_indices_and_backers.iter().map(|(c, ..)| *c).collect(); for (candidate, (core, backers, group, relay_parent_number)) in candidates.into_iter().zip(core_indices_and_backers) { let para_id = candidate.descriptor().para_id; // initialize all availability votes to 0. let availability_votes: BitVec = bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()]; Self::deposit_event(Event::::CandidateBacked( candidate.candidate.to_plain(), candidate.candidate.commitments.head_data.clone(), core.0, group, )); let candidate_hash = candidate.candidate.hash(); let (descriptor, commitments) = (candidate.candidate.descriptor, candidate.candidate.commitments); >::insert( ¶_id, CandidatePendingAvailability { core: core.0, hash: candidate_hash, descriptor, availability_votes, relay_parent_number, backers: backers.to_bitvec(), backed_in_number: now, backing_group: group, }, ); >::insert(¶_id, commitments); } Ok(ProcessedCandidates:: { core_indices, candidate_receipt_with_backing_validator_indices, }) } /// Run the acceptance criteria checks on the given candidate commitments. pub(crate) fn check_validation_outputs_for_runtime_api( para_id: ParaId, relay_parent_number: BlockNumberFor, validation_outputs: primitives::CandidateCommitments, ) -> bool { let prev_context = >::para_most_recent_context(para_id); let check_ctx = CandidateCheckContext::::new(prev_context); if check_ctx .check_validation_outputs( para_id, relay_parent_number, &validation_outputs.head_data, &validation_outputs.new_validation_code, validation_outputs.processed_downward_messages, &validation_outputs.upward_messages, BlockNumberFor::::from(validation_outputs.hrmp_watermark), &validation_outputs.horizontal_messages, ) .is_err() { log::debug!( target: LOG_TARGET, "Validation outputs checking for parachain `{}` failed", u32::from(para_id), ); false } else { true } } fn enact_candidate( relay_parent_number: BlockNumberFor, receipt: CommittedCandidateReceipt, backers: BitVec, availability_votes: BitVec, core_index: CoreIndex, backing_group: GroupIndex, ) -> Weight { let plain = receipt.to_plain(); let commitments = receipt.commitments; let config = >::config(); T::RewardValidators::reward_backing( backers .iter() .enumerate() .filter(|(_, backed)| **backed) .map(|(i, _)| ValidatorIndex(i as _)), ); T::RewardValidators::reward_bitfields( availability_votes .iter() .enumerate() .filter(|(_, voted)| **voted) .map(|(i, _)| ValidatorIndex(i as _)), ); // initial weight is config read. let mut weight = T::DbWeight::get().reads_writes(1, 0); if let Some(new_code) = commitments.new_validation_code { // Block number of candidate's inclusion. let now = >::block_number(); weight.saturating_add(>::schedule_code_upgrade( receipt.descriptor.para_id, new_code, now, &config, )); } // enact the messaging facet of the candidate. weight.saturating_accrue(>::prune_dmq( receipt.descriptor.para_id, commitments.processed_downward_messages, )); weight.saturating_accrue(Self::receive_upward_messages( receipt.descriptor.para_id, commitments.upward_messages.as_slice(), )); weight.saturating_accrue(>::prune_hrmp( receipt.descriptor.para_id, BlockNumberFor::::from(commitments.hrmp_watermark), )); weight.saturating_accrue(>::queue_outbound_hrmp( receipt.descriptor.para_id, commitments.horizontal_messages, )); Self::deposit_event(Event::::CandidateIncluded( plain, commitments.head_data.clone(), core_index, backing_group, )); weight.saturating_add(>::note_new_head( receipt.descriptor.para_id, commitments.head_data, relay_parent_number, )) } pub(crate) fn relay_dispatch_queue_size(para_id: ParaId) -> (u32, u32) { let fp = T::MessageQueue::footprint(AggregateMessageOrigin::Ump(UmpQueueId::Para(para_id))); (fp.count as u32, fp.size as u32) } /// Check that all the upward messages sent by a candidate pass the acceptance criteria. pub(crate) fn check_upward_messages( config: &HostConfiguration>, para: ParaId, upward_messages: &[UpwardMessage], ) -> Result<(), UmpAcceptanceCheckErr> { // Cannot send UMP messages while off-boarding. if >::is_offboarding(para) { ensure!(upward_messages.is_empty(), UmpAcceptanceCheckErr::IsOffboarding); } let additional_msgs = upward_messages.len() as u32; if additional_msgs > config.max_upward_message_num_per_candidate { return Err(UmpAcceptanceCheckErr::MoreMessagesThanPermitted { sent: additional_msgs, permitted: config.max_upward_message_num_per_candidate, }) } let (para_queue_count, mut para_queue_size) = Self::relay_dispatch_queue_size(para); if para_queue_count.saturating_add(additional_msgs) > config.max_upward_queue_count { return Err(UmpAcceptanceCheckErr::CapacityExceeded { count: para_queue_count.saturating_add(additional_msgs).into(), limit: config.max_upward_queue_count.into(), }) } for (idx, msg) in upward_messages.into_iter().enumerate() { let msg_size = msg.len() as u32; if msg_size > config.max_upward_message_size { return Err(UmpAcceptanceCheckErr::MessageSize { idx: idx as u32, msg_size, max_size: config.max_upward_message_size, }) } // make sure that the queue is not overfilled. // we do it here only once since returning false invalidates the whole relay-chain // block. if para_queue_size.saturating_add(msg_size) > config.max_upward_queue_size { return Err(UmpAcceptanceCheckErr::TotalSizeExceeded { total_size: para_queue_size.saturating_add(msg_size).into(), limit: config.max_upward_queue_size.into(), }) } para_queue_size.saturating_accrue(msg_size); } Ok(()) } /// Enqueues `upward_messages` from a `para`'s accepted candidate block. /// /// This function is infallible since the candidate was already accepted and we therefore need /// to deal with the messages as given. Messages that are too long will be ignored since such /// candidates should have already been rejected in [`Self::check_upward_messages`]. pub(crate) fn receive_upward_messages(para: ParaId, upward_messages: &[Vec]) -> Weight { let bounded = upward_messages .iter() .filter_map(|d| { BoundedSlice::try_from(&d[..]) .map_err(|e| { defensive!("Accepted candidate contains too long msg, len=", d.len()); e }) .ok() }) .collect(); Self::receive_bounded_upward_messages(para, bounded) } /// Enqueues storage-bounded `upward_messages` from a `para`'s accepted candidate block. pub(crate) fn receive_bounded_upward_messages( para: ParaId, messages: Vec>>, ) -> Weight { let count = messages.len() as u32; if count == 0 { return Weight::zero() } T::MessageQueue::enqueue_messages( messages.into_iter(), AggregateMessageOrigin::Ump(UmpQueueId::Para(para)), ); let weight = ::WeightInfo::receive_upward_messages(count); Self::deposit_event(Event::UpwardMessagesReceived { from: para, count }); weight } /// Cleans up all paras pending availability that the predicate returns true for. /// /// The predicate accepts the index of the core and the block number the core has been occupied /// since (i.e. the block number the candidate was backed at in this fork of the relay chain). /// /// Returns a vector of cleaned-up core IDs. pub(crate) fn collect_pending( pred: impl Fn(BlockNumberFor) -> AvailabilityTimeoutStatus>, ) -> Vec { let mut cleaned_up_ids = Vec::new(); let mut cleaned_up_cores = Vec::new(); for (para_id, pending_record) in >::iter() { if pred(pending_record.backed_in_number).timed_out { cleaned_up_ids.push(para_id); cleaned_up_cores.push(pending_record.core); } } for para_id in cleaned_up_ids { let pending = >::take(¶_id); let commitments = >::take(¶_id); if let (Some(pending), Some(commitments)) = (pending, commitments) { // defensive: this should always be true. let candidate = CandidateReceipt { descriptor: pending.descriptor, commitments_hash: commitments.hash(), }; Self::deposit_event(Event::::CandidateTimedOut( candidate, commitments.head_data, pending.core, )); } } cleaned_up_cores } /// Cleans up all paras pending availability that are in the given list of disputed candidates. /// /// Returns a vector of cleaned-up core IDs. pub(crate) fn collect_disputed(disputed: &BTreeSet) -> Vec { let mut cleaned_up_ids = Vec::new(); let mut cleaned_up_cores = Vec::new(); for (para_id, pending_record) in >::iter() { if disputed.contains(&pending_record.hash) { cleaned_up_ids.push(para_id); cleaned_up_cores.push(pending_record.core); } } for para_id in cleaned_up_ids { let _ = >::take(¶_id); let _ = >::take(¶_id); } cleaned_up_cores } /// Forcibly enact the candidate with the given ID as though it had been deemed available /// by bitfields. /// /// Is a no-op if there is no candidate pending availability for this para-id. /// This should generally not be used but it is useful during execution of Runtime APIs, /// where the changes to the state are expected to be discarded directly after. pub(crate) fn force_enact(para: ParaId) { let pending = >::take(¶); let commitments = >::take(¶); if let (Some(pending), Some(commitments)) = (pending, commitments) { let candidate = CommittedCandidateReceipt { descriptor: pending.descriptor, commitments }; Self::enact_candidate( pending.relay_parent_number, candidate, pending.backers, pending.availability_votes, pending.core, pending.backing_group, ); } } /// Returns the `CommittedCandidateReceipt` pending availability for the para provided, if any. pub(crate) fn candidate_pending_availability( para: ParaId, ) -> Option> { >::get(¶) .map(|p| p.descriptor) .and_then(|d| >::get(¶).map(move |c| (d, c))) .map(|(d, c)| CommittedCandidateReceipt { descriptor: d, commitments: c }) } /// Returns the metadata around the candidate pending availability for the /// para provided, if any. pub(crate) fn pending_availability( para: ParaId, ) -> Option>> { >::get(¶) } } const fn availability_threshold(n_validators: usize) -> usize { supermajority_threshold(n_validators) } impl AcceptanceCheckErr { /// Returns the same error so that it can be threaded through a needle of `DispatchError` and /// ultimately returned from a `Dispatchable`. fn strip_into_dispatch_err(self) -> Error { use AcceptanceCheckErr::*; match self { HeadDataTooLarge => Error::::HeadDataTooLarge, PrematureCodeUpgrade => Error::::PrematureCodeUpgrade, NewCodeTooLarge => Error::::NewCodeTooLarge, ProcessedDownwardMessages(_) => Error::::IncorrectDownwardMessageHandling, UpwardMessages(_) => Error::::InvalidUpwardMessages, HrmpWatermark(_) => Error::::HrmpWatermarkMishandling, OutboundHrmp(_) => Error::::InvalidOutboundHrmp, } } } impl OnQueueChanged for Pallet { // Write back the remaining queue capacity into `relay_dispatch_queue_remaining_capacity`. fn on_queue_changed(origin: AggregateMessageOrigin, count: u64, size: u64) { let para = match origin { AggregateMessageOrigin::Ump(UmpQueueId::Para(p)) => p, }; let (count, size) = (count.saturated_into(), size.saturated_into()); // TODO paritytech/polkadot#6283: Remove all usages of `relay_dispatch_queue_size` #[allow(deprecated)] well_known_keys::relay_dispatch_queue_size_typed(para).set((count, size)); let config = >::config(); let remaining_count = config.max_upward_queue_count.saturating_sub(count); let remaining_size = config.max_upward_queue_size.saturating_sub(size); well_known_keys::relay_dispatch_queue_remaining_capacity(para) .set((remaining_count, remaining_size)); } } /// A collection of data required for checking a candidate. pub(crate) struct CandidateCheckContext { config: configuration::HostConfiguration>, prev_context: Option>, } /// An error indicating that creating Persisted Validation Data failed /// while checking a candidate's validity. pub(crate) struct FailedToCreatePVD; impl CandidateCheckContext { pub(crate) fn new(prev_context: Option>) -> Self { Self { config: >::config(), prev_context } } /// Execute verification of the candidate. /// /// Assures: /// * relay-parent in-bounds /// * collator signature check passes /// * code hash of commitments matches current code hash /// * para head in the descriptor and commitments match /// /// Returns the relay-parent block number. pub(crate) fn verify_backed_candidate( &self, allowed_relay_parents: &AllowedRelayParentsTracker>, candidate_idx: usize, backed_candidate: &BackedCandidate<::Hash>, ) -> Result, FailedToCreatePVD>, Error> { let para_id = backed_candidate.descriptor().para_id; let relay_parent = backed_candidate.descriptor().relay_parent; // Check that the relay-parent is one of the allowed relay-parents. let (relay_parent_storage_root, relay_parent_number) = { match allowed_relay_parents.acquire_info(relay_parent, self.prev_context) { None => return Err(Error::::DisallowedRelayParent), Some(info) => info, } }; { let persisted_validation_data = match crate::util::make_persisted_validation_data::( para_id, relay_parent_number, relay_parent_storage_root, ) .defensive_proof("the para is registered") { Some(l) => l, None => return Ok(Err(FailedToCreatePVD)), }; let expected = persisted_validation_data.hash(); ensure!( expected == backed_candidate.descriptor().persisted_validation_data_hash, Error::::ValidationDataHashMismatch, ); } ensure!( backed_candidate.descriptor().check_collator_signature().is_ok(), Error::::NotCollatorSigned, ); let validation_code_hash = >::current_code_hash(para_id) // A candidate for a parachain without current validation code is not scheduled. .ok_or_else(|| Error::::UnscheduledCandidate)?; ensure!( backed_candidate.descriptor().validation_code_hash == validation_code_hash, Error::::InvalidValidationCodeHash, ); ensure!( backed_candidate.descriptor().para_head == backed_candidate.candidate.commitments.head_data.hash(), Error::::ParaHeadMismatch, ); if let Err(err) = self.check_validation_outputs( para_id, relay_parent_number, &backed_candidate.candidate.commitments.head_data, &backed_candidate.candidate.commitments.new_validation_code, backed_candidate.candidate.commitments.processed_downward_messages, &backed_candidate.candidate.commitments.upward_messages, BlockNumberFor::::from(backed_candidate.candidate.commitments.hrmp_watermark), &backed_candidate.candidate.commitments.horizontal_messages, ) { log::debug!( target: LOG_TARGET, "Validation outputs checking during inclusion of a candidate {} for parachain `{}` failed", candidate_idx, u32::from(para_id), ); Err(err.strip_into_dispatch_err::())?; }; Ok(Ok(relay_parent_number)) } /// Check the given outputs after candidate validation on whether it passes the acceptance /// criteria. /// /// The things that are checked can be roughly divided into limits and minimums. /// /// Limits are things like max message queue sizes and max head data size. /// /// Minimums are things like the minimum amount of messages that must be processed /// by the parachain block. /// /// Limits are checked against the current state. The parachain block must be acceptable /// by the current relay-chain state regardless of whether it was acceptable at some relay-chain /// state in the past. /// /// Minimums are checked against the current state but modulated by /// considering the information available at the relay-parent of the parachain block. fn check_validation_outputs( &self, para_id: ParaId, relay_parent_number: BlockNumberFor, head_data: &HeadData, new_validation_code: &Option, processed_downward_messages: u32, upward_messages: &[primitives::UpwardMessage], hrmp_watermark: BlockNumberFor, horizontal_messages: &[primitives::OutboundHrmpMessage], ) -> Result<(), AcceptanceCheckErr>> { ensure!( head_data.0.len() <= self.config.max_head_data_size as _, AcceptanceCheckErr::HeadDataTooLarge, ); // if any, the code upgrade attempt is allowed. if let Some(new_validation_code) = new_validation_code { ensure!( >::can_upgrade_validation_code(para_id), AcceptanceCheckErr::PrematureCodeUpgrade, ); ensure!( new_validation_code.0.len() <= self.config.max_code_size as _, AcceptanceCheckErr::NewCodeTooLarge, ); } // check if the candidate passes the messaging acceptance criteria >::check_processed_downward_messages( para_id, relay_parent_number, processed_downward_messages, )?; Pallet::::check_upward_messages(&self.config, para_id, upward_messages)?; >::check_hrmp_watermark(para_id, relay_parent_number, hrmp_watermark)?; >::check_outbound_hrmp(&self.config, para_id, horizontal_messages)?; Ok(()) } } impl QueueFootprinter for Pallet { type Origin = UmpQueueId; fn message_count(origin: Self::Origin) -> u64 { T::MessageQueue::footprint(AggregateMessageOrigin::Ump(origin)).count } }