lib.rs 53.6 KiB
Newer Older
		}
		(total_processed > 0, next_ready)
	}

	/// Service as many messages of a page as possible.
	///
	/// Returns how many messages were processed and the page's status.
	fn service_page(
		origin: &MessageOriginOf<T>,
		book_state: &mut BookStateOf<T>,
		weight: &mut WeightMeter,
		overweight_limit: Weight,
	) -> (u32, PageExecutionStatus) {
		use PageExecutionStatus::*;
		if weight
			.try_consume(
				T::WeightInfo::service_page_base_completion()
					.max(T::WeightInfo::service_page_base_no_completion()),
			)
			.is_err()
		{
			return (0, Bailed)
		}

		let page_index = book_state.begin;
		let mut page = match Pages::<T>::get(origin, page_index) {
			Some(p) => p,
			None => {
				defensive!("message-queue: referenced page not found");
				return (0, NoMore)
			},
		};

		let mut total_processed = 0;

		// Execute as many messages as possible.
		let status = loop {
			use ItemExecutionStatus::*;
			match Self::service_page_item(
				origin,
				page_index,
				book_state,
				&mut page,
				weight,
				overweight_limit,
			) {
				Bailed => break PageExecutionStatus::Bailed,
				NoItem => break PageExecutionStatus::NoMore,
				NoProgress => break PageExecutionStatus::NoProgress,
				// Keep going as long as we make progress...
				Executed(true) => total_processed.saturating_inc(),
				Executed(false) => (),
			}
		};

		if page.is_complete() {
			debug_assert!(status != Bailed, "we never bail if a page became complete");
			Pages::<T>::remove(origin, page_index);
			debug_assert!(book_state.count > 0, "completing a page implies there are pages");
			book_state.count.saturating_dec();
		} else {
			Pages::<T>::insert(origin, page_index, page);
		}
		(total_processed, status)
	}

	/// Execute the next message of a page.
	pub(crate) fn service_page_item(
		origin: &MessageOriginOf<T>,
		page_index: PageIndex,
		book_state: &mut BookStateOf<T>,
		page: &mut PageOf<T>,
		weight: &mut WeightMeter,
		overweight_limit: Weight,
	) -> ItemExecutionStatus {
		// This ugly pre-checking is needed for the invariant
		// "we never bail if a page became complete".
		if page.is_complete() {
			return ItemExecutionStatus::NoItem
		}
		if weight.try_consume(T::WeightInfo::service_page_item()).is_err() {
			return ItemExecutionStatus::Bailed
		}

		let payload = &match page.peek_first() {
			Some(m) => m,
			None => return ItemExecutionStatus::NoItem,
		}[..];

		use MessageExecutionStatus::*;
		let is_processed = match Self::process_message_payload(
			origin.clone(),
			page_index,
			page.first_index,
			payload.deref(),
			weight,
			overweight_limit,
		) {
			InsufficientWeight => return ItemExecutionStatus::Bailed,
			Unprocessable { permanent: false } => return ItemExecutionStatus::NoProgress,
			Processed | Unprocessable { permanent: true } => true,
			Overweight => false,
		};

		if is_processed {
			book_state.message_count.saturating_dec();
			book_state.size.saturating_reduce(payload.len() as u64);
		}
		page.skip_first(is_processed);
		ItemExecutionStatus::Executed(is_processed)
	}

	/// Ensure the correctness of state of this pallet.
	///
	/// # Assumptions-
	///
	/// If `serviceHead` points to a ready Queue, then BookState of that Queue has:
	///
	/// * `message_count` > 0
	/// * `size` > 0
	/// * `end` > `begin`
	/// * Some(ready_neighbours)
	/// * If `ready_neighbours.next` == self.origin, then `ready_neighbours.prev` == self.origin
	///   (only queue in ring)
	///
	/// For Pages(begin to end-1) in BookState:
	///
	/// * `remaining` > 0
	/// * `remaining_size` > 0
	/// * `first` <= `last`
	/// * Every page can be decoded into peek_* functions
	#[cfg(any(test, feature = "try-runtime"))]
	pub fn do_try_state() -> Result<(), sp_runtime::TryRuntimeError> {
		// Checking memory corruption for BookStateFor
		ensure!(
			BookStateFor::<T>::iter_keys().count() == BookStateFor::<T>::iter_values().count(),
			"Memory Corruption in BookStateFor"
		);
		// Checking memory corruption for Pages
		ensure!(
			Pages::<T>::iter_keys().count() == Pages::<T>::iter_values().count(),
			"Memory Corruption in Pages"
		);

		// No state to check
		if ServiceHead::<T>::get().is_none() {
			return Ok(())
		}

		//loop around this origin
		let starting_origin = ServiceHead::<T>::get().unwrap();

		while let Some(head) = Self::bump_service_head(&mut WeightMeter::max_limit()) {
			ensure!(
				BookStateFor::<T>::contains_key(&head),
				"Service head must point to an existing book"
			);

			let head_book_state = BookStateFor::<T>::get(&head);
			ensure!(
				head_book_state.message_count > 0,
				"There must be some messages if in ReadyRing"
			);
			ensure!(head_book_state.size > 0, "There must be some message size if in ReadyRing");
			ensure!(
				head_book_state.end > head_book_state.begin,
				"End > Begin if unprocessed messages exists"
			);
			ensure!(
				head_book_state.ready_neighbours.is_some(),
				"There must be neighbours if in ReadyRing"
			);

			if head_book_state.ready_neighbours.as_ref().unwrap().next == head {
				ensure!(
					head_book_state.ready_neighbours.as_ref().unwrap().prev == head,
					"Can only happen if only queue in ReadyRing"
				);
			}

			for page_index in head_book_state.begin..head_book_state.end {
				let page = Pages::<T>::get(&head, page_index).unwrap();
				let remaining_messages = page.remaining;
				let mut counted_remaining_messages = 0;
				ensure!(
					remaining_messages > 0.into(),
					"These must be some messages that have not been processed yet!"
				);

				for i in 0..u32::MAX {
					if let Some((_, processed, _)) = page.peek_index(i as usize) {
						if !processed {
							counted_remaining_messages += 1;
						}
					} else {
						break
					}
				}

				ensure!(
					remaining_messages == counted_remaining_messages.into(),
					"Memory Corruption"
				);
			}

			if head_book_state.ready_neighbours.as_ref().unwrap().next == starting_origin {
				break
			}
		}
		Ok(())
	}

	/// Print the pages in each queue and the messages in each page.
	///
	/// Processed messages are prefixed with a `*` and the current `begin`ning page with a `>`.
	///
	/// # Example output
	///
	/// ```text
	/// queue Here:
	///   page 0: []
	/// > page 1: []
	///   page 2: ["\0weight=4", "\0c", ]
	///   page 3: ["\0bigbig 1", ]
	///   page 4: ["\0bigbig 2", ]
	///   page 5: ["\0bigbig 3", ]
	/// ```
	#[cfg(feature = "std")]
	pub fn debug_info() -> String {
		let mut info = String::new();
		for (origin, book_state) in BookStateFor::<T>::iter() {
			let mut queue = format!("queue {:?}:\n", &origin);
			let mut pages = Pages::<T>::iter_prefix(&origin).collect::<Vec<_>>();
			pages.sort_by(|(a, _), (b, _)| a.cmp(b));
			for (page_index, mut page) in pages.into_iter() {
				let page_info = if book_state.begin == page_index { ">" } else { " " };
				let mut page_info = format!(
					"{} page {} ({:?} first, {:?} last, {:?} remain): [ ",
					page_info, page_index, page.first, page.last, page.remaining
				);
				for i in 0..u32::MAX {
					if let Some((_, processed, message)) =
						page.peek_index(i.try_into().expect("std-only code"))
					{
						let msg = String::from_utf8_lossy(message.deref());
						if processed {
							page_info.push('*');
						}
						page_info.push_str(&format!("{:?}, ", msg));
						page.skip_first(true);
					} else {
						break
					}
				}
				page_info.push_str("]\n");
				queue.push_str(&page_info);
			}
			info.push_str(&queue);
		}
		info
	}

	/// Process a single message.
	///
	/// The base weight of this function needs to be accounted for by the caller. `weight` is the
	/// remaining weight to process the message. `overweight_limit` is the maximum weight that a
	/// message can ever consume. Messages above this limit are marked as permanently overweight.
	fn process_message_payload(
		origin: MessageOriginOf<T>,
		page_index: PageIndex,
		message_index: T::Size,
		message: &[u8],
		meter: &mut WeightMeter,
		overweight_limit: Weight,
	) -> MessageExecutionStatus {
		let hash = sp_io::hashing::blake2_256(message);
		use ProcessMessageError::*;
		let prev_consumed = meter.consumed();
		match T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id) {
			Err(Overweight(w)) if w.any_gt(overweight_limit) => {
				// Permanently overweight.
				Self::deposit_event(Event::<T>::OverweightEnqueued {
					origin,
					page_index,
					message_index,
				});
				MessageExecutionStatus::Overweight
			},
			Err(Overweight(_)) => {
				// Temporarily overweight - save progress and stop processing this
				// queue.
				MessageExecutionStatus::InsufficientWeight
			},
			Err(Yield) => {
				// Processing should be reattempted later.
				MessageExecutionStatus::Unprocessable { permanent: false }
			},
			Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => {
				// Permanent error - drop
				Self::deposit_event(Event::<T>::ProcessingFailed { id, origin, error });
				MessageExecutionStatus::Unprocessable { permanent: true }
				// Success
				let weight_used = meter.consumed().saturating_sub(prev_consumed);
				Self::deposit_event(Event::<T>::Processed { id, origin, weight_used, success });
				MessageExecutionStatus::Processed
			},
		}
	}
}

