Skip to content
mod.rs 44.8 KiB
Newer Older
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The inclusion pallet is responsible for inclusion and availability of scheduled parachains.
//! It is responsible for carrying candidates from being backable to being backed, and then from
//! backed to included.
	configuration::{self, HostConfiguration},
	shared::{self, AllowedRelayParentsTracker},
	util::make_persisted_validation_data_with_parent,
};
use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec};
use frame_support::{
	defensive,
	pallet_prelude::*,
	traits::{EnqueueMessage, Footprint, QueueFootprint},
use frame_system::pallet_prelude::*;
use pallet_message_queue::OnQueueChanged;
use parity_scale_codec::{Decode, Encode};
	effective_minimum_backing_votes, supermajority_threshold, well_known_keys, BackedCandidate,
	CandidateCommitments, CandidateDescriptor, CandidateHash, CandidateReceipt,
	CommittedCandidateReceipt, CoreIndex, GroupIndex, Hash, HeadData, Id as ParaId,
	SignedAvailabilityBitfields, SigningContext, UpwardMessage, ValidatorId, ValidatorIndex,
	ValidityAttestation,
};
use scale_info::TypeInfo;
use sp_runtime::{traits::One, DispatchError, SaturatedConversion, Saturating};
#[cfg(feature = "std")]
use sp_std::fmt;
use sp_std::{
	collections::{btree_map::BTreeMap, btree_set::BTreeSet, vec_deque::VecDeque},
	prelude::*,
};

pub use pallet::*;

#[cfg(test)]
pub(crate) mod tests;

#[cfg(feature = "runtime-benchmarks")]
mod benchmarking;

pub trait WeightInfo {
	fn receive_upward_messages(i: u32) -> Weight;
}

pub struct TestWeightInfo;
impl WeightInfo for TestWeightInfo {
	fn receive_upward_messages(_: u32) -> Weight {
		Weight::MAX
	}
}

impl WeightInfo for () {
	fn receive_upward_messages(_: u32) -> Weight {
		Weight::zero()
	}
}

/// Maximum value that `config.max_upward_message_size` can be set to.
///
/// This is used for benchmarking sanely bounding relevant storage items. It is expected from the
/// `configuration` pallet to check these values before setting.
pub const MAX_UPWARD_MESSAGE_SIZE_BOUND: u32 = 128 * 1024;

/// A backed candidate pending availability.
#[derive(Encode, Decode, PartialEq, TypeInfo, Clone)]
#[cfg_attr(test, derive(Debug))]
pub struct CandidatePendingAvailability<H, N> {
	/// The availability core this is assigned to.
	core: CoreIndex,
	/// The candidate hash.
	hash: CandidateHash,
	/// The candidate descriptor.
	descriptor: CandidateDescriptor<H>,
	/// The candidate commitments.
	commitments: CandidateCommitments,
	/// The received availability votes. One bit per validator.
	availability_votes: BitVec<u8, BitOrderLsb0>,
	/// The backers of the candidate pending availability.
	backers: BitVec<u8, BitOrderLsb0>,
	/// The block number of the relay-parent of the receipt.
	relay_parent_number: N,
	/// The block number of the relay-chain block this was backed in.
	backed_in_number: N,
	/// The group index backing this block.
	backing_group: GroupIndex,
}

impl<H, N> CandidatePendingAvailability<H, N> {
	/// Get the availability votes on the candidate.
	pub(crate) fn availability_votes(&self) -> &BitVec<u8, BitOrderLsb0> {
		&self.availability_votes
	}

	/// Get the relay-chain block number this was backed in.
	pub(crate) fn backed_in_number(&self) -> N
	where
		N: Clone,
	{
		self.backed_in_number.clone()
	}

	/// Get the core index.
	pub(crate) fn core_occupied(&self) -> CoreIndex {
alexgparity's avatar
alexgparity committed
		self.core
	}

	/// Get the candidate hash.
	pub(crate) fn candidate_hash(&self) -> CandidateHash {
		self.hash
	}

	/// Get the candidate descriptor.
	pub(crate) fn candidate_descriptor(&self) -> &CandidateDescriptor<H> {
		&self.descriptor
	}

	/// Get the candidate commitments.
	pub(crate) fn candidate_commitments(&self) -> &CandidateCommitments {
		&self.commitments
	}

