From 647a9e6df924bb3e8c71d8d73309d913c3feed10 Mon Sep 17 00:00:00 2001 From: Keith Yeung <kungfukeith11@gmail.com> Date: Sat, 8 May 2021 13:18:01 -0700 Subject: [PATCH] Upgrade pallets to FRAMEv2 (#404) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Upgrade parachain info pallet to FRAMEv2 * Upgrade parachain system pallet to FRAMEv2 * Use Pallet<T> instead of Module<T> * Upgrade XCMP queue pallet to FRAMEv2 * Correctly specify the metadata for events in xcmp-queue pallet * Apply suggestions from code review * Update pallets/parachain-system/src/tests.rs Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> --- cumulus/pallets/parachain-system/src/lib.rs | 1702 +++++------------ cumulus/pallets/parachain-system/src/tests.rs | 935 +++++++++ cumulus/pallets/xcmp-queue/src/lib.rs | 457 +++-- .../pallets/parachain-info/src/lib.rs | 61 +- cumulus/rococo-parachains/runtime/src/lib.rs | 2 +- .../shell-runtime/src/lib.rs | 2 +- 6 files changed, 1675 insertions(+), 1484 deletions(-) create mode 100755 cumulus/pallets/parachain-system/src/tests.rs diff --git a/cumulus/pallets/parachain-system/src/lib.rs b/cumulus/pallets/parachain-system/src/lib.rs index a2c97250fdf..3962445738b 100644 --- a/cumulus/pallets/parachain-system/src/lib.rs +++ b/cumulus/pallets/parachain-system/src/lib.rs @@ -16,9 +16,9 @@ #![cfg_attr(not(feature = "std"), no_std)] -//! cumulus-pallet-parachain-system is a base module for cumulus-based parachains. +//! cumulus-pallet-parachain-system is a base pallet for cumulus-based parachains. //! -//! This module handles low-level details of being a parachain. It's responsibilities include: +//! This pallet handles low-level details of being a parachain. It's responsibilities include: //! //! - ingestion of the parachain validation data //! - ingestion of incoming downward and lateral messages and dispatching them @@ -37,13 +37,12 @@ use cumulus_primitives_core::{ }; use cumulus_primitives_parachain_inherent::ParachainInherentData; use frame_support::{ - decl_error, decl_event, decl_module, decl_storage, - dispatch::{DispatchResult, DispatchResultWithPostInfo}, ensure, - inherent::{InherentData, InherentIdentifier, ProvideInherent}, + dispatch::{DispatchError, DispatchResult}, storage, traits::Get, - weights::{DispatchClass, Pays, PostDispatchInfo, Weight}, + weights::{PostDispatchInfo, Weight, Pays}, + inherent::{InherentData, InherentIdentifier, ProvideInherent}, }; use frame_system::{ensure_none, ensure_root}; use polkadot_parachain::primitives::RelayChainBlockNumber; @@ -54,135 +53,220 @@ use sp_runtime::{ InvalidTransaction, TransactionLongevity, TransactionSource, TransactionValidity, ValidTransaction, }, - DispatchError, }; use sp_std::{cmp, collections::btree_map::BTreeMap, prelude::*}; mod relay_state_snapshot; #[macro_use] pub mod validate_block; +#[cfg(test)] +mod tests; -/// The pallet's configuration trait. -pub trait Config: frame_system::Config<OnSetCode = ParachainSetCode<Self>> { - /// The overarching event type. - type Event: From<Event<Self>> + Into<<Self as frame_system::Config>::Event>; +pub use pallet::*; - /// Something which can be notified when the validation data is set. - type OnValidationData: OnValidationData; +#[frame_support::pallet] +pub mod pallet { + use super::*; + use frame_support::pallet_prelude::*; + use frame_system::pallet_prelude::*; - /// Returns the parachain ID we are running with. - type SelfParaId: Get<ParaId>; + #[pallet::pallet] + pub struct Pallet<T>(_); - /// The place where outbound XCMP messages come from. This is queried in `finalize_block`. - type OutboundXcmpMessageSource: XcmpMessageSource; + #[pallet::config] + pub trait Config: frame_system::Config<OnSetCode = ParachainSetCode<Self>> { + /// The overarching event type. + type Event: From<Event<Self>> + IsType<<Self as frame_system::Config>::Event>; - /// The message handler that will be invoked when messages are received via DMP. - type DmpMessageHandler: DmpMessageHandler; + /// Something which can be notified when the validation data is set. + type OnValidationData: OnValidationData; - /// The weight we reserve at the beginning of the block for processing DMP messages. - type ReservedDmpWeight: Get<Weight>; + /// Returns the parachain ID we are running with. + type SelfParaId: Get<ParaId>; - /// The message handler that will be invoked when messages are received via XCMP. - /// - /// The messages are dispatched in the order they were relayed by the relay chain. If multiple - /// messages were relayed at one block, these will be dispatched in ascending order of the - /// sender's para ID. - type XcmpMessageHandler: XcmpMessageHandler; + /// The place where outbound XCMP messages come from. This is queried in `finalize_block`. + type OutboundXcmpMessageSource: XcmpMessageSource; - /// The weight we reserve at the beginning of the block for processing XCMP messages. - type ReservedXcmpWeight: Get<Weight>; -} + /// The message handler that will be invoked when messages are received via DMP. + type DmpMessageHandler: DmpMessageHandler; -// This pallet's storage items. -decl_storage! { - trait Store for Module<T: Config> as ParachainSystem { - /// We need to store the new validation function for the span between - /// setting it and applying it. If it has a - /// value, then [`PendingValidationFunction`] must have a real value, and - /// together will coordinate the block number where the upgrade will happen. - PendingRelayChainBlockNumber: Option<RelayChainBlockNumber>; + /// The weight we reserve at the beginning of the block for processing DMP messages. + type ReservedDmpWeight: Get<Weight>; - /// The new validation function we will upgrade to when the relay chain - /// reaches [`PendingRelayChainBlockNumber`]. A real validation function must - /// exist here as long as [`PendingRelayChainBlockNumber`] is set. - PendingValidationFunction get(fn new_validation_function): Vec<u8>; + /// The message handler that will be invoked when messages are received via XCMP. + /// + /// The messages are dispatched in the order they were relayed by the relay chain. If + /// multiple messages were relayed at one block, these will be dispatched in ascending + /// order of the sender's para ID. + type XcmpMessageHandler: XcmpMessageHandler; - /// The [`PersistedValidationData`] set for this block. - ValidationData get(fn validation_data): Option<PersistedValidationData>; + /// The weight we reserve at the beginning of the block for processing XCMP messages. + type ReservedXcmpWeight: Get<Weight>; + } - /// Were the validation data set to notify the relay chain? - DidSetValidationCode: bool; + #[pallet::hooks] + impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> { + fn on_finalize(_: T::BlockNumber) { + <DidSetValidationCode<T>>::kill(); - /// The last relay parent block number at which we signalled the code upgrade. - LastUpgrade: relay_chain::BlockNumber; + let host_config = match Self::host_configuration() { + Some(ok) => ok, + None => { + debug_assert!( + false, + "host configuration is promised to set until `on_finalize`; qed", + ); + return; + } + }; + let relevant_messaging_state = + match Self::relevant_messaging_state() { + Some(ok) => ok, + None => { + debug_assert!( + false, + "relevant messaging state is promised to be set until `on_finalize`; \ + qed", + ); + return; + } + }; - /// The snapshot of some state related to messaging relevant to the current parachain as per - /// the relay parent. - /// - /// This field is meant to be updated each block with the validation data inherent. Therefore, - /// before processing of the inherent, e.g. in `on_initialize` this data may be stale. - /// - /// This data is also absent from the genesis. - RelevantMessagingState get(fn relevant_messaging_state): Option<MessagingStateSnapshot>; - /// The parachain host configuration that was obtained from the relay parent. - /// - /// This field is meant to be updated each block with the validation data inherent. Therefore, - /// before processing of the inherent, e.g. in `on_initialize` this data may be stale. - /// - /// This data is also absent from the genesis. - HostConfiguration get(fn host_configuration): Option<AbridgedHostConfiguration>; + <PendingUpwardMessages<T>>::mutate(|up| { + let (count, size) = relevant_messaging_state.relay_dispatch_queue_size; - /// The last downward message queue chain head we have observed. - /// - /// This value is loaded before and saved after processing inbound downward messages carried - /// by the system inherent. - LastDmqMqcHead: MessageQueueChain; - /// The message queue chain heads we have observed per each channel incoming channel. - /// - /// This value is loaded before and saved after processing inbound downward messages carried - /// by the system inherent. - LastHrmpMqcHeads: BTreeMap<ParaId, MessageQueueChain>; + let available_capacity = cmp::min( + host_config.max_upward_queue_count.saturating_sub(count), + host_config.max_upward_message_num_per_candidate, + ); + let available_size = host_config.max_upward_queue_size.saturating_sub(size); - PendingUpwardMessages: Vec<UpwardMessage>; + // Count the number of messages we can possibly fit in the given constraints, i.e. + // available_capacity and available_size. + let num = up + .iter() + .scan( + (available_capacity as usize, available_size as usize), + |state, msg| { + let (cap_left, size_left) = *state; + match (cap_left.checked_sub(1), size_left.checked_sub(msg.len())) { + (Some(new_cap), Some(new_size)) => { + *state = (new_cap, new_size); + Some(()) + } + _ => None, + } + }, + ) + .count(); - /// The number of HRMP messages we observed in `on_initialize` and thus used that number for - /// announcing the weight of `on_initialize` and `on_finalize`. - AnnouncedHrmpMessagesPerCandidate: u32; + // TODO: #274 Return back messages that do not longer fit into the queue. - /// The weight we reserve at the beginning of the block for processing XCMP messages. This - /// overrides the amount set in the Config trait. - ReservedXcmpWeightOverride: Option<Weight>; + storage::unhashed::put(well_known_keys::UPWARD_MESSAGES, &up[0..num]); + *up = up.split_off(num); + }); - /// The weight we reserve at the beginning of the block for processing DMP messages. This - /// overrides the amount set in the Config trait. - ReservedDmpWeightOverride: Option<Weight>; + // Sending HRMP messages is a little bit more involved. There are the following + // constraints: + // + // - a channel should exist (and it can be closed while a message is buffered), + // - at most one message can be sent in a channel, + // - the sent out messages should be ordered by ascension of recipient para id. + // - the capacity and total size of the channel is limited, + // - the maximum size of a message is limited (and can potentially be changed), - /// The next authorized upgrade, if there is one. - AuthorizedUpgrade: Option<T::Hash>; - } -} + let maximum_channels = host_config + .hrmp_max_message_num_per_candidate + .min(<AnnouncedHrmpMessagesPerCandidate<T>>::take()) as usize; -// The pallet's dispatchable functions. -decl_module! { - pub struct Module<T: Config> for enum Call where origin: T::Origin { - type Error = Error<T>; + let outbound_messages = + T::OutboundXcmpMessageSource::take_outbound_messages(maximum_channels); - // Initializing events - // this is needed only if you are using events in your pallet - fn deposit_event() = default; + // Note conversion to the `OutboundHrmpMessage` isn't needed since the data that + // `take_outbound_messages` returns encodes equivalently. + // + // The following code is a smoke test to check that the `OutboundHrmpMessage` type + // doesn't accidentally change (e.g. by having a field added to it). If the following + // line breaks, then we'll need to revisit the assumption that the result of + // `take_outbound_messages` can be placed into `HRMP_OUTBOUND_MESSAGES` directly + // without a decode/encode round-trip. + let _ = OutboundHrmpMessage { + recipient: ParaId::from(0), + data: vec![], + }; + + storage::unhashed::put(well_known_keys::HRMP_OUTBOUND_MESSAGES, &outbound_messages); + } + + fn on_initialize(_n: T::BlockNumber) -> Weight { + // To prevent removing `NEW_VALIDATION_CODE` that was set by another `on_initialize` + // like for example from scheduler, we only kill the storage entry if it was not yet + // updated in the current block. + if !<DidSetValidationCode<T>>::get() { + storage::unhashed::kill(NEW_VALIDATION_CODE); + } + + // Remove the validation from the old block. + <ValidationData<T>>::kill(); + + let mut weight = T::DbWeight::get().writes(3); + storage::unhashed::kill(well_known_keys::HRMP_WATERMARK); + storage::unhashed::kill(well_known_keys::UPWARD_MESSAGES); + storage::unhashed::kill(well_known_keys::HRMP_OUTBOUND_MESSAGES); + // Here, in `on_initialize` we must report the weight for both `on_initialize` and + // `on_finalize`. + // + // One complication here, is that the `host_configuration` is updated by an inherent + // and those are processed after the block initialization phase. Therefore, we have to + // be content only with the configuration as per the previous block. That means that + // the configuration can be either stale (or be abscent altogether in case of the + // beginning of the chain). + // + // In order to mitigate this, we do the following. At the time, we are only concerned + // about `hrmp_max_message_num_per_candidate`. We reserve the amount of weight to + // process the number of HRMP messages according to the potentially stale + // configuration. In `on_finalize` we will process only the maximum between the + // announced number of messages and the actual received in the fresh configuration. + // + // In the common case, they will be the same. In the case the actual value is smaller + // than the announced, we would waste some of weight. In the case the actual value is + // greater than the announced, we will miss opportunity to send a couple of messages. + weight += T::DbWeight::get().reads_writes(1, 1); + let hrmp_max_message_num_per_candidate = Self::host_configuration() + .map(|cfg| cfg.hrmp_max_message_num_per_candidate) + .unwrap_or(0); + <AnnouncedHrmpMessagesPerCandidate<T>>::put(hrmp_max_message_num_per_candidate); + + // NOTE that the actual weight consumed by `on_finalize` may turn out lower. + weight += T::DbWeight::get().reads_writes( + 3 + hrmp_max_message_num_per_candidate as u64, + 4 + hrmp_max_message_num_per_candidate as u64, + ); + + weight + } + } + + #[pallet::call] + impl<T: Config> Pallet<T> { /// Force an already scheduled validation function upgrade to happen on a particular block. /// - /// Note that coordinating this block for the upgrade has to happen independently on the relay - /// chain and this parachain. Synchronizing the block for the upgrade is sensitive, and this - /// bypasses all checks and and normal protocols. Very easy to brick your chain if done wrong. - #[weight = (0, DispatchClass::Operational)] - pub fn set_upgrade_block(origin, relay_chain_block: RelayChainBlockNumber) { + /// Note that coordinating this block for the upgrade has to happen independently on the + /// relay chain and this parachain. Synchronizing the block for the upgrade is sensitive, + /// and this bypasses all checks and and normal protocols. Very easy to brick your chain + /// if done wrong. + #[pallet::weight((0, DispatchClass::Operational))] + pub fn set_upgrade_block( + origin: OriginFor<T>, + relay_chain_block: RelayChainBlockNumber, + ) -> DispatchResult { ensure_root(origin)?; - if let Some(_old_block) = PendingRelayChainBlockNumber::get() { - PendingRelayChainBlockNumber::put(relay_chain_block); + if <PendingRelayChainBlockNumber<T>>::get().is_some() { + <PendingRelayChainBlockNumber<T>>::put(relay_chain_block); + Ok(()) } else { - return Err(Error::<T>::NotScheduled.into()) + Err(Error::<T>::NotScheduled.into()) } } @@ -195,12 +279,15 @@ decl_module! { /// /// As a side effect, this function upgrades the current validation function /// if the appropriate time has come. - #[weight = (0, DispatchClass::Mandatory)] + #[pallet::weight((0, DispatchClass::Mandatory))] // TODO: This weight should be corrected. - pub fn set_validation_data(origin, data: ParachainInherentData) -> DispatchResultWithPostInfo { + pub fn set_validation_data( + origin: OriginFor<T>, + data: ParachainInherentData, + ) -> DispatchResultWithPostInfo { ensure_none(origin)?; assert!( - !ValidationData::exists(), + !<ValidationData<T>>::exists(), "ValidationData must be updated only once in a block", ); @@ -216,13 +303,13 @@ decl_module! { // initialization logic: we know that this runs exactly once every block, // which means we can put the initialization logic here to remove the // sequencing problem. - if let Some(apply_block) = PendingRelayChainBlockNumber::get() { + if let Some(apply_block) = <PendingRelayChainBlockNumber<T>>::get() { if vfp.relay_parent_number >= apply_block { - PendingRelayChainBlockNumber::kill(); - let validation_function = PendingValidationFunction::take(); - LastUpgrade::put(&apply_block); + <PendingRelayChainBlockNumber<T>>::kill(); + let validation_function = <PendingValidationFunction<T>>::take(); + <LastUpgrade<T>>::put(&apply_block); Self::put_parachain_code(&validation_function); - Self::deposit_event(RawEvent::ValidationFunctionApplied(vfp.relay_parent_number)); + Self::deposit_event(Event::ValidationFunctionApplied(vfp.relay_parent_number)); } } @@ -230,7 +317,7 @@ decl_module! { match relay_state_snapshot::extract_from_proof( T::SelfParaId::get(), vfp.relay_parent_storage_root, - relay_chain_state + relay_chain_state, ) { Ok(r) => r, Err(err) => { @@ -238,9 +325,9 @@ decl_module! { } }; - ValidationData::put(&vfp); - RelevantMessagingState::put(relevant_messaging_state.clone()); - HostConfiguration::put(host_config); + <ValidationData<T>>::put(&vfp); + <RelevantMessagingState<T>>::put(relevant_messaging_state.clone()); + <HostConfiguration<T>>::put(host_config); <T::OnValidationData as OnValidationData>::on_validation_data(&vfp); @@ -255,167 +342,192 @@ decl_module! { horizontal_messages, ); - Ok(PostDispatchInfo { actual_weight: Some(total_weight), pays_fee: Pays::No }) + Ok(PostDispatchInfo { + actual_weight: Some(total_weight), + pays_fee: Pays::No, + }) } - #[weight = (1_000, DispatchClass::Operational)] - fn sudo_send_upward_message(origin, message: UpwardMessage) { + #[pallet::weight((1_000, DispatchClass::Operational))] + fn sudo_send_upward_message( + origin: OriginFor<T>, + message: UpwardMessage, + ) -> DispatchResult { ensure_root(origin)?; let _ = Self::send_upward_message(message); + Ok(()) } - #[weight = (1_000_000, DispatchClass::Operational)] - fn authorize_upgrade(origin, code_hash: T::Hash) { + #[pallet::weight((1_000_000, DispatchClass::Operational))] + fn authorize_upgrade(origin: OriginFor<T>, code_hash: T::Hash) -> DispatchResult { ensure_root(origin)?; AuthorizedUpgrade::<T>::put(&code_hash); - Self::deposit_event(RawEvent::UpgradeAuthorized(code_hash)); + Self::deposit_event(Event::UpgradeAuthorized(code_hash)); + Ok(()) } - #[weight = 1_000_000] - fn enact_authorized_upgrade(_origin, code: Vec<u8>) -> DispatchResultWithPostInfo { - // No ensure origin on purpose. We validate by checking the code vs hash in storage. + #[pallet::weight(1_000_000)] + fn enact_authorized_upgrade(_: OriginFor<T>, code: Vec<u8>) -> DispatchResultWithPostInfo { Self::validate_authorized_upgrade(&code[..])?; Self::set_code_impl(code)?; AuthorizedUpgrade::<T>::kill(); Ok(Pays::No.into()) } + } - fn on_finalize() { - DidSetValidationCode::kill(); - - let host_config = match Self::host_configuration() { - Some(ok) => ok, - None => { - debug_assert!(false, "host configuration is promised to set until `on_finalize`; qed"); - return - } - }; - let relevant_messaging_state = match Self::relevant_messaging_state() { - Some(ok) => ok, - None => { - debug_assert!(false, "relevant messaging state is promised to be set until `on_finalize`; qed"); - return - } - }; - - <Self as Store>::PendingUpwardMessages::mutate(|up| { - let (count, size) = relevant_messaging_state.relay_dispatch_queue_size; - - let available_capacity = cmp::min( - host_config.max_upward_queue_count.saturating_sub(count), - host_config.max_upward_message_num_per_candidate, - ); - let available_size = host_config.max_upward_queue_size.saturating_sub(size); + #[pallet::event] + #[pallet::generate_deposit(pub(super) fn deposit_event)] + #[pallet::metadata(T::Hash = "Hash")] + pub enum Event<T: Config> { + /// The validation function has been scheduled to apply as of the contained relay chain + /// block number. + ValidationFunctionStored(RelayChainBlockNumber), + /// The validation function was applied as of the contained relay chain block number. + ValidationFunctionApplied(RelayChainBlockNumber), + /// An upgrade has been authorized. + UpgradeAuthorized(T::Hash), + /// Some downward messages have been received and will be processed. + /// \[ count \] + DownwardMessagesReceived(u32), + /// Downward messages were processed using the given weight. + /// \[ weight_used, result_mqc_head \] + DownwardMessagesProcessed(Weight, relay_chain::Hash), + } - // Count the number of messages we can possibly fit in the given constraints, i.e. - // available_capacity and available_size. - let num = up - .iter() - .scan( - (available_capacity as usize, available_size as usize), - |state, msg| { - let (cap_left, size_left) = *state; - match (cap_left.checked_sub(1), size_left.checked_sub(msg.len())) { - (Some(new_cap), Some(new_size)) => { - *state = (new_cap, new_size); - Some(()) - } - _ => None, - } - }, - ) - .count(); + #[pallet::error] + pub enum Error<T> { + /// Attempt to upgrade validation function while existing upgrade pending + OverlappingUpgrades, + /// Polkadot currently prohibits this parachain from upgrading its validation function + ProhibitedByPolkadot, + /// The supplied validation function has compiled into a blob larger than Polkadot is + /// willing to run + TooBig, + /// The inherent which supplies the validation data did not run this block + ValidationDataNotAvailable, + /// The inherent which supplies the host configuration did not run this block + HostConfigurationNotAvailable, + /// No validation function upgrade is currently scheduled. + NotScheduled, + /// No code upgrade has been authorized. + NothingAuthorized, + /// The given code upgrade has not been authorized. + Unauthorized, + } - // TODO: #274 Return back messages that do not longer fit into the queue. + /// We need to store the new validation function for the span between + /// setting it and applying it. If it has a + /// value, then [`PendingValidationFunction`] must have a real value, and + /// together will coordinate the block number where the upgrade will happen. + #[pallet::storage] + pub(super) type PendingRelayChainBlockNumber<T: Config> = + StorageValue<_, RelayChainBlockNumber>; + + /// The new validation function we will upgrade to when the relay chain + /// reaches [`PendingRelayChainBlockNumber`]. A real validation function must + /// exist here as long as [`PendingRelayChainBlockNumber`] is set. + #[pallet::storage] + #[pallet::getter(fn new_validation_function)] + pub(super) type PendingValidationFunction<T: Config> = StorageValue<_, Vec<u8>, ValueQuery>; + + /// The [`PersistedValidationData`] set for this block. + #[pallet::storage] + #[pallet::getter(fn validation_data)] + pub(super) type ValidationData<T: Config> = StorageValue<_, PersistedValidationData>; + + /// Were the validation data set to notify the relay chain? + #[pallet::storage] + pub(super) type DidSetValidationCode<T: Config> = StorageValue<_, bool, ValueQuery>; + + /// The last relay parent block number at which we signalled the code upgrade. + #[pallet::storage] + pub(super) type LastUpgrade<T: Config> = StorageValue<_, relay_chain::BlockNumber, ValueQuery>; + + /// The snapshot of some state related to messaging relevant to the current parachain as per + /// the relay parent. + /// + /// This field is meant to be updated each block with the validation data inherent. Therefore, + /// before processing of the inherent, e.g. in `on_initialize` this data may be stale. + /// + /// This data is also absent from the genesis. + #[pallet::storage] + #[pallet::getter(fn relevant_messaging_state)] + pub(super) type RelevantMessagingState<T: Config> = StorageValue<_, MessagingStateSnapshot>; - storage::unhashed::put(well_known_keys::UPWARD_MESSAGES, &up[0..num]); - *up = up.split_off(num); - }); + /// The parachain host configuration that was obtained from the relay parent. + /// + /// This field is meant to be updated each block with the validation data inherent. Therefore, + /// before processing of the inherent, e.g. in `on_initialize` this data may be stale. + /// + /// This data is also absent from the genesis. + #[pallet::storage] + #[pallet::getter(fn host_configuration)] + pub(super) type HostConfiguration<T: Config> = StorageValue<_, AbridgedHostConfiguration>; - // Sending HRMP messages is a little bit more involved. There are the following - // constraints: - // - // - a channel should exist (and it can be closed while a message is buffered), - // - at most one message can be sent in a channel, - // - the sent out messages should be ordered by ascension of recipient para id. - // - the capacity and total size of the channel is limited, - // - the maximum size of a message is limited (and can potentially be changed), + /// The last downward message queue chain head we have observed. + /// + /// This value is loaded before and saved after processing inbound downward messages carried + /// by the system inherent. + #[pallet::storage] + pub(super) type LastDmqMqcHead<T: Config> = StorageValue<_, MessageQueueChain, ValueQuery>; - let maximum_channels = host_config.hrmp_max_message_num_per_candidate - .min(AnnouncedHrmpMessagesPerCandidate::take()) as usize; + /// The message queue chain heads we have observed per each channel incoming channel. + /// + /// This value is loaded before and saved after processing inbound downward messages carried + /// by the system inherent. + #[pallet::storage] + pub(super) type LastHrmpMqcHeads<T: Config> = + StorageValue<_, BTreeMap<ParaId, MessageQueueChain>, ValueQuery>; - let outbound_messages = T::OutboundXcmpMessageSource::take_outbound_messages( - maximum_channels, - ); + #[pallet::storage] + pub(super) type PendingUpwardMessages<T: Config> = + StorageValue<_, Vec<UpwardMessage>, ValueQuery>; - // Note conversion to the `OutboundHrmpMessage` isn't needed since the data that - // `take_outbound_messages` returns encodes equivalently. - // - // The following code is a smoke test to check that the `OutboundHrmpMessage` type - // doesn't accidentally change (e.g. by having a field added to it). If the following - // line breaks, then we'll need to revisit the assumption that the result of - // `take_outbound_messages` can be placed into `HRMP_OUTBOUND_MESSAGES` directly without - // a decode/encode round-trip. - let _ = OutboundHrmpMessage { recipient: ParaId::from(0), data: vec![] }; + /// The number of HRMP messages we observed in `on_initialize` and thus used that number for + /// announcing the weight of `on_initialize` and `on_finalize`. + #[pallet::storage] + pub(super) type AnnouncedHrmpMessagesPerCandidate<T: Config> = StorageValue<_, u32, ValueQuery>; - storage::unhashed::put(well_known_keys::HRMP_OUTBOUND_MESSAGES, &outbound_messages); - } + /// The weight we reserve at the beginning of the block for processing XCMP messages. This + /// overrides the amount set in the Config trait. + #[pallet::storage] + pub(super) type ReservedXcmpWeightOverride<T: Config> = StorageValue<_, Weight>; - fn on_initialize(_n: T::BlockNumber) -> Weight { - // To prevent removing `NEW_VALIDATION_CODE` that was set by another `on_initialize` like - // for example from scheduler, we only kill the storage entry if it was not yet updated - // in the current block. - if !DidSetValidationCode::get() { - storage::unhashed::kill(NEW_VALIDATION_CODE); - } + /// The weight we reserve at the beginning of the block for processing DMP messages. This + /// overrides the amount set in the Config trait. + #[pallet::storage] + pub(super) type ReservedDmpWeightOverride<T: Config> = StorageValue<_, Weight>; - // Remove the validation from the old block. - ValidationData::kill(); + /// The next authorized upgrade, if there is one. + #[pallet::storage] + pub(super) type AuthorizedUpgrade<T: Config> = StorageValue<_, T::Hash>; - let mut weight = T::DbWeight::get().writes(3); - storage::unhashed::kill(well_known_keys::HRMP_WATERMARK); - storage::unhashed::kill(well_known_keys::UPWARD_MESSAGES); - storage::unhashed::kill(well_known_keys::HRMP_OUTBOUND_MESSAGES); + #[pallet::inherent] + impl<T: Config> ProvideInherent for Pallet<T> { + type Call = Call<T>; + type Error = sp_inherents::MakeFatalError<()>; + const INHERENT_IDENTIFIER: InherentIdentifier = + cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER; - // Here, in `on_initialize` we must report the weight for both `on_initialize` and - // `on_finalize`. - // - // One complication here, is that the `host_configuration` is updated by an inherent and - // those are processed after the block initialization phase. Therefore, we have to be - // content only with the configuration as per the previous block. That means that - // the configuration can be either stale (or be abscent altogether in case of the - // beginning of the chain). - // - // In order to mitigate this, we do the following. At the time, we are only concerned - // about `hrmp_max_message_num_per_candidate`. We reserve the amount of weight to process - // the number of HRMP messages according to the potentially stale configuration. In - // `on_finalize` we will process only the maximum between the announced number of messages - // and the actual received in the fresh configuration. - // - // In the common case, they will be the same. In the case the actual value is smaller - // than the announced, we would waste some of weight. In the case the actual value is - // greater than the announced, we will miss opportunity to send a couple of messages. - weight += T::DbWeight::get().reads_writes(1, 1); - let hrmp_max_message_num_per_candidate = - Self::host_configuration() - .map(|cfg| cfg.hrmp_max_message_num_per_candidate) - .unwrap_or(0); - AnnouncedHrmpMessagesPerCandidate::put(hrmp_max_message_num_per_candidate); + fn create_inherent(data: &InherentData) -> Option<Self::Call> { + let data: ParachainInherentData = data + .get_data(&Self::INHERENT_IDENTIFIER) + .ok() + .flatten() + .expect("validation function params are always injected into inherent data; qed"); - // NOTE that the actual weight consumed by `on_finalize` may turn out lower. - weight += T::DbWeight::get().reads_writes( - 3 + hrmp_max_message_num_per_candidate as u64, - 4 + hrmp_max_message_num_per_candidate as u64, - ); + Some(Call::set_validation_data(data)) + } - weight + fn is_inherent(call: &Self::Call) -> bool { + matches!(call, Call::set_validation_data(_)) } } } -impl<T: Config> Module<T> { +impl<T: Config> Pallet<T> { fn validate_authorized_upgrade(code: &[u8]) -> Result<T::Hash, DispatchError> { let required_hash = AuthorizedUpgrade::<T>::get().ok_or(Error::<T>::NothingAuthorized)?; let actual_hash = T::Hashing::hash(&code[..]); @@ -424,7 +536,7 @@ impl<T: Config> Module<T> { } } -impl<T: Config> sp_runtime::traits::ValidateUnsigned for Module<T> { +impl<T: Config> sp_runtime::traits::ValidateUnsigned for Pallet<T> { type Call = Call<T>; fn validate_unsigned(_source: TransactionSource, call: &Self::Call) -> TransactionValidity { @@ -446,21 +558,22 @@ impl<T: Config> sp_runtime::traits::ValidateUnsigned for Module<T> { } } -impl<T: Config> GetChannelInfo for Module<T> { +impl<T: Config> GetChannelInfo for Pallet<T> { fn get_channel_status(id: ParaId) -> ChannelStatus { // Note, that we are using `relevant_messaging_state` which may be from the previous - // block, in case this is called from `on_initialize`, i.e. before the inherent with fresh - // data is submitted. + // block, in case this is called from `on_initialize`, i.e. before the inherent with + // fresh data is submitted. // - // That shouldn't be a problem though because this is anticipated and already can happen. - // This is because sending implies that a message is buffered until there is space to send - // a message in the candidate. After a while waiting in a buffer, it may be discovered that - // the channel to which a message were addressed is now closed. Another possibility, is that - // the maximum message size was decreased so that a message in the buffer doesn't fit. Should - // any of that happen the sender should be notified about the message was discarded. + // That shouldn't be a problem though because this is anticipated and already can + // happen. This is because sending implies that a message is buffered until there is + // space to send a message in the candidate. After a while waiting in a buffer, it may + // be discovered that the channel to which a message were addressed is now closed. + // Another possibility, is that the maximum message size was decreased so that a + // message in the buffer doesn't fit. Should any of that happen the sender should be + // notified about the message was discarded. // - // Here it a similar case, with the difference that the realization that the channel is closed - // came the same block. + // Here it a similar case, with the difference that the realization that the channel is + // closed came the same block. let channels = match Self::relevant_messaging_state() { None => { log::warn!("calling `get_channel_status` with no RelevantMessagingState?!"); @@ -468,8 +581,8 @@ impl<T: Config> GetChannelInfo for Module<T> { } Some(d) => d.egress_channels, }; - // ^^^ NOTE: This storage field should carry over from the previous block. So if it's None - // then it must be that this is an edge-case where a message is attempted to be + // ^^^ NOTE: This storage field should carry over from the previous block. So if it's + // None then it must be that this is an edge-case where a message is attempted to be // sent at the first block. It should be safe to assume that there are no channels // opened at all so early. At least, relying on this assumption seems to be a better // tradeoff, compared to introducing an error variant that the clients should be @@ -495,7 +608,7 @@ impl<T: Config> GetChannelInfo for Module<T> { } } -impl<T: Config> Module<T> { +impl<T: Config> Pallet<T> { /// Validate the given [`PersistedValidationData`] against the /// [`ValidationParams`](polkadot_parachain::primitives::ValidationParams). /// @@ -523,23 +636,23 @@ impl<T: Config> Module<T> { /// Process all inbound downward messages relayed by the collator. /// - /// Checks if the sequence of the messages is valid, dispatches them and communicates the number - /// of processed messages to the collator via a storage update. + /// Checks if the sequence of the messages is valid, dispatches them and communicates the + /// number of processed messages to the collator via a storage update. /// - /// **Panics** if it turns out that after processing all messages the Message Queue Chain hash - /// doesn't match the expected. + /// **Panics** if it turns out that after processing all messages the Message Queue Chain + /// hash doesn't match the expected. fn process_inbound_downward_messages( expected_dmq_mqc_head: relay_chain::Hash, downward_messages: Vec<InboundDownwardMessage>, ) -> Weight { let dm_count = downward_messages.len() as u32; - let mut dmq_head = LastDmqMqcHead::get(); + let mut dmq_head = <LastDmqMqcHead<T>>::get(); let mut weight_used = 0; if dm_count != 0 { - Self::deposit_event(RawEvent::DownwardMessagesReceived(dm_count)); - let max_weight = - ReservedDmpWeightOverride::get().unwrap_or_else(T::ReservedDmpWeight::get); + Self::deposit_event(Event::DownwardMessagesReceived(dm_count)); + let max_weight = <ReservedDmpWeightOverride<T>>::get() + .unwrap_or_else(T::ReservedDmpWeight::get); let message_iter = downward_messages .into_iter() @@ -548,16 +661,16 @@ impl<T: Config> Module<T> { }) .map(|m| (m.sent_at, m.msg)); weight_used += T::DmpMessageHandler::handle_dmp_messages(message_iter, max_weight); - LastDmqMqcHead::put(&dmq_head); + <LastDmqMqcHead<T>>::put(&dmq_head); - Self::deposit_event(RawEvent::DownwardMessagesProcessed(weight_used, dmq_head.0)); - }; + Self::deposit_event(Event::DownwardMessagesProcessed(weight_used, dmq_head.0)); + } - // After hashing each message in the message queue chain submitted by the collator, we should - // arrive to the MQC head provided by the relay chain. + // After hashing each message in the message queue chain submitted by the collator, we + // should arrive to the MQC head provided by the relay chain. // - // A mismatch means that at least some of the submitted messages were altered, omitted or added - // improperly. + // A mismatch means that at least some of the submitted messages were altered, omitted or + // added improperly. assert_eq!(dmq_head.0, expected_dmq_mqc_head); // Store the processed_downward_messages here so that it will be accessible from @@ -572,30 +685,33 @@ impl<T: Config> Module<T> { /// This is similar to [`process_inbound_downward_messages`], but works on multiple inbound /// channels. /// - /// **Panics** if either any of horizontal messages submitted by the collator was sent from a - /// para which has no open channel to this parachain or if after processing messages - /// across all inbound channels MQCs were obtained which do not correspond to the - /// ones found on the relay-chain. + /// **Panics** if either any of horizontal messages submitted by the collator was sent from + /// a para which has no open channel to this parachain or if after processing + /// messages across all inbound channels MQCs were obtained which do not + /// correspond to the ones found on the relay-chain. fn process_inbound_horizontal_messages( ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)], horizontal_messages: BTreeMap<ParaId, Vec<InboundHrmpMessage>>, ) -> Weight { - // First, check that all submitted messages are sent from channels that exist. The channel - // exists if its MQC head is present in `vfp.hrmp_mqc_heads`. + // First, check that all submitted messages are sent from channels that exist. The + // channel exists if its MQC head is present in `vfp.hrmp_mqc_heads`. for sender in horizontal_messages.keys() { - // A violation of the assertion below indicates that one of the messages submitted by - // the collator was sent from a sender that doesn't have a channel opened to this parachain, - // according to the relay-parent state. - assert!(ingress_channels - .binary_search_by_key(sender, |&(s, _)| s) - .is_ok(),); + // A violation of the assertion below indicates that one of the messages submitted + // by the collator was sent from a sender that doesn't have a channel opened to + // this parachain, according to the relay-parent state. + assert!( + ingress_channels + .binary_search_by_key(sender, |&(s, _)| s) + .is_ok(), + ); } // Second, prepare horizontal messages for a more convenient processing: // - // instead of a mapping from a para to a list of inbound HRMP messages, we will have a list - // of tuples `(sender, message)` first ordered by `sent_at` (the relay chain block number - // in which the message hit the relay-chain) and second ordered by para id ascending. + // instead of a mapping from a para to a list of inbound HRMP messages, we will have a + // list of tuples `(sender, message)` first ordered by `sent_at` (the relay chain block + // number in which the message hit the relay-chain) and second ordered by para id + // ascending. // // The messages will be dispatched in this order. let mut horizontal_messages = horizontal_messages @@ -614,7 +730,7 @@ impl<T: Config> Module<T> { } }); - let last_mqc_heads = LastHrmpMqcHeads::get(); + let last_mqc_heads = <LastHrmpMqcHeads<T>>::get(); let mut running_mqc_heads = BTreeMap::new(); let mut hrmp_watermark = None; @@ -638,11 +754,11 @@ impl<T: Config> Module<T> { .map(|&(sender, ref message)| (sender, message.sent_at, &message.data[..])); let max_weight = - ReservedXcmpWeightOverride::get().unwrap_or_else(T::ReservedXcmpWeight::get); + <ReservedXcmpWeightOverride<T>>::get().unwrap_or_else(T::ReservedXcmpWeight::get); let weight_used = T::XcmpMessageHandler::handle_xcmp_messages(message_iter, max_weight); - // Check that the MQC heads for each channel provided by the relay chain match the MQC heads - // we have after processing all incoming messages. + // Check that the MQC heads for each channel provided by the relay chain match the MQC + // heads we have after processing all incoming messages. // // Along the way we also carry over the relevant entries from the `last_mqc_heads` to // `running_mqc_heads`. Otherwise, in a block where no messages were sent in a channel @@ -654,10 +770,11 @@ impl<T: Config> Module<T> { .or_insert_with(|| last_mqc_heads.get(&sender).cloned().unwrap_or_default()) .head(); let target_head = channel.mqc_head.unwrap_or_default(); + assert!(cur_head == target_head); } - LastHrmpMqcHeads::put(running_mqc_heads); + <LastHrmpMqcHeads<T>>::put(running_mqc_heads); // If we processed at least one message, then advance watermark to that location. if let Some(hrmp_watermark) = hrmp_watermark { @@ -672,7 +789,7 @@ impl<T: Config> Module<T> { /// upgrade has been scheduled. fn notify_polkadot_of_pending_upgrade(code: &[u8]) { storage::unhashed::put_raw(NEW_VALIDATION_CODE, code); - DidSetValidationCode::put(true); + <DidSetValidationCode<T>>::put(true); } /// Put a new validation function into a particular location where this @@ -685,7 +802,7 @@ impl<T: Config> Module<T> { /// /// Returns `None` if the relay chain parachain host configuration hasn't been submitted yet. pub fn max_code_size() -> Option<u32> { - HostConfiguration::get().map(|cfg| cfg.max_code_size) + <HostConfiguration<T>>::get().map(|cfg| cfg.max_code_size) } /// Returns if a PVF/runtime upgrade could be signalled at the current block, and if so @@ -694,13 +811,14 @@ impl<T: Config> Module<T> { vfp: &PersistedValidationData, cfg: &AbridgedHostConfiguration, ) -> Option<relay_chain::BlockNumber> { - if PendingRelayChainBlockNumber::get().is_some() { + if <PendingRelayChainBlockNumber<T>>::get().is_some() { // There is already upgrade scheduled. Upgrade is not allowed. return None; } - let relay_blocks_since_last_upgrade = - vfp.relay_parent_number.saturating_sub(LastUpgrade::get()); + let relay_blocks_since_last_upgrade = vfp + .relay_parent_number + .saturating_sub(<LastUpgrade<T>>::get()); if relay_blocks_since_last_upgrade <= cfg.validation_upgrade_frequency { // The cooldown after the last upgrade hasn't elapsed yet. Upgrade is not allowed. @@ -713,11 +831,12 @@ impl<T: Config> Module<T> { /// The implementation of the runtime upgrade functionality for parachains. fn set_code_impl(validation_function: Vec<u8>) -> DispatchResult { ensure!( - !PendingValidationFunction::exists(), + !<PendingValidationFunction<T>>::exists(), Error::<T>::OverlappingUpgrades ); let vfp = Self::validation_data().ok_or(Error::<T>::ValidationDataNotAvailable)?; - let cfg = Self::host_configuration().ok_or(Error::<T>::HostConfigurationNotAvailable)?; + let cfg = + Self::host_configuration().ok_or(Error::<T>::HostConfigurationNotAvailable)?; ensure!( validation_function.len() <= cfg.max_code_size as usize, Error::<T>::TooBig @@ -733,9 +852,9 @@ impl<T: Config> Module<T> { // storage keeps track locally for the parachain upgrade, which will // be applied later. Self::notify_polkadot_of_pending_upgrade(&validation_function); - PendingRelayChainBlockNumber::put(apply_block); - PendingValidationFunction::put(validation_function); - Self::deposit_event(RawEvent::ValidationFunctionStored(apply_block)); + <PendingRelayChainBlockNumber<T>>::put(apply_block); + <PendingValidationFunction<T>>::put(validation_function); + Self::deposit_event(Event::ValidationFunctionStored(apply_block)); Ok(()) } @@ -745,14 +864,14 @@ pub struct ParachainSetCode<T>(sp_std::marker::PhantomData<T>); impl<T: Config> frame_system::SetCode for ParachainSetCode<T> { fn set_code(code: Vec<u8>) -> DispatchResult { - Module::<T>::set_code_impl(code) + Pallet::<T>::set_code_impl(code) } } /// This struct provides ability to extend a message queue chain (MQC) and compute a new head. /// -/// MQC is an instance of a [hash chain] applied to a message queue. Using a hash chain it's possible -/// to represent a sequence of messages using only a single hash. +/// MQC is an instance of a [hash chain] applied to a message queue. Using a hash chain it's +/// possible to represent a sequence of messages using only a single hash. /// /// A head for an empty chain is agreed to be a zero hash. /// @@ -786,7 +905,7 @@ impl MessageQueueChain { } } -impl<T: Config> Module<T> { +impl<T: Config> Pallet<T> { pub fn send_upward_message(message: UpwardMessage) -> Result<u32, MessageSendError> { // Check if the message fits into the relay-chain constraints. // @@ -811,987 +930,20 @@ impl<T: Config> Module<T> { // then it must be that this is an edge-case where a message is attempted to be // sent at the first block. // - // Let's pass this message through. I think it's not unreasonable to expect that the - // message is not huge and it comes through, but if it doesn't it can be returned - // back to the sender. + // Let's pass this message through. I think it's not unreasonable to expect that + // the message is not huge and it comes through, but if it doesn't it can be + // returned back to the sender. // // Thus fall through here. } }; - <Self as Store>::PendingUpwardMessages::append(message); + <PendingUpwardMessages<T>>::append(message); Ok(0) } } -impl<T: Config> UpwardMessageSender for Module<T> { +impl<T: Config> UpwardMessageSender for Pallet<T> { fn send_upward_message(message: UpwardMessage) -> Result<u32, MessageSendError> { Self::send_upward_message(message) } } - -impl<T: Config> ProvideInherent for Module<T> { - type Call = Call<T>; - type Error = sp_inherents::MakeFatalError<()>; - const INHERENT_IDENTIFIER: InherentIdentifier = - cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER; - - fn create_inherent(data: &InherentData) -> Option<Self::Call> { - let data: ParachainInherentData = data - .get_data(&Self::INHERENT_IDENTIFIER) - .ok() - .flatten() - .expect("validation function params are always injected into inherent data; qed"); - - Some(Call::set_validation_data(data)) - } - - fn is_inherent(call: &Self::Call) -> bool { - matches!(call, Call::set_validation_data(_)) - } -} - -decl_event! { - pub enum Event<T> where Hash = <T as frame_system::Config>::Hash { - /// The validation function has been scheduled to apply as of the contained relay chain block number. - ValidationFunctionStored(RelayChainBlockNumber), - /// The validation function was applied as of the contained relay chain block number. - ValidationFunctionApplied(RelayChainBlockNumber), - /// An upgrade has been authorized. - UpgradeAuthorized(Hash), - /// Some downward messages have been received and will be processed. - /// \[ count \] - DownwardMessagesReceived(u32), - /// Downward messages were processed using the given weight. - /// \[ weight_used, result_mqc_head \] - DownwardMessagesProcessed(Weight, relay_chain::Hash), - } -} - -decl_error! { - pub enum Error for Module<T: Config> { - /// Attempt to upgrade validation function while existing upgrade pending - OverlappingUpgrades, - /// Polkadot currently prohibits this parachain from upgrading its validation function - ProhibitedByPolkadot, - /// The supplied validation function has compiled into a blob larger than Polkadot is willing to run - TooBig, - /// The inherent which supplies the validation data did not run this block - ValidationDataNotAvailable, - /// The inherent which supplies the host configuration did not run this block - HostConfigurationNotAvailable, - /// No validation function upgrade is currently scheduled. - NotScheduled, - /// No code upgrade has been authorized. - NothingAuthorized, - /// The given code upgrade has not been authorized. - Unauthorized, - } -} - -/// tests for this pallet -#[cfg(test)] -mod tests { - use super::*; - - use codec::Encode; - use cumulus_primitives_core::{ - relay_chain::BlockNumber as RelayBlockNumber, AbridgedHrmpChannel, InboundDownwardMessage, - InboundHrmpMessage, PersistedValidationData, - }; - use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder; - use frame_support::{ - assert_ok, - dispatch::UnfilteredDispatchable, - parameter_types, - traits::{OnFinalize, OnInitialize}, - }; - use frame_system::{InitKind, RawOrigin}; - use hex_literal::hex; - use relay_chain::v1::HrmpChannelId; - use sp_core::H256; - use sp_runtime::{testing::Header, traits::IdentityLookup}; - use sp_version::RuntimeVersion; - use std::cell::RefCell; - - use crate as parachain_system; - - type UncheckedExtrinsic = frame_system::mocking::MockUncheckedExtrinsic<Test>; - type Block = frame_system::mocking::MockBlock<Test>; - - frame_support::construct_runtime!( - pub enum Test where - Block = Block, - NodeBlock = Block, - UncheckedExtrinsic = UncheckedExtrinsic, - { - System: frame_system::{Pallet, Call, Config, Storage, Event<T>}, - ParachainSystem: parachain_system::{Pallet, Call, Storage, Event<T>}, - } - ); - - parameter_types! { - pub const BlockHashCount: u64 = 250; - pub Version: RuntimeVersion = RuntimeVersion { - spec_name: sp_version::create_runtime_str!("test"), - impl_name: sp_version::create_runtime_str!("system-test"), - authoring_version: 1, - spec_version: 1, - impl_version: 1, - apis: sp_version::create_apis_vec!([]), - transaction_version: 1, - }; - pub const ParachainId: ParaId = ParaId::new(200); - pub const ReservedXcmpWeight: Weight = 0; - pub const ReservedDmpWeight: Weight = 0; - } - impl frame_system::Config for Test { - type Origin = Origin; - type Call = Call; - type Index = u64; - type BlockNumber = u64; - type Hash = H256; - type Hashing = BlakeTwo256; - type AccountId = u64; - type Lookup = IdentityLookup<Self::AccountId>; - type Header = Header; - type Event = Event; - type BlockHashCount = BlockHashCount; - type BlockLength = (); - type BlockWeights = (); - type Version = Version; - type PalletInfo = PalletInfo; - type AccountData = (); - type OnNewAccount = (); - type OnKilledAccount = (); - type DbWeight = (); - type BaseCallFilter = (); - type SystemWeightInfo = (); - type SS58Prefix = (); - type OnSetCode = ParachainSetCode<Self>; - } - impl Config for Test { - type Event = Event; - type OnValidationData = (); - type SelfParaId = ParachainId; - type OutboundXcmpMessageSource = FromThreadLocal; - type DmpMessageHandler = SaveIntoThreadLocal; - type ReservedDmpWeight = ReservedDmpWeight; - type XcmpMessageHandler = SaveIntoThreadLocal; - type ReservedXcmpWeight = ReservedXcmpWeight; - } - - pub struct FromThreadLocal; - pub struct SaveIntoThreadLocal; - - std::thread_local! { - static HANDLED_DMP_MESSAGES: RefCell<Vec<(relay_chain::BlockNumber, Vec<u8>)>> = RefCell::new(Vec::new()); - static HANDLED_XCMP_MESSAGES: RefCell<Vec<(ParaId, relay_chain::BlockNumber, Vec<u8>)>> = RefCell::new(Vec::new()); - static SENT_MESSAGES: RefCell<Vec<(ParaId, Vec<u8>)>> = RefCell::new(Vec::new()); - } - - fn send_message(dest: ParaId, message: Vec<u8>) { - SENT_MESSAGES.with(|m| m.borrow_mut().push((dest, message))); - } - - impl XcmpMessageSource for FromThreadLocal { - fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> { - let mut ids = std::collections::BTreeSet::<ParaId>::new(); - let mut taken = 0; - let mut result = Vec::new(); - SENT_MESSAGES.with(|ms| { - ms.borrow_mut().retain(|m| { - let status = <Module<Test> as GetChannelInfo>::get_channel_status(m.0); - let ready = matches!(status, ChannelStatus::Ready(..)); - if ready && !ids.contains(&m.0) && taken < maximum_channels { - ids.insert(m.0); - taken += 1; - result.push(m.clone()); - false - } else { - true - } - }) - }); - result - } - } - - impl DmpMessageHandler for SaveIntoThreadLocal { - fn handle_dmp_messages( - iter: impl Iterator<Item = (RelayBlockNumber, Vec<u8>)>, - _max_weight: Weight, - ) -> Weight { - HANDLED_DMP_MESSAGES.with(|m| { - for i in iter { - m.borrow_mut().push(i); - } - 0 - }) - } - } - - impl XcmpMessageHandler for SaveIntoThreadLocal { - fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>( - iter: I, - _max_weight: Weight, - ) -> Weight { - HANDLED_XCMP_MESSAGES.with(|m| { - for (sender, sent_at, message) in iter { - m.borrow_mut().push((sender, sent_at, message.to_vec())); - } - 0 - }) - } - } - - // This function basically just builds a genesis storage key/value store according to - // our desired mockup. - fn new_test_ext() -> sp_io::TestExternalities { - HANDLED_DMP_MESSAGES.with(|m| m.borrow_mut().clear()); - HANDLED_XCMP_MESSAGES.with(|m| m.borrow_mut().clear()); - - frame_system::GenesisConfig::default() - .build_storage::<Test>() - .unwrap() - .into() - } - - struct CallInWasm(Vec<u8>); - - impl sp_core::traits::CallInWasm for CallInWasm { - fn call_in_wasm( - &self, - _wasm_code: &[u8], - _code_hash: Option<Vec<u8>>, - _method: &str, - _call_data: &[u8], - _ext: &mut dyn sp_externalities::Externalities, - _missing_host_functions: sp_core::traits::MissingHostFunctions, - ) -> Result<Vec<u8>, String> { - Ok(self.0.clone()) - } - } - - fn wasm_ext() -> sp_io::TestExternalities { - let version = RuntimeVersion { - spec_name: "test".into(), - spec_version: 2, - impl_version: 1, - ..Default::default() - }; - let call_in_wasm = CallInWasm(version.encode()); - - let mut ext = new_test_ext(); - ext.register_extension(sp_core::traits::CallInWasmExt::new(call_in_wasm)); - ext - } - - struct BlockTest { - n: <Test as frame_system::Config>::BlockNumber, - within_block: Box<dyn Fn()>, - after_block: Option<Box<dyn Fn()>>, - } - - /// BlockTests exist to test blocks with some setup: we have to assume that - /// `validate_block` will mutate and check storage in certain predictable - /// ways, for example, and we want to always ensure that tests are executed - /// in the context of some particular block number. - #[derive(Default)] - struct BlockTests { - tests: Vec<BlockTest>, - pending_upgrade: Option<RelayChainBlockNumber>, - ran: bool, - relay_sproof_builder_hook: - Option<Box<dyn Fn(&BlockTests, RelayChainBlockNumber, &mut RelayStateSproofBuilder)>>, - persisted_validation_data_hook: - Option<Box<dyn Fn(&BlockTests, RelayChainBlockNumber, &mut PersistedValidationData)>>, - inherent_data_hook: - Option<Box<dyn Fn(&BlockTests, RelayChainBlockNumber, &mut ParachainInherentData)>>, - } - - impl BlockTests { - fn new() -> BlockTests { - Default::default() - } - - fn add_raw(mut self, test: BlockTest) -> Self { - self.tests.push(test); - self - } - - fn add<F>(self, n: <Test as frame_system::Config>::BlockNumber, within_block: F) -> Self - where - F: 'static + Fn(), - { - self.add_raw(BlockTest { - n, - within_block: Box::new(within_block), - after_block: None, - }) - } - - fn add_with_post_test<F1, F2>( - self, - n: <Test as frame_system::Config>::BlockNumber, - within_block: F1, - after_block: F2, - ) -> Self - where - F1: 'static + Fn(), - F2: 'static + Fn(), - { - self.add_raw(BlockTest { - n, - within_block: Box::new(within_block), - after_block: Some(Box::new(after_block)), - }) - } - - fn with_relay_sproof_builder<F>(mut self, f: F) -> Self - where - F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut RelayStateSproofBuilder), - { - self.relay_sproof_builder_hook = Some(Box::new(f)); - self - } - - #[allow(dead_code)] // might come in handy in future. If now is future and it still hasn't - feel free. - fn with_validation_data<F>(mut self, f: F) -> Self - where - F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut PersistedValidationData), - { - self.persisted_validation_data_hook = Some(Box::new(f)); - self - } - - fn with_inherent_data<F>(mut self, f: F) -> Self - where - F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut ParachainInherentData), - { - self.inherent_data_hook = Some(Box::new(f)); - self - } - - fn run(&mut self) { - self.ran = true; - wasm_ext().execute_with(|| { - for BlockTest { - n, - within_block, - after_block, - } in self.tests.iter() - { - // clear pending updates, as applicable - if let Some(upgrade_block) = self.pending_upgrade { - if n >= &upgrade_block.into() { - self.pending_upgrade = None; - } - } - - // begin initialization - System::initialize( - &n, - &Default::default(), - &Default::default(), - InitKind::Full, - ); - - // now mess with the storage the way validate_block does - let mut sproof_builder = RelayStateSproofBuilder::default(); - if let Some(ref hook) = self.relay_sproof_builder_hook { - hook(self, *n as RelayChainBlockNumber, &mut sproof_builder); - } - let (relay_parent_storage_root, relay_chain_state) = - sproof_builder.into_state_root_and_proof(); - let mut vfp = PersistedValidationData { - relay_parent_number: *n as RelayChainBlockNumber, - relay_parent_storage_root, - ..Default::default() - }; - if let Some(ref hook) = self.persisted_validation_data_hook { - hook(self, *n as RelayChainBlockNumber, &mut vfp); - } - - ValidationData::put(&vfp); - storage::unhashed::kill(NEW_VALIDATION_CODE); - - // It is insufficient to push the validation function params - // to storage; they must also be included in the inherent data. - let inherent_data = { - let mut inherent_data = InherentData::default(); - let mut system_inherent_data = ParachainInherentData { - validation_data: vfp.clone(), - relay_chain_state, - downward_messages: Default::default(), - horizontal_messages: Default::default(), - }; - if let Some(ref hook) = self.inherent_data_hook { - hook(self, *n as RelayChainBlockNumber, &mut system_inherent_data); - } - inherent_data - .put_data( - cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER, - &system_inherent_data, - ) - .expect("failed to put VFP inherent"); - inherent_data - }; - - // execute the block - ParachainSystem::on_initialize(*n); - ParachainSystem::create_inherent(&inherent_data) - .expect("got an inherent") - .dispatch_bypass_filter(RawOrigin::None.into()) - .expect("dispatch succeeded"); - within_block(); - ParachainSystem::on_finalize(*n); - - // did block execution set new validation code? - if storage::unhashed::exists(NEW_VALIDATION_CODE) { - if self.pending_upgrade.is_some() { - panic!("attempted to set validation code while upgrade was pending"); - } - } - - // clean up - System::finalize(); - if let Some(after_block) = after_block { - after_block(); - } - } - }); - } - } - - impl Drop for BlockTests { - fn drop(&mut self) { - if !self.ran { - self.run(); - } - } - } - - #[test] - #[should_panic] - fn block_tests_run_on_drop() { - BlockTests::new().add(123, || { - panic!("if this test passes, block tests run properly") - }); - } - - #[test] - fn events() { - BlockTests::new() - .with_relay_sproof_builder(|_, _, builder| { - builder.host_config.validation_upgrade_delay = 1000; - }) - .add_with_post_test( - 123, - || { - assert_ok!(System::set_code(RawOrigin::Root.into(), Default::default())); - }, - || { - let events = System::events(); - assert_eq!( - events[0].event, - Event::parachain_system( - crate::RawEvent::ValidationFunctionStored(1123).into() - ) - ); - }, - ) - .add_with_post_test( - 1234, - || {}, - || { - let events = System::events(); - assert_eq!( - events[0].event, - Event::parachain_system( - crate::RawEvent::ValidationFunctionApplied(1234).into() - ) - ); - }, - ); - } - - #[test] - fn non_overlapping() { - BlockTests::new() - .with_relay_sproof_builder(|_, _, builder| { - builder.host_config.validation_upgrade_delay = 1000; - }) - .add(123, || { - assert_ok!(System::set_code(RawOrigin::Root.into(), Default::default())); - }) - .add(234, || { - assert_eq!( - System::set_code(RawOrigin::Root.into(), Default::default()), - Err(Error::<Test>::OverlappingUpgrades.into()), - ) - }); - } - - #[test] - fn manipulates_storage() { - BlockTests::new() - .add(123, || { - assert!( - !PendingValidationFunction::exists(), - "validation function must not exist yet" - ); - assert_ok!(System::set_code(RawOrigin::Root.into(), Default::default())); - assert!( - PendingValidationFunction::exists(), - "validation function must now exist" - ); - }) - .add_with_post_test( - 1234, - || {}, - || { - assert!( - !PendingValidationFunction::exists(), - "validation function must have been unset" - ); - }, - ); - } - - #[test] - fn checks_size() { - BlockTests::new() - .with_relay_sproof_builder(|_, _, builder| { - builder.host_config.max_code_size = 8; - }) - .add(123, || { - assert_eq!( - System::set_code(RawOrigin::Root.into(), vec![0; 64]), - Err(Error::<Test>::TooBig.into()), - ); - }); - } - - #[test] - fn send_upward_message_num_per_candidate() { - BlockTests::new() - .with_relay_sproof_builder(|_, _, sproof| { - sproof.host_config.max_upward_message_num_per_candidate = 1; - sproof.relay_dispatch_queue_size = None; - }) - .add_with_post_test( - 1, - || { - ParachainSystem::send_upward_message(b"Mr F was here".to_vec()).unwrap(); - ParachainSystem::send_upward_message(b"message 2".to_vec()).unwrap(); - }, - || { - let v: Option<Vec<Vec<u8>>> = - storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); - assert_eq!(v, Some(vec![b"Mr F was here".to_vec()]),); - }, - ) - .add_with_post_test( - 2, - || { /* do nothing within block */ }, - || { - let v: Option<Vec<Vec<u8>>> = - storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); - assert_eq!(v, Some(vec![b"message 2".to_vec()]),); - }, - ); - } - - #[test] - fn send_upward_message_relay_bottleneck() { - BlockTests::new() - .with_relay_sproof_builder(|_, relay_block_num, sproof| { - sproof.host_config.max_upward_message_num_per_candidate = 2; - sproof.host_config.max_upward_queue_count = 5; - - match relay_block_num { - 1 => sproof.relay_dispatch_queue_size = Some((5, 0)), - 2 => sproof.relay_dispatch_queue_size = Some((4, 0)), - _ => unreachable!(), - } - }) - .add_with_post_test( - 1, - || { - ParachainSystem::send_upward_message(vec![0u8; 8]).unwrap(); - }, - || { - // The message won't be sent because there is already one message in queue. - let v: Option<Vec<Vec<u8>>> = - storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); - assert_eq!(v, Some(vec![]),); - }, - ) - .add_with_post_test( - 2, - || { /* do nothing within block */ }, - || { - let v: Option<Vec<Vec<u8>>> = - storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); - assert_eq!(v, Some(vec![vec![0u8; 8]]),); - }, - ); - } - - #[test] - fn send_hrmp_message_buffer_channel_close() { - BlockTests::new() - .with_relay_sproof_builder(|_, relay_block_num, sproof| { - // - // Base case setup - // - sproof.para_id = ParaId::from(200); - sproof.hrmp_egress_channel_index = Some(vec![ParaId::from(300), ParaId::from(400)]); - sproof.hrmp_channels.insert( - HrmpChannelId { - sender: ParaId::from(200), - recipient: ParaId::from(300), - }, - AbridgedHrmpChannel { - max_capacity: 1, - msg_count: 1, // <- 1/1 means the channel is full - max_total_size: 1024, - max_message_size: 8, - total_size: 0, - mqc_head: Default::default(), - }, - ); - sproof.hrmp_channels.insert( - HrmpChannelId { - sender: ParaId::from(200), - recipient: ParaId::from(400), - }, - AbridgedHrmpChannel { - max_capacity: 1, - msg_count: 1, - max_total_size: 1024, - max_message_size: 8, - total_size: 0, - mqc_head: Default::default(), - }, - ); - - // - // Adjustment according to block - // - match relay_block_num { - 1 => {} - 2 => {} - 3 => { - // The channel 200->400 ceases to exist at the relay chain block 3 - sproof - .hrmp_egress_channel_index - .as_mut() - .unwrap() - .retain(|n| n != &ParaId::from(400)); - sproof.hrmp_channels.remove(&HrmpChannelId { - sender: ParaId::from(200), - recipient: ParaId::from(400), - }); - - // We also free up space for a message in the 200->300 channel. - sproof - .hrmp_channels - .get_mut(&HrmpChannelId { - sender: ParaId::from(200), - recipient: ParaId::from(300), - }) - .unwrap() - .msg_count = 0; - } - _ => unreachable!(), - } - }) - .add_with_post_test( - 1, - || { - send_message(ParaId::from(300), b"1".to_vec()); - send_message(ParaId::from(400), b"2".to_vec()); - }, - || {}, - ) - .add_with_post_test( - 2, - || {}, - || { - // both channels are at capacity so we do not expect any messages. - let v: Option<Vec<OutboundHrmpMessage>> = - storage::unhashed::get(well_known_keys::HRMP_OUTBOUND_MESSAGES); - assert_eq!(v, Some(vec![])); - }, - ) - .add_with_post_test( - 3, - || {}, - || { - let v: Option<Vec<OutboundHrmpMessage>> = - storage::unhashed::get(well_known_keys::HRMP_OUTBOUND_MESSAGES); - assert_eq!( - v, - Some(vec![OutboundHrmpMessage { - recipient: ParaId::from(300), - data: b"1".to_vec(), - }]) - ); - }, - ); - } - - #[test] - fn message_queue_chain() { - assert_eq!(MessageQueueChain::default().head(), H256::zero()); - - // Note that the resulting hashes are the same for HRMP and DMP. That's because even though - // the types are nominally different, they have the same structure and computation of the - // new head doesn't differ. - // - // These cases are taken from https://github.com/paritytech/polkadot/pull/2351 - assert_eq!( - MessageQueueChain::default() - .extend_downward(&InboundDownwardMessage { - sent_at: 2, - msg: vec![1, 2, 3], - }) - .extend_downward(&InboundDownwardMessage { - sent_at: 3, - msg: vec![4, 5, 6], - }) - .head(), - hex!["88dc00db8cc9d22aa62b87807705831f164387dfa49f80a8600ed1cbe1704b6b"].into(), - ); - assert_eq!( - MessageQueueChain::default() - .extend_hrmp(&InboundHrmpMessage { - sent_at: 2, - data: vec![1, 2, 3], - }) - .extend_hrmp(&InboundHrmpMessage { - sent_at: 3, - data: vec![4, 5, 6], - }) - .head(), - hex!["88dc00db8cc9d22aa62b87807705831f164387dfa49f80a8600ed1cbe1704b6b"].into(), - ); - } - - #[test] - fn receive_dmp() { - lazy_static::lazy_static! { - static ref MSG: InboundDownwardMessage = InboundDownwardMessage { - sent_at: 1, - msg: b"down".to_vec(), - }; - } - - BlockTests::new() - .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { - 1 => { - sproof.dmq_mqc_head = - Some(MessageQueueChain::default().extend_downward(&MSG).head()); - } - _ => unreachable!(), - }) - .with_inherent_data(|_, relay_block_num, data| match relay_block_num { - 1 => { - data.downward_messages.push(MSG.clone()); - } - _ => unreachable!(), - }) - .add(1, || { - HANDLED_DMP_MESSAGES.with(|m| { - let mut m = m.borrow_mut(); - assert_eq!(&*m, &[(MSG.sent_at, MSG.msg.clone())]); - m.clear(); - }); - }); - } - - #[test] - fn receive_hrmp() { - lazy_static::lazy_static! { - static ref MSG_1: InboundHrmpMessage = InboundHrmpMessage { - sent_at: 1, - data: b"1".to_vec(), - }; - - static ref MSG_2: InboundHrmpMessage = InboundHrmpMessage { - sent_at: 1, - data: b"2".to_vec(), - }; - - static ref MSG_3: InboundHrmpMessage = InboundHrmpMessage { - sent_at: 2, - data: b"3".to_vec(), - }; - - static ref MSG_4: InboundHrmpMessage = InboundHrmpMessage { - sent_at: 2, - data: b"4".to_vec(), - }; - } - - BlockTests::new() - .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { - 1 => { - // 200 - doesn't exist yet - // 300 - one new message - sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head = - Some(MessageQueueChain::default().extend_hrmp(&MSG_1).head()); - } - 2 => { - // 200 - two new messages - // 300 - now present with one message. - sproof.upsert_inbound_channel(ParaId::from(200)).mqc_head = - Some(MessageQueueChain::default().extend_hrmp(&MSG_4).head()); - sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head = Some( - MessageQueueChain::default() - .extend_hrmp(&MSG_1) - .extend_hrmp(&MSG_2) - .extend_hrmp(&MSG_3) - .head(), - ); - } - 3 => { - // 200 - no new messages - // 300 - is gone - sproof.upsert_inbound_channel(ParaId::from(200)).mqc_head = - Some(MessageQueueChain::default().extend_hrmp(&MSG_4).head()); - } - _ => unreachable!(), - }) - .with_inherent_data(|_, relay_block_num, data| match relay_block_num { - 1 => { - data.horizontal_messages - .insert(ParaId::from(300), vec![MSG_1.clone()]); - } - 2 => { - data.horizontal_messages.insert( - ParaId::from(300), - vec![ - // can't be sent at the block 1 actually. However, we cheat here - // because we want to test the case where there are multiple messages - // but the harness at the moment doesn't support block skipping. - MSG_2.clone(), - MSG_3.clone(), - ], - ); - data.horizontal_messages - .insert(ParaId::from(200), vec![MSG_4.clone()]); - } - 3 => {} - _ => unreachable!(), - }) - .add(1, || { - HANDLED_XCMP_MESSAGES.with(|m| { - let mut m = m.borrow_mut(); - assert_eq!(&*m, &[(ParaId::from(300), 1, b"1".to_vec())]); - m.clear(); - }); - }) - .add(2, || { - HANDLED_XCMP_MESSAGES.with(|m| { - let mut m = m.borrow_mut(); - assert_eq!( - &*m, - &[ - (ParaId::from(300), 1, b"2".to_vec()), - (ParaId::from(200), 2, b"4".to_vec()), - (ParaId::from(300), 2, b"3".to_vec()), - ] - ); - m.clear(); - }); - }) - .add(3, || {}); - } - - #[test] - fn receive_hrmp_empty_channel() { - BlockTests::new() - .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { - 1 => { - // no channels - } - 2 => { - // one new channel - sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head = - Some(MessageQueueChain::default().head()); - } - _ => unreachable!(), - }) - .add(1, || {}) - .add(2, || {}); - } - - #[test] - fn receive_hrmp_after_pause() { - lazy_static::lazy_static! { - static ref MSG_1: InboundHrmpMessage = InboundHrmpMessage { - sent_at: 1, - data: b"mikhailinvanovich".to_vec(), - }; - - static ref MSG_2: InboundHrmpMessage = InboundHrmpMessage { - sent_at: 3, - data: b"1000000000".to_vec(), - }; - } - - const ALICE: ParaId = ParaId::new(300); - - BlockTests::new() - .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { - 1 => { - sproof.upsert_inbound_channel(ALICE).mqc_head = - Some(MessageQueueChain::default().extend_hrmp(&MSG_1).head()); - } - 2 => { - // 300 - no new messages, mqc stayed the same. - sproof.upsert_inbound_channel(ALICE).mqc_head = - Some(MessageQueueChain::default().extend_hrmp(&MSG_1).head()); - } - 3 => { - // 300 - new message. - sproof.upsert_inbound_channel(ALICE).mqc_head = Some( - MessageQueueChain::default() - .extend_hrmp(&MSG_1) - .extend_hrmp(&MSG_2) - .head(), - ); - } - _ => unreachable!(), - }) - .with_inherent_data(|_, relay_block_num, data| match relay_block_num { - 1 => { - data.horizontal_messages.insert(ALICE, vec![MSG_1.clone()]); - } - 2 => { - // no new messages - } - 3 => { - data.horizontal_messages.insert(ALICE, vec![MSG_2.clone()]); - } - _ => unreachable!(), - }) - .add(1, || { - HANDLED_XCMP_MESSAGES.with(|m| { - let mut m = m.borrow_mut(); - assert_eq!(&*m, &[(ALICE, 1, b"mikhailinvanovich".to_vec())]); - m.clear(); - }); - }) - .add(2, || {}) - .add(3, || { - HANDLED_XCMP_MESSAGES.with(|m| { - let mut m = m.borrow_mut(); - assert_eq!(&*m, &[(ALICE, 3, b"1000000000".to_vec())]); - m.clear(); - }); - }); - } -} diff --git a/cumulus/pallets/parachain-system/src/tests.rs b/cumulus/pallets/parachain-system/src/tests.rs new file mode 100755 index 00000000000..57c420f74c5 --- /dev/null +++ b/cumulus/pallets/parachain-system/src/tests.rs @@ -0,0 +1,935 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see <http://www.gnu.org/licenses/>. +use super::*; + +use codec::Encode; +use cumulus_primitives_core::{ + AbridgedHrmpChannel, InboundDownwardMessage, InboundHrmpMessage, PersistedValidationData, + relay_chain::BlockNumber as RelayBlockNumber, +}; +use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder; +use frame_support::{ + assert_ok, + dispatch::UnfilteredDispatchable, + parameter_types, + storage, + traits::{OnFinalize, OnInitialize}, + weights::Weight, + inherent::{InherentData, ProvideInherent}, +}; +use frame_system::{InitKind, RawOrigin}; +use hex_literal::hex; +use relay_chain::v1::HrmpChannelId; +use sp_core::H256; +use sp_runtime::{testing::Header, traits::IdentityLookup}; +use sp_version::RuntimeVersion; +use std::cell::RefCell; + +use crate as parachain_system; + +type UncheckedExtrinsic = frame_system::mocking::MockUncheckedExtrinsic<Test>; +type Block = frame_system::mocking::MockBlock<Test>; + +frame_support::construct_runtime!( + pub enum Test where + Block = Block, + NodeBlock = Block, + UncheckedExtrinsic = UncheckedExtrinsic, + { + System: frame_system::{Pallet, Call, Config, Storage, Event<T>}, + ParachainSystem: parachain_system::{Pallet, Call, Storage, Event<T>}, + } +); + +parameter_types! { + pub const BlockHashCount: u64 = 250; + pub Version: RuntimeVersion = RuntimeVersion { + spec_name: sp_version::create_runtime_str!("test"), + impl_name: sp_version::create_runtime_str!("system-test"), + authoring_version: 1, + spec_version: 1, + impl_version: 1, + apis: sp_version::create_apis_vec!([]), + transaction_version: 1, + }; + pub const ParachainId: ParaId = ParaId::new(200); + pub const ReservedXcmpWeight: Weight = 0; + pub const ReservedDmpWeight: Weight = 0; +} +impl frame_system::Config for Test { + type Origin = Origin; + type Call = Call; + type Index = u64; + type BlockNumber = u64; + type Hash = H256; + type Hashing = BlakeTwo256; + type AccountId = u64; + type Lookup = IdentityLookup<Self::AccountId>; + type Header = Header; + type Event = Event; + type BlockHashCount = BlockHashCount; + type BlockLength = (); + type BlockWeights = (); + type Version = Version; + type PalletInfo = PalletInfo; + type AccountData = (); + type OnNewAccount = (); + type OnKilledAccount = (); + type DbWeight = (); + type BaseCallFilter = (); + type SystemWeightInfo = (); + type SS58Prefix = (); + type OnSetCode = ParachainSetCode<Self>; +} +impl Config for Test { + type Event = Event; + type OnValidationData = (); + type SelfParaId = ParachainId; + type OutboundXcmpMessageSource = FromThreadLocal; + type DmpMessageHandler = SaveIntoThreadLocal; + type ReservedDmpWeight = ReservedDmpWeight; + type XcmpMessageHandler = SaveIntoThreadLocal; + type ReservedXcmpWeight = ReservedXcmpWeight; +} + +pub struct FromThreadLocal; +pub struct SaveIntoThreadLocal; + +std::thread_local! { + static HANDLED_DMP_MESSAGES: RefCell<Vec<(relay_chain::BlockNumber, Vec<u8>)>> = RefCell::new(Vec::new()); + static HANDLED_XCMP_MESSAGES: RefCell<Vec<(ParaId, relay_chain::BlockNumber, Vec<u8>)>> = RefCell::new(Vec::new()); + static SENT_MESSAGES: RefCell<Vec<(ParaId, Vec<u8>)>> = RefCell::new(Vec::new()); +} + +fn send_message( + dest: ParaId, + message: Vec<u8>, +) { + SENT_MESSAGES.with(|m| m.borrow_mut().push((dest, message))); +} + +impl XcmpMessageSource for FromThreadLocal { + fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> { + let mut ids = std::collections::BTreeSet::<ParaId>::new(); + let mut taken = 0; + let mut result = Vec::new(); + SENT_MESSAGES.with(|ms| ms.borrow_mut() + .retain(|m| { + let status = <Pallet::<Test> as GetChannelInfo>::get_channel_status(m.0); + let ready = matches!(status, ChannelStatus::Ready(..)); + if ready && !ids.contains(&m.0) && taken < maximum_channels { + ids.insert(m.0); + taken += 1; + result.push(m.clone()); + false + } else { + true + } + }) + ); + result + } +} + +impl DmpMessageHandler for SaveIntoThreadLocal { + fn handle_dmp_messages( + iter: impl Iterator<Item=(RelayBlockNumber, Vec<u8>)>, + _max_weight: Weight, + ) -> Weight { + HANDLED_DMP_MESSAGES.with(|m| { + for i in iter { + m.borrow_mut().push(i); + } + 0 + }) + } +} + +impl XcmpMessageHandler for SaveIntoThreadLocal { + fn handle_xcmp_messages<'a, I: Iterator<Item=(ParaId, RelayBlockNumber, &'a [u8])>>( + iter: I, + _max_weight: Weight, + ) -> Weight { + HANDLED_XCMP_MESSAGES.with(|m| { + for (sender, sent_at, message) in iter { + m.borrow_mut().push((sender, sent_at, message.to_vec())); + } + 0 + }) + } +} + +// This function basically just builds a genesis storage key/value store according to +// our desired mockup. +fn new_test_ext() -> sp_io::TestExternalities { + HANDLED_DMP_MESSAGES.with(|m| m.borrow_mut().clear()); + HANDLED_XCMP_MESSAGES.with(|m| m.borrow_mut().clear()); + + frame_system::GenesisConfig::default() + .build_storage::<Test>() + .unwrap() + .into() +} + +struct CallInWasm(Vec<u8>); + +impl sp_core::traits::CallInWasm for CallInWasm { + fn call_in_wasm( + &self, + _wasm_code: &[u8], + _code_hash: Option<Vec<u8>>, + _method: &str, + _call_data: &[u8], + _ext: &mut dyn sp_externalities::Externalities, + _missing_host_functions: sp_core::traits::MissingHostFunctions, + ) -> Result<Vec<u8>, String> { + Ok(self.0.clone()) + } +} + +fn wasm_ext() -> sp_io::TestExternalities { + let version = RuntimeVersion { + spec_name: "test".into(), + spec_version: 2, + impl_version: 1, + ..Default::default() + }; + let call_in_wasm = CallInWasm(version.encode()); + + let mut ext = new_test_ext(); + ext.register_extension(sp_core::traits::CallInWasmExt::new(call_in_wasm)); + ext +} + +struct BlockTest { + n: <Test as frame_system::Config>::BlockNumber, + within_block: Box<dyn Fn()>, + after_block: Option<Box<dyn Fn()>>, +} + +/// BlockTests exist to test blocks with some setup: we have to assume that +/// `validate_block` will mutate and check storage in certain predictable +/// ways, for example, and we want to always ensure that tests are executed +/// in the context of some particular block number. +#[derive(Default)] +struct BlockTests { + tests: Vec<BlockTest>, + pending_upgrade: Option<RelayChainBlockNumber>, + ran: bool, + relay_sproof_builder_hook: + Option<Box<dyn Fn(&BlockTests, RelayChainBlockNumber, &mut RelayStateSproofBuilder)>>, + persisted_validation_data_hook: + Option<Box<dyn Fn(&BlockTests, RelayChainBlockNumber, &mut PersistedValidationData)>>, + inherent_data_hook: + Option<Box<dyn Fn(&BlockTests, RelayChainBlockNumber, &mut ParachainInherentData)>>, +} + +impl BlockTests { + fn new() -> BlockTests { + Default::default() + } + + fn add_raw(mut self, test: BlockTest) -> Self { + self.tests.push(test); + self + } + + fn add<F>(self, n: <Test as frame_system::Config>::BlockNumber, within_block: F) -> Self + where + F: 'static + Fn(), + { + self.add_raw(BlockTest { + n, + within_block: Box::new(within_block), + after_block: None, + }) + } + + fn add_with_post_test<F1, F2>( + self, + n: <Test as frame_system::Config>::BlockNumber, + within_block: F1, + after_block: F2, + ) -> Self + where + F1: 'static + Fn(), + F2: 'static + Fn(), + { + self.add_raw(BlockTest { + n, + within_block: Box::new(within_block), + after_block: Some(Box::new(after_block)), + }) + } + + fn with_relay_sproof_builder<F>(mut self, f: F) -> Self + where + F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut RelayStateSproofBuilder), + { + self.relay_sproof_builder_hook = Some(Box::new(f)); + self + } + + #[allow(dead_code)] // might come in handy in future. If now is future and it still hasn't - feel free. + fn with_validation_data<F>(mut self, f: F) -> Self + where + F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut PersistedValidationData), + { + self.persisted_validation_data_hook = Some(Box::new(f)); + self + } + + fn with_inherent_data<F>(mut self, f: F) -> Self + where + F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut ParachainInherentData), + { + self.inherent_data_hook = Some(Box::new(f)); + self + } + + fn run(&mut self) { + self.ran = true; + wasm_ext().execute_with(|| { + for BlockTest { + n, + within_block, + after_block, + } in self.tests.iter() + { + // clear pending updates, as applicable + if let Some(upgrade_block) = self.pending_upgrade { + if n >= &upgrade_block.into() { + self.pending_upgrade = None; + } + } + + // begin initialization + System::initialize( + &n, + &Default::default(), + &Default::default(), + InitKind::Full, + ); + + // now mess with the storage the way validate_block does + let mut sproof_builder = RelayStateSproofBuilder::default(); + if let Some(ref hook) = self.relay_sproof_builder_hook { + hook(self, *n as RelayChainBlockNumber, &mut sproof_builder); + } + let (relay_parent_storage_root, relay_chain_state) = + sproof_builder.into_state_root_and_proof(); + let mut vfp = PersistedValidationData { + relay_parent_number: *n as RelayChainBlockNumber, + relay_parent_storage_root, + ..Default::default() + }; + if let Some(ref hook) = self.persisted_validation_data_hook { + hook(self, *n as RelayChainBlockNumber, &mut vfp); + } + + <ValidationData<Test>>::put(&vfp); + storage::unhashed::kill(NEW_VALIDATION_CODE); + + // It is insufficient to push the validation function params + // to storage; they must also be included in the inherent data. + let inherent_data = { + let mut inherent_data = InherentData::default(); + let mut system_inherent_data = ParachainInherentData { + validation_data: vfp.clone(), + relay_chain_state, + downward_messages: Default::default(), + horizontal_messages: Default::default(), + }; + if let Some(ref hook) = self.inherent_data_hook { + hook(self, *n as RelayChainBlockNumber, &mut system_inherent_data); + } + inherent_data + .put_data( + cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER, + &system_inherent_data, + ) + .expect("failed to put VFP inherent"); + inherent_data + }; + + // execute the block + ParachainSystem::on_initialize(*n); + ParachainSystem::create_inherent(&inherent_data) + .expect("got an inherent") + .dispatch_bypass_filter(RawOrigin::None.into()) + .expect("dispatch succeeded"); + within_block(); + ParachainSystem::on_finalize(*n); + + // did block execution set new validation code? + if storage::unhashed::exists(NEW_VALIDATION_CODE) { + if self.pending_upgrade.is_some() { + panic!("attempted to set validation code while upgrade was pending"); + } + } + + // clean up + System::finalize(); + if let Some(after_block) = after_block { + after_block(); + } + } + }); + } +} + +impl Drop for BlockTests { + fn drop(&mut self) { + if !self.ran { + self.run(); + } + } +} + +#[test] +#[should_panic] +fn block_tests_run_on_drop() { + BlockTests::new().add(123, || { + panic!("if this test passes, block tests run properly") + }); +} + +#[test] +fn events() { + BlockTests::new() + .with_relay_sproof_builder(|_, _, builder| { + builder.host_config.validation_upgrade_delay = 1000; + }) + .add_with_post_test( + 123, + || { + assert_ok!(System::set_code( + RawOrigin::Root.into(), + Default::default() + )); + }, + || { + let events = System::events(); + assert_eq!( + events[0].event, + Event::parachain_system(crate::Event::ValidationFunctionStored(1123).into()) + ); + }, + ) + .add_with_post_test( + 1234, + || {}, + || { + let events = System::events(); + assert_eq!( + events[0].event, + Event::parachain_system(crate::Event::ValidationFunctionApplied(1234).into()) + ); + }, + ); +} + +#[test] +fn non_overlapping() { + BlockTests::new() + .with_relay_sproof_builder(|_, _, builder| { + builder.host_config.validation_upgrade_delay = 1000; + }) + .add(123, || { + assert_ok!(System::set_code( + RawOrigin::Root.into(), + Default::default() + )); + }) + .add(234, || { + assert_eq!( + System::set_code(RawOrigin::Root.into(), Default::default()), + Err(Error::<Test>::OverlappingUpgrades.into()), + ) + }); +} + +#[test] +fn manipulates_storage() { + BlockTests::new() + .add(123, || { + assert!( + !<PendingValidationFunction<Test>>::exists(), + "validation function must not exist yet" + ); + assert_ok!(System::set_code( + RawOrigin::Root.into(), + Default::default() + )); + assert!( + <PendingValidationFunction<Test>>::exists(), + "validation function must now exist" + ); + }) + .add_with_post_test( + 1234, + || {}, + || { + assert!( + !<PendingValidationFunction<Test>>::exists(), + "validation function must have been unset" + ); + }, + ); +} + +#[test] +fn checks_size() { + BlockTests::new() + .with_relay_sproof_builder(|_, _, builder| { + builder.host_config.max_code_size = 8; + }) + .add(123, || { + assert_eq!( + System::set_code(RawOrigin::Root.into(), vec![0; 64]), + Err(Error::<Test>::TooBig.into()), + ); + }); +} + +#[test] +fn send_upward_message_num_per_candidate() { + BlockTests::new() + .with_relay_sproof_builder(|_, _, sproof| { + sproof.host_config.max_upward_message_num_per_candidate = 1; + sproof.relay_dispatch_queue_size = None; + }) + .add_with_post_test( + 1, + || { + ParachainSystem::send_upward_message(b"Mr F was here".to_vec()).unwrap(); + ParachainSystem::send_upward_message(b"message 2".to_vec()).unwrap(); + }, + || { + let v: Option<Vec<Vec<u8>>> = + storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); + assert_eq!(v, Some(vec![b"Mr F was here".to_vec()]),); + }, + ) + .add_with_post_test( + 2, + || { /* do nothing within block */ }, + || { + let v: Option<Vec<Vec<u8>>> = + storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); + assert_eq!(v, Some(vec![b"message 2".to_vec()]),); + }, + ); +} + +#[test] +fn send_upward_message_relay_bottleneck() { + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| { + sproof.host_config.max_upward_message_num_per_candidate = 2; + sproof.host_config.max_upward_queue_count = 5; + + match relay_block_num { + 1 => sproof.relay_dispatch_queue_size = Some((5, 0)), + 2 => sproof.relay_dispatch_queue_size = Some((4, 0)), + _ => unreachable!(), + } + }) + .add_with_post_test( + 1, + || { + ParachainSystem::send_upward_message(vec![0u8; 8]).unwrap(); + }, + || { + // The message won't be sent because there is already one message in queue. + let v: Option<Vec<Vec<u8>>> = + storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); + assert_eq!(v, Some(vec![]),); + }, + ) + .add_with_post_test( + 2, + || { /* do nothing within block */ }, + || { + let v: Option<Vec<Vec<u8>>> = + storage::unhashed::get(well_known_keys::UPWARD_MESSAGES); + assert_eq!(v, Some(vec![vec![0u8; 8]]),); + }, + ); +} + +#[test] +fn send_hrmp_message_buffer_channel_close() { + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| { + // + // Base case setup + // + sproof.para_id = ParaId::from(200); + sproof.hrmp_egress_channel_index = Some(vec![ParaId::from(300), ParaId::from(400)]); + sproof.hrmp_channels.insert( + HrmpChannelId { + sender: ParaId::from(200), + recipient: ParaId::from(300), + }, + AbridgedHrmpChannel { + max_capacity: 1, + msg_count: 1, // <- 1/1 means the channel is full + max_total_size: 1024, + max_message_size: 8, + total_size: 0, + mqc_head: Default::default(), + }, + ); + sproof.hrmp_channels.insert( + HrmpChannelId { + sender: ParaId::from(200), + recipient: ParaId::from(400), + }, + AbridgedHrmpChannel { + max_capacity: 1, + msg_count: 1, + max_total_size: 1024, + max_message_size: 8, + total_size: 0, + mqc_head: Default::default(), + }, + ); + + // + // Adjustment according to block + // + match relay_block_num { + 1 => {} + 2 => {} + 3 => { + // The channel 200->400 ceases to exist at the relay chain block 3 + sproof + .hrmp_egress_channel_index + .as_mut() + .unwrap() + .retain(|n| n != &ParaId::from(400)); + sproof.hrmp_channels.remove(&HrmpChannelId { + sender: ParaId::from(200), + recipient: ParaId::from(400), + }); + + // We also free up space for a message in the 200->300 channel. + sproof + .hrmp_channels + .get_mut(&HrmpChannelId { + sender: ParaId::from(200), + recipient: ParaId::from(300), + }) + .unwrap() + .msg_count = 0; + } + _ => unreachable!(), + } + }) + .add_with_post_test( + 1, + || { + send_message( + ParaId::from(300), + b"1".to_vec(), + ); + send_message( + ParaId::from(400), + b"2".to_vec(), + ); + }, + || {}, + ) + .add_with_post_test( + 2, + || {}, + || { + // both channels are at capacity so we do not expect any messages. + let v: Option<Vec<OutboundHrmpMessage>> = + storage::unhashed::get(well_known_keys::HRMP_OUTBOUND_MESSAGES); + assert_eq!(v, Some(vec![])); + }, + ) + .add_with_post_test( + 3, + || {}, + || { + let v: Option<Vec<OutboundHrmpMessage>> = + storage::unhashed::get(well_known_keys::HRMP_OUTBOUND_MESSAGES); + assert_eq!( + v, + Some(vec![OutboundHrmpMessage { + recipient: ParaId::from(300), + data: b"1".to_vec(), + }]) + ); + }, + ); +} + +#[test] +fn message_queue_chain() { + assert_eq!(MessageQueueChain::default().head(), H256::zero()); + + // Note that the resulting hashes are the same for HRMP and DMP. That's because even though + // the types are nominally different, they have the same structure and computation of the + // new head doesn't differ. + // + // These cases are taken from https://github.com/paritytech/polkadot/pull/2351 + assert_eq!( + MessageQueueChain::default() + .extend_downward(&InboundDownwardMessage { + sent_at: 2, + msg: vec![1, 2, 3], + }) + .extend_downward(&InboundDownwardMessage { + sent_at: 3, + msg: vec![4, 5, 6], + }) + .head(), + hex!["88dc00db8cc9d22aa62b87807705831f164387dfa49f80a8600ed1cbe1704b6b"].into(), + ); + assert_eq!( + MessageQueueChain::default() + .extend_hrmp(&InboundHrmpMessage { + sent_at: 2, + data: vec![1, 2, 3], + }) + .extend_hrmp(&InboundHrmpMessage { + sent_at: 3, + data: vec![4, 5, 6], + }) + .head(), + hex!["88dc00db8cc9d22aa62b87807705831f164387dfa49f80a8600ed1cbe1704b6b"].into(), + ); +} + +#[test] +fn receive_dmp() { + lazy_static::lazy_static! { + static ref MSG: InboundDownwardMessage = InboundDownwardMessage { + sent_at: 1, + msg: b"down".to_vec(), + }; + } + + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { + 1 => { + sproof.dmq_mqc_head = + Some(MessageQueueChain::default().extend_downward(&MSG).head()); + } + _ => unreachable!(), + }) + .with_inherent_data(|_, relay_block_num, data| match relay_block_num { + 1 => { + data.downward_messages.push(MSG.clone()); + } + _ => unreachable!(), + }) + .add(1, || { + HANDLED_DMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!(&*m, &[(MSG.sent_at, MSG.msg.clone())]); + m.clear(); + }); + }); +} + +#[test] +fn receive_hrmp() { + lazy_static::lazy_static! { + static ref MSG_1: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 1, + data: b"1".to_vec(), + }; + + static ref MSG_2: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 1, + data: b"2".to_vec(), + }; + + static ref MSG_3: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 2, + data: b"3".to_vec(), + }; + + static ref MSG_4: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 2, + data: b"4".to_vec(), + }; + } + + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { + 1 => { + // 200 - doesn't exist yet + // 300 - one new message + sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head = + Some(MessageQueueChain::default().extend_hrmp(&MSG_1).head()); + } + 2 => { + // 200 - two new messages + // 300 - now present with one message. + sproof.upsert_inbound_channel(ParaId::from(200)).mqc_head = + Some(MessageQueueChain::default().extend_hrmp(&MSG_4).head()); + sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head = Some( + MessageQueueChain::default() + .extend_hrmp(&MSG_1) + .extend_hrmp(&MSG_2) + .extend_hrmp(&MSG_3) + .head(), + ); + } + 3 => { + // 200 - no new messages + // 300 - is gone + sproof.upsert_inbound_channel(ParaId::from(200)).mqc_head = + Some(MessageQueueChain::default().extend_hrmp(&MSG_4).head()); + } + _ => unreachable!(), + }) + .with_inherent_data(|_, relay_block_num, data| match relay_block_num { + 1 => { + data.horizontal_messages + .insert(ParaId::from(300), vec![MSG_1.clone()]); + } + 2 => { + data.horizontal_messages.insert( + ParaId::from(300), + vec![ + // can't be sent at the block 1 actually. However, we cheat here + // because we want to test the case where there are multiple messages + // but the harness at the moment doesn't support block skipping. + MSG_2.clone(), + MSG_3.clone(), + ], + ); + data.horizontal_messages + .insert(ParaId::from(200), vec![MSG_4.clone()]); + } + 3 => {} + _ => unreachable!(), + }) + .add(1, || { + HANDLED_XCMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!(&*m, &[(ParaId::from(300), 1, b"1".to_vec())]); + m.clear(); + }); + }) + .add(2, || { + HANDLED_XCMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!( + &*m, + &[ + (ParaId::from(300), 1, b"2".to_vec()), + (ParaId::from(200), 2, b"4".to_vec()), + (ParaId::from(300), 2, b"3".to_vec()), + ] + ); + m.clear(); + }); + }) + .add(3, || {}); +} + +#[test] +fn receive_hrmp_empty_channel() { + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { + 1 => { + // no channels + } + 2 => { + // one new channel + sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head = + Some(MessageQueueChain::default().head()); + } + _ => unreachable!(), + }) + .add(1, || {}) + .add(2, || {}); +} + +#[test] +fn receive_hrmp_after_pause() { + lazy_static::lazy_static! { + static ref MSG_1: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 1, + data: b"mikhailinvanovich".to_vec(), + }; + + static ref MSG_2: InboundHrmpMessage = InboundHrmpMessage { + sent_at: 3, + data: b"1000000000".to_vec(), + }; + } + + const ALICE: ParaId = ParaId::new(300); + + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { + 1 => { + sproof.upsert_inbound_channel(ALICE).mqc_head = + Some(MessageQueueChain::default().extend_hrmp(&MSG_1).head()); + } + 2 => { + // 300 - no new messages, mqc stayed the same. + sproof.upsert_inbound_channel(ALICE).mqc_head = + Some(MessageQueueChain::default().extend_hrmp(&MSG_1).head()); + } + 3 => { + // 300 - new message. + sproof.upsert_inbound_channel(ALICE).mqc_head = Some( + MessageQueueChain::default() + .extend_hrmp(&MSG_1) + .extend_hrmp(&MSG_2) + .head(), + ); + } + _ => unreachable!(), + }) + .with_inherent_data(|_, relay_block_num, data| match relay_block_num { + 1 => { + data.horizontal_messages.insert(ALICE, vec![MSG_1.clone()]); + } + 2 => { + // no new messages + } + 3 => { + data.horizontal_messages.insert(ALICE, vec![MSG_2.clone()]); + } + _ => unreachable!(), + }) + .add(1, || { + HANDLED_XCMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!(&*m, &[(ALICE, 1, b"mikhailinvanovich".to_vec())]); + m.clear(); + }); + }) + .add(2, || {}) + .add(3, || { + HANDLED_XCMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!(&*m, &[(ALICE, 3, b"1000000000".to_vec())]); + m.clear(); + }); + }); +} diff --git a/cumulus/pallets/xcmp-queue/src/lib.rs b/cumulus/pallets/xcmp-queue/src/lib.rs index 82b58e79ed4..bd27542c6af 100644 --- a/cumulus/pallets/xcmp-queue/src/lib.rs +++ b/cumulus/pallets/xcmp-queue/src/lib.rs @@ -25,29 +25,145 @@ #![cfg_attr(not(feature = "std"), no_std)] -use sp_std::{prelude::*, convert::TryFrom}; -use rand_chacha::{rand_core::{RngCore, SeedableRng}, ChaChaRng}; use codec::{Decode, Encode}; -use sp_runtime::{RuntimeDebug, traits::Hash}; -use frame_support::{decl_error, decl_event, decl_module, decl_storage, dispatch::Weight}; -use xcm::{ - VersionedXcm, v0::{ - Error as XcmError, ExecuteXcm, Junction, MultiLocation, SendXcm, Outcome, Xcm, - }, -}; use cumulus_primitives_core::{ - XcmpMessageHandler, ParaId, XcmpMessageSource, ChannelStatus, MessageSendError, GetChannelInfo, - relay_chain::BlockNumber as RelayBlockNumber, + relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError, + ParaId, XcmpMessageHandler, XcmpMessageSource, +}; +use frame_support::weights::Weight; +use rand_chacha::{ + rand_core::{RngCore, SeedableRng}, + ChaChaRng, }; +use sp_runtime::{traits::Hash, RuntimeDebug}; +use sp_std::{convert::TryFrom, prelude::*}; +use xcm::{ + v0::{Error as XcmError, ExecuteXcm, Junction, MultiLocation, Outcome, SendXcm, Xcm}, + VersionedXcm, +}; + +pub use pallet::*; + +#[frame_support::pallet] +pub mod pallet { + use super::*; + use frame_support::pallet_prelude::*; + use frame_system::pallet_prelude::*; + + #[pallet::pallet] + #[pallet::generate_store(pub(super) trait Store)] + pub struct Pallet<T>(_); + + #[pallet::config] + pub trait Config: frame_system::Config { + type Event: From<Event<Self>> + IsType<<Self as frame_system::Config>::Event>; + + /// Something to execute an XCM message. We need this to service the XCMoXCMP queue. + type XcmExecutor: ExecuteXcm<Self::Call>; + + /// Information on the avaialble XCMP channels. + type ChannelInfo: GetChannelInfo; + } -pub trait Config: frame_system::Config { - type Event: From<Event<Self>> + Into<<Self as frame_system::Config>::Event>; + impl Default for QueueConfigData { + fn default() -> Self { + Self { + suspend_threshold: 2, + drop_threshold: 5, + resume_threshold: 1, + threshold_weight: 100_000, + weight_restrict_decay: 2, + } + } + } + + #[pallet::hooks] + impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> { + fn on_idle(_now: T::BlockNumber, max_weight: Weight) -> Weight { + // on_idle processes additional messages with any remaining block weight. + Self::service_xcmp_queue(max_weight) + } + } - /// Something to execute an XCM message. We need this to service the XCMoXCMP queue. - type XcmExecutor: ExecuteXcm<Self::Call>; + #[pallet::call] + impl<T: Config> Pallet<T> {} - /// Information on the avaialble XCMP channels. - type ChannelInfo: GetChannelInfo; + #[pallet::event] + #[pallet::generate_deposit(pub(super) fn deposit_event)] + #[pallet::metadata(Option<T::Hash> = "Option<Hash>")] + pub enum Event<T: Config> { + /// Some XCM was executed ok. + Success(Option<T::Hash>), + /// Some XCM failed. + Fail(Option<T::Hash>, XcmError), + /// Bad XCM version used. + BadVersion(Option<T::Hash>), + /// Bad XCM format used. + BadFormat(Option<T::Hash>), + /// An upward message was sent to the relay chain. + UpwardMessageSent(Option<T::Hash>), + /// An HRMP message was sent to a sibling parachain. + XcmpMessageSent(Option<T::Hash>), + } + + #[pallet::error] + pub enum Error<T> { + /// Failed to send XCM message. + FailedToSend, + /// Bad XCM origin. + BadXcmOrigin, + /// Bad XCM data. + BadXcm, + } + + /// Status of the inbound XCMP channels. + #[pallet::storage] + pub(super) type InboundXcmpStatus<T: Config> = StorageValue< + _, + Vec<( + ParaId, + InboundStatus, + Vec<(RelayBlockNumber, XcmpMessageFormat)>, + )>, + ValueQuery, + >; + + /// Inbound aggregate XCMP messages. It can only be one per ParaId/block. + #[pallet::storage] + pub(super) type InboundXcmpMessages<T: Config> = StorageDoubleMap< + _, + Blake2_128Concat, + ParaId, + Twox64Concat, + RelayBlockNumber, + Vec<u8>, + ValueQuery, + >; + + /// The non-empty XCMP channels in order of becoming non-empty, and the index of the first + /// and last outbound message. If the two indices are equal, then it indicates an empty + /// queue and there must be a non-`Ok` `OutboundStatus`. We assume queues grow no greater + /// than 65535 items. Queue indices for normal messages begin at one; zero is reserved in + /// case of the need to send a high-priority signal message this block. + /// The bool is true if there is a signal message waiting to be sent. + #[pallet::storage] + pub(super) type OutboundXcmpStatus<T: Config> = + StorageValue<_, Vec<(ParaId, OutboundStatus, bool, u16, u16)>, ValueQuery>; + + // The new way of doing it: + /// The messages outbound in a given XCMP channel. + #[pallet::storage] + pub(super) type OutboundXcmpMessages<T: Config> = + StorageDoubleMap<_, Blake2_128Concat, ParaId, Twox64Concat, u16, Vec<u8>, ValueQuery>; + + /// Any signal messages waiting to be sent. + #[pallet::storage] + pub(super) type SignalMessages<T: Config> = + StorageMap<_, Blake2_128Concat, ParaId, Vec<u8>, ValueQuery>; + + /// The configuration which controls the dynamics of the outbound queue. + #[pallet::storage] + pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>; } #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Encode, Decode, RuntimeDebug)] @@ -80,90 +196,6 @@ pub struct QueueConfigData { weight_restrict_decay: Weight, } -impl Default for QueueConfigData { - fn default() -> Self { - Self { - suspend_threshold: 2, - drop_threshold: 5, - resume_threshold: 1, - threshold_weight: 100_000, - weight_restrict_decay: 2, - } - } -} - -decl_storage! { - trait Store for Module<T: Config> as XcmHandler { - /// Status of the inbound XCMP channels. - InboundXcmpStatus: Vec<(ParaId, InboundStatus, Vec<(RelayBlockNumber, XcmpMessageFormat)>)>; - - /// Inbound aggregate XCMP messages. It can only be one per ParaId/block. - InboundXcmpMessages: double_map hasher(blake2_128_concat) ParaId, - hasher(twox_64_concat) RelayBlockNumber - => Vec<u8>; - - /// The non-empty XCMP channels in order of becoming non-empty, and the index of the first - /// and last outbound message. If the two indices are equal, then it indicates an empty - /// queue and there must be a non-`Ok` `OutboundStatus`. We assume queues grow no greater - /// than 65535 items. Queue indices for normal messages begin at one; zero is reserved in - /// case of the need to send a high-priority signal message this block. - /// The bool is true if there is a signal message waiting to be sent. - OutboundXcmpStatus: Vec<(ParaId, OutboundStatus, bool, u16, u16)>; - - // The new way of doing it: - /// The messages outbound in a given XCMP channel. - OutboundXcmpMessages: double_map hasher(blake2_128_concat) ParaId, - hasher(twox_64_concat) u16 => Vec<u8>; - - /// Any signal messages waiting to be sent. - SignalMessages: map hasher(blake2_128_concat) ParaId => Vec<u8>; - - /// The configuration which controls the dynamics of the outbound queue. - QueueConfig: QueueConfigData; - } -} - -decl_event! { - pub enum Event<T> where Hash = <T as frame_system::Config>::Hash { - /// Some XCM was executed ok. - Success(Option<Hash>), - /// Some XCM failed. - Fail(Option<Hash>, XcmError), - /// Bad XCM version used. - BadVersion(Option<Hash>), - /// Bad XCM format used. - BadFormat(Option<Hash>), - /// An upward message was sent to the relay chain. - UpwardMessageSent(Option<Hash>), - /// An HRMP message was sent to a sibling parachain. - XcmpMessageSent(Option<Hash>), - } -} - -decl_error! { - pub enum Error for Module<T: Config> { - /// Failed to send XCM message. - FailedToSend, - /// Bad XCM origin. - BadXcmOrigin, - /// Bad XCM data. - BadXcm, - } -} - -decl_module! { - pub struct Module<T: Config> for enum Call where origin: T::Origin { - type Error = Error<T>; - - fn deposit_event() = default; - - fn on_idle(_now: T::BlockNumber, max_weight: Weight) -> Weight { - // on_idle processes additional messages with any remaining block weight. - Self::service_xcmp_queue(max_weight) - } - } -} - #[derive(PartialEq, Eq, Copy, Clone, Encode, Decode)] pub enum ChannelSignal { Suspend, @@ -182,7 +214,7 @@ pub enum XcmpMessageFormat { Signals, } -impl<T: Config> Module<T> { +impl<T: Config> Pallet<T> { /// Place a message `fragment` on the outgoing XCMP queue for `recipient`. /// /// Format is the type of aggregate message that the `fragment` may be safely encoded and @@ -213,25 +245,32 @@ impl<T: Config> Module<T> { // Optimization note: `max_message_size` could potentially be stored in // `OutboundXcmpMessages` once known; that way it's only accessed when a new page is needed. - let max_message_size = T::ChannelInfo::get_channel_max(recipient) - .ok_or(MessageSendError::NoChannel)?; + let max_message_size = + T::ChannelInfo::get_channel_max(recipient).ok_or(MessageSendError::NoChannel)?; if data.len() > max_message_size { return Err(MessageSendError::TooBig); } - let mut s = OutboundXcmpStatus::get(); - let index = s.iter().position(|item| item.0 == recipient) + let mut s = <OutboundXcmpStatus<T>>::get(); + let index = s + .iter() + .position(|item| item.0 == recipient) .unwrap_or_else(|| { s.push((recipient, OutboundStatus::Ok, false, 0, 0)); s.len() - 1 }); let have_active = s[index].4 > s[index].3; - let appended = have_active && OutboundXcmpMessages::mutate(recipient, s[index].4 - 1, |s| { - if XcmpMessageFormat::decode(&mut &s[..]) != Ok(format) { return false } - if s.len() + data.len() > max_message_size { return false } - s.extend_from_slice(&data[..]); - return true - }); + let appended = have_active + && <OutboundXcmpMessages<T>>::mutate(recipient, s[index].4 - 1, |s| { + if XcmpMessageFormat::decode(&mut &s[..]) != Ok(format) { + return false; + } + if s.len() + data.len() > max_message_size { + return false; + } + s.extend_from_slice(&data[..]); + return true; + }); if appended { Ok((s[index].4 - s[index].3 - 1) as u32) } else { @@ -240,9 +279,9 @@ impl<T: Config> Module<T> { s[index].4 += 1; let mut new_page = format.encode(); new_page.extend_from_slice(&data[..]); - OutboundXcmpMessages::insert(recipient, page_index, new_page); + <OutboundXcmpMessages<T>>::insert(recipient, page_index, new_page); let r = (s[index].4 - s[index].3 - 1) as u32; - OutboundXcmpStatus::put(s); + <OutboundXcmpStatus<T>>::put(s); Ok(r) } } @@ -250,26 +289,25 @@ impl<T: Config> Module<T> { /// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this /// block. fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), ()> { - let mut s = OutboundXcmpStatus::get(); + let mut s = <OutboundXcmpStatus<T>>::get(); if let Some(index) = s.iter().position(|item| item.0 == dest) { s[index].2 = true; } else { s.push((dest, OutboundStatus::Ok, true, 0, 0)); } - SignalMessages::mutate(dest, |page| if page.is_empty() { - *page = (XcmpMessageFormat::Signals, signal).encode(); - } else { - signal.using_encoded(|s| page.extend_from_slice(s)); + <SignalMessages<T>>::mutate(dest, |page| { + if page.is_empty() { + *page = (XcmpMessageFormat::Signals, signal).encode(); + } else { + signal.using_encoded(|s| page.extend_from_slice(s)); + } }); - OutboundXcmpStatus::put(s); + <OutboundXcmpStatus<T>>::put(s); Ok(()) } - pub fn send_blob_message( - recipient: ParaId, - blob: Vec<u8>, - ) -> Result<u32, MessageSendError> { + pub fn send_blob_message(recipient: ParaId, blob: Vec<u8>) -> Result<u32, MessageSendError> { Self::send_fragment(recipient, XcmpMessageFormat::ConcatenatedEncodedBlob, blob) } @@ -284,8 +322,10 @@ impl<T: Config> Module<T> { // Create a shuffled order for use to iterate through. // Not a great random seed, but good enough for our purposes. let seed = frame_system::Pallet::<T>::parent_hash(); - let seed = <[u8; 32]>::decode(&mut sp_runtime::traits::TrailingZeroInput::new(seed.as_ref())) - .expect("input is padded with zeroes; qed"); + let seed = <[u8; 32]>::decode(&mut sp_runtime::traits::TrailingZeroInput::new( + seed.as_ref(), + )) + .expect("input is padded with zeroes; qed"); let mut rng = ChaChaRng::from_seed(seed); let mut shuffled = (0..len).collect::<Vec<_>>(); for i in 0..len { @@ -297,7 +337,12 @@ impl<T: Config> Module<T> { shuffled } - fn handle_blob_message(_sender: ParaId, _sent_at: RelayBlockNumber, _blob: Vec<u8>, _weight_limit: Weight) -> Result<Weight, bool> { + fn handle_blob_message( + _sender: ParaId, + _sent_at: RelayBlockNumber, + _blob: Vec<u8>, + _weight_limit: Weight, + ) -> Result<Weight, bool> { debug_assert!(false, "Blob messages not handled."); Err(false) } @@ -312,23 +357,19 @@ impl<T: Config> Module<T> { log::debug!("Processing XCMP-XCM: {:?}", &hash); let (result, event) = match Xcm::<T::Call>::try_from(xcm) { Ok(xcm) => { - let location = ( - Junction::Parent, - Junction::Parachain(sender.into()), - ); - match T::XcmExecutor::execute_xcm( - location.into(), - xcm, - max_weight, - ) { - Outcome::Error(e) => (Err(e.clone()), RawEvent::Fail(Some(hash), e)), - Outcome::Complete(w) => (Ok(w), RawEvent::Success(Some(hash))), + let location = (Junction::Parent, Junction::Parachain(sender.into())); + match T::XcmExecutor::execute_xcm(location.into(), xcm, max_weight) { + Outcome::Error(e) => (Err(e.clone()), Event::Fail(Some(hash), e)), + Outcome::Complete(w) => (Ok(w), Event::Success(Some(hash))), // As far as the caller is concerned, this was dispatched without error, so // we just report the weight used. - Outcome::Incomplete(w, e) => (Ok(w), RawEvent::Fail(Some(hash), e)), + Outcome::Incomplete(w, e) => (Ok(w), Event::Fail(Some(hash), e)), } } - Err(()) => (Err(XcmError::UnhandledXcmVersion), RawEvent::BadVersion(Some(hash))), + Err(()) => ( + Err(XcmError::UnhandledXcmVersion), + Event::BadVersion(Some(hash)), + ), }; Self::deposit_event(event); result @@ -339,7 +380,7 @@ impl<T: Config> Module<T> { (sent_at, format): (RelayBlockNumber, XcmpMessageFormat), max_weight: Weight, ) -> (Weight, bool) { - let data = InboundXcmpMessages::get(sender, sent_at); + let data = <InboundXcmpMessages<T>>::get(sender, sent_at); let mut last_remaining_fragments; let mut remaining_fragments = &data[..]; let mut weight_used = 0; @@ -397,9 +438,9 @@ impl<T: Config> Module<T> { } let is_empty = remaining_fragments.is_empty(); if is_empty { - InboundXcmpMessages::remove(sender, sent_at); + <InboundXcmpMessages<T>>::remove(sender, sent_at); } else { - InboundXcmpMessages::insert(sender, sent_at, remaining_fragments); + <InboundXcmpMessages<T>>::insert(sender, sent_at, remaining_fragments); } (weight_used, is_empty) } @@ -432,9 +473,9 @@ impl<T: Config> Module<T> { /// for the second &c. though empirical and or practical factors may give rise to adjusting it /// further. fn service_xcmp_queue(max_weight: Weight) -> Weight { - let mut status = InboundXcmpStatus::get(); // <- sorted. + let mut status = <InboundXcmpStatus<T>>::get(); // <- sorted. if status.len() == 0 { - return 0 + return 0; } let QueueConfigData { @@ -442,7 +483,7 @@ impl<T: Config> Module<T> { threshold_weight, weight_restrict_decay, .. - } = QueueConfig::get(); + } = <QueueConfig<T>>::get(); let mut shuffled = Self::create_shuffle(status.len()); let mut weight_used = 0; @@ -457,7 +498,9 @@ impl<T: Config> Module<T> { // send more, heavier messages. let mut shuffle_index = 0; - while shuffle_index < shuffled.len() && max_weight.saturating_sub(weight_used) >= threshold_weight { + while shuffle_index < shuffled.len() + && max_weight.saturating_sub(weight_used) >= threshold_weight + { let index = shuffled[shuffle_index]; let sender = status[index].0; @@ -466,7 +509,8 @@ impl<T: Config> Module<T> { // first round. For the second round we unlock all weight. If we come close enough // on the first round to unlocking everything, then we do so. if shuffle_index < status.len() { - weight_available += (max_weight - weight_available) / (weight_restrict_decay + 1); + weight_available += + (max_weight - weight_available) / (weight_restrict_decay + 1); if weight_available + threshold_weight > max_weight { weight_available = max_weight; } @@ -476,16 +520,16 @@ impl<T: Config> Module<T> { } let weight_processed = if status[index].2.is_empty() { - debug_assert!(false, "channel exists in status; there must be messages; qed"); + debug_assert!( + false, + "channel exists in status; there must be messages; qed" + ); 0 } else { // Process up to one block's worth for now. let weight_remaining = weight_available.saturating_sub(weight_used); - let (weight_processed, is_empty) = Self::process_xcmp_message( - sender, - status[index].2[0], - weight_remaining, - ); + let (weight_processed, is_empty) = + Self::process_xcmp_message(sender, status[index].2[0], weight_remaining); if is_empty { status[index].2.remove(0); } @@ -493,20 +537,26 @@ impl<T: Config> Module<T> { }; weight_used += weight_processed; - if status[index].2.len() as u32 <= resume_threshold && status[index].1 == InboundStatus::Suspended { + if status[index].2.len() as u32 <= resume_threshold + && status[index].1 == InboundStatus::Suspended + { // Resume let r = Self::send_signal(sender, ChannelSignal::Resume); - debug_assert!(r.is_ok(), "WARNING: Failed sending resume into suspended channel"); + debug_assert!( + r.is_ok(), + "WARNING: Failed sending resume into suspended channel" + ); status[index].1 = InboundStatus::Ok; } // If there are more and we're making progress, we process them after we've given the // other channels a look in. If we've still not unlocked all weight, then we set them // up for processing a second time anyway. - if !status[index].2.is_empty() && weight_processed > 0 || weight_available != max_weight { + if !status[index].2.is_empty() && weight_processed > 0 || weight_available != max_weight + { if shuffle_index + 1 == shuffled.len() { // Only this queue left. Just run around this loop once more. - continue + continue; } shuffled.push(index); } @@ -516,12 +566,12 @@ impl<T: Config> Module<T> { // Only retain the senders that have non-empty queues. status.retain(|item| !item.2.is_empty()); - InboundXcmpStatus::put(status); + <InboundXcmpStatus<T>>::put(status); weight_used } fn suspend_channel(target: ParaId) { - OutboundXcmpStatus::mutate(|s| { + <OutboundXcmpStatus<T>>::mutate(|s| { if let Some(index) = s.iter().position(|item| item.0 == target) { let ok = s[index].1 == OutboundStatus::Ok; debug_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok."); @@ -533,41 +583,53 @@ impl<T: Config> Module<T> { } fn resume_channel(target: ParaId) { - OutboundXcmpStatus::mutate(|s| { + <OutboundXcmpStatus<T>>::mutate(|s| { if let Some(index) = s.iter().position(|item| item.0 == target) { let suspended = s[index].1 == OutboundStatus::Suspended; - debug_assert!(suspended, "WARNING: Attempt to resume channel that was not suspended."); + debug_assert!( + suspended, + "WARNING: Attempt to resume channel that was not suspended." + ); if s[index].3 == s[index].4 { s.remove(index); } else { s[index].1 = OutboundStatus::Ok; } } else { - debug_assert!(false, "WARNING: Attempt to resume channel that was not suspended."); + debug_assert!( + false, + "WARNING: Attempt to resume channel that was not suspended." + ); } }); } } -impl<T: Config> XcmpMessageHandler for Module<T> { - fn handle_xcmp_messages<'a, I: Iterator<Item=(ParaId, RelayBlockNumber, &'a [u8])>>( +impl<T: Config> XcmpMessageHandler for Pallet<T> { + fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>( iter: I, max_weight: Weight, ) -> Weight { - let mut status = InboundXcmpStatus::get(); + let mut status = <InboundXcmpStatus<T>>::get(); - let QueueConfigData { suspend_threshold, drop_threshold, .. } = QueueConfig::get(); + let QueueConfigData { + suspend_threshold, + drop_threshold, + .. + } = <QueueConfig<T>>::get(); for (sender, sent_at, data) in iter { - // Figure out the message format. let mut data_ref = data; let format = match XcmpMessageFormat::decode(&mut data_ref) { Ok(f) => f, Err(_) => { - debug_assert!(false, "Unknown XCMP message format. Silently dropping message"); - continue - }, + debug_assert!( + false, + "Unknown XCMP message format. Silently dropping message" + ); + continue; + } }; if format == XcmpMessageFormat::Signals { while !data_ref.is_empty() { @@ -587,34 +649,39 @@ impl<T: Config> XcmpMessageHandler for Module<T> { status[i].1 = InboundStatus::Suspended; let r = Self::send_signal(sender, ChannelSignal::Suspend); if r.is_err() { - log::warn!("Attempt to suspend channel failed. Messages may be dropped."); + log::warn!( + "Attempt to suspend channel failed. Messages may be dropped." + ); } } if (count as u32) < drop_threshold { status[i].2.push((sent_at, format)); } else { - debug_assert!(false, "XCMP channel queue full. Silently dropping message"); + debug_assert!( + false, + "XCMP channel queue full. Silently dropping message" + ); } - }, + } Err(_) => status.push((sender, InboundStatus::Ok, vec![(sent_at, format)])), } // Queue the payload for later execution. - InboundXcmpMessages::insert(sender, sent_at, data_ref); + <InboundXcmpMessages<T>>::insert(sender, sent_at, data_ref); } // Optimization note; it would make sense to execute messages immediately if // `status.is_empty()` here. } status.sort(); - InboundXcmpStatus::put(status); + <InboundXcmpStatus<T>>::put(status); Self::service_xcmp_queue(max_weight) } } -impl<T: Config> XcmpMessageSource for Module<T> { +impl<T: Config> XcmpMessageSource for Pallet<T> { fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> { - let mut statuses = OutboundXcmpStatus::get(); + let mut statuses = <OutboundXcmpStatus<T>>::get(); let old_statuses_len = statuses.len(); let max_message_count = statuses.len().min(maximum_channels); let mut result = Vec::with_capacity(max_message_count); @@ -628,42 +695,42 @@ impl<T: Config> XcmpMessageSource for Module<T> { break; } if outbound_status == OutboundStatus::Suspended { - continue + continue; } let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) { ChannelStatus::Closed => { // This means that there is no such channel anymore. Nothing to be done but // swallow the messages and discard the status. for i in begin..end { - OutboundXcmpMessages::remove(para_id, i); + <OutboundXcmpMessages<T>>::remove(para_id, i); } if signalling { - SignalMessages::remove(para_id); + <SignalMessages<T>>::remove(para_id); } *status = (para_id, OutboundStatus::Ok, false, 0, 0); - continue + continue; } ChannelStatus::Full => continue, ChannelStatus::Ready(n, e) => (n, e), }; let page = if signalling { - let page = SignalMessages::get(para_id); + let page = <SignalMessages<T>>::get(para_id); if page.len() < max_size_now { - SignalMessages::remove(para_id); + <SignalMessages<T>>::remove(para_id); signalling = false; page } else { - continue + continue; } } else if end > begin { - let page = OutboundXcmpMessages::get(para_id, begin); + let page = <OutboundXcmpMessages<T>>::get(para_id, begin); if page.len() < max_size_now { - OutboundXcmpMessages::remove(para_id, begin); + <OutboundXcmpMessages<T>>::remove(para_id, begin); begin += 1; page } else { - continue + continue; } } else { continue; @@ -705,23 +772,27 @@ impl<T: Config> XcmpMessageSource for Module<T> { // be no less than the pruned channels. statuses.rotate_left(result.len() - pruned); - OutboundXcmpStatus::put(statuses); + <OutboundXcmpStatus<T>>::put(statuses); result } } /// Xcm sender for sending to a sibling parachain. -impl<T: Config> SendXcm for Module<T> { +impl<T: Config> SendXcm for Pallet<T> { fn send_xcm(dest: MultiLocation, msg: Xcm<()>) -> Result<(), XcmError> { match &dest { // An HRMP message for a sibling parachain. MultiLocation::X2(Junction::Parent, Junction::Parachain(id)) => { let msg = VersionedXcm::<()>::from(msg); let hash = T::Hashing::hash_of(&msg); - Self::send_fragment((*id).into(), XcmpMessageFormat::ConcatenatedVersionedXcm, msg) - .map_err(|e| XcmError::SendFailed(<&'static str>::from(e)))?; - Self::deposit_event(RawEvent::XcmpMessageSent(Some(hash))); + Self::send_fragment( + (*id).into(), + XcmpMessageFormat::ConcatenatedVersionedXcm, + msg, + ) + .map_err(|e| XcmError::SendFailed(<&'static str>::from(e)))?; + Self::deposit_event(Event::XcmpMessageSent(Some(hash))); Ok(()) } // Anything else is unhandled. This includes a message this is meant for us. diff --git a/cumulus/rococo-parachains/pallets/parachain-info/src/lib.rs b/cumulus/rococo-parachains/pallets/parachain-info/src/lib.rs index 4a189dee671..0d83b0c66d3 100644 --- a/cumulus/rococo-parachains/pallets/parachain-info/src/lib.rs +++ b/cumulus/rococo-parachains/pallets/parachain-info/src/lib.rs @@ -18,25 +18,58 @@ #![cfg_attr(not(feature = "std"), no_std)] -use frame_support::{decl_module, decl_storage, traits::Get}; +pub use pallet::*; -use cumulus_primitives_core::ParaId; +#[frame_support::pallet] +pub mod pallet { + use frame_support::pallet_prelude::*; + use frame_system::pallet_prelude::*; + use cumulus_primitives_core::ParaId; -/// Configuration trait of this pallet. -pub trait Config: frame_system::Config {} + #[pallet::pallet] + #[pallet::generate_store(pub(super) trait Store)] + pub struct Pallet<T>(_); -impl<T: Config> Get<ParaId> for Module<T> { - fn get() -> ParaId { - Self::parachain_id() + #[pallet::config] + pub trait Config: frame_system::Config {} + + #[pallet::hooks] + impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {} + + #[pallet::call] + impl<T: Config> Pallet<T> {} + + #[pallet::genesis_config] + pub struct GenesisConfig { + pub parachain_id: ParaId, } -} -decl_storage! { - trait Store for Module<T: Config> as ParachainInfo { - ParachainId get(fn parachain_id) config(): ParaId = 100.into(); + #[cfg(feature = "std")] + impl Default for GenesisConfig { + fn default() -> Self { + Self { + parachain_id: 100.into() + } + } } -} -decl_module! { - pub struct Module<T: Config> for enum Call where origin: T::Origin {} + #[pallet::genesis_build] + impl<T: Config> GenesisBuild<T> for GenesisConfig { + fn build(&self) { + <ParachainId<T>>::put(&self.parachain_id); + } + } + + #[pallet::type_value] + pub(super) fn DefaultForParachainId() -> ParaId { 100.into() } + + #[pallet::storage] + #[pallet::getter(fn parachain_id)] + pub(super) type ParachainId<T: Config> = StorageValue<_, ParaId, ValueQuery, DefaultForParachainId>; + + impl<T: Config> Get<ParaId> for Pallet<T> { + fn get() -> ParaId { + Self::parachain_id() + } + } } diff --git a/cumulus/rococo-parachains/runtime/src/lib.rs b/cumulus/rococo-parachains/runtime/src/lib.rs index ea36553f91f..9487c7313b0 100644 --- a/cumulus/rococo-parachains/runtime/src/lib.rs +++ b/cumulus/rococo-parachains/runtime/src/lib.rs @@ -238,7 +238,7 @@ parameter_types! { impl cumulus_pallet_parachain_system::Config for Runtime { type Event = Event; type OnValidationData = (); - type SelfParaId = parachain_info::Module<Runtime>; + type SelfParaId = parachain_info::Pallet<Runtime>; type OutboundXcmpMessageSource = XcmpQueue; type DmpMessageHandler = DmpQueue; type ReservedDmpWeight = ReservedDmpWeight; diff --git a/cumulus/rococo-parachains/shell-runtime/src/lib.rs b/cumulus/rococo-parachains/shell-runtime/src/lib.rs index 4d7a4a8deab..58d7a42200e 100644 --- a/cumulus/rococo-parachains/shell-runtime/src/lib.rs +++ b/cumulus/rococo-parachains/shell-runtime/src/lib.rs @@ -161,7 +161,7 @@ parameter_types! { impl cumulus_pallet_parachain_system::Config for Runtime { type Event = Event; type OnValidationData = (); - type SelfParaId = parachain_info::Module<Runtime>; + type SelfParaId = parachain_info::Pallet<Runtime>; type OutboundXcmpMessageSource = (); type DmpMessageHandler = cumulus_pallet_xcm::UnlimitedDmpExecution<Runtime>; type ReservedDmpWeight = ReservedDmpWeight; -- GitLab