// Copyright (C) Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! The inclusion pallet is responsible for inclusion and availability of scheduled parachains. //! //! It is responsible for carrying candidates from being backable to being backed, and then from //! backed to included. use crate::{ configuration::{self, HostConfiguration}, disputes, dmp, hrmp, paras::{self, UpgradeStrategy}, scheduler, shared::{self, AllowedRelayParentsTracker}, util::make_persisted_validation_data_with_parent, }; use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec}; use frame_support::{ defensive, pallet_prelude::*, traits::{EnqueueMessage, Footprint, QueueFootprint}, BoundedSlice, }; use frame_system::pallet_prelude::*; use pallet_message_queue::OnQueueChanged; use parity_scale_codec::{Decode, Encode}; use primitives::{ effective_minimum_backing_votes, supermajority_threshold, well_known_keys, BackedCandidate, CandidateCommitments, CandidateDescriptor, CandidateHash, CandidateReceipt, CommittedCandidateReceipt, CoreIndex, GroupIndex, Hash, HeadData, Id as ParaId, SignedAvailabilityBitfields, SigningContext, UpwardMessage, ValidatorId, ValidatorIndex, ValidityAttestation, }; use scale_info::TypeInfo; use sp_runtime::{traits::One, DispatchError, SaturatedConversion, Saturating}; #[cfg(feature = "std")] use sp_std::fmt; use sp_std::{ collections::{btree_map::BTreeMap, btree_set::BTreeSet, vec_deque::VecDeque}, prelude::*, }; pub use pallet::*; #[cfg(test)] pub(crate) mod tests; #[cfg(feature = "runtime-benchmarks")] mod benchmarking; pub mod migration; pub trait WeightInfo { fn receive_upward_messages(i: u32) -> Weight; } pub struct TestWeightInfo; impl WeightInfo for TestWeightInfo { fn receive_upward_messages(_: u32) -> Weight { Weight::MAX } } impl WeightInfo for () { fn receive_upward_messages(_: u32) -> Weight { Weight::zero() } } /// Maximum value that `config.max_upward_message_size` can be set to. /// /// This is used for benchmarking sanely bounding relevant storage items. It is expected from the /// `configuration` pallet to check these values before setting. pub const MAX_UPWARD_MESSAGE_SIZE_BOUND: u32 = 128 * 1024; /// A backed candidate pending availability. #[derive(Encode, Decode, PartialEq, TypeInfo, Clone)] #[cfg_attr(test, derive(Debug))] pub struct CandidatePendingAvailability { /// The availability core this is assigned to. core: CoreIndex, /// The candidate hash. hash: CandidateHash, /// The candidate descriptor. descriptor: CandidateDescriptor, /// The candidate commitments. commitments: CandidateCommitments, /// The received availability votes. One bit per validator. availability_votes: BitVec, /// The backers of the candidate pending availability. backers: BitVec, /// The block number of the relay-parent of the receipt. relay_parent_number: N, /// The block number of the relay-chain block this was backed in. backed_in_number: N, /// The group index backing this block. backing_group: GroupIndex, } impl CandidatePendingAvailability { /// Get the availability votes on the candidate. pub(crate) fn availability_votes(&self) -> &BitVec { &self.availability_votes } /// Get the relay-chain block number this was backed in. pub(crate) fn backed_in_number(&self) -> N where N: Clone, { self.backed_in_number.clone() } /// Get the core index. pub(crate) fn core_occupied(&self) -> CoreIndex { self.core } /// Get the candidate hash. pub(crate) fn candidate_hash(&self) -> CandidateHash { self.hash } /// Get the candidate descriptor. pub(crate) fn candidate_descriptor(&self) -> &CandidateDescriptor { &self.descriptor } /// Get the candidate commitments. pub(crate) fn candidate_commitments(&self) -> &CandidateCommitments { &self.commitments } /// Get the candidate's relay parent's number. pub(crate) fn relay_parent_number(&self) -> N where N: Clone, { self.relay_parent_number.clone() } /// Get the candidate backing group. pub(crate) fn backing_group(&self) -> GroupIndex { self.backing_group } /// Get the candidate's backers. pub(crate) fn backers(&self) -> &BitVec { &self.backers } #[cfg(any(feature = "runtime-benchmarks", test))] pub(crate) fn new( core: CoreIndex, hash: CandidateHash, descriptor: CandidateDescriptor, commitments: CandidateCommitments, availability_votes: BitVec, backers: BitVec, relay_parent_number: N, backed_in_number: N, backing_group: GroupIndex, ) -> Self { Self { core, hash, descriptor, commitments, availability_votes, backers, relay_parent_number, backed_in_number, backing_group, } } } /// A hook for applying validator rewards pub trait RewardValidators { // Reward the validators with the given indices for issuing backing statements. fn reward_backing(validators: impl IntoIterator); // Reward the validators with the given indices for issuing availability bitfields. // Validators are sent to this hook when they have contributed to the availability // of a candidate by setting a bit in their bitfield. fn reward_bitfields(validators: impl IntoIterator); } /// Helper return type for `process_candidates`. #[derive(Encode, Decode, PartialEq, TypeInfo)] #[cfg_attr(test, derive(Debug))] pub(crate) struct ProcessedCandidates { pub(crate) core_indices: Vec<(CoreIndex, ParaId)>, pub(crate) candidate_receipt_with_backing_validator_indices: Vec<(CandidateReceipt, Vec<(ValidatorIndex, ValidityAttestation)>)>, } impl Default for ProcessedCandidates { fn default() -> Self { Self { core_indices: Vec::new(), candidate_receipt_with_backing_validator_indices: Vec::new(), } } } /// Reads the footprint of queues for a specific origin type. pub trait QueueFootprinter { type Origin; fn message_count(origin: Self::Origin) -> u64; } impl QueueFootprinter for () { type Origin = UmpQueueId; fn message_count(_: Self::Origin) -> u64 { 0 } } /// Aggregate message origin for the `MessageQueue` pallet. /// /// Can be extended to serve further use-cases besides just UMP. Is stored in storage, so any change /// to existing values will require a migration. #[derive(Encode, Decode, Clone, MaxEncodedLen, Eq, PartialEq, RuntimeDebug, TypeInfo)] pub enum AggregateMessageOrigin { /// Inbound upward message. #[codec(index = 0)] Ump(UmpQueueId), } /// Identifies a UMP queue inside the `MessageQueue` pallet. /// /// It is written in verbose form since future variants like `Here` and `Bridged` are already /// foreseeable. #[derive(Encode, Decode, Clone, MaxEncodedLen, Eq, PartialEq, RuntimeDebug, TypeInfo)] pub enum UmpQueueId { /// The message originated from this parachain. #[codec(index = 0)] Para(ParaId), } #[cfg(feature = "runtime-benchmarks")] impl From for AggregateMessageOrigin { fn from(n: u32) -> Self { // Some dummy for the benchmarks. Self::Ump(UmpQueueId::Para(n.into())) } } /// The maximal length of a UMP message. pub type MaxUmpMessageLenOf = <::MessageQueue as EnqueueMessage>::MaxMessageLen; #[frame_support::pallet] pub mod pallet { use super::*; const STORAGE_VERSION: StorageVersion = StorageVersion::new(1); #[pallet::pallet] #[pallet::without_storage_info] #[pallet::storage_version(STORAGE_VERSION)] pub struct Pallet(_); #[pallet::config] pub trait Config: frame_system::Config + shared::Config + paras::Config + dmp::Config + hrmp::Config + configuration::Config + scheduler::Config { type RuntimeEvent: From> + IsType<::RuntimeEvent>; type DisputesHandler: disputes::DisputesHandler>; type RewardValidators: RewardValidators; /// The system message queue. /// /// The message queue provides general queueing and processing functionality. Currently it /// replaces the old `UMP` dispatch queue. Other use-cases can be implemented as well by /// adding new variants to `AggregateMessageOrigin`. type MessageQueue: EnqueueMessage; /// Weight info for the calls of this pallet. type WeightInfo: WeightInfo; } #[pallet::event] #[pallet::generate_deposit(pub(super) fn deposit_event)] pub enum Event { /// A candidate was backed. `[candidate, head_data]` CandidateBacked(CandidateReceipt, HeadData, CoreIndex, GroupIndex), /// A candidate was included. `[candidate, head_data]` CandidateIncluded(CandidateReceipt, HeadData, CoreIndex, GroupIndex), /// A candidate timed out. `[candidate, head_data]` CandidateTimedOut(CandidateReceipt, HeadData, CoreIndex), /// Some upward messages have been received and will be processed. UpwardMessagesReceived { from: ParaId, count: u32 }, } #[pallet::error] pub enum Error { /// Validator index out of bounds. ValidatorIndexOutOfBounds, /// Candidate submitted but para not scheduled. UnscheduledCandidate, /// Head data exceeds the configured maximum. HeadDataTooLarge, /// Code upgrade prematurely. PrematureCodeUpgrade, /// Output code is too large NewCodeTooLarge, /// The candidate's relay-parent was not allowed. Either it was /// not recent enough or it didn't advance based on the last parachain block. DisallowedRelayParent, /// Failed to compute group index for the core: either it's out of bounds /// or the relay parent doesn't belong to the current session. InvalidAssignment, /// Invalid group index in core assignment. InvalidGroupIndex, /// Insufficient (non-majority) backing. InsufficientBacking, /// Invalid (bad signature, unknown validator, etc.) backing. InvalidBacking, /// Collator did not sign PoV. NotCollatorSigned, /// The validation data hash does not match expected. ValidationDataHashMismatch, /// The downward message queue is not processed correctly. IncorrectDownwardMessageHandling, /// At least one upward message sent does not pass the acceptance criteria. InvalidUpwardMessages, /// The candidate didn't follow the rules of HRMP watermark advancement. HrmpWatermarkMishandling, /// The HRMP messages sent by the candidate is not valid. InvalidOutboundHrmp, /// The validation code hash of the candidate is not valid. InvalidValidationCodeHash, /// The `para_head` hash in the candidate descriptor doesn't match the hash of the actual /// para head in the commitments. ParaHeadMismatch, } /// Candidates pending availability by `ParaId`. They form a chain starting from the latest /// included head of the para. /// Use a different prefix post-migration to v1, since the v0 `PendingAvailability` storage /// would otherwise have the exact same prefix which could cause undefined behaviour when doing /// the migration. #[pallet::storage] #[pallet::storage_prefix = "V1"] pub(crate) type PendingAvailability = StorageMap< _, Twox64Concat, ParaId, VecDeque>>, >; #[pallet::call] impl Pallet {} } const LOG_TARGET: &str = "runtime::inclusion"; /// The reason that a candidate's outputs were rejected for. #[cfg_attr(feature = "std", derive(Debug))] enum AcceptanceCheckErr { HeadDataTooLarge, /// Code upgrades are not permitted at the current time. PrematureCodeUpgrade, /// The new runtime blob is too large. NewCodeTooLarge, /// The candidate violated this DMP acceptance criteria. ProcessedDownwardMessages, /// The candidate violated this UMP acceptance criteria. UpwardMessages, /// The candidate violated this HRMP watermark acceptance criteria. HrmpWatermark, /// The candidate violated this outbound HRMP acceptance criteria. OutboundHrmp, } impl From for AcceptanceCheckErr { fn from(_: dmp::ProcessedDownwardMessagesAcceptanceErr) -> Self { Self::ProcessedDownwardMessages } } impl From for AcceptanceCheckErr { fn from(_: UmpAcceptanceCheckErr) -> Self { Self::UpwardMessages } } impl From> for AcceptanceCheckErr { fn from(_: hrmp::HrmpWatermarkAcceptanceErr) -> Self { Self::HrmpWatermark } } impl From for AcceptanceCheckErr { fn from(_: hrmp::OutboundHrmpAcceptanceErr) -> Self { Self::OutboundHrmp } } /// An error returned by [`Pallet::check_upward_messages`] that indicates a violation of one of /// acceptance criteria rules. #[cfg_attr(test, derive(PartialEq))] pub(crate) enum UmpAcceptanceCheckErr { /// The maximal number of messages that can be submitted in one batch was exceeded. MoreMessagesThanPermitted { sent: u32, permitted: u32 }, /// The maximal size of a single message was exceeded. MessageSize { idx: u32, msg_size: u32, max_size: u32 }, /// The allowed number of messages in the queue was exceeded. CapacityExceeded { count: u64, limit: u64 }, /// The allowed combined message size in the queue was exceeded. TotalSizeExceeded { total_size: u64, limit: u64 }, /// A para-chain cannot send UMP messages while it is offboarding. IsOffboarding, } #[cfg(feature = "std")] impl fmt::Debug for UmpAcceptanceCheckErr { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { match *self { UmpAcceptanceCheckErr::MoreMessagesThanPermitted { sent, permitted } => write!( fmt, "more upward messages than permitted by config ({} > {})", sent, permitted, ), UmpAcceptanceCheckErr::MessageSize { idx, msg_size, max_size } => write!( fmt, "upward message idx {} larger than permitted by config ({} > {})", idx, msg_size, max_size, ), UmpAcceptanceCheckErr::CapacityExceeded { count, limit } => write!( fmt, "the ump queue would have more items than permitted by config ({} > {})", count, limit, ), UmpAcceptanceCheckErr::TotalSizeExceeded { total_size, limit } => write!( fmt, "the ump queue would have grown past the max size permitted by config ({} > {})", total_size, limit, ), UmpAcceptanceCheckErr::IsOffboarding => { write!(fmt, "upward message rejected because the para is off-boarding") }, } } } impl Pallet { /// Block initialization logic, called by initializer. pub(crate) fn initializer_initialize(_now: BlockNumberFor) -> Weight { Weight::zero() } /// Block finalization logic, called by initializer. pub(crate) fn initializer_finalize() {} /// Handle an incoming session change. pub(crate) fn initializer_on_new_session( _notification: &crate::initializer::SessionChangeNotification>, outgoing_paras: &[ParaId], ) { // unlike most drain methods, drained elements are not cleared on `Drop` of the iterator // and require consumption. for _ in PendingAvailability::::drain() {} Self::cleanup_outgoing_ump_dispatch_queues(outgoing_paras); } pub(crate) fn cleanup_outgoing_ump_dispatch_queues(outgoing: &[ParaId]) { for outgoing_para in outgoing { Self::cleanup_outgoing_ump_dispatch_queue(*outgoing_para); } } pub(crate) fn cleanup_outgoing_ump_dispatch_queue(para: ParaId) { T::MessageQueue::sweep_queue(AggregateMessageOrigin::Ump(UmpQueueId::Para(para))); } /// Extract the freed cores based on cores that became available. /// /// Bitfields are expected to have been sanitized already. E.g. via `sanitize_bitfields`! /// /// Updates storage items `PendingAvailability`. /// /// Returns a `Vec` of `CandidateHash`es and their respective `AvailabilityCore`s that became /// available, and cores free. pub(crate) fn update_pending_availability_and_get_freed_cores( validators: &[ValidatorId], signed_bitfields: SignedAvailabilityBitfields, ) -> Vec<(CoreIndex, CandidateHash)> { let threshold = availability_threshold(validators.len()); let mut votes_per_core: BTreeMap> = BTreeMap::new(); for (checked_bitfield, validator_index) in signed_bitfields.into_iter().map(|signed_bitfield| { let validator_idx = signed_bitfield.validator_index(); let checked_bitfield = signed_bitfield.into_payload(); (checked_bitfield, validator_idx) }) { for (bit_idx, _) in checked_bitfield.0.iter().enumerate().filter(|(_, is_av)| **is_av) { let core_index = CoreIndex(bit_idx as u32); votes_per_core .entry(core_index) .or_insert_with(|| BTreeSet::new()) .insert(validator_index); } } let mut freed_cores = vec![]; let pending_paraids: Vec<_> = PendingAvailability::::iter_keys().collect(); for paraid in pending_paraids { PendingAvailability::::mutate(paraid, |candidates| { if let Some(candidates) = candidates { let mut last_enacted_index: Option = None; for (candidate_index, candidate) in candidates.iter_mut().enumerate() { if let Some(validator_indices) = votes_per_core.remove(&candidate.core) { for validator_index in validator_indices.iter() { // defensive check - this is constructed by loading the // availability bitfield record, which is always `Some` if // the core is occupied - that's why we're here. if let Some(mut bit) = candidate.availability_votes.get_mut(validator_index.0 as usize) { *bit = true; } } } // We check for the candidate's availability even if we didn't get any new // bitfields for its core, as it may have already been available at a // previous block but wasn't enacted due to its predecessors not being // available. if candidate.availability_votes.count_ones() >= threshold { // We can only enact a candidate if we've enacted all of its // predecessors already. let can_enact = if candidate_index == 0 { last_enacted_index == None } else { let prev_candidate_index = usize::try_from(candidate_index - 1) .expect("Previous `if` would have caught a 0 candidate index."); matches!(last_enacted_index, Some(old_index) if old_index == prev_candidate_index) }; if can_enact { last_enacted_index = Some(candidate_index); } } } // Trim the pending availability candidates storage and enact candidates of this // para now. if let Some(last_enacted_index) = last_enacted_index { let evicted_candidates = candidates.drain(0..=last_enacted_index); for candidate in evicted_candidates { freed_cores.push((candidate.core, candidate.hash)); let receipt = CommittedCandidateReceipt { descriptor: candidate.descriptor, commitments: candidate.commitments, }; let _weight = Self::enact_candidate( candidate.relay_parent_number, receipt, candidate.backers, candidate.availability_votes, candidate.core, candidate.backing_group, ); } } } }); } freed_cores } /// Process candidates that have been backed. Provide a set of /// candidates along with their scheduled cores. /// /// Candidates of the same paraid should be sorted according to their dependency order (they /// should form a chain). If this condition is not met, this function will return an error. /// (This really should not happen here, if the candidates were properly sanitised in /// paras_inherent). pub(crate) fn process_candidates( allowed_relay_parents: &AllowedRelayParentsTracker>, candidates: &BTreeMap, CoreIndex)>>, group_validators: GV, core_index_enabled: bool, ) -> Result, DispatchError> where GV: Fn(GroupIndex) -> Option>, { if candidates.is_empty() { return Ok(ProcessedCandidates::default()) } let now = frame_system::Pallet::::block_number(); let validators = shared::ActiveValidatorKeys::::get(); // Collect candidate receipts with backers. let mut candidate_receipt_with_backing_validator_indices = Vec::with_capacity(candidates.len()); let mut core_indices = Vec::with_capacity(candidates.len()); for (para_id, para_candidates) in candidates { let mut latest_head_data = match Self::para_latest_head_data(para_id) { None => { defensive!("Latest included head data for paraid {:?} is None", para_id); continue }, Some(latest_head_data) => latest_head_data, }; for (candidate, core) in para_candidates.iter() { let candidate_hash = candidate.candidate().hash(); let check_ctx = CandidateCheckContext::::new(None); let relay_parent_number = check_ctx.verify_backed_candidate( &allowed_relay_parents, candidate.candidate(), latest_head_data.clone(), )?; // The candidate based upon relay parent `N` should be backed by a // group assigned to core at block `N + 1`. Thus, // `relay_parent_number + 1` will always land in the current // session. let group_idx = scheduler::Pallet::::group_assigned_to_core( *core, relay_parent_number + One::one(), ) .ok_or_else(|| { log::warn!( target: LOG_TARGET, "Failed to compute group index for candidate {:?}", candidate_hash ); Error::::InvalidAssignment })?; let group_vals = group_validators(group_idx).ok_or_else(|| Error::::InvalidGroupIndex)?; // Check backing vote count and validity. let (backers, backer_idx_and_attestation) = Self::check_backing_votes( candidate, &validators, group_vals, core_index_enabled, )?; // Found a valid candidate. latest_head_data = candidate.candidate().commitments.head_data.clone(); candidate_receipt_with_backing_validator_indices .push((candidate.receipt(), backer_idx_and_attestation)); core_indices.push((*core, *para_id)); // Update storage now PendingAvailability::::mutate(¶_id, |pending_availability| { let new_candidate = CandidatePendingAvailability { core: *core, hash: candidate_hash, descriptor: candidate.candidate().descriptor.clone(), commitments: candidate.candidate().commitments.clone(), // initialize all availability votes to 0. availability_votes: bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()], relay_parent_number, backers: backers.to_bitvec(), backed_in_number: now, backing_group: group_idx, }; if let Some(pending_availability) = pending_availability { pending_availability.push_back(new_candidate); } else { *pending_availability = Some([new_candidate].into_iter().collect::>()) } }); // Deposit backed event. Self::deposit_event(Event::::CandidateBacked( candidate.candidate().to_plain(), candidate.candidate().commitments.head_data.clone(), *core, group_idx, )); } } Ok(ProcessedCandidates:: { core_indices, candidate_receipt_with_backing_validator_indices, }) } // Get the latest backed output head data of this para. pub(crate) fn para_latest_head_data(para_id: &ParaId) -> Option { match PendingAvailability::::get(para_id).and_then(|pending_candidates| { pending_candidates.back().map(|x| x.commitments.head_data.clone()) }) { Some(head_data) => Some(head_data), None => paras::Heads::::get(para_id), } } fn check_backing_votes( backed_candidate: &BackedCandidate, validators: &[ValidatorId], group_vals: Vec, core_index_enabled: bool, ) -> Result<(BitVec, Vec<(ValidatorIndex, ValidityAttestation)>), Error> { let minimum_backing_votes = configuration::ActiveConfig::::get().minimum_backing_votes; let mut backers = bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()]; let signing_context = SigningContext { parent_hash: backed_candidate.descriptor().relay_parent, session_index: shared::CurrentSessionIndex::::get(), }; let (validator_indices, _) = backed_candidate.validator_indices_and_core_index(core_index_enabled); // check the signatures in the backing and that it is a majority. let maybe_amount_validated = primitives::check_candidate_backing( backed_candidate.candidate().hash(), backed_candidate.validity_votes(), validator_indices, &signing_context, group_vals.len(), |intra_group_vi| { group_vals .get(intra_group_vi) .and_then(|vi| validators.get(vi.0 as usize)) .map(|v| v.clone()) }, ); match maybe_amount_validated { Ok(amount_validated) => ensure!( amount_validated >= effective_minimum_backing_votes(group_vals.len(), minimum_backing_votes), Error::::InsufficientBacking, ), Err(()) => { Err(Error::::InvalidBacking)?; }, } let mut backer_idx_and_attestation = Vec::<(ValidatorIndex, ValidityAttestation)>::with_capacity( validator_indices.count_ones(), ); for ((bit_idx, _), attestation) in validator_indices .iter() .enumerate() .filter(|(_, signed)| **signed) .zip(backed_candidate.validity_votes().iter().cloned()) { let val_idx = group_vals.get(bit_idx).expect("this query succeeded above; qed"); backer_idx_and_attestation.push((*val_idx, attestation)); backers.set(val_idx.0 as _, true); } Ok((backers, backer_idx_and_attestation)) } /// Run the acceptance criteria checks on the given candidate commitments. pub(crate) fn check_validation_outputs_for_runtime_api( para_id: ParaId, relay_parent_number: BlockNumberFor, validation_outputs: primitives::CandidateCommitments, ) -> bool { let prev_context = paras::MostRecentContext::::get(para_id); let check_ctx = CandidateCheckContext::::new(prev_context); if check_ctx .check_validation_outputs( para_id, relay_parent_number, &validation_outputs.head_data, &validation_outputs.new_validation_code, validation_outputs.processed_downward_messages, &validation_outputs.upward_messages, BlockNumberFor::::from(validation_outputs.hrmp_watermark), &validation_outputs.horizontal_messages, ) .is_err() { log::debug!( target: LOG_TARGET, "Validation outputs checking for parachain `{}` failed", u32::from(para_id), ); false } else { true } } fn enact_candidate( relay_parent_number: BlockNumberFor, receipt: CommittedCandidateReceipt, backers: BitVec, availability_votes: BitVec, core_index: CoreIndex, backing_group: GroupIndex, ) -> Weight { let plain = receipt.to_plain(); let commitments = receipt.commitments; let config = configuration::ActiveConfig::::get(); T::RewardValidators::reward_backing( backers .iter() .enumerate() .filter(|(_, backed)| **backed) .map(|(i, _)| ValidatorIndex(i as _)), ); T::RewardValidators::reward_bitfields( availability_votes .iter() .enumerate() .filter(|(_, voted)| **voted) .map(|(i, _)| ValidatorIndex(i as _)), ); // initial weight is config read. let mut weight = T::DbWeight::get().reads_writes(1, 0); if let Some(new_code) = commitments.new_validation_code { // Block number of candidate's inclusion. let now = frame_system::Pallet::::block_number(); weight.saturating_add(paras::Pallet::::schedule_code_upgrade( receipt.descriptor.para_id, new_code, now, &config, UpgradeStrategy::SetGoAheadSignal, )); } // enact the messaging facet of the candidate. weight.saturating_accrue(dmp::Pallet::::prune_dmq( receipt.descriptor.para_id, commitments.processed_downward_messages, )); weight.saturating_accrue(Self::receive_upward_messages( receipt.descriptor.para_id, commitments.upward_messages.as_slice(), )); weight.saturating_accrue(hrmp::Pallet::::prune_hrmp( receipt.descriptor.para_id, BlockNumberFor::::from(commitments.hrmp_watermark), )); weight.saturating_accrue(hrmp::Pallet::::queue_outbound_hrmp( receipt.descriptor.para_id, commitments.horizontal_messages, )); Self::deposit_event(Event::::CandidateIncluded( plain, commitments.head_data.clone(), core_index, backing_group, )); weight.saturating_add(paras::Pallet::::note_new_head( receipt.descriptor.para_id, commitments.head_data, relay_parent_number, )) } pub(crate) fn relay_dispatch_queue_size(para_id: ParaId) -> (u32, u32) { let fp = T::MessageQueue::footprint(AggregateMessageOrigin::Ump(UmpQueueId::Para(para_id))); (fp.storage.count as u32, fp.storage.size as u32) } /// Check that all the upward messages sent by a candidate pass the acceptance criteria. pub(crate) fn check_upward_messages( config: &HostConfiguration>, para: ParaId, upward_messages: &[UpwardMessage], ) -> Result<(), UmpAcceptanceCheckErr> { // Cannot send UMP messages while off-boarding. if paras::Pallet::::is_offboarding(para) { ensure!(upward_messages.is_empty(), UmpAcceptanceCheckErr::IsOffboarding); } let additional_msgs = upward_messages.len() as u32; if additional_msgs > config.max_upward_message_num_per_candidate { return Err(UmpAcceptanceCheckErr::MoreMessagesThanPermitted { sent: additional_msgs, permitted: config.max_upward_message_num_per_candidate, }) } let (para_queue_count, mut para_queue_size) = Self::relay_dispatch_queue_size(para); if para_queue_count.saturating_add(additional_msgs) > config.max_upward_queue_count { return Err(UmpAcceptanceCheckErr::CapacityExceeded { count: para_queue_count.saturating_add(additional_msgs).into(), limit: config.max_upward_queue_count.into(), }) } for (idx, msg) in upward_messages.into_iter().enumerate() { let msg_size = msg.len() as u32; if msg_size > config.max_upward_message_size { return Err(UmpAcceptanceCheckErr::MessageSize { idx: idx as u32, msg_size, max_size: config.max_upward_message_size, }) } // make sure that the queue is not overfilled. // we do it here only once since returning false invalidates the whole relay-chain // block. if para_queue_size.saturating_add(msg_size) > config.max_upward_queue_size { return Err(UmpAcceptanceCheckErr::TotalSizeExceeded { total_size: para_queue_size.saturating_add(msg_size).into(), limit: config.max_upward_queue_size.into(), }) } para_queue_size.saturating_accrue(msg_size); } Ok(()) } /// Enqueues `upward_messages` from a `para`'s accepted candidate block. /// /// This function is infallible since the candidate was already accepted and we therefore need /// to deal with the messages as given. Messages that are too long will be ignored since such /// candidates should have already been rejected in [`Self::check_upward_messages`]. pub(crate) fn receive_upward_messages(para: ParaId, upward_messages: &[Vec]) -> Weight { let bounded = upward_messages .iter() .filter_map(|d| { BoundedSlice::try_from(&d[..]) .map_err(|e| { defensive!("Accepted candidate contains too long msg, len=", d.len()); e }) .ok() }) .collect(); Self::receive_bounded_upward_messages(para, bounded) } /// Enqueues storage-bounded `upward_messages` from a `para`'s accepted candidate block. pub(crate) fn receive_bounded_upward_messages( para: ParaId, messages: Vec>>, ) -> Weight { let count = messages.len() as u32; if count == 0 { return Weight::zero() } T::MessageQueue::enqueue_messages( messages.into_iter(), AggregateMessageOrigin::Ump(UmpQueueId::Para(para)), ); let weight = ::WeightInfo::receive_upward_messages(count); Self::deposit_event(Event::UpwardMessagesReceived { from: para, count }); weight } /// Cleans up all timed out candidates as well as their descendant candidates. /// /// Returns a vector of cleaned-up core IDs. pub(crate) fn free_timedout() -> Vec { let timeout_pred = scheduler::Pallet::::availability_timeout_predicate(); let timed_out: Vec<_> = Self::free_failed_cores( |candidate| timeout_pred(candidate.backed_in_number).timed_out, None, ) .collect(); let mut timed_out_cores = Vec::with_capacity(timed_out.len()); for candidate in timed_out.iter() { timed_out_cores.push(candidate.core); let receipt = CandidateReceipt { descriptor: candidate.descriptor.clone(), commitments_hash: candidate.commitments.hash(), }; Self::deposit_event(Event::::CandidateTimedOut( receipt, candidate.commitments.head_data.clone(), candidate.core, )); } timed_out_cores } /// Cleans up all cores pending availability occupied by one of the disputed candidates or which /// are descendants of a disputed candidate. /// /// Returns a vector of cleaned-up core IDs, along with the evicted candidate hashes. pub(crate) fn free_disputed( disputed: &BTreeSet, ) -> Vec<(CoreIndex, CandidateHash)> { Self::free_failed_cores( |candidate| disputed.contains(&candidate.hash), Some(disputed.len()), ) .map(|candidate| (candidate.core, candidate.hash)) .collect() } // Clean up cores whose candidates are deemed as failed by the predicate. `pred` returns true if // a candidate is considered failed. // A failed candidate also frees all subsequent cores which hold descendants of said candidate. fn free_failed_cores< P: Fn(&CandidatePendingAvailability>) -> bool, >( pred: P, capacity_hint: Option, ) -> impl Iterator>> { let mut earliest_dropped_indices: BTreeMap = BTreeMap::new(); for (para_id, pending_candidates) in PendingAvailability::::iter() { // We assume that pending candidates are stored in dependency order. So we need to store // the earliest dropped candidate. All others that follow will get freed as well. let mut earliest_dropped_idx = None; for (index, candidate) in pending_candidates.iter().enumerate() { if pred(candidate) { earliest_dropped_idx = Some(index); // Since we're looping the candidates in dependency order, we've found the // earliest failed index for this paraid. break; } } if let Some(earliest_dropped_idx) = earliest_dropped_idx { earliest_dropped_indices.insert(para_id, earliest_dropped_idx); } } let mut cleaned_up_cores = if let Some(capacity) = capacity_hint { Vec::with_capacity(capacity) } else { vec![] }; for (para_id, earliest_dropped_idx) in earliest_dropped_indices { // Do cleanups and record the cleaned up cores PendingAvailability::::mutate(¶_id, |record| { if let Some(record) = record { let cleaned_up = record.drain(earliest_dropped_idx..); cleaned_up_cores.extend(cleaned_up); } }); } cleaned_up_cores.into_iter() } /// Forcibly enact the pending candidates of the given paraid as though they had been deemed /// available by bitfields. /// /// Is a no-op if there is no candidate pending availability for this para-id. /// If there are multiple candidates pending availability for this para-id, it will enact all of /// them. This should generally not be used but it is useful during execution of Runtime APIs, /// where the changes to the state are expected to be discarded directly after. pub(crate) fn force_enact(para: ParaId) { PendingAvailability::::mutate(¶, |candidates| { if let Some(candidates) = candidates { for candidate in candidates.drain(..) { let receipt = CommittedCandidateReceipt { descriptor: candidate.descriptor, commitments: candidate.commitments, }; Self::enact_candidate( candidate.relay_parent_number, receipt, candidate.backers, candidate.availability_votes, candidate.core, candidate.backing_group, ); } } }); } /// Returns the first `CommittedCandidateReceipt` pending availability for the para provided, if /// any. pub(crate) fn candidate_pending_availability( para: ParaId, ) -> Option> { PendingAvailability::::get(¶).and_then(|p| { p.get(0).map(|p| CommittedCandidateReceipt { descriptor: p.descriptor.clone(), commitments: p.commitments.clone(), }) }) } /// Returns all the `CommittedCandidateReceipt` pending availability for the para provided, if /// any. pub(crate) fn candidates_pending_availability( para: ParaId, ) -> Vec> { >::get(¶) .map(|candidates| { candidates .into_iter() .map(|candidate| CommittedCandidateReceipt { descriptor: candidate.descriptor.clone(), commitments: candidate.commitments.clone(), }) .collect() }) .unwrap_or_default() } /// Returns the metadata around the first candidate pending availability for the /// para provided, if any. pub(crate) fn pending_availability( para: ParaId, ) -> Option>> { PendingAvailability::::get(¶).and_then(|p| p.get(0).cloned()) } /// Returns the metadata around the candidate pending availability occupying the supplied core, /// if any. pub(crate) fn pending_availability_with_core( para: ParaId, core: CoreIndex, ) -> Option>> { PendingAvailability::::get(¶) .and_then(|p| p.iter().find(|c| c.core == core).cloned()) } } const fn availability_threshold(n_validators: usize) -> usize { supermajority_threshold(n_validators) } impl AcceptanceCheckErr { /// Returns the same error so that it can be threaded through a needle of `DispatchError` and /// ultimately returned from a `Dispatchable`. fn strip_into_dispatch_err(self) -> Error { use AcceptanceCheckErr::*; match self { HeadDataTooLarge => Error::::HeadDataTooLarge, PrematureCodeUpgrade => Error::::PrematureCodeUpgrade, NewCodeTooLarge => Error::::NewCodeTooLarge, ProcessedDownwardMessages => Error::::IncorrectDownwardMessageHandling, UpwardMessages => Error::::InvalidUpwardMessages, HrmpWatermark => Error::::HrmpWatermarkMishandling, OutboundHrmp => Error::::InvalidOutboundHrmp, } } } impl OnQueueChanged for Pallet { // Write back the remaining queue capacity into `relay_dispatch_queue_remaining_capacity`. fn on_queue_changed(origin: AggregateMessageOrigin, fp: QueueFootprint) { let para = match origin { AggregateMessageOrigin::Ump(UmpQueueId::Para(p)) => p, }; let QueueFootprint { storage: Footprint { count, size }, .. } = fp; let (count, size) = (count.saturated_into(), size.saturated_into()); // TODO paritytech/polkadot#6283: Remove all usages of `relay_dispatch_queue_size` #[allow(deprecated)] well_known_keys::relay_dispatch_queue_size_typed(para).set((count, size)); let config = configuration::ActiveConfig::::get(); let remaining_count = config.max_upward_queue_count.saturating_sub(count); let remaining_size = config.max_upward_queue_size.saturating_sub(size); well_known_keys::relay_dispatch_queue_remaining_capacity(para) .set((remaining_count, remaining_size)); } } /// A collection of data required for checking a candidate. pub(crate) struct CandidateCheckContext { config: configuration::HostConfiguration>, prev_context: Option>, } impl CandidateCheckContext { pub(crate) fn new(prev_context: Option>) -> Self { Self { config: configuration::ActiveConfig::::get(), prev_context } } /// Execute verification of the candidate. /// /// Assures: /// * relay-parent in-bounds /// * collator signature check passes /// * code hash of commitments matches current code hash /// * para head in the descriptor and commitments match /// /// Returns the relay-parent block number. pub(crate) fn verify_backed_candidate( &self, allowed_relay_parents: &AllowedRelayParentsTracker>, backed_candidate_receipt: &CommittedCandidateReceipt<::Hash>, parent_head_data: HeadData, ) -> Result, Error> { let para_id = backed_candidate_receipt.descriptor().para_id; let relay_parent = backed_candidate_receipt.descriptor().relay_parent; // Check that the relay-parent is one of the allowed relay-parents. let (relay_parent_storage_root, relay_parent_number) = { match allowed_relay_parents.acquire_info(relay_parent, self.prev_context) { None => return Err(Error::::DisallowedRelayParent), Some(info) => info, } }; { let persisted_validation_data = make_persisted_validation_data_with_parent::( relay_parent_number, relay_parent_storage_root, parent_head_data, ); let expected = persisted_validation_data.hash(); ensure!( expected == backed_candidate_receipt.descriptor().persisted_validation_data_hash, Error::::ValidationDataHashMismatch, ); } ensure!( backed_candidate_receipt.descriptor().check_collator_signature().is_ok(), Error::::NotCollatorSigned, ); let validation_code_hash = paras::CurrentCodeHash::::get(para_id) // A candidate for a parachain without current validation code is not scheduled. .ok_or_else(|| Error::::UnscheduledCandidate)?; ensure!( backed_candidate_receipt.descriptor().validation_code_hash == validation_code_hash, Error::::InvalidValidationCodeHash, ); ensure!( backed_candidate_receipt.descriptor().para_head == backed_candidate_receipt.commitments.head_data.hash(), Error::::ParaHeadMismatch, ); if let Err(err) = self.check_validation_outputs( para_id, relay_parent_number, &backed_candidate_receipt.commitments.head_data, &backed_candidate_receipt.commitments.new_validation_code, backed_candidate_receipt.commitments.processed_downward_messages, &backed_candidate_receipt.commitments.upward_messages, BlockNumberFor::::from(backed_candidate_receipt.commitments.hrmp_watermark), &backed_candidate_receipt.commitments.horizontal_messages, ) { log::debug!( target: LOG_TARGET, "Validation outputs checking during inclusion of a candidate {:?} for parachain `{}` failed", backed_candidate_receipt.hash(), u32::from(para_id), ); Err(err.strip_into_dispatch_err::())?; }; Ok(relay_parent_number) } /// Check the given outputs after candidate validation on whether it passes the acceptance /// criteria. /// /// The things that are checked can be roughly divided into limits and minimums. /// /// Limits are things like max message queue sizes and max head data size. /// /// Minimums are things like the minimum amount of messages that must be processed /// by the parachain block. /// /// Limits are checked against the current state. The parachain block must be acceptable /// by the current relay-chain state regardless of whether it was acceptable at some relay-chain /// state in the past. /// /// Minimums are checked against the current state but modulated by /// considering the information available at the relay-parent of the parachain block. fn check_validation_outputs( &self, para_id: ParaId, relay_parent_number: BlockNumberFor, head_data: &HeadData, new_validation_code: &Option, processed_downward_messages: u32, upward_messages: &[primitives::UpwardMessage], hrmp_watermark: BlockNumberFor, horizontal_messages: &[primitives::OutboundHrmpMessage], ) -> Result<(), AcceptanceCheckErr> { ensure!( head_data.0.len() <= self.config.max_head_data_size as _, AcceptanceCheckErr::HeadDataTooLarge, ); // if any, the code upgrade attempt is allowed. if let Some(new_validation_code) = new_validation_code { ensure!( paras::Pallet::::can_upgrade_validation_code(para_id), AcceptanceCheckErr::PrematureCodeUpgrade, ); ensure!( new_validation_code.0.len() <= self.config.max_code_size as _, AcceptanceCheckErr::NewCodeTooLarge, ); } // check if the candidate passes the messaging acceptance criteria dmp::Pallet::::check_processed_downward_messages( para_id, relay_parent_number, processed_downward_messages, )?; Pallet::::check_upward_messages(&self.config, para_id, upward_messages)?; hrmp::Pallet::::check_hrmp_watermark(para_id, relay_parent_number, hrmp_watermark)?; hrmp::Pallet::::check_outbound_hrmp(&self.config, para_id, horizontal_messages)?; Ok(()) } } impl QueueFootprinter for Pallet { type Origin = UmpQueueId; fn message_count(origin: Self::Origin) -> u64 { T::MessageQueue::footprint(AggregateMessageOrigin::Ump(origin)).storage.count } }