Skip to content
lib.rs 60.9 KiB
Newer Older
					debug_assert!(
						page.remaining_size.is_zero(),
						"no messages remaining; no space taken; qed"
					);
					Pages::<T>::remove(&origin, page_index);
					debug_assert!(book_state.count >= 1, "page exists, so book must have pages");
					book_state.count.saturating_dec();
					T::WeightInfo::execute_overweight_page_removed()
				// no need to consider .first or ready ring since processing an overweight page
				// would not alter that state.
				} else {
					Pages::<T>::insert(&origin, page_index, page);
					T::WeightInfo::execute_overweight_page_updated()
				};
				BookStateFor::<T>::insert(&origin, &book_state);
				T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
				Ok(weight_counter.consumed().saturating_add(page_weight))
			},
		}
	}

	/// Remove a stale page or one which has no more messages remaining to be processed.
	fn do_reap_page(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
		match with_service_mutex(|| Self::do_reap_page_inner(origin, page_index)) {
			Err(()) => Err(Error::<T>::RecursiveDisallowed.into()),
			Ok(x) => x,
		}
	}

	/// Same as `do_reap_page` but must be called while holding the `service_mutex`.
	fn do_reap_page_inner(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
		let mut book_state = BookStateFor::<T>::get(origin);
		// definitely not reapable if the page's index is no less than the `begin`ning of ready
		// pages.
		ensure!(page_index < book_state.begin, Error::<T>::NotReapable);

		let page = Pages::<T>::get(origin, page_index).ok_or(Error::<T>::NoPage)?;

		// definitely reapable if the page has no messages in it.
		let reapable = page.remaining.is_zero();

		// also reapable if the page index has dropped below our watermark.
		let cullable = || {
			let total_pages = book_state.count;
			let ready_pages = book_state.end.saturating_sub(book_state.begin).min(total_pages);

			// The number of stale pages - i.e. pages which contain unprocessed overweight messages.
			// We would prefer to keep these around but will restrict how far into history they can
			// extend if we notice that there's too many of them.
			//
			// We don't know *where* in history these pages are so we use a dynamic formula which
			// reduces the historical time horizon as the stale pages pile up and increases it as
			// they reduce.
			let stale_pages = total_pages - ready_pages;

			// The maximum number of stale pages (i.e. of overweight messages) allowed before
			// culling can happen at all. Once there are more stale pages than this, then historical
			// pages may be dropped, even if they contain unprocessed overweight messages.
			let max_stale = T::MaxStale::get();

			// The amount beyond the maximum which are being used. If it's not beyond the maximum
			// then we exit now since no culling is needed.
			let overflow = match stale_pages.checked_sub(max_stale + 1) {
				Some(x) => x + 1,
				None => return false,
			};

			// The special formula which tells us how deep into index-history we will pages. As
			// the overflow is greater (and thus the need to drop items from storage is more urgent)
			// this is reduced, allowing a greater range of pages to be culled.
			// With a minimum `overflow` (`1`), this returns `max_stale ** 2`, indicating we only
			// cull beyond that number of indices deep into history.
			// At this overflow increases, our depth reduces down to a limit of `max_stale`. We
			// never want to reduce below this since this will certainly allow enough pages to be
			// culled in order to bring `overflow` back to zero.
			let backlog = (max_stale * max_stale / overflow).max(max_stale);

			let watermark = book_state.begin.saturating_sub(backlog);
			page_index < watermark
		};
		ensure!(reapable || cullable(), Error::<T>::NotReapable);

		Pages::<T>::remove(origin, page_index);
		debug_assert!(book_state.count > 0, "reaping a page implies there are pages");
		book_state.count.saturating_dec();
		book_state.message_count.saturating_reduce(page.remaining.into() as u64);
		book_state.size.saturating_reduce(page.remaining_size.into() as u64);
		BookStateFor::<T>::insert(origin, &book_state);
		T::QueueChangeHandler::on_queue_changed(origin.clone(), book_state.into());
		Self::deposit_event(Event::PageReaped { origin: origin.clone(), index: page_index });

		Ok(())
	}

	/// Execute any messages remaining to be processed in the queue of `origin`, using up to
	/// `weight_limit` to do so. Any messages which would take more than `overweight_limit` to
	/// execute are deemed overweight and ignored.
	fn service_queue(
		origin: MessageOriginOf<T>,
		weight: &mut WeightMeter,
		overweight_limit: Weight,
	) -> (bool, Option<MessageOriginOf<T>>) {
		use PageExecutionStatus::*;
		if weight
			.try_consume(
				T::WeightInfo::service_queue_base()
					.saturating_add(T::WeightInfo::ready_ring_unknit()),
			)
			.is_err()
		{
			return (false, None)
		}

		let mut book_state = BookStateFor::<T>::get(&origin);
		let mut total_processed = 0;
		if T::QueuePausedQuery::is_paused(&origin) {
			let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
			return (false, next_ready)
		}

		while book_state.end > book_state.begin {
			let (processed, status) =
				Self::service_page(&origin, &mut book_state, weight, overweight_limit);
			total_processed.saturating_accrue(processed);
			match status {
				// Store the page progress and do not go to the next one.
				Bailed | NoProgress => break,
				// Go to the next page if this one is at the end.
			};
			book_state.begin.saturating_inc();
		}
		let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
		if book_state.begin >= book_state.end {
			// No longer ready - unknit.
			if let Some(neighbours) = book_state.ready_neighbours.take() {
				Self::ready_ring_unknit(&origin, neighbours);
			} else if total_processed > 0 {
				defensive!("Freshly processed queue must have been ready");
			}
		}
		BookStateFor::<T>::insert(&origin, &book_state);
		if total_processed > 0 {
			T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
		}
		(total_processed > 0, next_ready)
	}

	/// Service as many messages of a page as possible.
	///
	/// Returns how many messages were processed and the page's status.
	fn service_page(
		origin: &MessageOriginOf<T>,
		book_state: &mut BookStateOf<T>,
		weight: &mut WeightMeter,
		overweight_limit: Weight,
	) -> (u32, PageExecutionStatus) {
		use PageExecutionStatus::*;
		if weight
			.try_consume(
				T::WeightInfo::service_page_base_completion()
					.max(T::WeightInfo::service_page_base_no_completion()),
			)
			.is_err()
		{
			return (0, Bailed)
		}

		let page_index = book_state.begin;
		let mut page = match Pages::<T>::get(origin, page_index) {
			Some(p) => p,
			None => {
				defensive!("message-queue: referenced page not found");
				return (0, NoMore)
			},
		};

		let mut total_processed = 0;

		// Execute as many messages as possible.
		let status = loop {
			use ItemExecutionStatus::*;
			match Self::service_page_item(
				origin,
				page_index,
				book_state,
				&mut page,
				weight,
				overweight_limit,
			) {
				Bailed => break PageExecutionStatus::Bailed,
				NoItem => break PageExecutionStatus::NoMore,
				NoProgress => break PageExecutionStatus::NoProgress,
				// Keep going as long as we make progress...
				Executed(true) => total_processed.saturating_inc(),
				Executed(false) => (),
			}
		};

		if page.is_complete() {
			debug_assert!(status != Bailed, "we never bail if a page became complete");
			Pages::<T>::remove(origin, page_index);
			debug_assert!(book_state.count > 0, "completing a page implies there are pages");
			book_state.count.saturating_dec();
		} else {
			Pages::<T>::insert(origin, page_index, page);
		}
		(total_processed, status)
	}

	/// Execute the next message of a page.
	pub(crate) fn service_page_item(
		origin: &MessageOriginOf<T>,
		page_index: PageIndex,
		book_state: &mut BookStateOf<T>,
		page: &mut PageOf<T>,
		weight: &mut WeightMeter,
		overweight_limit: Weight,
	) -> ItemExecutionStatus {
		use MessageExecutionStatus::*;
		// This ugly pre-checking is needed for the invariant
		// "we never bail if a page became complete".
		if page.is_complete() {
			return ItemExecutionStatus::NoItem
		}
		if weight.try_consume(T::WeightInfo::service_page_item()).is_err() {
			return ItemExecutionStatus::Bailed
		}

		let payload = &match page.peek_first() {
			Some(m) => m,
			None => return ItemExecutionStatus::NoItem,
		}[..];
		let payload_len = payload.len() as u64;
		// Store these for the case that `process_message_payload` is recursive.
		Pages::<T>::insert(origin, page_index, &*page);
		BookStateFor::<T>::insert(origin, &*book_state);

		let res = Self::process_message_payload(
			origin.clone(),
			page_index,
			page.first_index,
			weight,
			overweight_limit,
		);

		// And restore them afterwards to see the changes of a recursive call.
		*book_state = BookStateFor::<T>::get(origin);
		if let Some(new_page) = Pages::<T>::get(origin, page_index) {
			*page = new_page;
		} else {
			defensive!("page must exist since we just inserted it and recursive calls are not allowed to remove anything");
			return ItemExecutionStatus::NoItem
		};

		let is_processed = match res {
			InsufficientWeight => return ItemExecutionStatus::Bailed,
			Unprocessable { permanent: false } => return ItemExecutionStatus::NoProgress,
			Processed | Unprocessable { permanent: true } | StackLimitReached => true,
			Overweight => false,
		};

		if is_processed {
			book_state.message_count.saturating_dec();
			book_state.size.saturating_reduce(payload_len as u64);
		}
		page.skip_first(is_processed);
		ItemExecutionStatus::Executed(is_processed)
	}

	/// Ensure the correctness of state of this pallet.
	///
	/// # Assumptions-
	///
	/// If `serviceHead` points to a ready Queue, then BookState of that Queue has:
	///
	/// * `message_count` > 0
	/// * `size` > 0
	/// * `end` > `begin`
	/// * Some(ready_neighbours)
	/// * If `ready_neighbours.next` == self.origin, then `ready_neighbours.prev` == self.origin
	///   (only queue in ring)
	///
	/// For Pages(begin to end-1) in BookState:
	///
	/// * `remaining` > 0
	/// * `remaining_size` > 0
	/// * `first` <= `last`
	/// * Every page can be decoded into peek_* functions
	#[cfg(any(test, feature = "try-runtime", feature = "std"))]
	pub fn do_try_state() -> Result<(), sp_runtime::TryRuntimeError> {
		// Checking memory corruption for BookStateFor
		ensure!(
			BookStateFor::<T>::iter_keys().count() == BookStateFor::<T>::iter_values().count(),
			"Memory Corruption in BookStateFor"
		);
		// Checking memory corruption for Pages
		ensure!(
			Pages::<T>::iter_keys().count() == Pages::<T>::iter_values().count(),
			"Memory Corruption in Pages"
		);

		// Basic checks for each book
		for book in BookStateFor::<T>::iter_values() {
			ensure!(book.end >= book.begin, "Invariant");
			ensure!(book.end < 1 << 30, "Likely overflow or corruption");
			ensure!(book.message_count < 1 << 30, "Likely overflow or corruption");
			ensure!(book.size < 1 << 30, "Likely overflow or corruption");
			ensure!(book.count < 1 << 30, "Likely overflow or corruption");

			let fp: QueueFootprint = book.into();
			ensure!(fp.ready_pages <= fp.pages, "There cannot be more ready than total pages");
		let Some(starting_origin) = ServiceHead::<T>::get() else { return Ok(()) };
gupnik's avatar
gupnik committed
		while let Some(head) = Self::bump_service_head(&mut WeightMeter::new()) {
			ensure!(
				BookStateFor::<T>::contains_key(&head),
				"Service head must point to an existing book"
			);

			let head_book_state = BookStateFor::<T>::get(&head);
			ensure!(
				head_book_state.message_count > 0,
				"There must be some messages if in ReadyRing"
			);
			ensure!(head_book_state.size > 0, "There must be some message size if in ReadyRing");
			ensure!(
				head_book_state.end > head_book_state.begin,
				"End > Begin if unprocessed messages exists"
			);
			ensure!(
				head_book_state.ready_neighbours.is_some(),
				"There must be neighbours if in ReadyRing"
			);

			if head_book_state.ready_neighbours.as_ref().unwrap().next == head {
				ensure!(
					head_book_state.ready_neighbours.as_ref().unwrap().prev == head,
					"Can only happen if only queue in ReadyRing"
				);
			}

			for page_index in head_book_state.begin..head_book_state.end {
				let page = Pages::<T>::get(&head, page_index).unwrap();
				let remaining_messages = page.remaining;
				let mut counted_remaining_messages: u32 = 0;
				ensure!(
					remaining_messages > 0.into(),
					"These must be some messages that have not been processed yet!"
				);

				for i in 0..u32::MAX {
					if let Some((_, processed, _)) = page.peek_index(i as usize) {
						if !processed {
							counted_remaining_messages += 1;
						}
					} else {
						break
					}
				}

				ensure!(
					remaining_messages.into() == counted_remaining_messages,
					"Memory Corruption"
				);
			}

			if head_book_state.ready_neighbours.as_ref().unwrap().next == starting_origin {
				break
			}
		}
		Ok(())
	}

	/// Print the pages in each queue and the messages in each page.
	///
	/// Processed messages are prefixed with a `*` and the current `begin`ning page with a `>`.
	///
	/// # Example output
	///
	/// ```text
	/// queue Here:
	///   page 0: []
	/// > page 1: []
	///   page 2: ["\0weight=4", "\0c", ]
	///   page 3: ["\0bigbig 1", ]
	///   page 4: ["\0bigbig 2", ]
	///   page 5: ["\0bigbig 3", ]
	/// ```
	#[cfg(feature = "std")]
	pub fn debug_info() -> String {
		let mut info = String::new();
		for (origin, book_state) in BookStateFor::<T>::iter() {
			let mut queue = format!("queue {:?}:\n", &origin);
			let mut pages = Pages::<T>::iter_prefix(&origin).collect::<Vec<_>>();
			pages.sort_by(|(a, _), (b, _)| a.cmp(b));
			for (page_index, mut page) in pages.into_iter() {
				let page_info = if book_state.begin == page_index { ">" } else { " " };
				let mut page_info = format!(
					"{} page {} ({:?} first, {:?} last, {:?} remain): [ ",
					page_info, page_index, page.first, page.last, page.remaining
				);
				for i in 0..u32::MAX {
					if let Some((_, processed, message)) =
						page.peek_index(i.try_into().expect("std-only code"))
					{
						let msg = String::from_utf8_lossy(message);
						if processed {
							page_info.push('*');
						}
						page_info.push_str(&format!("{:?}, ", msg));
						page.skip_first(true);
					} else {
						break
					}
				}
				page_info.push_str("]\n");
				queue.push_str(&page_info);
			}
			info.push_str(&queue);
		}
		info
	}

	/// Process a single message.
	///
	/// The base weight of this function needs to be accounted for by the caller. `weight` is the
	/// remaining weight to process the message. `overweight_limit` is the maximum weight that a
	/// message can ever consume. Messages above this limit are marked as permanently overweight.
	fn process_message_payload(
		origin: MessageOriginOf<T>,
		page_index: PageIndex,
		message_index: T::Size,
		message: &[u8],
		meter: &mut WeightMeter,
		overweight_limit: Weight,
	) -> MessageExecutionStatus {
		let mut id = sp_io::hashing::blake2_256(message);
		use ProcessMessageError::*;
		let prev_consumed = meter.consumed();
		match T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id) {
			Err(Overweight(w)) if w.any_gt(overweight_limit) => {
				// Permanently overweight.
				Self::deposit_event(Event::<T>::OverweightEnqueued {
					origin,
					page_index,
					message_index,
				});
				MessageExecutionStatus::Overweight
			},
			Err(Overweight(_)) => {
				// Temporarily overweight - save progress and stop processing this
				// queue.
				MessageExecutionStatus::InsufficientWeight
			},
			Err(Yield) => {
				// Processing should be reattempted later.
				MessageExecutionStatus::Unprocessable { permanent: false }
			},
			Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => {
				// Permanent error - drop
				Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
				MessageExecutionStatus::Unprocessable { permanent: true }
			Err(error @ StackLimitReached) => {
				Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
				MessageExecutionStatus::StackLimitReached
			},
				// Success
				let weight_used = meter.consumed().saturating_sub(prev_consumed);
				Self::deposit_event(Event::<T>::Processed {
					id: id.into(),
					origin,
					weight_used,
					success,
				});
				MessageExecutionStatus::Processed
			},
		}
	}
}