	/// Get the candidate's relay parent's number.
	pub(crate) fn relay_parent_number(&self) -> N
	where
		N: Clone,
	{
		self.relay_parent_number.clone()
	}

	/// Get the candidate backing group.
	pub(crate) fn backing_group(&self) -> GroupIndex {
		self.backing_group
	}

	/// Get the candidate's backers.
	pub(crate) fn backers(&self) -> &BitVec<u8, BitOrderLsb0> {
		&self.backers
	}

	#[cfg(any(feature = "runtime-benchmarks", test))]
	pub(crate) fn new(
		core: CoreIndex,
		hash: CandidateHash,
		descriptor: CandidateDescriptor<H>,
		commitments: CandidateCommitments,
		availability_votes: BitVec<u8, BitOrderLsb0>,
		backers: BitVec<u8, BitOrderLsb0>,
		relay_parent_number: N,
		backed_in_number: N,
		backing_group: GroupIndex,
	) -> Self {
		Self {
			core,
			hash,
			descriptor,
			availability_votes,
			backers,
			relay_parent_number,
			backed_in_number,
			backing_group,
		}
	}
}

/// A hook for applying validator rewards
pub trait RewardValidators {
	// Reward the validators with the given indices for issuing backing statements.
	fn reward_backing(validators: impl IntoIterator<Item = ValidatorIndex>);
	// Reward the validators with the given indices for issuing availability bitfields.
	// Validators are sent to this hook when they have contributed to the availability
	// of a candidate by setting a bit in their bitfield.
	fn reward_bitfields(validators: impl IntoIterator<Item = ValidatorIndex>);
}

/// Helper return type for `process_candidates`.
#[derive(Encode, Decode, PartialEq, TypeInfo)]
#[cfg_attr(test, derive(Debug))]
pub(crate) struct ProcessedCandidates<H = Hash> {
	pub(crate) core_indices: Vec<(CoreIndex, ParaId)>,
	pub(crate) candidate_receipt_with_backing_validator_indices:
		Vec<(CandidateReceipt<H>, Vec<(ValidatorIndex, ValidityAttestation)>)>,
}

impl<H> Default for ProcessedCandidates<H> {
	fn default() -> Self {
		Self {
			core_indices: Vec::new(),
			candidate_receipt_with_backing_validator_indices: Vec::new(),
		}
	}
}

/// Reads the footprint of queues for a specific origin type.
pub trait QueueFootprinter {
	type Origin;

	fn message_count(origin: Self::Origin) -> u64;
}

impl QueueFootprinter for () {
	type Origin = UmpQueueId;

	fn message_count(_: Self::Origin) -> u64 {
		0
	}
}

/// Aggregate message origin for the `MessageQueue` pallet.
///
/// Can be extended to serve further use-cases besides just UMP. Is stored in storage, so any change
/// to existing values will require a migration.
#[derive(Encode, Decode, Clone, MaxEncodedLen, Eq, PartialEq, RuntimeDebug, TypeInfo)]
pub enum AggregateMessageOrigin {
	/// Inbound upward message.
	#[codec(index = 0)]
	Ump(UmpQueueId),
}

/// Identifies a UMP queue inside the `MessageQueue` pallet.
///
/// It is written in verbose form since future variants like `Here` and `Bridged` are already
sfuhfds's avatar
sfuhfds committed
/// foreseeable.
#[derive(Encode, Decode, Clone, MaxEncodedLen, Eq, PartialEq, RuntimeDebug, TypeInfo)]
pub enum UmpQueueId {
	/// The message originated from this parachain.
	#[codec(index = 0)]
	Para(ParaId),
}

#[cfg(feature = "runtime-benchmarks")]
impl From<u32> for AggregateMessageOrigin {
	fn from(n: u32) -> Self {
		// Some dummy for the benchmarks.
		Self::Ump(UmpQueueId::Para(n.into()))
	}
}

/// The maximal length of a UMP message.
pub type MaxUmpMessageLenOf<T> =
	<<T as Config>::MessageQueue as EnqueueMessage<AggregateMessageOrigin>>::MaxMessageLen;

#[frame_support::pallet]
pub mod pallet {
	use super::*;

	const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
	#[pallet::pallet]
	#[pallet::without_storage_info]
	#[pallet::storage_version(STORAGE_VERSION)]
	pub struct Pallet<T>(_);

