// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! # Generalized Message Queue Pallet
//!
//! Provides generalized message queuing and processing capabilities on a per-queue basis for
//! arbitrary use-cases.
//!
//! # Design Goals
//!
//! 1. Minimal assumptions about `Message`s and `MessageOrigin`s. Both should be MEL bounded blobs.
//! This ensures the generality and reusability of the pallet.
//! 2. Well known and tightly limited pre-dispatch PoV weights, especially for message execution.
//! This is paramount for the success of the pallet since message execution is done in
//! `on_initialize` which must _never_ under-estimate its PoV weight. It also needs a frugal PoV
//! footprint since PoV is scarce and this is (possibly) done in every block. This must also hold
//! in the presence of unpredictable message size distributions.
//! 3. Usable as XCMP, DMP and UMP message/dispatch queue - possibly through adapter types.
//!
//! # Design
//!
//! The pallet has means to enqueue, store and process messages. This is implemented by having
//! *queues* which store enqueued messages and can be *served* to process said messages. A queue is
//! identified by its origin in the `BookStateFor`. Each message has an origin which defines into
//! which queue it will be stored. Messages are stored by being appended to the last [`Page`] of a
//! book. Each book keeps track of its pages by indexing `Pages`. The `ReadyRing` contains all
//! queues which hold at least one unprocessed message and are thereby *ready* to be serviced. The
//! `ServiceHead` indicates which *ready* queue is the next to be serviced.
//! The pallet implements [`frame_support::traits::EnqueueMessage`],
//! [`frame_support::traits::ServiceQueues`] and has [`frame_support::traits::ProcessMessage`] and
//! [`OnQueueChanged`] hooks to communicate with the outside world.
//!
//! NOTE: The storage items are not linked since they are not public.
//!
//! **Message Execution**
//!
//! Executing a message is offloaded to the [`Config::MessageProcessor`] which contains the actual
//! logic of how to handle the message since they are blobs. A message can be temporarily or
//! permanently overweight. The pallet will perpetually try to execute a temporarily overweight
//! message. A permanently overweight message is skipped and must be executed manually.
//!
//! **Pagination**
//!
//! Queues are stored in a *paged* manner by splitting their messages into [`Page`]s. This results
//! in a lot of complexity when implementing the pallet but is completely necessary to archive the
//! second #[Design Goal](design-goals). The problem comes from the fact a message can *possibly* be
//! quite large, lets say 64KiB. This then results in a *MEL* of at least 64KiB which results in a
//! PoV of at least 64KiB. Now we have the assumption that most messages are much shorter than their
//! maximum allowed length. This would result in most messages having a pre-dispatch PoV size which
//! is much larger than their post-dispatch PoV size, possibly by a factor of thousand. Disregarding
//! this observation would cripple the processing power of the pallet since it cannot straighten out
//! this discrepancy at runtime. Conceptually, the implementation is packing as many messages into a
//! single bounded vec, as actually fit into the bounds. This reduces the wasted PoV.
//!
//! **Page Data Layout**
//!
//! A Page contains a heap which holds all its messages. The heap is built by concatenating
//! `(ItemHeader, Message)` pairs. The [`ItemHeader`] contains the length of the message which is
//! needed for retrieving it. This layout allows for constant access time of the next message and
//! linear access time for any message in the page. The header must remain minimal to reduce its PoV
//! impact.
//!
//! **Weight Metering**
//!
//! The pallet utilizes the [`sp_weights::WeightMeter`] to manually track its consumption to always
//! stay within the required limit. This implies that the message processor hook can calculate the
//! weight of a message without executing it. This restricts the possible use-cases but is necessary
//! since the pallet runs in `on_initialize` which has a hard weight limit. The weight meter is used
//! in a way that `can_accrue` and `check_accrue` are always used to check the remaining weight of
//! an operation before committing to it. The process of exiting due to insufficient weight is
//! termed "bailing".
//!
//! # Scenario: Message enqueuing
//!
//! A message `m` is enqueued for origin `o` into queue `Q[o]` through
//! [`frame_support::traits::EnqueueMessage::enqueue_message`]`(m, o)`.
//!
//! First the queue is either loaded if it exists or otherwise created with empty default values.
//! The message is then inserted to the queue by appended it into its last `Page` or by creating a
//! new `Page` just for `m` if it does not fit in there. The number of messages in the `Book` is
//! incremented.
//!
//! `Q[o]` is now *ready* which will eventually result in `m` being processed.
//!
//! # Scenario: Message processing
//!
//! The pallet runs each block in `on_initialize` or when being manually called through
//! [`frame_support::traits::ServiceQueues::service_queues`].
//!
//! First it tries to "rotate" the `ReadyRing` by one through advancing the `ServiceHead` to the
//! next *ready* queue. It then starts to service this queue by servicing as many pages of it as
//! possible. Servicing a page means to execute as many message of it as possible. Each executed
//! message is marked as *processed* if the [`Config::MessageProcessor`] return Ok. An event
//! [`Event::Processed`] is emitted afterwards. It is possible that the weight limit of the pallet
//! will never allow a specific message to be executed. In this case it remains as unprocessed and
//! is skipped. This process stops if either there are no more messages in the queue or the
//! remaining weight became insufficient to service this queue. If there is enough weight it tries
//! to advance to the next *ready* queue and service it. This continues until there are no more
//! queues on which it can make progress or not enough weight to check that.
//!
//! # Scenario: Overweight execution
//!
//! A permanently over-weight message which was skipped by the message processing will never be
//! executed automatically through `on_initialize` nor by calling
//! [`frame_support::traits::ServiceQueues::service_queues`].
//!
//! Manual intervention in the form of
//! [`frame_support::traits::ServiceQueues::execute_overweight`] is necessary. Overweight messages
//! emit an [`Event::OverweightEnqueued`] event which can be used to extract the arguments for
//! manual execution. This only works on permanently overweight messages. There is no guarantee that
//! this will work since the message could be part of a stale page and be reaped before execution
//! commences.
//!
//! # Terminology
//!
//! - `Message`: A blob of data into which the pallet has no introspection, defined as
//! [`BoundedSlice>`]. The message length is limited by [`MaxMessageLenOf`]
//! which is calculated from [`Config::HeapSize`] and [`ItemHeader::max_encoded_len()`].
//! - `MessageOrigin`: A generic *origin* of a message, defined as [`MessageOriginOf`]. The
//! requirements for it are kept minimal to remain as generic as possible. The type is defined in
//! [`frame_support::traits::ProcessMessage::Origin`].
//! - `Page`: An array of `Message`s, see [`Page`]. Can never be empty.
//! - `Book`: A list of `Page`s, see [`BookState`]. Can be empty.
//! - `Queue`: A `Book` together with an `MessageOrigin` which can be part of the `ReadyRing`. Can
//! be empty.
//! - `ReadyRing`: A double-linked list which contains all *ready* `Queue`s. It chains together the
//! queues via their `ready_neighbours` fields. A `Queue` is *ready* if it contains at least one
//! `Message` which can be processed. Can be empty.
//! - `ServiceHead`: A pointer into the `ReadyRing` to the next `Queue` to be serviced.
//! - (`un`)`processed`: A message is marked as *processed* after it was executed by the pallet. A
//! message which was either: not yet executed or could not be executed remains as `unprocessed`
//! which is the default state for a message after being enqueued.
//! - `knitting`/`unknitting`: The means of adding or removing a `Queue` from the `ReadyRing`.
//! - `MEL`: The Max Encoded Length of a type, see [`codec::MaxEncodedLen`].
//!
//! # Properties
//!
//! **Liveness - Enqueueing**
//!
//! It is always possible to enqueue any message for any `MessageOrigin`.
//!
//! **Liveness - Processing**
//!
//! `on_initialize` always respects its finite weight-limit.
//!
//! **Progress - Enqueueing**
//!
//! An enqueued message immediately becomes *unprocessed* and thereby eligible for execution.
//!
//! **Progress - Processing**
//!
//! The pallet will execute at least one unprocessed message per block, if there is any. Ensuring
//! this property needs careful consideration of the concrete weights, since it is possible that the
//! weight limit of `on_initialize` never allows for the execution of even one message; trivially if
//! the limit is set to zero. `integrity_test` can be used to ensure that this property holds.
//!
//! **Fairness - Enqueuing**
//!
//! Enqueueing a message for a specific `MessageOrigin` does not influence the ability to enqueue a
//! message for the same of any other `MessageOrigin`; guaranteed by **Liveness - Enqueueing**.
//!
//! **Fairness - Processing**
//!
//! The average amount of weight available for message processing is the same for each queue if the
//! number of queues is constant. Creating a new queue must therefore be, possibly economically,
//! expensive. Currently this is archived by having one queue per para-chain/thread, which keeps the
//! number of queues within `O(n)` and should be "good enough".
#![cfg_attr(not(feature = "std"), no_std)]
mod benchmarking;
mod integration_test;
mod mock;
pub mod mock_helpers;
mod tests;
pub mod weights;
use codec::{Codec, Decode, Encode, MaxEncodedLen};
use frame_support::{
defensive,
pallet_prelude::*,
traits::{
DefensiveTruncateFrom, EnqueueMessage, ExecuteOverweightError, Footprint, ProcessMessage,
ProcessMessageError, ServiceQueues,
},
BoundedSlice, CloneNoBound, DefaultNoBound,
};
use frame_system::pallet_prelude::*;
pub use pallet::*;
use scale_info::TypeInfo;
use sp_arithmetic::traits::{BaseArithmetic, Unsigned};
use sp_runtime::{
traits::{Hash, One, Zero},
SaturatedConversion, Saturating,
};
use sp_std::{fmt::Debug, ops::Deref, prelude::*, vec};
use sp_weights::WeightMeter;
pub use weights::WeightInfo;
/// Type for identifying a page.
type PageIndex = u32;
/// Data encoded and prefixed to the encoded `MessageItem`.
#[derive(Encode, Decode, PartialEq, MaxEncodedLen, Debug)]
pub struct ItemHeader {
/// The length of this item, not including the size of this header. The next item of the page
/// follows immediately after the payload of this item.
payload_len: Size,
/// Whether this item has been processed.
is_processed: bool,
}
/// A page of messages. Pages always contain at least one item.
#[derive(
CloneNoBound, Encode, Decode, RuntimeDebugNoBound, DefaultNoBound, TypeInfo, MaxEncodedLen,
)]
#[scale_info(skip_type_params(HeapSize))]
#[codec(mel_bound(Size: MaxEncodedLen))]
pub struct Page + Debug + Clone + Default, HeapSize: Get> {
/// Messages remaining to be processed; this includes overweight messages which have been
/// skipped.
remaining: Size,
/// The size of all remaining messages to be processed.
///
/// Includes overweight messages outside of the `first` to `last` window.
remaining_size: Size,
/// The number of items before the `first` item in this page.
first_index: Size,
/// The heap-offset of the header of the first message item in this page which is ready for
/// processing.
first: Size,
/// The heap-offset of the header of the last message item in this page.
last: Size,
/// The heap. If `self.offset == self.heap.len()` then the page is empty and should be deleted.
heap: BoundedVec>,
}
impl<
Size: BaseArithmetic + Unsigned + Copy + Into + Codec + MaxEncodedLen + Debug + Default,
HeapSize: Get,
> Page
{
/// Create a [`Page`] from one unprocessed message.
fn from_message(message: BoundedSlice>) -> Self {
let payload_len = message.len();
let data_len = ItemHeader::::max_encoded_len().saturating_add(payload_len);
let payload_len = payload_len.saturated_into();
let header = ItemHeader:: { payload_len, is_processed: false };
let mut heap = Vec::with_capacity(data_len);
header.using_encoded(|h| heap.extend_from_slice(h));
heap.extend_from_slice(message.deref());
Page {
remaining: One::one(),
remaining_size: payload_len,
first_index: Zero::zero(),
first: Zero::zero(),
last: Zero::zero(),
heap: BoundedVec::defensive_truncate_from(heap),
}
}
/// Try to append one message to a page.
fn try_append_message(
&mut self,
message: BoundedSlice>,
) -> Result<(), ()> {
let pos = self.heap.len();
let payload_len = message.len();
let data_len = ItemHeader::::max_encoded_len().saturating_add(payload_len);
let payload_len = payload_len.saturated_into();
let header = ItemHeader:: { payload_len, is_processed: false };
let heap_size: u32 = HeapSize::get().into();
if (heap_size as usize).saturating_sub(self.heap.len()) < data_len {
// Can't fit.
return Err(())
}
let mut heap = sp_std::mem::take(&mut self.heap).into_inner();
header.using_encoded(|h| heap.extend_from_slice(h));
heap.extend_from_slice(message.deref());
self.heap = BoundedVec::defensive_truncate_from(heap);
self.last = pos.saturated_into();
self.remaining.saturating_inc();
self.remaining_size.saturating_accrue(payload_len);
Ok(())
}
/// Returns the first message in the page without removing it.
///
/// SAFETY: Does not panic even on corrupted storage.
fn peek_first(&self) -> Option>> {
if self.first > self.last {
return None
}
let f = (self.first.into() as usize).min(self.heap.len());
let mut item_slice = &self.heap[f..];
if let Ok(h) = ItemHeader::::decode(&mut item_slice) {
let payload_len = h.payload_len.into() as usize;
if payload_len <= item_slice.len() {
// impossible to truncate since is sliced up from `self.heap: BoundedVec`
return Some(BoundedSlice::defensive_truncate_from(&item_slice[..payload_len]))
}
}
defensive!("message-queue: heap corruption");
None
}
/// Point `first` at the next message, marking the first as processed if `is_processed` is true.
fn skip_first(&mut self, is_processed: bool) {
let f = (self.first.into() as usize).min(self.heap.len());
if let Ok(mut h) = ItemHeader::decode(&mut &self.heap[f..]) {
if is_processed && !h.is_processed {
h.is_processed = true;
h.using_encoded(|d| self.heap[f..f + d.len()].copy_from_slice(d));
self.remaining.saturating_dec();
self.remaining_size.saturating_reduce(h.payload_len);
}
self.first
.saturating_accrue(ItemHeader::::max_encoded_len().saturated_into());
self.first.saturating_accrue(h.payload_len);
self.first_index.saturating_inc();
}
}
/// Return the message with index `index` in the form of `(position, processed, message)`.
fn peek_index(&self, index: usize) -> Option<(usize, bool, &[u8])> {
let mut pos = 0;
let mut item_slice = &self.heap[..];
let header_len: usize = ItemHeader::::max_encoded_len().saturated_into();
for _ in 0..index {
let h = ItemHeader::::decode(&mut item_slice).ok()?;
let item_len = h.payload_len.into() as usize;
if item_slice.len() < item_len {
return None
}
item_slice = &item_slice[item_len..];
pos.saturating_accrue(header_len.saturating_add(item_len));
}
let h = ItemHeader::::decode(&mut item_slice).ok()?;
if item_slice.len() < h.payload_len.into() as usize {
return None
}
item_slice = &item_slice[..h.payload_len.into() as usize];
Some((pos, h.is_processed, item_slice))
}
/// Set the `is_processed` flag for the item at `pos` to be `true` if not already and decrement
/// the `remaining` counter of the page.
///
/// Does nothing if no [`ItemHeader`] could be decoded at the given position.
fn note_processed_at_pos(&mut self, pos: usize) {
if let Ok(mut h) = ItemHeader::::decode(&mut &self.heap[pos..]) {
if !h.is_processed {
h.is_processed = true;
h.using_encoded(|d| self.heap[pos..pos + d.len()].copy_from_slice(d));
self.remaining.saturating_dec();
self.remaining_size.saturating_reduce(h.payload_len);
}
}
}
/// Returns whether the page is *complete* which means that no messages remain.
fn is_complete(&self) -> bool {
self.remaining.is_zero()
}
}
/// A single link in the double-linked Ready Ring list.
#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug, PartialEq)]
pub struct Neighbours {
/// The previous queue.
prev: MessageOrigin,
/// The next queue.
next: MessageOrigin,
}
/// The state of a queue as represented by a book of its pages.
///
/// Each queue has exactly one book which holds all of its pages. All pages of a book combined
/// contain all of the messages of its queue; hence the name *Book*.
/// Books can be chained together in a double-linked fashion through their `ready_neighbours` field.
#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug)]
pub struct BookState {
/// The first page with some items to be processed in it. If this is `>= end`, then there are
/// no pages with items to be processing in them.
begin: PageIndex,
/// One more than the last page with some items to be processed in it.
end: PageIndex,
/// The number of pages stored at present.
///
/// This might be larger than `end-begin`, because we keep pages with unprocessed overweight
/// messages outside of the end/begin window.
count: PageIndex,
/// If this book has any ready pages, then this will be `Some` with the previous and next
/// neighbours. This wraps around.
ready_neighbours: Option>,
/// The number of unprocessed messages stored at present.
message_count: u64,
/// The total size of all unprocessed messages stored at present.
size: u64,
}
impl Default for BookState {
fn default() -> Self {
Self { begin: 0, end: 0, count: 0, ready_neighbours: None, message_count: 0, size: 0 }
}
}
/// Handler code for when the items in a queue change.
pub trait OnQueueChanged {
/// Note that the queue `id` now has `item_count` items in it, taking up `items_size` bytes.
fn on_queue_changed(id: Id, items_count: u64, items_size: u64);
}
impl OnQueueChanged for () {
fn on_queue_changed(_: Id, _: u64, _: u64) {}
}
#[frame_support::pallet]
pub mod pallet {
use super::*;
#[pallet::pallet]
#[pallet::generate_store(pub(super) trait Store)]
pub struct Pallet(_);
/// The module configuration trait.
#[pallet::config]
pub trait Config: frame_system::Config {
/// The overarching event type.
type RuntimeEvent: From> + IsType<::RuntimeEvent>;
/// Weight information for extrinsics in this pallet.
type WeightInfo: WeightInfo;
/// Processor for a message.
///
/// Must be set to [`mock_helpers::NoopMessageProcessor`] for benchmarking.
/// Other message processors that consumes exactly (1, 1) weight for any give message will
/// work as well. Otherwise the benchmarking will also measure the weight of the message
/// processor, which is not desired.
type MessageProcessor: ProcessMessage;
/// Page/heap size type.
type Size: BaseArithmetic
+ Unsigned
+ Copy
+ Into
+ Member
+ Encode
+ Decode
+ MaxEncodedLen
+ TypeInfo
+ Default;
/// Code to be called when a message queue changes - either with items introduced or
/// removed.
type QueueChangeHandler: OnQueueChanged<::Origin>;
/// The size of the page; this implies the maximum message size which can be sent.
///
/// A good value depends on the expected message sizes, their weights, the weight that is
/// available for processing them and the maximal needed message size. The maximal message
/// size is slightly lower than this as defined by [`MaxMessageLenOf`].
#[pallet::constant]
type HeapSize: Get;
/// The maximum number of stale pages (i.e. of overweight messages) allowed before culling
/// can happen. Once there are more stale pages than this, then historical pages may be
/// dropped, even if they contain unprocessed overweight messages.
#[pallet::constant]
type MaxStale: Get;
/// The amount of weight (if any) which should be provided to the message queue for
/// servicing enqueued items.
///
/// This may be legitimately `None` in the case that you will call
/// `ServiceQueues::service_queues` manually.
#[pallet::constant]
type ServiceWeight: Get