/// Provides a [`sp_core::Get`] to access the `MEL` of a [`codec::MaxEncodedLen`] type.
pub struct MaxEncodedLenOf<T>(sp_std::marker::PhantomData<T>);
impl<T: MaxEncodedLen> Get<u32> for MaxEncodedLenOf<T> {
	fn get() -> u32 {
		T::max_encoded_len() as u32
	}
}

/// Calculates the maximum message length and exposed it through the [`codec::MaxEncodedLen`] trait.
pub struct MaxMessageLen<Origin, Size, HeapSize>(
	sp_std::marker::PhantomData<(Origin, Size, HeapSize)>,
);
impl<Origin: MaxEncodedLen, Size: MaxEncodedLen + Into<u32>, HeapSize: Get<Size>> Get<u32>
	for MaxMessageLen<Origin, Size, HeapSize>
{
	fn get() -> u32 {
		(HeapSize::get().into()).saturating_sub(ItemHeader::<Size>::max_encoded_len() as u32)
	}
}

/// The maximal message length.
pub type MaxMessageLenOf<T> =
	MaxMessageLen<MessageOriginOf<T>, <T as Config>::Size, <T as Config>::HeapSize>;
/// The maximal encoded origin length.
pub type MaxOriginLenOf<T> = MaxEncodedLenOf<MessageOriginOf<T>>;
/// The `MessageOrigin` of this pallet.
pub type MessageOriginOf<T> = <<T as Config>::MessageProcessor as ProcessMessage>::Origin;
/// The maximal heap size of a page.
pub type HeapSizeU32Of<T> = IntoU32<<T as Config>::HeapSize, <T as Config>::Size>;
/// The [`Page`] of this pallet.
pub type PageOf<T> = Page<<T as Config>::Size, <T as Config>::HeapSize>;
/// The [`BookState`] of this pallet.
pub type BookStateOf<T> = BookState<MessageOriginOf<T>>;

