// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! The inclusion pallet is responsible for inclusion and availability of scheduled parachains.
//!
//! It is responsible for carrying candidates from being backable to being backed, and then from
//! backed to included.
use crate::{
configuration::{self, HostConfiguration},
disputes, dmp, hrmp,
paras::{self, UpgradeStrategy},
scheduler,
shared::{self, AllowedRelayParentsTracker},
util::make_persisted_validation_data_with_parent,
};
use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec};
use frame_support::{
defensive,
pallet_prelude::*,
traits::{EnqueueMessage, Footprint, QueueFootprint},
BoundedSlice,
};
use frame_system::pallet_prelude::*;
use pallet_message_queue::OnQueueChanged;
use parity_scale_codec::{Decode, Encode};
use primitives::{
effective_minimum_backing_votes, supermajority_threshold, well_known_keys, BackedCandidate,
CandidateCommitments, CandidateDescriptor, CandidateHash, CandidateReceipt,
CommittedCandidateReceipt, CoreIndex, GroupIndex, Hash, HeadData, Id as ParaId,
SignedAvailabilityBitfields, SigningContext, UpwardMessage, ValidatorId, ValidatorIndex,
ValidityAttestation,
};
use scale_info::TypeInfo;
use sp_runtime::{traits::One, DispatchError, SaturatedConversion, Saturating};
#[cfg(feature = "std")]
use sp_std::fmt;
use sp_std::{
collections::{btree_map::BTreeMap, btree_set::BTreeSet, vec_deque::VecDeque},
prelude::*,
};
pub use pallet::*;
#[cfg(test)]
pub(crate) mod tests;
#[cfg(feature = "runtime-benchmarks")]
mod benchmarking;
pub mod migration;
pub trait WeightInfo {
fn receive_upward_messages(i: u32) -> Weight;
}
pub struct TestWeightInfo;
impl WeightInfo for TestWeightInfo {
fn receive_upward_messages(_: u32) -> Weight {
Weight::MAX
}
}
impl WeightInfo for () {
fn receive_upward_messages(_: u32) -> Weight {
Weight::zero()
}
}
/// Maximum value that `config.max_upward_message_size` can be set to.
///
/// This is used for benchmarking sanely bounding relevant storage items. It is expected from the
/// `configuration` pallet to check these values before setting.
pub const MAX_UPWARD_MESSAGE_SIZE_BOUND: u32 = 128 * 1024;
/// A backed candidate pending availability.
#[derive(Encode, Decode, PartialEq, TypeInfo, Clone)]
#[cfg_attr(test, derive(Debug))]
pub struct CandidatePendingAvailability {
/// The availability core this is assigned to.
core: CoreIndex,
/// The candidate hash.
hash: CandidateHash,
/// The candidate descriptor.
descriptor: CandidateDescriptor,
/// The candidate commitments.
commitments: CandidateCommitments,
/// The received availability votes. One bit per validator.
availability_votes: BitVec,
/// The backers of the candidate pending availability.
backers: BitVec,
/// The block number of the relay-parent of the receipt.
relay_parent_number: N,
/// The block number of the relay-chain block this was backed in.
backed_in_number: N,
/// The group index backing this block.
backing_group: GroupIndex,
}
impl CandidatePendingAvailability {
/// Get the availability votes on the candidate.
pub(crate) fn availability_votes(&self) -> &BitVec {
&self.availability_votes
}
/// Get the relay-chain block number this was backed in.
pub(crate) fn backed_in_number(&self) -> N
where
N: Clone,
{
self.backed_in_number.clone()
}
/// Get the core index.
pub(crate) fn core_occupied(&self) -> CoreIndex {
self.core
}
/// Get the candidate hash.
pub(crate) fn candidate_hash(&self) -> CandidateHash {
self.hash
}
/// Get the candidate descriptor.
pub(crate) fn candidate_descriptor(&self) -> &CandidateDescriptor {
&self.descriptor
}
/// Get the candidate commitments.
pub(crate) fn candidate_commitments(&self) -> &CandidateCommitments {
&self.commitments
}
/// Get the candidate's relay parent's number.
pub(crate) fn relay_parent_number(&self) -> N
where
N: Clone,
{
self.relay_parent_number.clone()
}
/// Get the candidate backing group.
pub(crate) fn backing_group(&self) -> GroupIndex {
self.backing_group
}
/// Get the candidate's backers.
pub(crate) fn backers(&self) -> &BitVec {
&self.backers
}
#[cfg(any(feature = "runtime-benchmarks", test))]
pub(crate) fn new(
core: CoreIndex,
hash: CandidateHash,
descriptor: CandidateDescriptor,
commitments: CandidateCommitments,
availability_votes: BitVec,
backers: BitVec,
relay_parent_number: N,
backed_in_number: N,
backing_group: GroupIndex,
) -> Self {
Self {
core,
hash,
descriptor,
commitments,
availability_votes,
backers,
relay_parent_number,
backed_in_number,
backing_group,
}
}
}
/// A hook for applying validator rewards
pub trait RewardValidators {
// Reward the validators with the given indices for issuing backing statements.
fn reward_backing(validators: impl IntoIterator- );
// Reward the validators with the given indices for issuing availability bitfields.
// Validators are sent to this hook when they have contributed to the availability
// of a candidate by setting a bit in their bitfield.
fn reward_bitfields(validators: impl IntoIterator
- );
}
/// Helper return type for `process_candidates`.
#[derive(Encode, Decode, PartialEq, TypeInfo)]
#[cfg_attr(test, derive(Debug))]
pub(crate) struct ProcessedCandidates {
pub(crate) core_indices: Vec<(CoreIndex, ParaId)>,
pub(crate) candidate_receipt_with_backing_validator_indices:
Vec<(CandidateReceipt, Vec<(ValidatorIndex, ValidityAttestation)>)>,
}
impl Default for ProcessedCandidates {
fn default() -> Self {
Self {
core_indices: Vec::new(),
candidate_receipt_with_backing_validator_indices: Vec::new(),
}
}
}
/// Reads the footprint of queues for a specific origin type.
pub trait QueueFootprinter {
type Origin;
fn message_count(origin: Self::Origin) -> u64;
}
impl QueueFootprinter for () {
type Origin = UmpQueueId;
fn message_count(_: Self::Origin) -> u64 {
0
}
}
/// Aggregate message origin for the `MessageQueue` pallet.
///
/// Can be extended to serve further use-cases besides just UMP. Is stored in storage, so any change
/// to existing values will require a migration.
#[derive(Encode, Decode, Clone, MaxEncodedLen, Eq, PartialEq, RuntimeDebug, TypeInfo)]
pub enum AggregateMessageOrigin {
/// Inbound upward message.
#[codec(index = 0)]
Ump(UmpQueueId),
}
/// Identifies a UMP queue inside the `MessageQueue` pallet.
///
/// It is written in verbose form since future variants like `Here` and `Bridged` are already
/// forseeable.
#[derive(Encode, Decode, Clone, MaxEncodedLen, Eq, PartialEq, RuntimeDebug, TypeInfo)]
pub enum UmpQueueId {
/// The message originated from this parachain.
#[codec(index = 0)]
Para(ParaId),
}
#[cfg(feature = "runtime-benchmarks")]
impl From for AggregateMessageOrigin {
fn from(n: u32) -> Self {
// Some dummy for the benchmarks.
Self::Ump(UmpQueueId::Para(n.into()))
}
}
/// The maximal length of a UMP message.
pub type MaxUmpMessageLenOf =
<::MessageQueue as EnqueueMessage>::MaxMessageLen;
#[frame_support::pallet]
pub mod pallet {
use super::*;
const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
#[pallet::pallet]
#[pallet::without_storage_info]
#[pallet::storage_version(STORAGE_VERSION)]
pub struct Pallet(_);
#[pallet::config]
pub trait Config:
frame_system::Config
+ shared::Config
+ paras::Config
+ dmp::Config
+ hrmp::Config
+ configuration::Config
+ scheduler::Config
{
type RuntimeEvent: From> + IsType<::RuntimeEvent>;
type DisputesHandler: disputes::DisputesHandler>;
type RewardValidators: RewardValidators;
/// The system message queue.
///
/// The message queue provides general queueing and processing functionality. Currently it
/// replaces the old `UMP` dispatch queue. Other use-cases can be implemented as well by
/// adding new variants to `AggregateMessageOrigin`.
type MessageQueue: EnqueueMessage;
/// Weight info for the calls of this pallet.
type WeightInfo: WeightInfo;
}
#[pallet::event]
#[pallet::generate_deposit(pub(super) fn deposit_event)]
pub enum Event {
/// A candidate was backed. `[candidate, head_data]`
CandidateBacked(CandidateReceipt, HeadData, CoreIndex, GroupIndex),
/// A candidate was included. `[candidate, head_data]`
CandidateIncluded(CandidateReceipt, HeadData, CoreIndex, GroupIndex),
/// A candidate timed out. `[candidate, head_data]`
CandidateTimedOut(CandidateReceipt, HeadData, CoreIndex),
/// Some upward messages have been received and will be processed.
UpwardMessagesReceived { from: ParaId, count: u32 },
}
#[pallet::error]
pub enum Error {
/// Validator index out of bounds.
ValidatorIndexOutOfBounds,
/// Candidate submitted but para not scheduled.
UnscheduledCandidate,
/// Head data exceeds the configured maximum.
HeadDataTooLarge,
/// Code upgrade prematurely.
PrematureCodeUpgrade,
/// Output code is too large
NewCodeTooLarge,
/// The candidate's relay-parent was not allowed. Either it was
/// not recent enough or it didn't advance based on the last parachain block.
DisallowedRelayParent,
/// Failed to compute group index for the core: either it's out of bounds
/// or the relay parent doesn't belong to the current session.
InvalidAssignment,
/// Invalid group index in core assignment.
InvalidGroupIndex,
/// Insufficient (non-majority) backing.
InsufficientBacking,
/// Invalid (bad signature, unknown validator, etc.) backing.
InvalidBacking,
/// Collator did not sign PoV.
NotCollatorSigned,
/// The validation data hash does not match expected.
ValidationDataHashMismatch,
/// The downward message queue is not processed correctly.
IncorrectDownwardMessageHandling,
/// At least one upward message sent does not pass the acceptance criteria.
InvalidUpwardMessages,
/// The candidate didn't follow the rules of HRMP watermark advancement.
HrmpWatermarkMishandling,
/// The HRMP messages sent by the candidate is not valid.
InvalidOutboundHrmp,
/// The validation code hash of the candidate is not valid.
InvalidValidationCodeHash,
/// The `para_head` hash in the candidate descriptor doesn't match the hash of the actual
/// para head in the commitments.
ParaHeadMismatch,
}
/// Candidates pending availability by `ParaId`. They form a chain starting from the latest
/// included head of the para.
/// Use a different prefix post-migration to v1, since the v0 `PendingAvailability` storage
/// would otherwise have the exact same prefix which could cause undefined behaviour when doing
/// the migration.
#[pallet::storage]
#[pallet::storage_prefix = "V1"]
pub(crate) type PendingAvailability = StorageMap<
_,
Twox64Concat,
ParaId,
VecDeque>>,
>;
#[pallet::call]
impl Pallet {}
}
const LOG_TARGET: &str = "runtime::inclusion";
/// The reason that a candidate's outputs were rejected for.
#[cfg_attr(feature = "std", derive(Debug))]
enum AcceptanceCheckErr {
HeadDataTooLarge,
/// Code upgrades are not permitted at the current time.
PrematureCodeUpgrade,
/// The new runtime blob is too large.
NewCodeTooLarge,
/// The candidate violated this DMP acceptance criteria.
ProcessedDownwardMessages,
/// The candidate violated this UMP acceptance criteria.
UpwardMessages,
/// The candidate violated this HRMP watermark acceptance criteria.
HrmpWatermark,
/// The candidate violated this outbound HRMP acceptance criteria.
OutboundHrmp,
}
impl From for AcceptanceCheckErr {
fn from(_: dmp::ProcessedDownwardMessagesAcceptanceErr) -> Self {
Self::ProcessedDownwardMessages
}
}
impl From for AcceptanceCheckErr {
fn from(_: UmpAcceptanceCheckErr) -> Self {
Self::UpwardMessages
}
}
impl From> for AcceptanceCheckErr {
fn from(_: hrmp::HrmpWatermarkAcceptanceErr) -> Self {
Self::HrmpWatermark
}
}
impl From for AcceptanceCheckErr {
fn from(_: hrmp::OutboundHrmpAcceptanceErr) -> Self {
Self::OutboundHrmp
}
}
/// An error returned by [`Pallet::check_upward_messages`] that indicates a violation of one of
/// acceptance criteria rules.
#[cfg_attr(test, derive(PartialEq))]
pub(crate) enum UmpAcceptanceCheckErr {
/// The maximal number of messages that can be submitted in one batch was exceeded.
MoreMessagesThanPermitted { sent: u32, permitted: u32 },
/// The maximal size of a single message was exceeded.
MessageSize { idx: u32, msg_size: u32, max_size: u32 },
/// The allowed number of messages in the queue was exceeded.
CapacityExceeded { count: u64, limit: u64 },
/// The allowed combined message size in the queue was exceeded.
TotalSizeExceeded { total_size: u64, limit: u64 },
/// A para-chain cannot send UMP messages while it is offboarding.
IsOffboarding,
}
#[cfg(feature = "std")]
impl fmt::Debug for UmpAcceptanceCheckErr {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
UmpAcceptanceCheckErr::MoreMessagesThanPermitted { sent, permitted } => write!(
fmt,
"more upward messages than permitted by config ({} > {})",
sent, permitted,
),
UmpAcceptanceCheckErr::MessageSize { idx, msg_size, max_size } => write!(
fmt,
"upward message idx {} larger than permitted by config ({} > {})",
idx, msg_size, max_size,
),
UmpAcceptanceCheckErr::CapacityExceeded { count, limit } => write!(
fmt,
"the ump queue would have more items than permitted by config ({} > {})",
count, limit,
),
UmpAcceptanceCheckErr::TotalSizeExceeded { total_size, limit } => write!(
fmt,
"the ump queue would have grown past the max size permitted by config ({} > {})",
total_size, limit,
),
UmpAcceptanceCheckErr::IsOffboarding => {
write!(fmt, "upward message rejected because the para is off-boarding")
},
}
}
}
impl Pallet {
/// Block initialization logic, called by initializer.
pub(crate) fn initializer_initialize(_now: BlockNumberFor) -> Weight {
Weight::zero()
}
/// Block finalization logic, called by initializer.
pub(crate) fn initializer_finalize() {}
/// Handle an incoming session change.
pub(crate) fn initializer_on_new_session(
_notification: &crate::initializer::SessionChangeNotification>,
outgoing_paras: &[ParaId],
) {
// unlike most drain methods, drained elements are not cleared on `Drop` of the iterator
// and require consumption.
for _ in PendingAvailability::::drain() {}
Self::cleanup_outgoing_ump_dispatch_queues(outgoing_paras);
}
pub(crate) fn cleanup_outgoing_ump_dispatch_queues(outgoing: &[ParaId]) {
for outgoing_para in outgoing {
Self::cleanup_outgoing_ump_dispatch_queue(*outgoing_para);
}
}
pub(crate) fn cleanup_outgoing_ump_dispatch_queue(para: ParaId) {
T::MessageQueue::sweep_queue(AggregateMessageOrigin::Ump(UmpQueueId::Para(para)));
}
/// Extract the freed cores based on cores that became available.
///
/// Bitfields are expected to have been sanitized already. E.g. via `sanitize_bitfields`!
///
/// Updates storage items `PendingAvailability`.
///
/// Returns a `Vec` of `CandidateHash`es and their respective `AvailabilityCore`s that became
/// available, and cores free.
pub(crate) fn update_pending_availability_and_get_freed_cores(
validators: &[ValidatorId],
signed_bitfields: SignedAvailabilityBitfields,
) -> Vec<(CoreIndex, CandidateHash)> {
let threshold = availability_threshold(validators.len());
let mut votes_per_core: BTreeMap> = BTreeMap::new();
for (checked_bitfield, validator_index) in
signed_bitfields.into_iter().map(|signed_bitfield| {
let validator_idx = signed_bitfield.validator_index();
let checked_bitfield = signed_bitfield.into_payload();
(checked_bitfield, validator_idx)
}) {
for (bit_idx, _) in checked_bitfield.0.iter().enumerate().filter(|(_, is_av)| **is_av) {
let core_index = CoreIndex(bit_idx as u32);
votes_per_core
.entry(core_index)
.or_insert_with(|| BTreeSet::new())
.insert(validator_index);
}
}
let mut freed_cores = vec![];
let pending_paraids: Vec<_> = PendingAvailability::::iter_keys().collect();
for paraid in pending_paraids {
PendingAvailability::::mutate(paraid, |candidates| {
if let Some(candidates) = candidates {
let mut last_enacted_index: Option = None;
for (candidate_index, candidate) in candidates.iter_mut().enumerate() {
if let Some(validator_indices) = votes_per_core.remove(&candidate.core) {
for validator_index in validator_indices.iter() {
// defensive check - this is constructed by loading the
// availability bitfield record, which is always `Some` if
// the core is occupied - that's why we're here.
if let Some(mut bit) =
candidate.availability_votes.get_mut(validator_index.0 as usize)
{
*bit = true;
}
}
}
// We check for the candidate's availability even if we didn't get any new
// bitfields for its core, as it may have already been available at a
// previous block but wasn't enacted due to its predecessors not being
// available.
if candidate.availability_votes.count_ones() >= threshold {
// We can only enact a candidate if we've enacted all of its
// predecessors already.
let can_enact = if candidate_index == 0 {
last_enacted_index == None
} else {
let prev_candidate_index = usize::try_from(candidate_index - 1)
.expect("Previous `if` would have caught a 0 candidate index.");
matches!(last_enacted_index, Some(old_index) if old_index == prev_candidate_index)
};
if can_enact {
last_enacted_index = Some(candidate_index);
}
}
}
// Trim the pending availability candidates storage and enact candidates of this
// para now.
if let Some(last_enacted_index) = last_enacted_index {
let evicted_candidates = candidates.drain(0..=last_enacted_index);
for candidate in evicted_candidates {
freed_cores.push((candidate.core, candidate.hash));
let receipt = CommittedCandidateReceipt {
descriptor: candidate.descriptor,
commitments: candidate.commitments,
};
let _weight = Self::enact_candidate(
candidate.relay_parent_number,
receipt,
candidate.backers,
candidate.availability_votes,
candidate.core,
candidate.backing_group,
);
}
}
}
});
}
freed_cores
}
/// Process candidates that have been backed. Provide a set of
/// candidates along with their scheduled cores.
///
/// Candidates of the same paraid should be sorted according to their dependency order (they
/// should form a chain). If this condition is not met, this function will return an error.
/// (This really should not happen here, if the candidates were properly sanitised in
/// paras_inherent).
pub(crate) fn process_candidates(
allowed_relay_parents: &AllowedRelayParentsTracker>,
candidates: &BTreeMap, CoreIndex)>>,
group_validators: GV,
core_index_enabled: bool,
) -> Result, DispatchError>
where
GV: Fn(GroupIndex) -> Option>,
{
if candidates.is_empty() {
return Ok(ProcessedCandidates::default())
}
let now = frame_system::Pallet::::block_number();
let validators = shared::ActiveValidatorKeys::::get();
// Collect candidate receipts with backers.
let mut candidate_receipt_with_backing_validator_indices =
Vec::with_capacity(candidates.len());
let mut core_indices = Vec::with_capacity(candidates.len());
for (para_id, para_candidates) in candidates {
let mut latest_head_data = match Self::para_latest_head_data(para_id) {
None => {
defensive!("Latest included head data for paraid {:?} is None", para_id);
continue
},
Some(latest_head_data) => latest_head_data,
};
for (candidate, core) in para_candidates.iter() {
let candidate_hash = candidate.candidate().hash();
let check_ctx = CandidateCheckContext::::new(None);
let relay_parent_number = check_ctx.verify_backed_candidate(
&allowed_relay_parents,
candidate.candidate(),
latest_head_data.clone(),
)?;
// The candidate based upon relay parent `N` should be backed by a
// group assigned to core at block `N + 1`. Thus,
// `relay_parent_number + 1` will always land in the current
// session.
let group_idx = scheduler::Pallet::::group_assigned_to_core(
*core,
relay_parent_number + One::one(),
)
.ok_or_else(|| {
log::warn!(
target: LOG_TARGET,
"Failed to compute group index for candidate {:?}",
candidate_hash
);
Error::::InvalidAssignment
})?;
let group_vals =
group_validators(group_idx).ok_or_else(|| Error::::InvalidGroupIndex)?;
// Check backing vote count and validity.
let (backers, backer_idx_and_attestation) = Self::check_backing_votes(
candidate,
&validators,
group_vals,
core_index_enabled,
)?;
// Found a valid candidate.
latest_head_data = candidate.candidate().commitments.head_data.clone();
candidate_receipt_with_backing_validator_indices
.push((candidate.receipt(), backer_idx_and_attestation));
core_indices.push((*core, *para_id));
// Update storage now
PendingAvailability::::mutate(¶_id, |pending_availability| {
let new_candidate = CandidatePendingAvailability {
core: *core,
hash: candidate_hash,
descriptor: candidate.candidate().descriptor.clone(),
commitments: candidate.candidate().commitments.clone(),
// initialize all availability votes to 0.
availability_votes: bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()],
relay_parent_number,
backers: backers.to_bitvec(),
backed_in_number: now,
backing_group: group_idx,
};
if let Some(pending_availability) = pending_availability {
pending_availability.push_back(new_candidate);
} else {
*pending_availability =
Some([new_candidate].into_iter().collect::>())
}
});
// Deposit backed event.
Self::deposit_event(Event::::CandidateBacked(
candidate.candidate().to_plain(),
candidate.candidate().commitments.head_data.clone(),
*core,
group_idx,
));
}
}
Ok(ProcessedCandidates:: {
core_indices,
candidate_receipt_with_backing_validator_indices,
})
}
// Get the latest backed output head data of this para.
pub(crate) fn para_latest_head_data(para_id: &ParaId) -> Option {
match PendingAvailability::::get(para_id).and_then(|pending_candidates| {
pending_candidates.back().map(|x| x.commitments.head_data.clone())
}) {
Some(head_data) => Some(head_data),
None => paras::Heads::::get(para_id),
}
}
fn check_backing_votes(
backed_candidate: &BackedCandidate,
validators: &[ValidatorId],
group_vals: Vec,
core_index_enabled: bool,
) -> Result<(BitVec, Vec<(ValidatorIndex, ValidityAttestation)>), Error> {
let minimum_backing_votes = configuration::ActiveConfig::::get().minimum_backing_votes;
let mut backers = bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()];
let signing_context = SigningContext {
parent_hash: backed_candidate.descriptor().relay_parent,
session_index: shared::CurrentSessionIndex::::get(),
};
let (validator_indices, _) =
backed_candidate.validator_indices_and_core_index(core_index_enabled);
// check the signatures in the backing and that it is a majority.
let maybe_amount_validated = primitives::check_candidate_backing(
backed_candidate.candidate().hash(),
backed_candidate.validity_votes(),
validator_indices,
&signing_context,
group_vals.len(),
|intra_group_vi| {
group_vals
.get(intra_group_vi)
.and_then(|vi| validators.get(vi.0 as usize))
.map(|v| v.clone())
},
);
match maybe_amount_validated {
Ok(amount_validated) => ensure!(
amount_validated >=
effective_minimum_backing_votes(group_vals.len(), minimum_backing_votes),
Error::::InsufficientBacking,
),
Err(()) => {
Err(Error::::InvalidBacking)?;
},
}
let mut backer_idx_and_attestation =
Vec::<(ValidatorIndex, ValidityAttestation)>::with_capacity(
validator_indices.count_ones(),
);
for ((bit_idx, _), attestation) in validator_indices
.iter()
.enumerate()
.filter(|(_, signed)| **signed)
.zip(backed_candidate.validity_votes().iter().cloned())
{
let val_idx = group_vals.get(bit_idx).expect("this query succeeded above; qed");
backer_idx_and_attestation.push((*val_idx, attestation));
backers.set(val_idx.0 as _, true);
}
Ok((backers, backer_idx_and_attestation))
}
/// Run the acceptance criteria checks on the given candidate commitments.
pub(crate) fn check_validation_outputs_for_runtime_api(
para_id: ParaId,
relay_parent_number: BlockNumberFor,
validation_outputs: primitives::CandidateCommitments,
) -> bool {
let prev_context = paras::MostRecentContext::::get(para_id);
let check_ctx = CandidateCheckContext::::new(prev_context);
if check_ctx
.check_validation_outputs(
para_id,
relay_parent_number,
&validation_outputs.head_data,
&validation_outputs.new_validation_code,
validation_outputs.processed_downward_messages,
&validation_outputs.upward_messages,
BlockNumberFor::::from(validation_outputs.hrmp_watermark),
&validation_outputs.horizontal_messages,
)
.is_err()
{
log::debug!(
target: LOG_TARGET,
"Validation outputs checking for parachain `{}` failed",
u32::from(para_id),
);
false
} else {
true
}
}
fn enact_candidate(
relay_parent_number: BlockNumberFor,
receipt: CommittedCandidateReceipt,
backers: BitVec,
availability_votes: BitVec,
core_index: CoreIndex,
backing_group: GroupIndex,
) -> Weight {
let plain = receipt.to_plain();
let commitments = receipt.commitments;
let config = configuration::ActiveConfig::::get();
T::RewardValidators::reward_backing(
backers
.iter()
.enumerate()
.filter(|(_, backed)| **backed)
.map(|(i, _)| ValidatorIndex(i as _)),
);
T::RewardValidators::reward_bitfields(
availability_votes
.iter()
.enumerate()
.filter(|(_, voted)| **voted)
.map(|(i, _)| ValidatorIndex(i as _)),
);
// initial weight is config read.
let mut weight = T::DbWeight::get().reads_writes(1, 0);
if let Some(new_code) = commitments.new_validation_code {
// Block number of candidate's inclusion.
let now = frame_system::Pallet::::block_number();
weight.saturating_add(paras::Pallet::::schedule_code_upgrade(
receipt.descriptor.para_id,
new_code,
now,
&config,
UpgradeStrategy::SetGoAheadSignal,
));
}
// enact the messaging facet of the candidate.
weight.saturating_accrue(dmp::Pallet::::prune_dmq(
receipt.descriptor.para_id,
commitments.processed_downward_messages,
));
weight.saturating_accrue(Self::receive_upward_messages(
receipt.descriptor.para_id,
commitments.upward_messages.as_slice(),
));
weight.saturating_accrue(hrmp::Pallet::::prune_hrmp(
receipt.descriptor.para_id,
BlockNumberFor::::from(commitments.hrmp_watermark),
));
weight.saturating_accrue(hrmp::Pallet::::queue_outbound_hrmp(
receipt.descriptor.para_id,
commitments.horizontal_messages,
));
Self::deposit_event(Event::::CandidateIncluded(
plain,
commitments.head_data.clone(),
core_index,
backing_group,
));
weight.saturating_add(paras::Pallet::::note_new_head(
receipt.descriptor.para_id,
commitments.head_data,
relay_parent_number,
))
}
pub(crate) fn relay_dispatch_queue_size(para_id: ParaId) -> (u32, u32) {
let fp = T::MessageQueue::footprint(AggregateMessageOrigin::Ump(UmpQueueId::Para(para_id)));
(fp.storage.count as u32, fp.storage.size as u32)
}
/// Check that all the upward messages sent by a candidate pass the acceptance criteria.
pub(crate) fn check_upward_messages(
config: &HostConfiguration>,
para: ParaId,
upward_messages: &[UpwardMessage],
) -> Result<(), UmpAcceptanceCheckErr> {
// Cannot send UMP messages while off-boarding.
if paras::Pallet::::is_offboarding(para) {
ensure!(upward_messages.is_empty(), UmpAcceptanceCheckErr::IsOffboarding);
}
let additional_msgs = upward_messages.len() as u32;
if additional_msgs > config.max_upward_message_num_per_candidate {
return Err(UmpAcceptanceCheckErr::MoreMessagesThanPermitted {
sent: additional_msgs,
permitted: config.max_upward_message_num_per_candidate,
})
}
let (para_queue_count, mut para_queue_size) = Self::relay_dispatch_queue_size(para);
if para_queue_count.saturating_add(additional_msgs) > config.max_upward_queue_count {
return Err(UmpAcceptanceCheckErr::CapacityExceeded {
count: para_queue_count.saturating_add(additional_msgs).into(),
limit: config.max_upward_queue_count.into(),
})
}
for (idx, msg) in upward_messages.into_iter().enumerate() {
let msg_size = msg.len() as u32;
if msg_size > config.max_upward_message_size {
return Err(UmpAcceptanceCheckErr::MessageSize {
idx: idx as u32,
msg_size,
max_size: config.max_upward_message_size,
})
}
// make sure that the queue is not overfilled.
// we do it here only once since returning false invalidates the whole relay-chain
// block.
if para_queue_size.saturating_add(msg_size) > config.max_upward_queue_size {
return Err(UmpAcceptanceCheckErr::TotalSizeExceeded {
total_size: para_queue_size.saturating_add(msg_size).into(),
limit: config.max_upward_queue_size.into(),
})
}
para_queue_size.saturating_accrue(msg_size);
}
Ok(())
}
/// Enqueues `upward_messages` from a `para`'s accepted candidate block.
///
/// This function is infallible since the candidate was already accepted and we therefore need
/// to deal with the messages as given. Messages that are too long will be ignored since such
/// candidates should have already been rejected in [`Self::check_upward_messages`].
pub(crate) fn receive_upward_messages(para: ParaId, upward_messages: &[Vec]) -> Weight {
let bounded = upward_messages
.iter()
.filter_map(|d| {
BoundedSlice::try_from(&d[..])
.map_err(|e| {
defensive!("Accepted candidate contains too long msg, len=", d.len());
e
})
.ok()
})
.collect();
Self::receive_bounded_upward_messages(para, bounded)
}
/// Enqueues storage-bounded `upward_messages` from a `para`'s accepted candidate block.
pub(crate) fn receive_bounded_upward_messages(
para: ParaId,
messages: Vec>>,
) -> Weight {
let count = messages.len() as u32;
if count == 0 {
return Weight::zero()
}
T::MessageQueue::enqueue_messages(
messages.into_iter(),
AggregateMessageOrigin::Ump(UmpQueueId::Para(para)),
);
let weight = ::WeightInfo::receive_upward_messages(count);
Self::deposit_event(Event::UpwardMessagesReceived { from: para, count });
weight
}
/// Cleans up all timed out candidates as well as their descendant candidates.
///
/// Returns a vector of cleaned-up core IDs.
pub(crate) fn free_timedout() -> Vec {
let timeout_pred = scheduler::Pallet::::availability_timeout_predicate();
let timed_out: Vec<_> = Self::free_failed_cores(
|candidate| timeout_pred(candidate.backed_in_number).timed_out,
None,
)
.collect();
let mut timed_out_cores = Vec::with_capacity(timed_out.len());
for candidate in timed_out.iter() {
timed_out_cores.push(candidate.core);
let receipt = CandidateReceipt {
descriptor: candidate.descriptor.clone(),
commitments_hash: candidate.commitments.hash(),
};
Self::deposit_event(Event::::CandidateTimedOut(
receipt,
candidate.commitments.head_data.clone(),
candidate.core,
));
}
timed_out_cores
}
/// Cleans up all cores pending availability occupied by one of the disputed candidates or which
/// are descendants of a disputed candidate.
///
/// Returns a vector of cleaned-up core IDs, along with the evicted candidate hashes.
pub(crate) fn free_disputed(
disputed: &BTreeSet,
) -> Vec<(CoreIndex, CandidateHash)> {
Self::free_failed_cores(
|candidate| disputed.contains(&candidate.hash),
Some(disputed.len()),
)
.map(|candidate| (candidate.core, candidate.hash))
.collect()
}
// Clean up cores whose candidates are deemed as failed by the predicate. `pred` returns true if
// a candidate is considered failed.
// A failed candidate also frees all subsequent cores which hold descendants of said candidate.
fn free_failed_cores<
P: Fn(&CandidatePendingAvailability>) -> bool,
>(
pred: P,
capacity_hint: Option,
) -> impl Iterator
- >> {
let mut earliest_dropped_indices: BTreeMap = BTreeMap::new();
for (para_id, pending_candidates) in PendingAvailability::::iter() {
// We assume that pending candidates are stored in dependency order. So we need to store
// the earliest dropped candidate. All others that follow will get freed as well.
let mut earliest_dropped_idx = None;
for (index, candidate) in pending_candidates.iter().enumerate() {
if pred(candidate) {
earliest_dropped_idx = Some(index);
// Since we're looping the candidates in dependency order, we've found the
// earliest failed index for this paraid.
break;
}
}
if let Some(earliest_dropped_idx) = earliest_dropped_idx {
earliest_dropped_indices.insert(para_id, earliest_dropped_idx);
}
}
let mut cleaned_up_cores =
if let Some(capacity) = capacity_hint { Vec::with_capacity(capacity) } else { vec![] };
for (para_id, earliest_dropped_idx) in earliest_dropped_indices {
// Do cleanups and record the cleaned up cores
PendingAvailability::::mutate(¶_id, |record| {
if let Some(record) = record {
let cleaned_up = record.drain(earliest_dropped_idx..);
cleaned_up_cores.extend(cleaned_up);
}
});
}
cleaned_up_cores.into_iter()
}
/// Forcibly enact the pending candidates of the given paraid as though they had been deemed
/// available by bitfields.
///
/// Is a no-op if there is no candidate pending availability for this para-id.
/// If there are multiple candidates pending availability for this para-id, it will enact all of
/// them. This should generally not be used but it is useful during execution of Runtime APIs,
/// where the changes to the state are expected to be discarded directly after.
pub(crate) fn force_enact(para: ParaId) {
PendingAvailability::::mutate(¶, |candidates| {
if let Some(candidates) = candidates {
for candidate in candidates.drain(..) {
let receipt = CommittedCandidateReceipt {
descriptor: candidate.descriptor,
commitments: candidate.commitments,
};
Self::enact_candidate(
candidate.relay_parent_number,
receipt,
candidate.backers,
candidate.availability_votes,
candidate.core,
candidate.backing_group,
);
}
}
});
}
/// Returns the first `CommittedCandidateReceipt` pending availability for the para provided, if
/// any.
pub(crate) fn candidate_pending_availability(
para: ParaId,
) -> Option> {
PendingAvailability::::get(¶).and_then(|p| {
p.get(0).map(|p| CommittedCandidateReceipt {
descriptor: p.descriptor.clone(),
commitments: p.commitments.clone(),
})
})
}
/// Returns all the `CommittedCandidateReceipt` pending availability for the para provided, if
/// any.
pub(crate) fn candidates_pending_availability(
para: ParaId,
) -> Vec> {
>::get(¶)
.map(|candidates| {
candidates
.into_iter()
.map(|candidate| CommittedCandidateReceipt {
descriptor: candidate.descriptor.clone(),
commitments: candidate.commitments.clone(),
})
.collect()
})
.unwrap_or_default()
}
/// Returns the metadata around the first candidate pending availability for the
/// para provided, if any.
pub(crate) fn pending_availability(
para: ParaId,
) -> Option>> {
PendingAvailability::::get(¶).and_then(|p| p.get(0).cloned())
}
/// Returns the metadata around the candidate pending availability occupying the supplied core,
/// if any.
pub(crate) fn pending_availability_with_core(
para: ParaId,
core: CoreIndex,
) -> Option>> {
PendingAvailability::::get(¶)
.and_then(|p| p.iter().find(|c| c.core == core).cloned())
}
}
const fn availability_threshold(n_validators: usize) -> usize {
supermajority_threshold(n_validators)
}
impl AcceptanceCheckErr {
/// Returns the same error so that it can be threaded through a needle of `DispatchError` and
/// ultimately returned from a `Dispatchable`.
fn strip_into_dispatch_err(self) -> Error {
use AcceptanceCheckErr::*;
match self {
HeadDataTooLarge => Error::::HeadDataTooLarge,
PrematureCodeUpgrade => Error::::PrematureCodeUpgrade,
NewCodeTooLarge => Error::::NewCodeTooLarge,
ProcessedDownwardMessages => Error::::IncorrectDownwardMessageHandling,
UpwardMessages => Error::::InvalidUpwardMessages,
HrmpWatermark => Error::::HrmpWatermarkMishandling,
OutboundHrmp => Error::::InvalidOutboundHrmp,
}
}
}
impl OnQueueChanged for Pallet {
// Write back the remaining queue capacity into `relay_dispatch_queue_remaining_capacity`.
fn on_queue_changed(origin: AggregateMessageOrigin, fp: QueueFootprint) {
let para = match origin {
AggregateMessageOrigin::Ump(UmpQueueId::Para(p)) => p,
};
let QueueFootprint { storage: Footprint { count, size }, .. } = fp;
let (count, size) = (count.saturated_into(), size.saturated_into());
// TODO paritytech/polkadot#6283: Remove all usages of `relay_dispatch_queue_size`
#[allow(deprecated)]
well_known_keys::relay_dispatch_queue_size_typed(para).set((count, size));
let config = configuration::ActiveConfig::::get();
let remaining_count = config.max_upward_queue_count.saturating_sub(count);
let remaining_size = config.max_upward_queue_size.saturating_sub(size);
well_known_keys::relay_dispatch_queue_remaining_capacity(para)
.set((remaining_count, remaining_size));
}
}
/// A collection of data required for checking a candidate.
pub(crate) struct CandidateCheckContext {
config: configuration::HostConfiguration>,
prev_context: Option>,
}
impl CandidateCheckContext {
pub(crate) fn new(prev_context: Option>) -> Self {
Self { config: configuration::ActiveConfig::::get(), prev_context }
}
/// Execute verification of the candidate.
///
/// Assures:
/// * relay-parent in-bounds
/// * collator signature check passes
/// * code hash of commitments matches current code hash
/// * para head in the descriptor and commitments match
///
/// Returns the relay-parent block number.
pub(crate) fn verify_backed_candidate(
&self,
allowed_relay_parents: &AllowedRelayParentsTracker>,
backed_candidate_receipt: &CommittedCandidateReceipt<::Hash>,
parent_head_data: HeadData,
) -> Result, Error> {
let para_id = backed_candidate_receipt.descriptor().para_id;
let relay_parent = backed_candidate_receipt.descriptor().relay_parent;
// Check that the relay-parent is one of the allowed relay-parents.
let (relay_parent_storage_root, relay_parent_number) = {
match allowed_relay_parents.acquire_info(relay_parent, self.prev_context) {
None => return Err(Error::::DisallowedRelayParent),
Some(info) => info,
}
};
{
let persisted_validation_data = make_persisted_validation_data_with_parent::(
relay_parent_number,
relay_parent_storage_root,
parent_head_data,
);
let expected = persisted_validation_data.hash();
ensure!(
expected == backed_candidate_receipt.descriptor().persisted_validation_data_hash,
Error::::ValidationDataHashMismatch,
);
}
ensure!(
backed_candidate_receipt.descriptor().check_collator_signature().is_ok(),
Error::::NotCollatorSigned,
);
let validation_code_hash = paras::CurrentCodeHash::::get(para_id)
// A candidate for a parachain without current validation code is not scheduled.
.ok_or_else(|| Error::::UnscheduledCandidate)?;
ensure!(
backed_candidate_receipt.descriptor().validation_code_hash == validation_code_hash,
Error::::InvalidValidationCodeHash,
);
ensure!(
backed_candidate_receipt.descriptor().para_head ==
backed_candidate_receipt.commitments.head_data.hash(),
Error::::ParaHeadMismatch,
);
if let Err(err) = self.check_validation_outputs(
para_id,
relay_parent_number,
&backed_candidate_receipt.commitments.head_data,
&backed_candidate_receipt.commitments.new_validation_code,
backed_candidate_receipt.commitments.processed_downward_messages,
&backed_candidate_receipt.commitments.upward_messages,
BlockNumberFor::::from(backed_candidate_receipt.commitments.hrmp_watermark),
&backed_candidate_receipt.commitments.horizontal_messages,
) {
log::debug!(
target: LOG_TARGET,
"Validation outputs checking during inclusion of a candidate {:?} for parachain `{}` failed",
backed_candidate_receipt.hash(),
u32::from(para_id),
);
Err(err.strip_into_dispatch_err::())?;
};
Ok(relay_parent_number)
}
/// Check the given outputs after candidate validation on whether it passes the acceptance
/// criteria.
///
/// The things that are checked can be roughly divided into limits and minimums.
///
/// Limits are things like max message queue sizes and max head data size.
///
/// Minimums are things like the minimum amount of messages that must be processed
/// by the parachain block.
///
/// Limits are checked against the current state. The parachain block must be acceptable
/// by the current relay-chain state regardless of whether it was acceptable at some relay-chain
/// state in the past.
///
/// Minimums are checked against the current state but modulated by
/// considering the information available at the relay-parent of the parachain block.
fn check_validation_outputs(
&self,
para_id: ParaId,
relay_parent_number: BlockNumberFor,
head_data: &HeadData,
new_validation_code: &Option,
processed_downward_messages: u32,
upward_messages: &[primitives::UpwardMessage],
hrmp_watermark: BlockNumberFor,
horizontal_messages: &[primitives::OutboundHrmpMessage],
) -> Result<(), AcceptanceCheckErr> {
ensure!(
head_data.0.len() <= self.config.max_head_data_size as _,
AcceptanceCheckErr::HeadDataTooLarge,
);
// if any, the code upgrade attempt is allowed.
if let Some(new_validation_code) = new_validation_code {
ensure!(
paras::Pallet::::can_upgrade_validation_code(para_id),
AcceptanceCheckErr::PrematureCodeUpgrade,
);
ensure!(
new_validation_code.0.len() <= self.config.max_code_size as _,
AcceptanceCheckErr::NewCodeTooLarge,
);
}
// check if the candidate passes the messaging acceptance criteria
dmp::Pallet::::check_processed_downward_messages(
para_id,
relay_parent_number,
processed_downward_messages,
)?;
Pallet::::check_upward_messages(&self.config, para_id, upward_messages)?;
hrmp::Pallet::::check_hrmp_watermark(para_id, relay_parent_number, hrmp_watermark)?;
hrmp::Pallet::::check_outbound_hrmp(&self.config, para_id, horizontal_messages)?;
Ok(())
}
}
impl QueueFootprinter for Pallet {
type Origin = UmpQueueId;
fn message_count(origin: Self::Origin) -> u64 {
T::MessageQueue::footprint(AggregateMessageOrigin::Ump(origin)).storage.count
}
}