/// Run a closure that errors on re-entrance. Meant to be used by anything that services queues.
pub(crate) fn with_service_mutex<F: FnOnce() -> R, R>(f: F) -> Result<R, ()> {
	// Holds the singleton token instance.
	environmental::environmental!(token: Option<()>);

	token::using_once(&mut Some(()), || {
		// The first `ok_or` should always be `Ok` since we are inside a `using_once`.
		let hold = token::with(|t| t.take()).ok_or(()).defensive()?.ok_or(())?;

		// Put the token back when we're done.
		defer! {
			token::with(|t| {
				*t = Some(hold);
			});
		}

		Ok(f())
	})
}

/// Provides a [`sp_core::Get`] to access the `MEL` of a [`codec::MaxEncodedLen`] type.
pub struct MaxEncodedLenOf<T>(sp_std::marker::PhantomData<T>);
impl<T: MaxEncodedLen> Get<u32> for MaxEncodedLenOf<T> {
	fn get() -> u32 {
		T::max_encoded_len() as u32
	}
}

/// Calculates the maximum message length and exposed it through the [`codec::MaxEncodedLen`] trait.
pub struct MaxMessageLen<Origin, Size, HeapSize>(
	sp_std::marker::PhantomData<(Origin, Size, HeapSize)>,
);
impl<Origin: MaxEncodedLen, Size: MaxEncodedLen + Into<u32>, HeapSize: Get<Size>> Get<u32>
	for MaxMessageLen<Origin, Size, HeapSize>
{
	fn get() -> u32 {
		(HeapSize::get().into()).saturating_sub(ItemHeader::<Size>::max_encoded_len() as u32)
	}
}