/// Converts a [`sp_core::Get`] with returns a type that can be cast into an `u32` into a `Get`
/// which returns an `u32`.
pub struct IntoU32<T, O>(sp_std::marker::PhantomData<(T, O)>);
impl<T: Get<O>, O: Into<u32>> Get<u32> for IntoU32<T, O> {
	fn get() -> u32 {
		T::get().into()
	}
}

impl<T: Config> ServiceQueues for Pallet<T> {
	type OverweightMessageAddress = (MessageOriginOf<T>, PageIndex, T::Size);

	fn service_queues(weight_limit: Weight) -> Weight {
		// The maximum weight that processing a single message may take.
		let overweight_limit = weight_limit;
		let mut weight = WeightMeter::from_limit(weight_limit);

		let mut next = match Self::bump_service_head(&mut weight) {
			Some(h) => h,
			None => return weight.consumed(),
		};
		// The last queue that did not make any progress.
		// The loop aborts as soon as it arrives at this queue again without making any progress
		// on other queues in between.
		let mut last_no_progress = None;

		loop {
			let (progressed, n) = Self::service_queue(next.clone(), &mut weight, overweight_limit);
			next = match n {
				Some(n) =>
					if !progressed {
						if last_no_progress == Some(n.clone()) {
							break
						}
						if last_no_progress.is_none() {
							last_no_progress = Some(next.clone())
						}
						n
					} else {
						last_no_progress = None;
						n
					},
				None => break,
			}
		}
	}

