hrmp.rs 53.3 KB
Newer Older
Sergey Pepyakin's avatar
Sergey Pepyakin committed
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

use crate::{
Sergey Pepyakin's avatar
Sergey Pepyakin committed
	configuration::{self, HostConfiguration},
Sergey Pepyakin's avatar
Sergey Pepyakin committed
};
use parity_scale_codec::{Decode, Encode};
use frame_support::{
	decl_storage, decl_module, decl_error, ensure, traits::Get, weights::Weight, StorageMap,
	StorageValue, dispatch::DispatchResult,
};
Sergey Pepyakin's avatar
Sergey Pepyakin committed
use primitives::v1::{
	Balance, Hash, HrmpChannelId, Id as ParaId, InboundHrmpMessage, OutboundHrmpMessage,
	SessionIndex,
};
use sp_runtime::traits::{BlakeTwo256, Hash as HashT};
use sp_std::{
	mem, fmt,
	collections::{btree_map::BTreeMap, btree_set::BTreeSet},
	prelude::*,
};
Sergey Pepyakin's avatar
Sergey Pepyakin committed

/// A description of a request to open an HRMP channel.
#[derive(Encode, Decode)]
pub struct HrmpOpenChannelRequest {
	/// Indicates if this request was confirmed by the recipient.
	pub confirmed: bool,
	/// How many session boundaries ago this request was seen.
	pub age: SessionIndex,
	/// The amount that the sender supplied at the time of creation of this request.
	pub sender_deposit: Balance,
	/// The maximum message size that could be put into the channel.
	pub max_message_size: u32,
	/// The maximum number of messages that can be pending in the channel at once.
	pub max_capacity: u32,
	/// The maximum total size of the messages that can be pending in the channel at once.
	pub max_total_size: u32,
}

/// A metadata of an HRMP channel.
#[derive(Encode, Decode)]
#[cfg_attr(test, derive(Debug))]
pub struct HrmpChannel {
	// NOTE: This structure is used by parachains via merkle proofs. Therefore, this struct requires
	// special treatment.
	//
	// A parachain requested this struct can only depend on the subset of this struct. Specifically,
	// only a first few fields can be depended upon (See `AbridgedHrmpChannel`). These fields cannot
	// be changed without corresponding migration of parachains.

Sergey Pepyakin's avatar
Sergey Pepyakin committed
	/// The maximum number of messages that can be pending in the channel at once.
	pub max_capacity: u32,
	/// The maximum total size of the messages that can be pending in the channel at once.
	pub max_total_size: u32,
	/// The maximum message size that could be put into the channel.
	pub max_message_size: u32,
	/// The current number of messages pending in the channel.
	/// Invariant: should be less or equal to `max_capacity`.s`.
	pub msg_count: u32,
	/// The total size in bytes of all message payloads in the channel.
	/// Invariant: should be less or equal to `max_total_size`.
	pub total_size: u32,
	/// A head of the Message Queue Chain for this channel. Each link in this chain has a form:
	/// `(prev_head, B, H(M))`, where
	/// - `prev_head`: is the previous value of `mqc_head` or zero if none.
	/// - `B`: is the [relay-chain] block number in which a message was appended
	/// - `H(M)`: is the hash of the message being appended.
	/// This value is initialized to a special value that consists of all zeroes which indicates
	/// that no messages were previously added.
	pub mqc_head: Option<Hash>,
	/// The amount that the sender supplied as a deposit when opening this channel.
	pub sender_deposit: Balance,
	/// The amount that the recipient supplied as a deposit when accepting opening this channel.
	pub recipient_deposit: Balance,
/// An error returned by [`check_hrmp_watermark`] that indicates an acceptance criteria check
/// didn't pass.
pub enum HrmpWatermarkAcceptanceErr<BlockNumber> {
	AdvancementRule {
		new_watermark: BlockNumber,
		last_watermark: BlockNumber,
	},
	AheadRelayParent {
		new_watermark: BlockNumber,
		relay_chain_parent_number: BlockNumber,
	},
	LandsOnBlockWithNoMessages {
		new_watermark: BlockNumber,
	},
}

/// An error returned by [`check_outbound_hrmp`] that indicates an acceptance criteria check
/// didn't pass.
pub enum OutboundHrmpAcceptanceErr {
	MoreMessagesThanPermitted {
		sent: u32,
		permitted: u32,
	},
	NotSorted {
		idx: u32,
	},
	NoSuchChannel {
		idx: u32,
		channel_id: HrmpChannelId,
	},
	MaxMessageSizeExceeded {
		idx: u32,
		msg_size: u32,
		max_size: u32,
	},
	TotalSizeExceeded {
		idx: u32,
		total_size: u32,
		limit: u32,
	},
	CapacityExceeded {
		idx: u32,
		count: u32,
		limit: u32,
	},
}

impl<BlockNumber> fmt::Debug for HrmpWatermarkAcceptanceErr<BlockNumber>
where
	BlockNumber: fmt::Debug,
{
	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
		use HrmpWatermarkAcceptanceErr::*;
		match self {
			AdvancementRule {
				new_watermark,
				last_watermark,
			} => write!(
				fmt,
				"the HRMP watermark is not advanced relative to the last watermark ({:?} > {:?})",
				new_watermark, last_watermark,
			),
			AheadRelayParent {
				new_watermark,
				relay_chain_parent_number,
			} => write!(
				fmt,
				"the HRMP watermark is ahead the relay-parent ({:?} > {:?})",
				new_watermark, relay_chain_parent_number
			),
			LandsOnBlockWithNoMessages { new_watermark } => write!(
				fmt,
				"the HRMP watermark ({:?}) doesn't land on a block with messages received",
			),
		}
	}
}