/// The maximal message length.
pub type MaxMessageLenOf<T> =
	MaxMessageLen<MessageOriginOf<T>, <T as Config>::Size, <T as Config>::HeapSize>;
/// The maximal encoded origin length.
pub type MaxOriginLenOf<T> = MaxEncodedLenOf<MessageOriginOf<T>>;
/// The `MessageOrigin` of this pallet.
pub type MessageOriginOf<T> = <<T as Config>::MessageProcessor as ProcessMessage>::Origin;
/// The maximal heap size of a page.
pub type HeapSizeU32Of<T> = IntoU32<<T as Config>::HeapSize, <T as Config>::Size>;
/// The [`Page`] of this pallet.
pub type PageOf<T> = Page<<T as Config>::Size, <T as Config>::HeapSize>;
/// The [`BookState`] of this pallet.
pub type BookStateOf<T> = BookState<MessageOriginOf<T>>;

/// Converts a [`sp_core::Get`] with returns a type that can be cast into an `u32` into a `Get`
/// which returns an `u32`.
pub struct IntoU32<T, O>(sp_std::marker::PhantomData<(T, O)>);
impl<T: Get<O>, O: Into<u32>> Get<u32> for IntoU32<T, O> {
	fn get() -> u32 {
		T::get().into()
	}
}

