Skip to content
lib.rs 46.4 KiB
Newer Older
		});
	}

	fn resume_channel(target: ParaId) {
		<OutboundXcmpStatus<T>>::mutate(|s| {
			if let Some(index) = s.iter().position(|item| item.recipient == target) {
				let suspended = s[index].state == OutboundState::Suspended;
				debug_assert!(
					suspended,
					"WARNING: Attempt to resume channel that was not suspended."
				);
				if s[index].first_index == s[index].last_index {
					s.remove(index);
				} else {
					s[index].state = OutboundState::Ok;
				debug_assert!(false, "WARNING: Attempt to resume channel that was not suspended.");

	#[cfg(feature = "bridging")]
	fn is_inbound_channel_suspended(sender: ParaId) -> bool {
		<InboundXcmpStatus<T>>::get()
			.iter()
			.find(|c| c.sender == sender)
			.map(|c| c.state == InboundState::Suspended)
			.unwrap_or(false)
	}

	#[cfg(feature = "bridging")]
	/// Returns tuple of `OutboundState` and number of queued pages.
	fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
		<OutboundXcmpStatus<T>>::get().iter().find(|c| c.recipient == target).map(|c| {
			let queued_pages = c.last_index.saturating_sub(c.first_index);
			(c.state, queued_pages)
		})
	}
impl<T: Config> XcmpMessageHandler for Pallet<T> {
	fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
		iter: I,
		max_weight: Weight,
	) -> Weight {
		let mut status = <InboundXcmpStatus<T>>::get();
		let QueueConfigData { suspend_threshold, drop_threshold, .. } = <QueueConfig<T>>::get();

		for (sender, sent_at, data) in iter {
			// Figure out the message format.
			let mut data_ref = data;
			let format = match XcmpMessageFormat::decode_with_depth_limit(
				MAX_XCM_DECODE_DEPTH,
				&mut data_ref,
			) {
				Ok(f) => f,
				Err(_) => {
					debug_assert!(false, "Unknown XCMP message format. Silently dropping message");
					continue
				},
			};
			if format == XcmpMessageFormat::Signals {
				while !data_ref.is_empty() {
					use ChannelSignal::*;
					match ChannelSignal::decode(&mut data_ref) {
						Ok(Suspend) => Self::suspend_channel(sender),
						Ok(Resume) => Self::resume_channel(sender),
						Err(_) => break,
					}
				}
			} else {
				// Record the fact we received it.
				match status.binary_search_by_key(&sender, |item| item.sender) {
					Ok(i) => {
						let count = status[i].message_metadata.len() as u32;
						if count >= suspend_threshold && status[i].state == InboundState::Ok {
							status[i].state = InboundState::Suspended;
							let r = Self::send_signal(sender, ChannelSignal::Suspend);
							if r.is_err() {
								log::warn!(
									"Attempt to suspend channel failed. Messages may be dropped."
								);
						if count < drop_threshold {
							status[i].message_metadata.push((sent_at, format));
						} else {
							debug_assert!(
								false,
								"XCMP channel queue full. Silently dropping message"
							);
						// Update the delivery fee factor, if applicable.
						if count > suspend_threshold {
							let message_size_factor =
								FixedU128::from((data_ref.len() / 1024) as u128)
									.saturating_mul(delivery_fee_constants::MESSAGE_SIZE_FEE_BASE);
							Self::increase_fee_factor(sender, message_size_factor);
						}
					Err(_) => status.push(InboundChannelDetails {
						sender,
						state: InboundState::Ok,
						message_metadata: vec![(sent_at, format)],
					}),
				}
				// Queue the payload for later execution.
				<InboundXcmpMessages<T>>::insert(sender, sent_at, data_ref);
			}

			// Optimization note; it would make sense to execute messages immediately if
			// `status.is_empty()` here.
		}
		status.sort();
		<InboundXcmpStatus<T>>::put(status);

		Self::service_xcmp_queue(max_weight)
	}
}

impl<T: Config> XcmpMessageSource for Pallet<T> {
	fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
		let mut statuses = <OutboundXcmpStatus<T>>::get();
		let old_statuses_len = statuses.len();
		let max_message_count = statuses.len().min(maximum_channels);
		let mut result = Vec::with_capacity(max_message_count);

		for status in statuses.iter_mut() {
			let OutboundChannelDetails {
				recipient: para_id,
				state: outbound_state,
				mut signals_exist,
				mut first_index,
				mut last_index,
			} = *status;

			if result.len() == max_message_count {
				// We check this condition in the beginning of the loop so that we don't include
				// a message where the limit is 0.
			if outbound_state == OutboundState::Suspended {
			}
			let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
				ChannelStatus::Closed => {
					// This means that there is no such channel anymore. Nothing to be done but
					// swallow the messages and discard the status.
					for i in first_index..last_index {
						<OutboundXcmpMessages<T>>::remove(para_id, i);
					if signals_exist {
						<SignalMessages<T>>::remove(para_id);
					*status = OutboundChannelDetails::new(para_id);
				ChannelStatus::Full => continue,
				ChannelStatus::Ready(n, e) => (n, e),
			};

			let page = if signals_exist {
				let page = <SignalMessages<T>>::get(para_id);
				if page.len() < max_size_now {
					<SignalMessages<T>>::remove(para_id);
					signals_exist = false;
					page
				} else {
			} else if last_index > first_index {
				let page = <OutboundXcmpMessages<T>>::get(para_id, first_index);
				if page.len() < max_size_now {
					<OutboundXcmpMessages<T>>::remove(para_id, first_index);
					first_index += 1;
					page
				} else {
			if first_index == last_index {
				first_index = 0;
				last_index = 0;
			}

			if page.len() > max_size_ever {
				// TODO: #274 This means that the channel's max message size has changed since
				//   the message was sent. We should parse it and split into smaller mesasges but
				//   since it's so unlikely then for now we just drop it.
				log::warn!("WARNING: oversize message in queue. silently dropping.");
			} else {
				result.push((para_id, page));
			}

			let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
				Some(channel_info) => channel_info.max_total_size,
				None => {
					log::warn!("calling `get_channel_info` with no RelevantMessagingState?!");
					MAX_POSSIBLE_ALLOCATION // We use this as a fallback in case the messaging state is not present
				},
			};
			let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
			let remaining_total_size: usize = (first_index..last_index)
				.map(|index| OutboundXcmpMessages::<T>::decode_len(para_id, index).unwrap())
				.sum();
			if remaining_total_size <= threshold as usize {
				Self::decrease_fee_factor(para_id);
			}

			*status = OutboundChannelDetails {
				recipient: para_id,
				state: outbound_state,
				signals_exist,
				first_index,
				last_index,
			};
		}

		// Sort the outbound messages by ascending recipient para id to satisfy the acceptance
		// criteria requirement.
		result.sort_by_key(|m| m.0);

		// Prune hrmp channels that became empty. Additionally, because it may so happen that we
		// only gave attention to some channels in `non_empty_hrmp_channels` it's important to
		// change the order. Otherwise, the next `on_finalize` we will again give attention
		// only to those channels that happen to be in the beginning, until they are emptied.
		// This leads to "starvation" of the channels near to the end.
		//
		// To mitigate this we shift all processed elements towards the end of the vector using
		// `rotate_left`. To get intuition how it works see the examples in its rustdoc.
		statuses.retain(|x| {
			x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index
		});

		// old_status_len must be >= status.len() since we never add anything to status.
		let pruned = old_statuses_len - statuses.len();
		// removing an item from status implies a message being sent, so the result messages must
		// be no less than the pruned channels.
		statuses.rotate_left(result.len().saturating_sub(pruned));
		<OutboundXcmpStatus<T>>::put(statuses);

		result
	}
}

/// Xcm sender for sending to a sibling parachain.
impl<T: Config> SendXcm for Pallet<T> {
Gavin Wood's avatar
Gavin Wood committed
	type Ticket = (ParaId, VersionedXcm<()>);

	fn validate(
		dest: &mut Option<MultiLocation>,
		msg: &mut Option<Xcm<()>>,
	) -> SendResult<(ParaId, VersionedXcm<()>)> {
		let d = dest.take().ok_or(SendError::MissingArgument)?;
Gavin Wood's avatar
Gavin Wood committed
		match &d {
			// An HRMP message for a sibling parachain.
			MultiLocation { parents: 1, interior: X1(Parachain(id)) } => {
Gavin Wood's avatar
Gavin Wood committed
				let xcm = msg.take().ok_or(SendError::MissingArgument)?;
				let id = ParaId::from(*id);
				let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
Gavin Wood's avatar
Gavin Wood committed
				let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
					.map_err(|()| SendError::DestinationUnsupported)?;
Gavin Wood's avatar
Gavin Wood committed
				Ok(((id, versioned_xcm), price))
			},
			_ => {
				// Anything else is unhandled. This includes a message that is not meant for us.
				// We need to make sure that dest/msg is not consumed here.
				*dest = Some(d);
				Err(SendError::NotApplicable)
			},
		}
	}

	fn deliver((id, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
		let hash = xcm.using_encoded(sp_io::hashing::blake2_256);

		match Self::send_fragment(id, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm) {
			Ok(_) => {
				Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
Gavin Wood's avatar
Gavin Wood committed
				Ok(hash)
Gavin Wood's avatar
Gavin Wood committed
			Err(e) => Err(SendError::Transport(<&'static str>::from(e))),

impl<T: Config> FeeTracker for Pallet<T> {
	type Id = ParaId;

	fn get_fee_factor(id: Self::Id) -> FixedU128 {
		<DeliveryFeeFactor<T>>::get(id)
	}

	fn increase_fee_factor(id: Self::Id, message_size_factor: FixedU128) -> FixedU128 {
		<DeliveryFeeFactor<T>>::mutate(id, |f| {
			*f = f.saturating_mul(
				delivery_fee_constants::EXPONENTIAL_FEE_BASE.saturating_add(message_size_factor),
			);
			*f
		})
	}

	fn decrease_fee_factor(id: Self::Id) -> FixedU128 {
		<DeliveryFeeFactor<T>>::mutate(id, |f| {
			*f = InitialFactor::get().max(*f / delivery_fee_constants::EXPONENTIAL_FEE_BASE);
			*f
		})
	}
}