impl fmt::Debug for OutboundHrmpAcceptanceErr {
	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
		use OutboundHrmpAcceptanceErr::*;
		match self {
			MoreMessagesThanPermitted { sent, permitted } => write!(
				fmt,
				"more HRMP messages than permitted by config ({} > {})",
			),
			NotSorted { idx } => write!(
				fmt,
				"the HRMP messages are not sorted (first unsorted is at index {})",
				idx,
			),
			NoSuchChannel { idx, channel_id } => write!(
				fmt,
				"the HRMP message at index {} is sent to a non existent channel {:?}->{:?}",
				idx, channel_id.sender, channel_id.recipient,
			),
			MaxMessageSizeExceeded {
				idx,
				msg_size,
				max_size,
			} => write!(
				fmt,
				"the HRMP message at index {} exceeds the negotiated channel maximum message size ({} > {})",
			),
			TotalSizeExceeded {
				idx,
				total_size,
				limit,
			} => write!(
				fmt,
				"sending the HRMP message at index {} would exceed the neogitiated channel total size  ({} > {})",
			),
			CapacityExceeded { idx, count, limit } => write!(
				fmt,
				"sending the HRMP message at index {} would exceed the neogitiated channel capacity  ({} > {})",
pub trait Config: frame_system::Config + configuration::Config + paras::Config + dmp::Config {
	type Origin: From<crate::Origin>
		+ From<<Self as frame_system::Config>::Origin>
		+ Into<Result<crate::Origin, <Self as Config>::Origin>>;
	trait Store for Module<T: Config> as Hrmp {
		/// Paras that are to be cleaned up at the end of the session.
		/// The entries are sorted ascending by the para id.
		OutgoingParas: Vec<ParaId>;


		/// The set of pending HRMP open channel requests.
		///
		/// The set is accompanied by a list for iteration.
		///
		/// Invariant:
		/// - There are no channels that exists in list but not in the set and vice versa.
		HrmpOpenChannelRequests: map hasher(twox_64_concat) HrmpChannelId => Option<HrmpOpenChannelRequest>;
		HrmpOpenChannelRequestsList: Vec<HrmpChannelId>;

		/// This mapping tracks how many open channel requests are inititated by a given sender para.
		/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items that has `(X, _)`
		/// as the number of `HrmpOpenChannelRequestCount` for `X`.
		HrmpOpenChannelRequestCount: map hasher(twox_64_concat) ParaId => u32;
		/// This mapping tracks how many open channel requests were accepted by a given recipient para.
		/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items `(_, X)` with
		/// `confirmed` set to true, as the number of `HrmpAcceptedChannelRequestCount` for `X`.
		HrmpAcceptedChannelRequestCount: map hasher(twox_64_concat) ParaId => u32;

		/// A set of pending HRMP close channel requests that are going to be closed during the session change.
		/// Used for checking if a given channel is registered for closure.
		///
		/// The set is accompanied by a list for iteration.
		///
		/// Invariant:
		/// - There are no channels that exists in list but not in the set and vice versa.
		HrmpCloseChannelRequests: map hasher(twox_64_concat) HrmpChannelId => Option<()>;
		HrmpCloseChannelRequestsList: Vec<HrmpChannelId>;

		/// The HRMP watermark associated with each para.
		/// Invariant:
		/// - each para `P` used here as a key should satisfy `Paras::is_valid_para(P)` within a session.
		HrmpWatermarks: map hasher(twox_64_concat) ParaId => Option<T::BlockNumber>;
		/// HRMP channel data associated with each para.
		/// Invariant:
		/// - each participant in the channel should satisfy `Paras::is_valid_para(P)` within a session.
		HrmpChannels: map hasher(twox_64_concat) HrmpChannelId => Option<HrmpChannel>;
		/// Ingress/egress indexes allow to find all the senders and receivers given the opposite
		/// side. I.e.
		///
		/// (a) ingress index allows to find all the senders for a given recipient.
		/// (b) egress index allows to find all the recipients for a given sender.
		///
		/// Invariants:
		/// - for each ingress index entry for `P` each item `I` in the index should present in `HrmpChannels`
		///   as `(I, P)`.
		/// - for each egress index entry for `P` each item `E` in the index should present in `HrmpChannels`
		///   as `(P, E)`.
		/// - there should be no other dangling channels in `HrmpChannels`.
		/// - the vectors are sorted.
		HrmpIngressChannelsIndex: map hasher(twox_64_concat) ParaId => Vec<ParaId>;
		// NOTE that this field is used by parachains via merkle storage proofs, therefore changing
		// the format will require migration of parachains.
		HrmpEgressChannelsIndex: map hasher(twox_64_concat) ParaId => Vec<ParaId>;
		/// Storage for the messages for each channel.
		/// Invariant: cannot be non-empty if the corresponding channel in `HrmpChannels` is `None`.
		HrmpChannelContents: map hasher(twox_64_concat) HrmpChannelId => Vec<InboundHrmpMessage<T::BlockNumber>>;
		/// Maintains a mapping that can be used to answer the question:
		/// What paras sent a message at the given block number for a given reciever.
		/// Invariants:
		/// - The inner `Vec<ParaId>` is never empty.
		/// - The inner `Vec<ParaId>` cannot store two same `ParaId`.
		/// - The outer vector is sorted ascending by block number and cannot store two items with the same
		///   block number.
		HrmpChannelDigests: map hasher(twox_64_concat) ParaId => Vec<(T::BlockNumber, Vec<ParaId>)>;
	}
}

decl_error! {
	pub enum Error for Module<T: Config> {
		/// The sender tried to open a channel to themselves.
		OpenHrmpChannelToSelf,
		/// The recipient is not a valid para.
		OpenHrmpChannelInvalidRecipient,
		/// The requested capacity is zero.
		OpenHrmpChannelZeroCapacity,
		/// The requested capacity exceeds the global limit.
		OpenHrmpChannelCapacityExceedsLimit,
		/// The requested maximum message size is 0.
		OpenHrmpChannelZeroMessageSize,
		/// The open request requested the message size that exceeds the global limit.
		OpenHrmpChannelMessageSizeExceedsLimit,
		/// The channel already exists
		OpenHrmpChannelAlreadyExists,
		/// There is already a request to open the same channel.
		OpenHrmpChannelAlreadyRequested,
		/// The sender already has the maximum number of allowed outbound channels.
		OpenHrmpChannelLimitExceeded,
		/// The channel from the sender to the origin doesn't exist.
		AcceptHrmpChannelDoesntExist,
		/// The channel is already confirmed.
		AcceptHrmpChannelAlreadyConfirmed,
		/// The recipient already has the maximum number of allowed inbound channels.
		AcceptHrmpChannelLimitExceeded,
		/// The origin tries to close a channel where it is neither the sender nor the recipient.
		CloseHrmpChannelUnauthorized,
		/// The channel to be closed doesn't exist.
		CloseHrmpChannelDoesntExist,
		/// The channel close request is already requested.
		CloseHrmpChannelAlreadyUnderway,
	 }
}

decl_module! {
	/// The HRMP module.
	pub struct Module<T: Config> for enum Call where origin: <T as frame_system::Config>::Origin {
		/// Initiate opening a channel from a parachain to a given recipient with given channel
		/// parameters.
		///
		/// - `proposed_max_capacity` - specifies how many messages can be in the channel at once.
		/// - `proposed_max_message_size` - specifies the maximum size of any of the messages.
		///
		/// These numbers are a subject to the relay-chain configuration limits.
		///
		/// The channel can be opened only after the recipient confirms it and only on a session
		/// change.
		pub fn hrmp_init_open_channel(
			origin,
			recipient: ParaId,
			proposed_max_capacity: u32,
			proposed_max_message_size: u32,
		) -> DispatchResult {
			let origin = ensure_parachain(<T as Config>::Origin::from(origin))?;
			Self::init_open_channel(
				origin,
				recipient,
				proposed_max_capacity,
				proposed_max_message_size
			)?;
			Ok(())
		}

		/// Accept a pending open channel request from the given sender.
		///
		/// The channel will be opened only on the next session boundary.
		pub fn hrmp_accept_open_channel(origin, sender: ParaId) -> DispatchResult {
			let origin = ensure_parachain(<T as Config>::Origin::from(origin))?;
			Self::accept_open_channel(origin, sender)?;
			Ok(())
		}

		/// Initiate unilateral closing of a channel. The origin must be either the sender or the
		/// recipient in the channel being closed.
		///
		/// The closure can only happen on a session change.
		pub fn hrmp_close_channel(origin, channel_id: HrmpChannelId) -> DispatchResult {
			let origin = ensure_parachain(<T as Config>::Origin::from(origin))?;
			Self::close_channel(origin, channel_id)?;
			Ok(())
		}
	}
}

Sergey Pepyakin's avatar
Sergey Pepyakin committed
/// Routines and getters related to HRMP.
impl<T: Config> Module<T> {
	/// Block initialization logic, called by initializer.
	pub(crate) fn initializer_initialize(_now: T::BlockNumber) -> Weight {
		0
	}

	/// Block finalization logic, called by initializer.
	pub(crate) fn initializer_finalize() {}

	/// Called by the initializer to note that a new session has started.
	pub(crate) fn initializer_on_new_session(
		notification: &initializer::SessionChangeNotification<T::BlockNumber>,
	) {
		Self::perform_outgoing_para_cleanup();
		Self::process_hrmp_open_channel_requests(&notification.prev_config);
		Self::process_hrmp_close_channel_requests();
	}

	/// Iterate over all paras that were registered for offboarding and remove all the data
	/// associated with them.
	fn perform_outgoing_para_cleanup() {
		let outgoing = OutgoingParas::take();
		for outgoing_para in outgoing {
			Self::clean_hrmp_after_outgoing(outgoing_para);
		}
	}

	/// Schedule a para to be cleaned up at the start of the next session.
	pub(crate) fn schedule_para_cleanup(id: ParaId) {
		OutgoingParas::mutate(|v| {
			if let Err(i) = v.binary_search(&id) {
				v.insert(i, id);
			}
		});
	}

Sergey Pepyakin's avatar
Sergey Pepyakin committed
	/// Remove all storage entries associated with the given para.
	pub(super) fn clean_hrmp_after_outgoing(outgoing_para: ParaId) {
		<Self as Store>::HrmpOpenChannelRequestCount::remove(&outgoing_para);
		<Self as Store>::HrmpAcceptedChannelRequestCount::remove(&outgoing_para);

		// close all channels where the outgoing para acts as the recipient.
		for sender in <Self as Store>::HrmpIngressChannelsIndex::take(&outgoing_para) {
			Self::close_hrmp_channel(&HrmpChannelId {
				sender,
				recipient: outgoing_para.clone(),
			});
		}
		// close all channels where the outgoing para acts as the sender.
		for recipient in <Self as Store>::HrmpEgressChannelsIndex::take(&outgoing_para) {
			Self::close_hrmp_channel(&HrmpChannelId {
				sender: outgoing_para.clone(),
				recipient,
			});
		}
	}

	/// Iterate over all open channel requests and:
	///
	/// - prune the stale requests
	/// - enact the confirmed requests
	pub(super) fn process_hrmp_open_channel_requests(config: &HostConfiguration<T::BlockNumber>) {
		let mut open_req_channels = <Self as Store>::HrmpOpenChannelRequestsList::get();
		if open_req_channels.is_empty() {
			return;
		}

		// iterate the vector starting from the end making our way to the beginning. This way we
		// can leverage `swap_remove` to efficiently remove an item during iteration.
		let mut idx = open_req_channels.len();
		loop {
			// bail if we've iterated over all items.
			if idx == 0 {
				break;
			}

			idx -= 1;
			let channel_id = open_req_channels[idx].clone();
			let mut request = <Self as Store>::HrmpOpenChannelRequests::get(&channel_id).expect(
				"can't be `None` due to the invariant that the list contains the same items as the set; qed",
			);
Sergey Pepyakin's avatar
Sergey Pepyakin committed

			if request.confirmed {
				if <paras::Module<T>>::is_valid_para(channel_id.sender)
					&& <paras::Module<T>>::is_valid_para(channel_id.recipient)
				{
					<Self as Store>::HrmpChannels::insert(
						&channel_id,
						HrmpChannel {
							sender_deposit: request.sender_deposit,
							recipient_deposit: config.hrmp_recipient_deposit,
							max_capacity: request.max_capacity,
							max_total_size: request.max_total_size,
							max_message_size: request.max_message_size,
							msg_count: 0,
							total_size: 0,
							mqc_head: None,
						},
					);

					<Self as Store>::HrmpIngressChannelsIndex::mutate(&channel_id.recipient, |v| {
						if let Err(i) = v.binary_search(&channel_id.sender) {
							v.insert(i, channel_id.sender);
						}
					});
					<Self as Store>::HrmpEgressChannelsIndex::mutate(&channel_id.sender, |v| {
						if let Err(i) = v.binary_search(&channel_id.recipient) {
							v.insert(i, channel_id.recipient);
						}
					});
				}

				let new_open_channel_req_cnt =
					<Self as Store>::HrmpOpenChannelRequestCount::get(&channel_id.sender)
						.saturating_sub(1);
				if new_open_channel_req_cnt != 0 {
					<Self as Store>::HrmpOpenChannelRequestCount::insert(
						&channel_id.sender,
						new_open_channel_req_cnt,
					);
				} else {
					<Self as Store>::HrmpOpenChannelRequestCount::remove(&channel_id.sender);
				}

				let new_accepted_channel_req_cnt =
					<Self as Store>::HrmpAcceptedChannelRequestCount::get(&channel_id.recipient)
						.saturating_sub(1);
				if new_accepted_channel_req_cnt != 0 {
					<Self as Store>::HrmpAcceptedChannelRequestCount::insert(
						&channel_id.recipient,
						new_accepted_channel_req_cnt,
					);
				} else {
					<Self as Store>::HrmpAcceptedChannelRequestCount::remove(&channel_id.recipient);
				}

				let _ = open_req_channels.swap_remove(idx);
				<Self as Store>::HrmpOpenChannelRequests::remove(&channel_id);
			} else {
				request.age += 1;
				if request.age == config.hrmp_open_request_ttl {
					// got stale

					<Self as Store>::HrmpOpenChannelRequestCount::mutate(&channel_id.sender, |v| {
						*v -= 1;
					});

					// TODO: return deposit https://github.com/paritytech/polkadot/issues/1907

					let _ = open_req_channels.swap_remove(idx);
					<Self as Store>::HrmpOpenChannelRequests::remove(&channel_id);
				}
			}
		}

		<Self as Store>::HrmpOpenChannelRequestsList::put(open_req_channels);
	}

	/// Iterate over all close channel requests unconditionally closing the channels.
	pub(super) fn process_hrmp_close_channel_requests() {
		let close_reqs = <Self as Store>::HrmpCloseChannelRequestsList::take();
		for condemned_ch_id in close_reqs {
			<Self as Store>::HrmpCloseChannelRequests::remove(&condemned_ch_id);
			Self::close_hrmp_channel(&condemned_ch_id);

			// clean up the indexes.
			<Self as Store>::HrmpEgressChannelsIndex::mutate(&condemned_ch_id.sender, |v| {
				if let Ok(i) = v.binary_search(&condemned_ch_id.recipient) {
					v.remove(i);
				}
			});
			<Self as Store>::HrmpIngressChannelsIndex::mutate(&condemned_ch_id.recipient, |v| {
				if let Ok(i) = v.binary_search(&condemned_ch_id.sender) {
					v.remove(i);
				}
			});
		}
	}

	/// Close and remove the designated HRMP channel.
	///
	/// This includes returning the deposits. However, it doesn't include updating the ingress/egress
	/// indicies.
	pub(super) fn close_hrmp_channel(channel_id: &HrmpChannelId) {
		// TODO: return deposit https://github.com/paritytech/polkadot/issues/1907

		<Self as Store>::HrmpChannels::remove(channel_id);
		<Self as Store>::HrmpChannelContents::remove(channel_id);
	}

	/// Check that the candidate of the given recipient controls the HRMP watermark properly.
	pub(crate) fn check_hrmp_watermark(
		recipient: ParaId,
		relay_chain_parent_number: T::BlockNumber,
		new_hrmp_watermark: T::BlockNumber,
	) -> Result<(), HrmpWatermarkAcceptanceErr<T::BlockNumber>> {
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		// First, check where the watermark CANNOT legally land.
		//
		// (a) For ensuring that messages are eventually, a rule requires each parablock new
		//     watermark should be greater than the last one.
		//
		// (b) However, a parachain cannot read into "the future", therefore the watermark should
		//     not be greater than the relay-chain context block which the parablock refers to.
		if let Some(last_watermark) = <Self as Store>::HrmpWatermarks::get(&recipient) {
			if new_hrmp_watermark <= last_watermark {
				return Err(HrmpWatermarkAcceptanceErr::AdvancementRule {
					new_watermark: new_hrmp_watermark,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
					last_watermark,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
			}
		}
		if new_hrmp_watermark > relay_chain_parent_number {
			return Err(HrmpWatermarkAcceptanceErr::AheadRelayParent {
				new_watermark: new_hrmp_watermark,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
				relay_chain_parent_number,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		}

		// Second, check where the watermark CAN land. It's one of the following:
		//
		// (a) The relay parent block number.
		// (b) A relay-chain block in which this para received at least one message.
		if new_hrmp_watermark == relay_chain_parent_number {
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		} else {
			let digest = <Self as Store>::HrmpChannelDigests::get(&recipient);
			if !digest
				.binary_search_by_key(&new_hrmp_watermark, |(block_no, _)| *block_no)
				.is_ok()
			{
				return Err(HrmpWatermarkAcceptanceErr::LandsOnBlockWithNoMessages {
					new_watermark: new_hrmp_watermark,
				});
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		}
	}

	pub(crate) fn check_outbound_hrmp(
		config: &HostConfiguration<T::BlockNumber>,
		sender: ParaId,
		out_hrmp_msgs: &[OutboundHrmpMessage<ParaId>],
	) -> Result<(), OutboundHrmpAcceptanceErr> {
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		if out_hrmp_msgs.len() as u32 > config.hrmp_max_message_num_per_candidate {
			return Err(OutboundHrmpAcceptanceErr::MoreMessagesThanPermitted {
				sent: out_hrmp_msgs.len() as u32,
				permitted: config.hrmp_max_message_num_per_candidate,
			});
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		}

		let mut last_recipient = None::<ParaId>;

		for (idx, out_msg) in out_hrmp_msgs
			.iter()
			.enumerate()
			.map(|(idx, out_msg)| (idx as u32, out_msg))
		{
Sergey Pepyakin's avatar
Sergey Pepyakin committed
			match last_recipient {
				// the messages must be sorted in ascending order and there must be no two messages sent
				// to the same recipient. Thus we can check that every recipient is strictly greater than
				// the previous one.
				Some(last_recipient) if out_msg.recipient <= last_recipient => {
					return Err(OutboundHrmpAcceptanceErr::NotSorted { idx });
Sergey Pepyakin's avatar
Sergey Pepyakin committed
				}
				_ => last_recipient = Some(out_msg.recipient),
			}

			let channel_id = HrmpChannelId {
				sender,
				recipient: out_msg.recipient,
			};

			let channel = match <Self as Store>::HrmpChannels::get(&channel_id) {
				Some(channel) => channel,
				None => {
					return Err(OutboundHrmpAcceptanceErr::NoSuchChannel { channel_id, idx });
			let msg_size = out_msg.data.len() as u32;
			if msg_size > channel.max_message_size {
				return Err(OutboundHrmpAcceptanceErr::MaxMessageSizeExceeded {
Sergey Pepyakin's avatar
Sergey Pepyakin committed
					idx,
					msg_size,
					max_size: channel.max_message_size,
				});
Sergey Pepyakin's avatar
Sergey Pepyakin committed
			}

			let new_total_size = channel.total_size + out_msg.data.len() as u32;
			if new_total_size > channel.max_total_size {
				return Err(OutboundHrmpAcceptanceErr::TotalSizeExceeded {
Sergey Pepyakin's avatar
Sergey Pepyakin committed
					idx,
					total_size: new_total_size,
					limit: channel.max_total_size,
				});
Sergey Pepyakin's avatar
Sergey Pepyakin committed
			}

			let new_msg_count = channel.msg_count + 1;
			if new_msg_count > channel.max_capacity {
				return Err(OutboundHrmpAcceptanceErr::CapacityExceeded {
Sergey Pepyakin's avatar
Sergey Pepyakin committed
					idx,
					count: new_msg_count,
					limit: channel.max_capacity,
				});
Sergey Pepyakin's avatar
Sergey Pepyakin committed
	}

	pub(crate) fn prune_hrmp(recipient: ParaId, new_hrmp_watermark: T::BlockNumber) -> Weight {
		let mut weight = 0;

		// sift through the incoming messages digest to collect the paras that sent at least one
		// message to this parachain between the old and new watermarks.
		let senders = <Self as Store>::HrmpChannelDigests::mutate(&recipient, |digest| {
			let mut senders = BTreeSet::new();
			let mut leftover = Vec::with_capacity(digest.len());
			for (block_no, paras_sent_msg) in mem::replace(digest, Vec::new()) {
				if block_no <= new_hrmp_watermark {
					senders.extend(paras_sent_msg);
				} else {
					leftover.push((block_no, paras_sent_msg));
				}
			}
			*digest = leftover;
			senders
		});
		weight += T::DbWeight::get().reads_writes(1, 1);

		// having all senders we can trivially find out the channels which we need to prune.
		let channels_to_prune = senders
			.into_iter()
			.map(|sender| HrmpChannelId { sender, recipient });
		for channel_id in channels_to_prune {
			// prune each channel up to the new watermark keeping track how many messages we removed
			// and what is the total byte size of them.
			let (mut pruned_cnt, mut pruned_size) = (0, 0);

			let contents = <Self as Store>::HrmpChannelContents::get(&channel_id);
			let mut leftover = Vec::with_capacity(contents.len());
			for msg in contents {
				if msg.sent_at <= new_hrmp_watermark {
					pruned_cnt += 1;
					pruned_size += msg.data.len();
				} else {
					leftover.push(msg);
				}
			}
			if !leftover.is_empty() {
				<Self as Store>::HrmpChannelContents::insert(&channel_id, leftover);
			} else {
				<Self as Store>::HrmpChannelContents::remove(&channel_id);
			}

			// update the channel metadata.
			<Self as Store>::HrmpChannels::mutate(&channel_id, |channel| {
				if let Some(ref mut channel) = channel {
					channel.msg_count -= pruned_cnt as u32;
					channel.total_size -= pruned_size as u32;
				}
			});

			weight += T::DbWeight::get().reads_writes(2, 2);
		}

		<Self as Store>::HrmpWatermarks::insert(&recipient, new_hrmp_watermark);
		weight += T::DbWeight::get().reads_writes(0, 1);

		weight
	}

	/// Process the outbound HRMP messages by putting them into the appropriate recipient queues.
	///
	/// Returns the amount of weight consumed.
	pub(crate) fn queue_outbound_hrmp(
		sender: ParaId,
		out_hrmp_msgs: Vec<OutboundHrmpMessage<ParaId>>,
	) -> Weight {
		let mut weight = 0;
		let now = <frame_system::Module<T>>::block_number();

		for out_msg in out_hrmp_msgs {
			let channel_id = HrmpChannelId {
				sender,
				recipient: out_msg.recipient,
			};

			let mut channel = match <Self as Store>::HrmpChannels::get(&channel_id) {
				Some(channel) => channel,
				None => {
					// apparently, that since acceptance of this candidate the recipient was
					// offboarded and the channel no longer exists.
					continue;
				}
			};

			let inbound = InboundHrmpMessage {
				sent_at: now,
				data: out_msg.data,
			};

			// book keeping
			channel.msg_count += 1;
			channel.total_size += inbound.data.len() as u32;

			// compute the new MQC head of the channel
			let prev_head = channel.mqc_head.clone().unwrap_or(Default::default());
			let new_head = BlakeTwo256::hash_of(&(
				prev_head,
				inbound.sent_at,
				T::Hashing::hash_of(&inbound.data),
			));
			channel.mqc_head = Some(new_head);

			<Self as Store>::HrmpChannels::insert(&channel_id, channel);
			<Self as Store>::HrmpChannelContents::append(&channel_id, inbound);

			// The digests are sorted in ascending by block number order. Assuming absence of
			// contextual execution, there are only two possible scenarios here:
			//
			// (a) It's the first time anybody sends a message to this recipient within this block.
			//     In this case, the digest vector would be empty or the block number of the latest
			//     entry  is smaller than the current.
			//
			// (b) Somebody has already sent a message within the current block. That means that
			//     the block number of the latest entry is equal to the current.
			//
			// Note that having the latest entry greater than the current block number is a logical
			// error.
			let mut recipient_digest =
				<Self as Store>::HrmpChannelDigests::get(&channel_id.recipient);
			if let Some(cur_block_digest) = recipient_digest
				.last_mut()
				.filter(|(block_no, _)| *block_no == now)
				.map(|(_, ref mut d)| d)
			{
				cur_block_digest.push(sender);
			} else {
				recipient_digest.push((now, vec![sender]));
			}
			<Self as Store>::HrmpChannelDigests::insert(&channel_id.recipient, recipient_digest);

			weight += T::DbWeight::get().reads_writes(2, 2);
		}

		weight
	}

	/// Initiate opening a channel from a parachain to a given recipient with given channel
	/// parameters.
	///
	/// Basically the same as [`hrmp_init_open_channel`](Module::hrmp_init_open_channel) but intendend for calling directly from
	/// other pallets rather than dispatched.
	pub fn init_open_channel(
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		origin: ParaId,
		recipient: ParaId,
		proposed_max_capacity: u32,
		proposed_max_message_size: u32,
	) -> Result<(), Error<T>> {
		ensure!(origin != recipient, Error::<T>::OpenHrmpChannelToSelf);
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		ensure!(
			<paras::Module<T>>::is_valid_para(recipient),
			Error::<T>::OpenHrmpChannelInvalidRecipient,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		);

		let config = <configuration::Module<T>>::config();
		ensure!(
			proposed_max_capacity > 0,
			Error::<T>::OpenHrmpChannelZeroCapacity,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		);
		ensure!(
			proposed_max_capacity <= config.hrmp_channel_max_capacity,
			Error::<T>::OpenHrmpChannelCapacityExceedsLimit,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		);
		ensure!(
			proposed_max_message_size > 0,
			Error::<T>::OpenHrmpChannelZeroMessageSize,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		);
		ensure!(
			proposed_max_message_size <= config.hrmp_channel_max_message_size,
			Error::<T>::OpenHrmpChannelMessageSizeExceedsLimit,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		);

		let channel_id = HrmpChannelId {
			sender: origin,
			recipient,
		};
		ensure!(
			<Self as Store>::HrmpOpenChannelRequests::get(&channel_id).is_none(),
			Error::<T>::OpenHrmpChannelAlreadyExists,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		);
		ensure!(
			<Self as Store>::HrmpChannels::get(&channel_id).is_none(),
			Error::<T>::OpenHrmpChannelAlreadyRequested,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		);

		let egress_cnt =
			<Self as Store>::HrmpEgressChannelsIndex::decode_len(&origin).unwrap_or(0) as u32;
		let open_req_cnt = <Self as Store>::HrmpOpenChannelRequestCount::get(&origin);
		let channel_num_limit = if <paras::Module<T>>::is_parathread(origin) {
			config.hrmp_max_parathread_outbound_channels
		} else {
			config.hrmp_max_parachain_outbound_channels
		};
		ensure!(
			egress_cnt + open_req_cnt < channel_num_limit,
			Error::<T>::OpenHrmpChannelLimitExceeded,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		);

		// TODO: Deposit https://github.com/paritytech/polkadot/issues/1907

		<Self as Store>::HrmpOpenChannelRequestCount::insert(&origin, open_req_cnt + 1);
		<Self as Store>::HrmpOpenChannelRequests::insert(
			&channel_id,
			HrmpOpenChannelRequest {
				confirmed: false,
				age: 0,
				sender_deposit: config.hrmp_sender_deposit,
				max_capacity: proposed_max_capacity,
				max_message_size: proposed_max_message_size,
				max_total_size: config.hrmp_channel_max_total_size,
			},
		);
		<Self as Store>::HrmpOpenChannelRequestsList::append(channel_id);

		let notification_bytes = {
			use xcm::v0::Xcm;
			use parity_scale_codec::Encode as _;
Sergey Pepyakin's avatar
Sergey Pepyakin committed

			Xcm::HrmpNewChannelOpenRequest {
				sender: u32::from(origin),
				max_capacity: proposed_max_capacity,
				max_message_size: proposed_max_message_size,
			}
			.encode()
		};
		if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
			<dmp::Module<T>>::queue_downward_message(&config, recipient, notification_bytes)
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		{
			// this should never happen unless the max downward message size is configured to an
			// jokingly small number.
			debug_assert!(false);
		}

		Ok(())
	}

	/// Accept a pending open channel request from the given sender.
	///
	/// Basically the same as [`hrmp_accept_open_channel`](Module::hrmp_accept_open_channel) but intendend for calling directly from
	/// other pallets rather than dispatched.
	pub fn accept_open_channel(origin: ParaId, sender: ParaId) -> Result<(), Error<T>> {
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		let channel_id = HrmpChannelId {
			sender,
			recipient: origin,
		};
		let mut channel_req = <Self as Store>::HrmpOpenChannelRequests::get(&channel_id)
			.ok_or(Error::<T>::AcceptHrmpChannelDoesntExist)?;
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		ensure!(
			!channel_req.confirmed,
			Error::<T>::AcceptHrmpChannelAlreadyConfirmed,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		);

		// check if by accepting this open channel request, this parachain would exceed the
		// number of inbound channels.
		let config = <configuration::Module<T>>::config();
		let channel_num_limit = if <paras::Module<T>>::is_parathread(origin) {
			config.hrmp_max_parathread_inbound_channels
		} else {
			config.hrmp_max_parachain_inbound_channels
		};
		let ingress_cnt =
			<Self as Store>::HrmpIngressChannelsIndex::decode_len(&origin).unwrap_or(0) as u32;
		let accepted_cnt = <Self as Store>::HrmpAcceptedChannelRequestCount::get(&origin);
		ensure!(
			ingress_cnt + accepted_cnt < channel_num_limit,
			Error::<T>::AcceptHrmpChannelLimitExceeded,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		);

		// TODO: Deposit https://github.com/paritytech/polkadot/issues/1907

		// persist the updated open channel request and then increment the number of accepted
		// channels.
		channel_req.confirmed = true;
		<Self as Store>::HrmpOpenChannelRequests::insert(&channel_id, channel_req);
		<Self as Store>::HrmpAcceptedChannelRequestCount::insert(&origin, accepted_cnt + 1);

		let notification_bytes = {
			use parity_scale_codec::Encode as _;
Sergey Pepyakin's avatar
Sergey Pepyakin committed

			Xcm::HrmpChannelAccepted {
				recipient: u32::from(origin),
			}
			.encode()
		};
		if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
			<dmp::Module<T>>::queue_downward_message(&config, sender, notification_bytes)
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		{
			// this should never happen unless the max downward message size is configured to an
			// jokingly small number.
			debug_assert!(false);
		}

		Ok(())
	}

	fn close_channel(origin: ParaId, channel_id: HrmpChannelId) -> Result<(), Error<T>> {
Sergey Pepyakin's avatar
Sergey Pepyakin committed
		// check if the origin is allowed to close the channel.
		ensure!(
			origin == channel_id.sender || origin == channel_id.recipient,