impl<T: Config> ServiceQueues for Pallet<T> {
	type OverweightMessageAddress = (MessageOriginOf<T>, PageIndex, T::Size);

	fn service_queues(weight_limit: Weight) -> Weight {
gupnik's avatar
gupnik committed
		let mut weight = WeightMeter::with_limit(weight_limit);
		// Get the maximum weight that processing a single message may take:
		let max_weight = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
			defensive!("Not enough weight to service a single message.");
			Weight::zero()
		});

		match with_service_mutex(|| {
			let mut next = match Self::bump_service_head(&mut weight) {
				Some(h) => h,
				None => return weight.consumed(),
			};
			// The last queue that did not make any progress.
			// The loop aborts as soon as it arrives at this queue again without making any progress
			// on other queues in between.
			let mut last_no_progress = None;

			loop {
				let (progressed, n) = Self::service_queue(next.clone(), &mut weight, max_weight);
				next = match n {
					Some(n) =>
						if !progressed {
							if last_no_progress == Some(n.clone()) {
								break
							}
							if last_no_progress.is_none() {
								last_no_progress = Some(next.clone())
							}
							n
						} else {
							last_no_progress = None;
							n
						},
					None => break,
				}
			weight.consumed()
		}) {
			Err(()) => weight.consumed(),
			Ok(w) => w,
		}
	}

	/// Execute a single overweight message.
	///
	/// The weight limit must be enough for `execute_overweight` and the message execution itself.
	fn execute_overweight(
		weight_limit: Weight,
		(message_origin, page, index): Self::OverweightMessageAddress,
	) -> Result<Weight, ExecuteOverweightError> {
gupnik's avatar
gupnik committed
		let mut weight = WeightMeter::with_limit(weight_limit);
		if weight
			.try_consume(
				T::WeightInfo::execute_overweight_page_removed()
					.max(T::WeightInfo::execute_overweight_page_updated()),
			)
			.is_err()
		{
			return Err(ExecuteOverweightError::InsufficientWeight)
		}

		Pallet::<T>::do_execute_overweight(message_origin, page, index, weight.remaining()).map_err(
			|e| match e {
				Error::<T>::InsufficientWeight => ExecuteOverweightError::InsufficientWeight,
				Error::<T>::AlreadyProcessed => ExecuteOverweightError::AlreadyProcessed,
				Error::<T>::QueuePaused => ExecuteOverweightError::QueuePaused,
				Error::<T>::NoPage | Error::<T>::NoMessage | Error::<T>::Queued =>
					ExecuteOverweightError::NotFound,
				Error::<T>::RecursiveDisallowed => ExecuteOverweightError::RecursiveDisallowed,
				_ => ExecuteOverweightError::Other,
			},
		)
	}
}