	#[pallet::config]
	pub trait Config:
		frame_system::Config
		+ shared::Config
		+ paras::Config
		+ dmp::Config
		+ hrmp::Config
		+ configuration::Config
		+ scheduler::Config
		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
		type DisputesHandler: disputes::DisputesHandler<BlockNumberFor<Self>>;
		type RewardValidators: RewardValidators;

		/// The system message queue.
		///
		/// The message queue provides general queueing and processing functionality. Currently it
		/// replaces the old `UMP` dispatch queue. Other use-cases can be implemented as well by
		/// adding new variants to `AggregateMessageOrigin`.
		type MessageQueue: EnqueueMessage<AggregateMessageOrigin>;

		/// Weight info for the calls of this pallet.
		type WeightInfo: WeightInfo;
	}

	#[pallet::event]
	#[pallet::generate_deposit(pub(super) fn deposit_event)]
	pub enum Event<T: Config> {
		/// A candidate was backed. `[candidate, head_data]`
		CandidateBacked(CandidateReceipt<T::Hash>, HeadData, CoreIndex, GroupIndex),
		/// A candidate was included. `[candidate, head_data]`
		CandidateIncluded(CandidateReceipt<T::Hash>, HeadData, CoreIndex, GroupIndex),
		/// A candidate timed out. `[candidate, head_data]`
		CandidateTimedOut(CandidateReceipt<T::Hash>, HeadData, CoreIndex),
		/// Some upward messages have been received and will be processed.
		UpwardMessagesReceived { from: ParaId, count: u32 },
	}

	#[pallet::error]
	pub enum Error<T> {
		/// Validator index out of bounds.
		ValidatorIndexOutOfBounds,
		/// Candidate submitted but para not scheduled.
		UnscheduledCandidate,
		/// Head data exceeds the configured maximum.
		HeadDataTooLarge,
		/// Code upgrade prematurely.
		PrematureCodeUpgrade,
		/// Output code is too large
		NewCodeTooLarge,
		/// The candidate's relay-parent was not allowed. Either it was
		/// not recent enough or it didn't advance based on the last parachain block.
		DisallowedRelayParent,
		/// Failed to compute group index for the core: either it's out of bounds
		/// or the relay parent doesn't belong to the current session.
		InvalidAssignment,
		/// Invalid group index in core assignment.
		InvalidGroupIndex,
		/// Insufficient (non-majority) backing.
		InsufficientBacking,
		/// Invalid (bad signature, unknown validator, etc.) backing.
		InvalidBacking,
		/// Collator did not sign PoV.
		NotCollatorSigned,
		/// The validation data hash does not match expected.
		ValidationDataHashMismatch,
		/// The downward message queue is not processed correctly.
		IncorrectDownwardMessageHandling,
		/// At least one upward message sent does not pass the acceptance criteria.
		InvalidUpwardMessages,
		/// The candidate didn't follow the rules of HRMP watermark advancement.
		HrmpWatermarkMishandling,
		/// The HRMP messages sent by the candidate is not valid.
		InvalidOutboundHrmp,
		/// The validation code hash of the candidate is not valid.
		InvalidValidationCodeHash,
		/// The `para_head` hash in the candidate descriptor doesn't match the hash of the actual
		/// para head in the commitments.
	/// Candidates pending availability by `ParaId`. They form a chain starting from the latest
	/// included head of the para.
	/// Use a different prefix post-migration to v1, since the v0 `PendingAvailability` storage
	/// would otherwise have the exact same prefix which could cause undefined behaviour when doing
	/// the migration.
	#[pallet::storage]
	#[pallet::storage_prefix = "V1"]
	pub(crate) type PendingAvailability<T: Config> = StorageMap<
		_,
		Twox64Concat,
		ParaId,
		VecDeque<CandidatePendingAvailability<T::Hash, BlockNumberFor<T>>>,

	#[pallet::call]
	impl<T: Config> Pallet<T> {}
}

const LOG_TARGET: &str = "runtime::inclusion";

/// The reason that a candidate's outputs were rejected for.
#[cfg_attr(feature = "std", derive(Debug))]
	HeadDataTooLarge,
	/// Code upgrades are not permitted at the current time.
	PrematureCodeUpgrade,
	/// The new runtime blob is too large.
	NewCodeTooLarge,
	/// The candidate violated this DMP acceptance criteria.
	/// The candidate violated this UMP acceptance criteria.
	/// The candidate violated this HRMP watermark acceptance criteria.
	/// The candidate violated this outbound HRMP acceptance criteria.
	OutboundHrmp,
}