	/// Execute a single overweight message.
	///
	/// The weight limit must be enough for `execute_overweight` and the message execution itself.
	fn execute_overweight(
		weight_limit: Weight,
		(message_origin, page, index): Self::OverweightMessageAddress,
	) -> Result<Weight, ExecuteOverweightError> {
		let mut weight = WeightMeter::from_limit(weight_limit);
		if weight
			.try_consume(
				T::WeightInfo::execute_overweight_page_removed()
					.max(T::WeightInfo::execute_overweight_page_updated()),
			)
			.is_err()
		{
			return Err(ExecuteOverweightError::InsufficientWeight)
		}

		Pallet::<T>::do_execute_overweight(message_origin, page, index, weight.remaining()).map_err(
			|e| match e {
				Error::<T>::InsufficientWeight => ExecuteOverweightError::InsufficientWeight,
				Error::<T>::AlreadyProcessed => ExecuteOverweightError::AlreadyProcessed,
				Error::<T>::QueuePaused => ExecuteOverweightError::QueuePaused,
				Error::<T>::NoPage | Error::<T>::NoMessage | Error::<T>::Queued =>
					ExecuteOverweightError::NotFound,
				_ => ExecuteOverweightError::Other,
			},
		)
	}
}

impl<T: Config> EnqueueMessage<MessageOriginOf<T>> for Pallet<T> {
	type MaxMessageLen =
		MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;

	fn enqueue_message(
		message: BoundedSlice<u8, Self::MaxMessageLen>,
		origin: <T::MessageProcessor as ProcessMessage>::Origin,
	) {
		Self::do_enqueue_message(&origin, message);
		let book_state = BookStateFor::<T>::get(&origin);
		T::QueueChangeHandler::on_queue_changed(origin, book_state.message_count, book_state.size);
	}

	fn enqueue_messages<'a>(
		messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
		origin: <T::MessageProcessor as ProcessMessage>::Origin,
	) {
		for message in messages {
			Self::do_enqueue_message(&origin, message);
		}
		let book_state = BookStateFor::<T>::get(&origin);
		T::QueueChangeHandler::on_queue_changed(origin, book_state.message_count, book_state.size);
	}

	fn sweep_queue(origin: MessageOriginOf<T>) {
		if !BookStateFor::<T>::contains_key(&origin) {
			return
		}
		let mut book_state = BookStateFor::<T>::get(&origin);
		book_state.begin = book_state.end;
		if let Some(neighbours) = book_state.ready_neighbours.take() {
			Self::ready_ring_unknit(&origin, neighbours);
		}
		BookStateFor::<T>::insert(&origin, &book_state);
	}

	fn footprint(origin: MessageOriginOf<T>) -> Footprint {
		let book_state = BookStateFor::<T>::get(&origin);
		Footprint { count: book_state.message_count, size: book_state.size }
	}
}