impl<T: Config> EnqueueMessage<MessageOriginOf<T>> for Pallet<T> {
	type MaxMessageLen =
		MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;

	fn enqueue_message(
		message: BoundedSlice<u8, Self::MaxMessageLen>,
		origin: <T::MessageProcessor as ProcessMessage>::Origin,
	) {
		Self::do_enqueue_message(&origin, message);
		let book_state = BookStateFor::<T>::get(&origin);
		T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
	}

	fn enqueue_messages<'a>(
		messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
		origin: <T::MessageProcessor as ProcessMessage>::Origin,
	) {
		for message in messages {
			Self::do_enqueue_message(&origin, message);
		}
		let book_state = BookStateFor::<T>::get(&origin);
		T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
	}

	fn sweep_queue(origin: MessageOriginOf<T>) {
		if !BookStateFor::<T>::contains_key(&origin) {
			return
		}
		let mut book_state = BookStateFor::<T>::get(&origin);
		book_state.begin = book_state.end;
		if let Some(neighbours) = book_state.ready_neighbours.take() {
			Self::ready_ring_unknit(&origin, neighbours);
		}
		BookStateFor::<T>::insert(&origin, &book_state);
	}

	fn footprint(origin: MessageOriginOf<T>) -> QueueFootprint {
		BookStateFor::<T>::get(&origin).into()