// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see .
//! A pallet which uses the XCMP transport layer to handle both incoming and outgoing XCM message
//! sending and dispatch, queuing, signalling and backpressure. To do so, it implements:
//! * `XcmpMessageHandler`
//! * `XcmpMessageSource`
//!
//! Also provides an implementation of `SendXcm` which can be placed in a router tuple for relaying
//! XCM over XCMP if the destination is `Parent/Parachain`. It requires an implementation of
//! `XcmExecutor` for dispatching incoming XCM messages.
//!
//! To prevent out of memory errors on the `OutboundXcmpMessages` queue, an exponential fee factor
//! (`DeliveryFeeFactor`) is set, much like the one used in DMP.
//! The fee factor increases whenever the total size of messages in a particular channel passes a
//! threshold. This threshold is defined as a percentage of the maximum total size the channel can
//! have. More concretely, the threshold is `max_total_size` / `THRESHOLD_FACTOR`, where:
//! - `max_total_size` is the maximum size, in bytes, of the channel, not number of messages.
//! It is defined in the channel configuration.
//! - `THRESHOLD_FACTOR` just declares which percentage of the max size is the actual threshold.
//! If it's 2, then the threshold is half of the max size, if it's 4, it's a quarter, and so on.
#![cfg_attr(not(feature = "std"), no_std)]
pub mod migration;
#[cfg(test)]
mod mock;
#[cfg(test)]
mod tests;
#[cfg(feature = "runtime-benchmarks")]
mod benchmarking;
#[cfg(feature = "bridging")]
pub mod bridging;
pub mod weights;
pub use weights::WeightInfo;
use bounded_collections::BoundedBTreeSet;
use codec::{Decode, DecodeLimit, Encode};
use cumulus_primitives_core::{
relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
};
use frame_support::{
defensive, defensive_assert,
traits::{EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueuePausedQuery},
weights::{Weight, WeightMeter},
BoundedVec,
};
use pallet_message_queue::OnQueueChanged;
use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
use polkadot_runtime_parachains::FeeTracker;
use scale_info::TypeInfo;
use sp_core::MAX_POSSIBLE_ALLOCATION;
use sp_runtime::{FixedU128, RuntimeDebug, Saturating};
use sp_std::prelude::*;
use xcm::{latest::prelude::*, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
use xcm_executor::traits::ConvertOrigin;
pub use pallet::*;
/// Index used to identify overweight XCMs.
pub type OverweightIndex = u64;
/// The max length of an XCMP message.
pub type MaxXcmpMessageLenOf =
<::XcmpQueue as EnqueueMessage>::MaxMessageLen;
const LOG_TARGET: &str = "xcmp_queue";
const DEFAULT_POV_SIZE: u64 = 64 * 1024; // 64 KB
/// Constants related to delivery fee calculation
pub mod delivery_fee_constants {
use super::FixedU128;
/// Fees will start increasing when queue is half full
pub const THRESHOLD_FACTOR: u32 = 2;
/// The base number the delivery fee factor gets multiplied by every time it is increased.
/// Also, the number it gets divided by when decreased.
pub const EXPONENTIAL_FEE_BASE: FixedU128 = FixedU128::from_rational(105, 100); // 1.05
/// The contribution of each KB to a fee factor increase
pub const MESSAGE_SIZE_FEE_BASE: FixedU128 = FixedU128::from_rational(1, 1000); // 0.001
}
#[frame_support::pallet]
pub mod pallet {
use super::*;
use frame_support::{pallet_prelude::*, Twox64Concat};
use frame_system::pallet_prelude::*;
#[pallet::pallet]
#[pallet::storage_version(migration::STORAGE_VERSION)]
#[pallet::without_storage_info]
pub struct Pallet(_);
#[pallet::config]
pub trait Config: frame_system::Config {
type RuntimeEvent: From> + IsType<::RuntimeEvent>;
/// Information on the available XCMP channels.
type ChannelInfo: GetChannelInfo;
/// Means of converting an `Xcm` into a `VersionedXcm`.
type VersionWrapper: WrapVersion;
/// Enqueue an inbound horizontal message for later processing.
///
/// This defines the maximal message length via [`crate::MaxXcmpMessageLenOf`]. The pallet
/// assumes that this hook will eventually process all the pushed messages.
type XcmpQueue: EnqueueMessage;
/// The maximum number of inbound XCMP channels that can be suspended simultaneously.
///
/// Any further channel suspensions will fail and messages may get dropped without further
/// notice. Choosing a high value (1000) is okay; the trade-off that is described in
/// [`InboundXcmpSuspended`] still applies at that scale.
#[pallet::constant]
type MaxInboundSuspended: Get;
/// The origin that is allowed to resume or suspend the XCMP queue.
type ControllerOrigin: EnsureOrigin;
/// The conversion function used to attempt to convert an XCM `Location` origin to a
/// superuser origin.
type ControllerOriginConverter: ConvertOrigin;
/// The price for delivering an XCM to a sibling parachain destination.
type PriceForSiblingDelivery: PriceForMessageDelivery;
/// The weight information of this pallet.
type WeightInfo: WeightInfo;
}
#[pallet::call]
impl Pallet {
/// Suspends all XCM executions for the XCMP queue, regardless of the sender's origin.
///
/// - `origin`: Must pass `ControllerOrigin`.
#[pallet::call_index(1)]
#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
pub fn suspend_xcm_execution(origin: OriginFor) -> DispatchResult {
T::ControllerOrigin::ensure_origin(origin)?;
QueueSuspended::::try_mutate(|suspended| {
if *suspended {
Err(Error::::AlreadySuspended.into())
} else {
*suspended = true;
Ok(())
}
})
}
/// Resumes all XCM executions for the XCMP queue.
///
/// Note that this function doesn't change the status of the in/out bound channels.
///
/// - `origin`: Must pass `ControllerOrigin`.
#[pallet::call_index(2)]
#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
pub fn resume_xcm_execution(origin: OriginFor) -> DispatchResult {
T::ControllerOrigin::ensure_origin(origin)?;
QueueSuspended::::try_mutate(|suspended| {
if !*suspended {
Err(Error::::AlreadyResumed.into())
} else {
*suspended = false;
Ok(())
}
})
}
/// Overwrites the number of pages which must be in the queue for the other side to be
/// told to suspend their sending.
///
/// - `origin`: Must pass `Root`.
/// - `new`: Desired value for `QueueConfigData.suspend_value`
#[pallet::call_index(3)]
#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
pub fn update_suspend_threshold(origin: OriginFor, new: u32) -> DispatchResult {
ensure_root(origin)?;
QueueConfig::::try_mutate(|data| {
data.suspend_threshold = new;
data.validate::()
})
}
/// Overwrites the number of pages which must be in the queue after which we drop any
/// further messages from the channel.
///
/// - `origin`: Must pass `Root`.
/// - `new`: Desired value for `QueueConfigData.drop_threshold`
#[pallet::call_index(4)]
#[pallet::weight((T::WeightInfo::set_config_with_u32(),DispatchClass::Operational,))]
pub fn update_drop_threshold(origin: OriginFor, new: u32) -> DispatchResult {
ensure_root(origin)?;
QueueConfig::::try_mutate(|data| {
data.drop_threshold = new;
data.validate::()
})
}
/// Overwrites the number of pages which the queue must be reduced to before it signals
/// that message sending may recommence after it has been suspended.
///
/// - `origin`: Must pass `Root`.
/// - `new`: Desired value for `QueueConfigData.resume_threshold`
#[pallet::call_index(5)]
#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
pub fn update_resume_threshold(origin: OriginFor, new: u32) -> DispatchResult {
ensure_root(origin)?;
QueueConfig::::try_mutate(|data| {
data.resume_threshold = new;
data.validate::()
})
}
}
#[pallet::hooks]
impl Hooks> for Pallet {
fn integrity_test() {
let w = Self::on_idle_weight();
assert!(w != Weight::zero());
assert!(w.all_lte(T::BlockWeights::get().max_block));
}
fn on_idle(_block: BlockNumberFor, limit: Weight) -> Weight {
let mut meter = WeightMeter::with_limit(limit);
if meter.try_consume(Self::on_idle_weight()).is_err() {
log::debug!(
"Not enough weight for on_idle. {} < {}",
Self::on_idle_weight(),
limit
);
return meter.consumed()
}
migration::v3::lazy_migrate_inbound_queue::();
meter.consumed()
}
}
#[pallet::event]
#[pallet::generate_deposit(pub(super) fn deposit_event)]
pub enum Event {
/// An HRMP message was sent to a sibling parachain.
XcmpMessageSent { message_hash: XcmHash },
}
#[pallet::error]
pub enum Error {
/// Setting the queue config failed since one of its values was invalid.
BadQueueConfig,
/// The execution is already suspended.
AlreadySuspended,
/// The execution is already resumed.
AlreadyResumed,
}
/// The suspended inbound XCMP channels. All others are not suspended.
///
/// This is a `StorageValue` instead of a `StorageMap` since we expect multiple reads per block
/// to different keys with a one byte payload. The access to `BoundedBTreeSet` will be cached
/// within the block and therefore only included once in the proof size.
///
/// NOTE: The PoV benchmarking cannot know this and will over-estimate, but the actual proof
/// will be smaller.
#[pallet::storage]
pub type InboundXcmpSuspended =
StorageValue<_, BoundedBTreeSet, ValueQuery>;
/// The non-empty XCMP channels in order of becoming non-empty, and the index of the first
/// and last outbound message. If the two indices are equal, then it indicates an empty
/// queue and there must be a non-`Ok` `OutboundStatus`. We assume queues grow no greater
/// than 65535 items. Queue indices for normal messages begin at one; zero is reserved in
/// case of the need to send a high-priority signal message this block.
/// The bool is true if there is a signal message waiting to be sent.
#[pallet::storage]
pub(super) type OutboundXcmpStatus =
StorageValue<_, Vec, ValueQuery>;
// The new way of doing it:
/// The messages outbound in a given XCMP channel.
#[pallet::storage]
pub(super) type OutboundXcmpMessages =
StorageDoubleMap<_, Blake2_128Concat, ParaId, Twox64Concat, u16, Vec, ValueQuery>;
/// Any signal messages waiting to be sent.
#[pallet::storage]
pub(super) type SignalMessages =
StorageMap<_, Blake2_128Concat, ParaId, Vec, ValueQuery>;
/// The configuration which controls the dynamics of the outbound queue.
#[pallet::storage]
pub(super) type QueueConfig = StorageValue<_, QueueConfigData, ValueQuery>;
/// Whether or not the XCMP queue is suspended from executing incoming XCMs or not.
#[pallet::storage]
pub(super) type QueueSuspended = StorageValue<_, bool, ValueQuery>;
/// Initialization value for the DeliveryFee factor.
#[pallet::type_value]
pub fn InitialFactor() -> FixedU128 {
FixedU128::from_u32(1)
}
/// The factor to multiply the base delivery fee by.
#[pallet::storage]
pub(super) type DeliveryFeeFactor =
StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, InitialFactor>;
}
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
pub enum OutboundState {
Ok,
Suspended,
}
/// Struct containing detailed information about the outbound channel.
#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo)]
#[cfg_attr(feature = "std", derive(Debug))]
pub struct OutboundChannelDetails {
/// The `ParaId` of the parachain that this channel is connected with.
recipient: ParaId,
/// The state of the channel.
state: OutboundState,
/// Whether or not any signals exist in this channel.
signals_exist: bool,
/// The index of the first outbound message.
first_index: u16,
/// The index of the last outbound message.
last_index: u16,
}
impl OutboundChannelDetails {
pub fn new(recipient: ParaId) -> OutboundChannelDetails {
OutboundChannelDetails {
recipient,
state: OutboundState::Ok,
signals_exist: false,
first_index: 0,
last_index: 0,
}
}
pub fn with_signals(mut self) -> OutboundChannelDetails {
self.signals_exist = true;
self
}
pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
self.state = OutboundState::Suspended;
self
}
}
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
pub struct QueueConfigData {
/// The number of pages which must be in the queue for the other side to be told to suspend
/// their sending.
suspend_threshold: u32,
/// The number of pages which must be in the queue after which we drop any further messages
/// from the channel. This should normally not happen since the `suspend_threshold` can be used
/// to suspend the channel.
drop_threshold: u32,
/// The number of pages which the queue must be reduced to before it signals that
/// message sending may recommence after it has been suspended.
resume_threshold: u32,
}
impl Default for QueueConfigData {
fn default() -> Self {
// NOTE that these default values are only used on genesis. They should give a rough idea of
// what to set these values to, but is in no way a requirement.
Self {
drop_threshold: 48, // 64KiB * 48 = 3MiB
suspend_threshold: 32, // 64KiB * 32 = 2MiB
resume_threshold: 8, // 64KiB * 8 = 512KiB
}
}
}
impl QueueConfigData {
/// Validate all assumptions about `Self`.
///
/// Should be called prior to accepting this as new config.
pub fn validate(&self) -> sp_runtime::DispatchResult {
if self.resume_threshold < self.suspend_threshold &&
self.suspend_threshold <= self.drop_threshold &&
self.resume_threshold > 0
{
Ok(())
} else {
Err(Error::::BadQueueConfig.into())
}
}
}
#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
pub enum ChannelSignal {
Suspend,
Resume,
}
impl Pallet {
/// Place a message `fragment` on the outgoing XCMP queue for `recipient`.
///
/// Format is the type of aggregate message that the `fragment` may be safely encoded and
/// appended onto. Whether earlier unused space is used for the fragment at the risk of sending
/// it out of order is determined with `qos`. NOTE: For any two messages to be guaranteed to be
/// dispatched in order, then both must be sent with `ServiceQuality::Ordered`.
///
/// ## Background
///
/// For our purposes, one HRMP "message" is actually an aggregated block of XCM "messages".
///
/// For the sake of clarity, we distinguish between them as message AGGREGATEs versus
/// message FRAGMENTs.
///
/// So each AGGREGATE is comprised of one or more concatenated SCALE-encoded `Vec`
/// FRAGMENTs. Though each fragment is already probably a SCALE-encoded Xcm, we can't be
/// certain, so we SCALE encode each `Vec` fragment in order to ensure we have the
/// length prefixed and can thus decode each fragment from the aggregate stream. With this,
/// we can concatenate them into a single aggregate blob without needing to be concerned
/// about encoding fragment boundaries.
///
/// If successful, returns the number of pages in the outbound queue after enqueuing the new
/// fragment.
fn send_fragment(
recipient: ParaId,
format: XcmpMessageFormat,
fragment: Fragment,
) -> Result {
let encoded_fragment = fragment.encode();
// Optimization note: `max_message_size` could potentially be stored in
// `OutboundXcmpMessages` once known; that way it's only accessed when a new page is needed.
let channel_info =
T::ChannelInfo::get_channel_info(recipient).ok_or(MessageSendError::NoChannel)?;
// Max message size refers to aggregates, or pages. Not to individual fragments.
let max_message_size = channel_info.max_message_size as usize;
let format_size = format.encoded_size();
// We check the encoded fragment length plus the format size against the max message size
// because the format is concatenated if a new page is needed.
let size_to_check = encoded_fragment
.len()
.checked_add(format_size)
.ok_or(MessageSendError::TooBig)?;
if size_to_check > max_message_size {
return Err(MessageSendError::TooBig)
}
let mut all_channels = >::get();
let channel_details = if let Some(details) =
all_channels.iter_mut().find(|channel| channel.recipient == recipient)
{
details
} else {
all_channels.push(OutboundChannelDetails::new(recipient));
all_channels
.last_mut()
.expect("can't be empty; a new element was just pushed; qed")
};
let have_active = channel_details.last_index > channel_details.first_index;
// Try to append fragment to the last page, if there is enough space.
// We return the size of the last page inside of the option, to not calculate it again.
let appended_to_last_page = have_active
.then(|| {
>::mutate(
recipient,
channel_details.last_index - 1,
|page| {
if XcmpMessageFormat::decode_with_depth_limit(
MAX_XCM_DECODE_DEPTH,
&mut &page[..],
) != Ok(format)
{
defensive!("Bad format in outbound queue; dropping message");
return None
}
if page.len() + encoded_fragment.len() > max_message_size {
return None
}
page.extend_from_slice(&encoded_fragment[..]);
Some(page.len())
},
)
})
.flatten();
let (number_of_pages, last_page_size) = if let Some(size) = appended_to_last_page {
let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
(number_of_pages, size)
} else {
// Need to add a new page.
let page_index = channel_details.last_index;
channel_details.last_index += 1;
let mut new_page = format.encode();
new_page.extend_from_slice(&encoded_fragment[..]);
let last_page_size = new_page.len();
let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
>::insert(recipient, page_index, new_page);
>::put(all_channels);
(number_of_pages, last_page_size)
};
// We have to count the total size here since `channel_info.total_size` is not updated at
// this point in time. We assume all previous pages are filled, which, in practice, is not
// always the case.
let total_size =
number_of_pages.saturating_sub(1) * max_message_size as u32 + last_page_size as u32;
let threshold = channel_info.max_total_size / delivery_fee_constants::THRESHOLD_FACTOR;
if total_size > threshold {
let message_size_factor = FixedU128::from((encoded_fragment.len() / 1024) as u128)
.saturating_mul(delivery_fee_constants::MESSAGE_SIZE_FEE_BASE);
Self::increase_fee_factor(recipient, message_size_factor);
}
Ok(number_of_pages)
}
/// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this
/// block.
fn send_signal(dest: ParaId, signal: ChannelSignal) {
let mut s = >::get();
if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
details.signals_exist = true;
} else {
s.push(OutboundChannelDetails::new(dest).with_signals());
}
>::mutate(dest, |page| {
*page = (XcmpMessageFormat::Signals, signal).encode();
});
>::put(s);
}
fn suspend_channel(target: ParaId) {
>::mutate(|s| {
if let Some(details) = s.iter_mut().find(|item| item.recipient == target) {
let ok = details.state == OutboundState::Ok;
defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
details.state = OutboundState::Suspended;
} else {
s.push(OutboundChannelDetails::new(target).with_suspended_state());
}
});
}
fn resume_channel(target: ParaId) {
>::mutate(|s| {
if let Some(index) = s.iter().position(|item| item.recipient == target) {
let suspended = s[index].state == OutboundState::Suspended;
defensive_assert!(
suspended,
"WARNING: Attempt to resume channel that was not suspended."
);
if s[index].first_index == s[index].last_index {
s.remove(index);
} else {
s[index].state = OutboundState::Ok;
}
} else {
defensive!("WARNING: Attempt to resume channel that was not suspended.");
}
});
}
fn enqueue_xcmp_message(
sender: ParaId,
xcm: BoundedVec>,
meter: &mut WeightMeter,
) -> Result<(), ()> {
if meter.try_consume(T::WeightInfo::enqueue_xcmp_message()).is_err() {
defensive!("Out of weight: cannot enqueue XCMP messages; dropping msg");
return Err(())
}
let QueueConfigData { drop_threshold, .. } = >::get();
let fp = T::XcmpQueue::footprint(sender);
// Assume that it will not fit into the current page:
let new_pages = fp.ready_pages.saturating_add(1);
if new_pages > drop_threshold {
// This should not happen since the channel should have been suspended in
// [`on_queue_changed`].
log::error!("XCMP queue for sibling {:?} is full; dropping messages.", sender);
return Err(())
}
T::XcmpQueue::enqueue_message(xcm.as_bounded_slice(), sender);
Ok(())
}
/// Split concatenated encoded `VersionedXcm`s or `MaybeDoubleEncodedVersionedXcm`s into
/// individual items.
///
/// We directly encode them again since that is needed later on.
pub(crate) fn take_first_concatenated_xcm(
data: &mut &[u8],
meter: &mut WeightMeter,
) -> Result>, ()> {
if data.is_empty() {
return Err(())
}
if meter.try_consume(T::WeightInfo::take_first_concatenated_xcm()).is_err() {
defensive!("Out of weight; could not decode all; dropping");
return Err(())
}
let xcm = VersionedXcm::<()>::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, data)
.map_err(|_| ())?;
xcm.encode().try_into().map_err(|_| ())
}
/// The worst-case weight of `on_idle`.
pub fn on_idle_weight() -> Weight {
::WeightInfo::on_idle_good_msg()
.max(::WeightInfo::on_idle_large_msg())
}
#[cfg(feature = "bridging")]
fn is_inbound_channel_suspended(sender: ParaId) -> bool {
>::get().iter().any(|c| c == &sender)
}
#[cfg(feature = "bridging")]
/// Returns tuple of `OutboundState` and number of queued pages.
fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
>::get().iter().find(|c| c.recipient == target).map(|c| {
let queued_pages = c.last_index.saturating_sub(c.first_index);
(c.state, queued_pages)
})
}
}
impl OnQueueChanged for Pallet {
// Suspends/Resumes the queue when certain thresholds are reached.
fn on_queue_changed(para: ParaId, fp: QueueFootprint) {
let QueueConfigData { resume_threshold, suspend_threshold, .. } = >::get();
let mut suspended_channels = >::get();
let suspended = suspended_channels.contains(¶);
if suspended && fp.ready_pages <= resume_threshold {
Self::send_signal(para, ChannelSignal::Resume);
suspended_channels.remove(¶);
>::put(suspended_channels);
} else if !suspended && fp.ready_pages >= suspend_threshold {
log::warn!("XCMP queue for sibling {:?} is full; suspending channel.", para);
Self::send_signal(para, ChannelSignal::Suspend);
if let Err(err) = suspended_channels.try_insert(para) {
log::error!("Too many channels suspended; cannot suspend sibling {:?}: {:?}; further messages may be dropped.", para, err);
}
>::put(suspended_channels);
}
}
}
impl QueuePausedQuery for Pallet {
fn is_paused(para: &ParaId) -> bool {
if !QueueSuspended::::get() {
return false
}
// Make an exception for the superuser queue:
let sender_origin = T::ControllerOriginConverter::convert_origin(
(Parent, Parachain((*para).into())),
OriginKind::Superuser,
);
let is_controller =
sender_origin.map_or(false, |origin| T::ControllerOrigin::try_origin(origin).is_ok());
!is_controller
}
}
impl XcmpMessageHandler for Pallet {
fn handle_xcmp_messages<'a, I: Iterator- >(
iter: I,
max_weight: Weight,
) -> Weight {
let mut meter = WeightMeter::with_limit(max_weight);
for (sender, _sent_at, mut data) in iter {
let format = match XcmpMessageFormat::decode(&mut data) {
Ok(f) => f,
Err(_) => {
defensive!("Unknown XCMP message format - dropping");
continue
},
};
match format {
XcmpMessageFormat::Signals =>
while !data.is_empty() {
if meter
.try_consume(
T::WeightInfo::suspend_channel()
.max(T::WeightInfo::resume_channel()),
)
.is_err()
{
defensive!("Not enough weight to process signals - dropping");
break
}
match ChannelSignal::decode(&mut data) {
Ok(ChannelSignal::Suspend) => Self::suspend_channel(sender),
Ok(ChannelSignal::Resume) => Self::resume_channel(sender),
Err(_) => {
defensive!("Undecodable channel signal - dropping");
break
},
}
},
XcmpMessageFormat::ConcatenatedVersionedXcm =>
while !data.is_empty() {
let Ok(xcm) = Self::take_first_concatenated_xcm(&mut data, &mut meter)
else {
defensive!("HRMP inbound decode stream broke; page will be dropped.",);
break
};
if let Err(()) = Self::enqueue_xcmp_message(sender, xcm, &mut meter) {
defensive!(
"Could not enqueue XCMP messages. Used weight: ",
meter.consumed_ratio()
);
break
}
},
XcmpMessageFormat::ConcatenatedEncodedBlob => {
defensive!("Blob messages are unhandled - dropping");
continue
},
}
}
meter.consumed()
}
}
impl XcmpMessageSource for Pallet {
fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec)> {
let mut statuses = >::get();
let old_statuses_len = statuses.len();
let max_message_count = statuses.len().min(maximum_channels);
let mut result = Vec::with_capacity(max_message_count);
for status in statuses.iter_mut() {
let OutboundChannelDetails {
recipient: para_id,
state: outbound_state,
mut signals_exist,
mut first_index,
mut last_index,
} = *status;
let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
ChannelStatus::Closed => {
// This means that there is no such channel anymore. Nothing to be done but
// swallow the messages and discard the status.
for i in first_index..last_index {
>::remove(para_id, i);
}
if signals_exist {
>::remove(para_id);
}
*status = OutboundChannelDetails::new(para_id);
continue
},
ChannelStatus::Full => continue,
ChannelStatus::Ready(n, e) => (n, e),
};
// This is a hard limit from the host config; not even signals can bypass it.
if result.len() == max_message_count {
// We check this condition in the beginning of the loop so that we don't include
// a message where the limit is 0.
break
}
let page = if signals_exist {
let page = >::get(para_id);
defensive_assert!(!page.is_empty(), "Signals must exist");
if page.len() < max_size_now {
>::remove(para_id);
signals_exist = false;
page
} else {
defensive!("Signals should fit into a single page");
continue
}
} else if outbound_state == OutboundState::Suspended {
// Signals are exempt from suspension.
continue
} else if last_index > first_index {
let page = >::get(para_id, first_index);
if page.len() < max_size_now {
>::remove(para_id, first_index);
first_index += 1;
page
} else {
continue
}
} else {
continue
};
if first_index == last_index {
first_index = 0;
last_index = 0;
}
if page.len() > max_size_ever {
// TODO: #274 This means that the channel's max message size has changed since
// the message was sent. We should parse it and split into smaller messages but
// since it's so unlikely then for now we just drop it.
defensive!("WARNING: oversize message in queue - dropping");
} else {
result.push((para_id, page));
}
let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
Some(channel_info) => channel_info.max_total_size,
None => {
log::warn!("calling `get_channel_info` with no RelevantMessagingState?!");
MAX_POSSIBLE_ALLOCATION // We use this as a fallback in case the messaging state is not present
},
};
let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
let remaining_total_size: usize = (first_index..last_index)
.map(|index| OutboundXcmpMessages::::decode_len(para_id, index).unwrap())
.sum();
if remaining_total_size <= threshold as usize {
Self::decrease_fee_factor(para_id);
}
*status = OutboundChannelDetails {
recipient: para_id,
state: outbound_state,
signals_exist,
first_index,
last_index,
};
}
debug_assert!(!statuses.iter().any(|s| s.signals_exist), "Signals should be handled");
// Sort the outbound messages by ascending recipient para id to satisfy the acceptance
// criteria requirement.
result.sort_by_key(|m| m.0);
// Prune hrmp channels that became empty. Additionally, because it may so happen that we
// only gave attention to some channels in `non_empty_hrmp_channels` it's important to
// change the order. Otherwise, the next `on_finalize` we will again give attention
// only to those channels that happen to be in the beginning, until they are emptied.
// This leads to "starvation" of the channels near to the end.
//
// To mitigate this we shift all processed elements towards the end of the vector using
// `rotate_left`. To get intuition how it works see the examples in its rustdoc.
statuses.retain(|x| {
x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index
});
// old_status_len must be >= status.len() since we never add anything to status.
let pruned = old_statuses_len - statuses.len();
// removing an item from status implies a message being sent, so the result messages must
// be no less than the pruned channels.
statuses.rotate_left(result.len().saturating_sub(pruned));
>::put(statuses);
result
}
}
/// Xcm sender for sending to a sibling parachain.
impl SendXcm for Pallet {
type Ticket = (ParaId, VersionedXcm<()>);
fn validate(
dest: &mut Option,
msg: &mut Option>,
) -> SendResult<(ParaId, VersionedXcm<()>)> {
let d = dest.take().ok_or(SendError::MissingArgument)?;
match d.unpack() {
// An HRMP message for a sibling parachain.
(1, [Parachain(id)]) => {
let xcm = msg.take().ok_or(SendError::MissingArgument)?;
let id = ParaId::from(*id);
let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
.map_err(|()| SendError::DestinationUnsupported)?;
validate_xcm_nesting(&versioned_xcm)
.map_err(|()| SendError::ExceedsMaxMessageSize)?;
Ok(((id, versioned_xcm), price))
},
_ => {
// Anything else is unhandled. This includes a message that is not meant for us.
// We need to make sure that dest/msg is not consumed here.
*dest = Some(d);
Err(SendError::NotApplicable)
},
}
}
fn deliver((id, xcm): (ParaId, VersionedXcm<()>)) -> Result {
let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
defensive_assert!(
validate_xcm_nesting(&xcm).is_ok(),
"Tickets are valid prior to delivery by trait XCM; qed"
);
match Self::send_fragment(id, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm) {
Ok(_) => {
Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
Ok(hash)
},
Err(e) => {
log::error!(target: LOG_TARGET, "Deliver error: {e:?}");
Err(SendError::Transport(e.into()))
},
}
}
}
/// Checks that the XCM is decodable with `MAX_XCM_DECODE_DEPTH`.
///
/// Note that this uses the limit of the sender - not the receiver. It it best effort.
pub(crate) fn validate_xcm_nesting(xcm: &VersionedXcm<()>) -> Result<(), ()> {
xcm.using_encoded(|mut enc| {
VersionedXcm::<()>::decode_all_with_depth_limit(MAX_XCM_DECODE_DEPTH, &mut enc).map(|_| ())
})
.map_err(|_| ())
}
impl FeeTracker for Pallet {
type Id = ParaId;
fn get_fee_factor(id: Self::Id) -> FixedU128 {
>::get(id)
}
fn increase_fee_factor(id: Self::Id, message_size_factor: FixedU128) -> FixedU128 {
>::mutate(id, |f| {
*f = f.saturating_mul(
delivery_fee_constants::EXPONENTIAL_FEE_BASE.saturating_add(message_size_factor),
);
*f
})
}
fn decrease_fee_factor(id: Self::Id) -> FixedU128 {
>::mutate(id, |f| {
*f = InitialFactor::get().max(*f / delivery_fee_constants::EXPONENTIAL_FEE_BASE);
*f
})
}
}