impl From<dmp::ProcessedDownwardMessagesAcceptanceErr> for AcceptanceCheckErr {
	fn from(_: dmp::ProcessedDownwardMessagesAcceptanceErr) -> Self {
		Self::ProcessedDownwardMessages
	}
}

impl From<UmpAcceptanceCheckErr> for AcceptanceCheckErr {
	fn from(_: UmpAcceptanceCheckErr) -> Self {
		Self::UpwardMessages
	}
}

impl<BlockNumber> From<hrmp::HrmpWatermarkAcceptanceErr<BlockNumber>> for AcceptanceCheckErr {
	fn from(_: hrmp::HrmpWatermarkAcceptanceErr<BlockNumber>) -> Self {
		Self::HrmpWatermark
	}
}

impl From<hrmp::OutboundHrmpAcceptanceErr> for AcceptanceCheckErr {
	fn from(_: hrmp::OutboundHrmpAcceptanceErr) -> Self {
		Self::OutboundHrmp
	}
/// An error returned by [`Pallet::check_upward_messages`] that indicates a violation of one of
/// acceptance criteria rules.
#[cfg_attr(test, derive(PartialEq))]
pub(crate) enum UmpAcceptanceCheckErr {
	/// The maximal number of messages that can be submitted in one batch was exceeded.
	MoreMessagesThanPermitted { sent: u32, permitted: u32 },
	/// The maximal size of a single message was exceeded.
	MessageSize { idx: u32, msg_size: u32, max_size: u32 },
	/// The allowed number of messages in the queue was exceeded.
	CapacityExceeded { count: u64, limit: u64 },
	/// The allowed combined message size in the queue was exceeded.
	TotalSizeExceeded { total_size: u64, limit: u64 },
	/// A para-chain cannot send UMP messages while it is offboarding.
	IsOffboarding,
}

#[cfg(feature = "std")]
impl fmt::Debug for UmpAcceptanceCheckErr {
	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
		match *self {
			UmpAcceptanceCheckErr::MoreMessagesThanPermitted { sent, permitted } => write!(
				fmt,
				"more upward messages than permitted by config ({} > {})",
				sent, permitted,
			),
			UmpAcceptanceCheckErr::MessageSize { idx, msg_size, max_size } => write!(
				fmt,
				"upward message idx {} larger than permitted by config ({} > {})",
				idx, msg_size, max_size,
			),
			UmpAcceptanceCheckErr::CapacityExceeded { count, limit } => write!(
				fmt,
				"the ump queue would have more items than permitted by config ({} > {})",
				count, limit,
			),
			UmpAcceptanceCheckErr::TotalSizeExceeded { total_size, limit } => write!(
				fmt,
				"the ump queue would have grown past the max size permitted by config ({} > {})",
				total_size, limit,
			),
			UmpAcceptanceCheckErr::IsOffboarding => {
				write!(fmt, "upward message rejected because the para is off-boarding")
			},
impl<T: Config> Pallet<T> {
	/// Block initialization logic, called by initializer.
	pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
		Weight::zero()
	}

	/// Block finalization logic, called by initializer.
	pub(crate) fn initializer_finalize() {}

	/// Handle an incoming session change.
	pub(crate) fn initializer_on_new_session(
		_notification: &crate::initializer::SessionChangeNotification<BlockNumberFor<T>>,
		outgoing_paras: &[ParaId],
	) {
		// unlike most drain methods, drained elements are not cleared on `Drop` of the iterator
		// and require consumption.
		for _ in PendingAvailability::<T>::drain() {}

		Self::cleanup_outgoing_ump_dispatch_queues(outgoing_paras);
	}

	pub(crate) fn cleanup_outgoing_ump_dispatch_queues(outgoing: &[ParaId]) {
		for outgoing_para in outgoing {
			Self::cleanup_outgoing_ump_dispatch_queue(*outgoing_para);
		}
	}

	pub(crate) fn cleanup_outgoing_ump_dispatch_queue(para: ParaId) {
		T::MessageQueue::sweep_queue(AggregateMessageOrigin::Ump(UmpQueueId::Para(para)));
	}

	/// Extract the freed cores based on cores that became available.
	///
	/// Bitfields are expected to have been sanitized already. E.g. via `sanitize_bitfields`!
	///
	/// Updates storage items `PendingAvailability`.
	/// Returns a `Vec` of `CandidateHash`es and their respective `AvailabilityCore`s that became
	/// available, and cores free.
	pub(crate) fn update_pending_availability_and_get_freed_cores(
		validators: &[ValidatorId],
		signed_bitfields: SignedAvailabilityBitfields,
	) -> Vec<(CoreIndex, CandidateHash)> {
		let threshold = availability_threshold(validators.len());

		let mut votes_per_core: BTreeMap<CoreIndex, BTreeSet<ValidatorIndex>> = BTreeMap::new();

		for (checked_bitfield, validator_index) in
			signed_bitfields.into_iter().map(|signed_bitfield| {
				let validator_idx = signed_bitfield.validator_index();
				let checked_bitfield = signed_bitfield.into_payload();
				(checked_bitfield, validator_idx)
			}) {
			for (bit_idx, _) in checked_bitfield.0.iter().enumerate().filter(|(_, is_av)| **is_av) {
				let core_index = CoreIndex(bit_idx as u32);
				votes_per_core
					.entry(core_index)
					.or_insert_with(|| BTreeSet::new())
					.insert(validator_index);
		let pending_paraids: Vec<_> = PendingAvailability::<T>::iter_keys().collect();
		for paraid in pending_paraids {
			PendingAvailability::<T>::mutate(paraid, |candidates| {
				if let Some(candidates) = candidates {
					let mut last_enacted_index: Option<usize> = None;

					for (candidate_index, candidate) in candidates.iter_mut().enumerate() {
						if let Some(validator_indices) = votes_per_core.remove(&candidate.core) {
							for validator_index in validator_indices.iter() {
								// defensive check - this is constructed by loading the
								// availability bitfield record, which is always `Some` if
								// the core is occupied - that's why we're here.
								if let Some(mut bit) =
									candidate.availability_votes.get_mut(validator_index.0 as usize)
								{
									*bit = true;
								}
							}
						}

						// We check for the candidate's availability even if we didn't get any new
						// bitfields for its core, as it may have already been available at a
						// previous block but wasn't enacted due to its predecessors not being
						// available.
						if candidate.availability_votes.count_ones() >= threshold {
							// We can only enact a candidate if we've enacted all of its
							// predecessors already.
							let can_enact = if candidate_index == 0 {
								last_enacted_index == None
							} else {
								let prev_candidate_index = usize::try_from(candidate_index - 1)
									.expect("Previous `if` would have caught a 0 candidate index.");
								matches!(last_enacted_index, Some(old_index) if old_index == prev_candidate_index)
							};

							if can_enact {
								last_enacted_index = Some(candidate_index);
							}
						}
					}
					// Trim the pending availability candidates storage and enact candidates of this
					// para now.
					if let Some(last_enacted_index) = last_enacted_index {
						let evicted_candidates = candidates.drain(0..=last_enacted_index);
						for candidate in evicted_candidates {
							freed_cores.push((candidate.core, candidate.hash));

							let receipt = CommittedCandidateReceipt {
								descriptor: candidate.descriptor,
								commitments: candidate.commitments,
							};
							let _weight = Self::enact_candidate(
								candidate.relay_parent_number,
								receipt,
								candidate.backers,
								candidate.availability_votes,
								candidate.core,
								candidate.backing_group,
							);
						}
					}
				}
			});
	/// Process candidates that have been backed. Provide a set of
	/// candidates along with their scheduled cores.
	/// Candidates of the same paraid should be sorted according to their dependency order (they
	/// should form a chain). If this condition is not met, this function will return an error.
	/// (This really should not happen here, if the candidates were properly sanitised in
	/// paras_inherent).
	pub(crate) fn process_candidates<GV>(
		allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
		candidates: &BTreeMap<ParaId, Vec<(BackedCandidate<T::Hash>, CoreIndex)>>,
		group_validators: GV,
	) -> Result<ProcessedCandidates<T::Hash>, DispatchError>
	where
		GV: Fn(GroupIndex) -> Option<Vec<ValidatorIndex>>,
	{
			return Ok(ProcessedCandidates::default())
		}

		let now = frame_system::Pallet::<T>::block_number();
		let validators = shared::ActiveValidatorKeys::<T>::get();

		// Collect candidate receipts with backers.
		let mut candidate_receipt_with_backing_validator_indices =
			Vec::with_capacity(candidates.len());
		let mut core_indices = Vec::with_capacity(candidates.len());
		for (para_id, para_candidates) in candidates {
			let mut latest_head_data = match Self::para_latest_head_data(para_id) {
				None => {
					defensive!("Latest included head data for paraid {:?} is None", para_id);
					continue
				},
				Some(latest_head_data) => latest_head_data,
			for (candidate, core) in para_candidates.iter() {
				let candidate_hash = candidate.candidate().hash();

				let check_ctx = CandidateCheckContext::<T>::new(None);
				let relay_parent_number = check_ctx.verify_backed_candidate(
					&allowed_relay_parents,
					candidate.candidate(),
					latest_head_data.clone(),
				)?;

				// The candidate based upon relay parent `N` should be backed by a
				// group assigned to core at block `N + 1`. Thus,
				// `relay_parent_number + 1` will always land in the current
				// session.
				let group_idx = scheduler::Pallet::<T>::group_assigned_to_core(
					relay_parent_number + One::one(),
				)
				.ok_or_else(|| {
					log::warn!(
						target: LOG_TARGET,
						"Failed to compute group index for candidate {:?}",
						candidate_hash
					);
					Error::<T>::InvalidAssignment
				})?;
				let group_vals =
					group_validators(group_idx).ok_or_else(|| Error::<T>::InvalidGroupIndex)?;

				// Check backing vote count and validity.
				let (backers, backer_idx_and_attestation) = Self::check_backing_votes(
					candidate,
					&validators,
					group_vals,
					core_index_enabled,
				)?;

				// Found a valid candidate.
				latest_head_data = candidate.candidate().commitments.head_data.clone();
				candidate_receipt_with_backing_validator_indices
					.push((candidate.receipt(), backer_idx_and_attestation));
				core_indices.push((*core, *para_id));

				// Update storage now
				PendingAvailability::<T>::mutate(&para_id, |pending_availability| {
					let new_candidate = CandidatePendingAvailability {
						core: *core,
						hash: candidate_hash,
						descriptor: candidate.candidate().descriptor.clone(),
						commitments: candidate.candidate().commitments.clone(),
						// initialize all availability votes to 0.
						availability_votes: bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()],
						relay_parent_number,
						backers: backers.to_bitvec(),
						backed_in_number: now,
						backing_group: group_idx,
					};

					if let Some(pending_availability) = pending_availability {
						pending_availability.push_back(new_candidate);
					} else {
						*pending_availability =
							Some([new_candidate].into_iter().collect::<VecDeque<_>>())
				// Deposit backed event.
				Self::deposit_event(Event::<T>::CandidateBacked(
					candidate.candidate().to_plain(),
					candidate.candidate().commitments.head_data.clone(),
					*core,
		Ok(ProcessedCandidates::<T::Hash> {
			core_indices,
			candidate_receipt_with_backing_validator_indices,
		})
	}
	// Get the latest backed output head data of this para.
	pub(crate) fn para_latest_head_data(para_id: &ParaId) -> Option<HeadData> {
		match PendingAvailability::<T>::get(para_id).and_then(|pending_candidates| {
			pending_candidates.back().map(|x| x.commitments.head_data.clone())
		}) {
			Some(head_data) => Some(head_data),
			None => paras::Heads::<T>::get(para_id),
	fn check_backing_votes(
		backed_candidate: &BackedCandidate<T::Hash>,
		validators: &[ValidatorId],
		group_vals: Vec<ValidatorIndex>,
		core_index_enabled: bool,
	) -> Result<(BitVec<u8, BitOrderLsb0>, Vec<(ValidatorIndex, ValidityAttestation)>), Error<T>> {
		let minimum_backing_votes = configuration::ActiveConfig::<T>::get().minimum_backing_votes;
		let mut backers = bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()];
		let signing_context = SigningContext {
			parent_hash: backed_candidate.descriptor().relay_parent,
			session_index: shared::CurrentSessionIndex::<T>::get(),
		let (validator_indices, _) =
			backed_candidate.validator_indices_and_core_index(core_index_enabled);

		// check the signatures in the backing and that it is a majority.
		let maybe_amount_validated = primitives::check_candidate_backing(
			backed_candidate.candidate().hash(),
			backed_candidate.validity_votes(),
			validator_indices,
			&signing_context,
			group_vals.len(),
			|intra_group_vi| {
				group_vals
					.get(intra_group_vi)
					.and_then(|vi| validators.get(vi.0 as usize))
					.map(|v| v.clone())
			},
		);
		match maybe_amount_validated {
			Ok(amount_validated) => ensure!(
				amount_validated >=
					effective_minimum_backing_votes(group_vals.len(), minimum_backing_votes),
				Error::<T>::InsufficientBacking,
			),
			Err(()) => {
				Err(Error::<T>::InvalidBacking)?;
			},
		}
		let mut backer_idx_and_attestation =
			Vec::<(ValidatorIndex, ValidityAttestation)>::with_capacity(
				validator_indices.count_ones(),

		for ((bit_idx, _), attestation) in validator_indices
			.iter()
			.enumerate()
			.filter(|(_, signed)| **signed)
			.zip(backed_candidate.validity_votes().iter().cloned())
		{
			let val_idx = group_vals.get(bit_idx).expect("this query succeeded above; qed");
			backer_idx_and_attestation.push((*val_idx, attestation));

			backers.set(val_idx.0 as _, true);
		Ok((backers, backer_idx_and_attestation))
	}

	/// Run the acceptance criteria checks on the given candidate commitments.
	pub(crate) fn check_validation_outputs_for_runtime_api(
		para_id: ParaId,
		relay_parent_number: BlockNumberFor<T>,
		validation_outputs: primitives::CandidateCommitments,
		let prev_context = paras::MostRecentContext::<T>::get(para_id);
		let check_ctx = CandidateCheckContext::<T>::new(prev_context);
		if check_ctx
			.check_validation_outputs(
				para_id,
				relay_parent_number,
				&validation_outputs.head_data,
				&validation_outputs.new_validation_code,
				validation_outputs.processed_downward_messages,
				&validation_outputs.upward_messages,
				BlockNumberFor::<T>::from(validation_outputs.hrmp_watermark),
				&validation_outputs.horizontal_messages,
			)
			.is_err()
		{
			log::debug!(
				target: LOG_TARGET,
				"Validation outputs checking for parachain `{}` failed",
				u32::from(para_id),
			);
			false
		} else {
			true
		}
	}

	fn enact_candidate(
		relay_parent_number: BlockNumberFor<T>,
		receipt: CommittedCandidateReceipt<T::Hash>,
		backers: BitVec<u8, BitOrderLsb0>,
		availability_votes: BitVec<u8, BitOrderLsb0>,
		core_index: CoreIndex,
		backing_group: GroupIndex,
	) -> Weight {
		let plain = receipt.to_plain();
		let commitments = receipt.commitments;
		let config = configuration::ActiveConfig::<T>::get();

		T::RewardValidators::reward_backing(
			backers
				.iter()
				.enumerate()
				.filter(|(_, backed)| **backed)
				.map(|(i, _)| ValidatorIndex(i as _)),
		);

		T::RewardValidators::reward_bitfields(
			availability_votes
				.iter()
				.enumerate()
				.filter(|(_, voted)| **voted)
				.map(|(i, _)| ValidatorIndex(i as _)),
		);

		// initial weight is config read.
		let mut weight = T::DbWeight::get().reads_writes(1, 0);
		if let Some(new_code) = commitments.new_validation_code {
			// Block number of candidate's inclusion.
			let now = frame_system::Pallet::<T>::block_number();
			weight.saturating_add(paras::Pallet::<T>::schedule_code_upgrade(
				receipt.descriptor.para_id,
				new_code,
				UpgradeStrategy::SetGoAheadSignal,
		}

		// enact the messaging facet of the candidate.
		weight.saturating_accrue(dmp::Pallet::<T>::prune_dmq(
			receipt.descriptor.para_id,
			commitments.processed_downward_messages,
		));
		weight.saturating_accrue(Self::receive_upward_messages(
			receipt.descriptor.para_id,
			commitments.upward_messages.as_slice(),
		));
		weight.saturating_accrue(hrmp::Pallet::<T>::prune_hrmp(
			receipt.descriptor.para_id,
			BlockNumberFor::<T>::from(commitments.hrmp_watermark),
		weight.saturating_accrue(hrmp::Pallet::<T>::queue_outbound_hrmp(
			receipt.descriptor.para_id,
			commitments.horizontal_messages,

		Self::deposit_event(Event::<T>::CandidateIncluded(
			plain,
			commitments.head_data.clone(),
			core_index,
			backing_group,
		));

		weight.saturating_add(paras::Pallet::<T>::note_new_head(
			receipt.descriptor.para_id,
			commitments.head_data,
			relay_parent_number,
		))
	}

	pub(crate) fn relay_dispatch_queue_size(para_id: ParaId) -> (u32, u32) {
		let fp = T::MessageQueue::footprint(AggregateMessageOrigin::Ump(UmpQueueId::Para(para_id)));
		(fp.storage.count as u32, fp.storage.size as u32)
	/// Check that all the upward messages sent by a candidate pass the acceptance criteria.
	pub(crate) fn check_upward_messages(
		config: &HostConfiguration<BlockNumberFor<T>>,
		para: ParaId,
		upward_messages: &[UpwardMessage],
	) -> Result<(), UmpAcceptanceCheckErr> {
		// Cannot send UMP messages while off-boarding.
		if paras::Pallet::<T>::is_offboarding(para) {
			ensure!(upward_messages.is_empty(), UmpAcceptanceCheckErr::IsOffboarding);
		}

		let additional_msgs = upward_messages.len() as u32;
		if additional_msgs > config.max_upward_message_num_per_candidate {
			return Err(UmpAcceptanceCheckErr::MoreMessagesThanPermitted {
				sent: additional_msgs,
				permitted: config.max_upward_message_num_per_candidate,
			})
		}

		let (para_queue_count, mut para_queue_size) = Self::relay_dispatch_queue_size(para);
		if para_queue_count.saturating_add(additional_msgs) > config.max_upward_queue_count {
			return Err(UmpAcceptanceCheckErr::CapacityExceeded {
				count: para_queue_count.saturating_add(additional_msgs).into(),
				limit: config.max_upward_queue_count.into(),
			})
		}

		for (idx, msg) in upward_messages.into_iter().enumerate() {
			let msg_size = msg.len() as u32;
			if msg_size > config.max_upward_message_size {
				return Err(UmpAcceptanceCheckErr::MessageSize {
					idx: idx as u32,
					max_size: config.max_upward_message_size,
				})
			}
			// make sure that the queue is not overfilled.
			// we do it here only once since returning false invalidates the whole relay-chain
			// block.
			if para_queue_size.saturating_add(msg_size) > config.max_upward_queue_size {
				return Err(UmpAcceptanceCheckErr::TotalSizeExceeded {
					total_size: para_queue_size.saturating_add(msg_size).into(),
					limit: config.max_upward_queue_size.into(),
			para_queue_size.saturating_accrue(msg_size);
		}

		Ok(())
	}

	/// Enqueues `upward_messages` from a `para`'s accepted candidate block.
	///
	/// This function is infallible since the candidate was already accepted and we therefore need
	/// to deal with the messages as given. Messages that are too long will be ignored since such
	/// candidates should have already been rejected in [`Self::check_upward_messages`].
	pub(crate) fn receive_upward_messages(para: ParaId, upward_messages: &[Vec<u8>]) -> Weight {
		let bounded = upward_messages
			.iter()
			.filter_map(|d| {
				BoundedSlice::try_from(&d[..])
					.map_err(|e| {
						defensive!("Accepted candidate contains too long msg, len=", d.len());
						e
					})
					.ok()
			})
			.collect();
		Self::receive_bounded_upward_messages(para, bounded)
	}

	/// Enqueues storage-bounded `upward_messages` from a `para`'s accepted candidate block.
	pub(crate) fn receive_bounded_upward_messages(
		para: ParaId,
		messages: Vec<BoundedSlice<'_, u8, MaxUmpMessageLenOf<T>>>,
	) -> Weight {
		let count = messages.len() as u32;
		if count == 0 {
			return Weight::zero()
		}

		T::MessageQueue::enqueue_messages(
			messages.into_iter(),
			AggregateMessageOrigin::Ump(UmpQueueId::Para(para)),
		);
		let weight = <T as Config>::WeightInfo::receive_upward_messages(count);
		Self::deposit_event(Event::UpwardMessagesReceived { from: para, count });
		weight
	/// Cleans up all timed out candidates as well as their descendant candidates.
	///
	/// Returns a vector of cleaned-up core IDs.
	pub(crate) fn free_timedout() -> Vec<CoreIndex> {