tests.rs 61.5 KiB
Newer Older
fn sweep_queue_wraps_works() {
	use MessageOrigin::*;
	build_and_execute::<Test>(|| {
		build_ring::<Test>(&[Here]);

		MessageQueue::sweep_queue(Here);
		let book = BookStateFor::<Test>::get(Here);
		assert!(book.ready_neighbours.is_none());
	});
}

#[test]
fn sweep_queue_invalid_noops() {
	use MessageOrigin::*;
	build_and_execute::<Test>(|| {
		assert_storage_noop!(MessageQueue::sweep_queue(Here));
	});
}

#[test]
fn footprint_works() {
	build_and_execute::<Test>(|| {
		let origin = MessageOrigin::Here;
		let (page, msgs) = full_page::<Test>();
		let book = book_for::<Test>(&page);
		BookStateFor::<Test>::insert(origin, book);

		let info = MessageQueue::footprint(origin);
		assert_eq!(info.storage.count as usize, msgs);
		assert_eq!(info.storage.size, page.remaining_size as u64);
		assert_eq!(info.pages, 1);

		// Sweeping a queue never calls OnQueueChanged.
		assert!(QueueChanges::take().is_empty());
	})
}

/// The footprint of an invalid queue is the default footprint.
#[test]
fn footprint_invalid_works() {
	build_and_execute::<Test>(|| {
		let origin = MessageOrigin::Here;
		assert_eq!(MessageQueue::footprint(origin), Default::default());
	})
}

/// The footprint of a swept queue is still correct.
#[test]
fn footprint_on_swept_works() {
	use MessageOrigin::*;
	build_and_execute::<Test>(|| {

		MessageQueue::sweep_queue(Here);
		let fp = MessageQueue::footprint(Here);
		assert_eq!((1, 1, 1), (fp.storage.count, fp.storage.size, fp.pages));
	})
}

/// The number of reported pages takes overweight pages into account.
#[test]
fn footprint_num_pages_works() {
	use MessageOrigin::*;
	build_and_execute::<Test>(|| {
		MessageQueue::enqueue_message(msg("weight=2"), Here);
		MessageQueue::enqueue_message(msg("weight=3"), Here);

		assert_eq!(MessageQueue::footprint(Here), fp(2, 2, 2, 16));

		// Mark the messages as overweight.
		assert_eq!(MessageQueue::service_queues(1.into_weight()), 0.into_weight());
		assert_eq!(System::events().len(), 2);
		// `ready_pages` decreases but `page` count does not.
		assert_eq!(MessageQueue::footprint(Here), fp(2, 0, 2, 16));

		// Now execute the second message.
		assert_eq!(
			<MessageQueue as ServiceQueues>::execute_overweight(3.into_weight(), (Here, 1, 0))
				.unwrap(),
			3.into_weight()
		);
		assert_eq!(MessageQueue::footprint(Here), fp(1, 0, 1, 8));
		// And the first one:
		assert_eq!(
			<MessageQueue as ServiceQueues>::execute_overweight(2.into_weight(), (Here, 0, 0))
				.unwrap(),
			2.into_weight()
		);
		assert_eq!(MessageQueue::footprint(Here), Default::default());
		assert_eq!(MessageQueue::footprint(Here), fp(0, 0, 0, 0));

		// `ready_pages` and normal `pages` increases again:
		MessageQueue::enqueue_message(msg("weight=3"), Here);
		assert_eq!(MessageQueue::footprint(Here), fp(1, 1, 1, 8));
	})
}

#[test]
fn execute_overweight_works() {
	build_and_execute::<Test>(|| {
		set_weight("bump_service_head", 1.into_weight());
		set_weight("service_queue_base", 1.into_weight());
		set_weight("service_page_base_completion", 1.into_weight());

		// Enqueue a message
		let origin = MessageOrigin::Here;
		MessageQueue::enqueue_message(msg("weight=6"), origin);
		// Load the current book
		let book = BookStateFor::<Test>::get(origin);
		assert_eq!(book.message_count, 1);
		assert!(Pages::<Test>::contains_key(origin, 0));

		// Mark the message as permanently overweight.
		assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight());
		assert_eq!(QueueChanges::take(), vec![(origin, 1, 8)]);
		assert_last_event::<Test>(
			Event::OverweightEnqueued {
				id: blake2_256(b"weight=6"),
				origin: MessageOrigin::Here,
				message_index: 0,
				page_index: 0,
			}
			.into(),
		);

		// Now try to execute it with too few weight.
		let consumed =
			<MessageQueue as ServiceQueues>::execute_overweight(5.into_weight(), (origin, 0, 0));
		assert_eq!(consumed, Err(ExecuteOverweightError::InsufficientWeight));

		// Execute it with enough weight.
		assert_eq!(Pages::<Test>::iter().count(), 1);
		assert!(QueueChanges::take().is_empty());
		let consumed =
			<MessageQueue as ServiceQueues>::execute_overweight(7.into_weight(), (origin, 0, 0))
				.unwrap();
		assert_eq!(consumed, 6.into_weight());
		assert_eq!(QueueChanges::take(), vec![(origin, 0, 0)]);
		// There is no message left in the book.
		let book = BookStateFor::<Test>::get(origin);
		assert_eq!(book.message_count, 0);
		// And no more pages.
		assert_eq!(Pages::<Test>::iter().count(), 0);

		// Doing it again with enough weight will error.
		let consumed =
			<MessageQueue as ServiceQueues>::execute_overweight(70.into_weight(), (origin, 0, 0));
		assert_eq!(consumed, Err(ExecuteOverweightError::NotFound));
		assert!(QueueChanges::take().is_empty());
		assert!(!Pages::<Test>::contains_key(origin, 0), "Page is gone");
		// The book should have been unknit from the ready ring.
		assert!(!ServiceHead::<Test>::exists(), "No ready book");
	});
}

#[test]
fn permanently_overweight_book_unknits() {
	use MessageOrigin::*;

	build_and_execute::<Test>(|| {
		set_weight("bump_service_head", 1.into_weight());
		set_weight("service_queue_base", 1.into_weight());
		set_weight("service_page_base_completion", 1.into_weight());

		MessageQueue::enqueue_messages([msg("weight=9")].into_iter(), Here);

		// It is the only ready book.
		assert_ring(&[Here]);
		// Mark the message as overweight.
		assert_eq!(MessageQueue::service_queues(8.into_weight()), 4.into_weight());
		assert_last_event::<Test>(
			Event::OverweightEnqueued {
				id: blake2_256(b"weight=9"),
				origin: Here,
				message_index: 0,
				page_index: 0,
			}
			.into(),
		);
		// The book is not ready anymore.
		assert_ring(&[]);
		assert_eq!(MessagesProcessed::take().len(), 0);
		assert_eq!(BookStateFor::<Test>::get(Here).message_count, 1);
		assert_eq!(MessageQueue::footprint(Here).pages, 1);
		// Now if we enqueue another message, it will become ready again.
		MessageQueue::enqueue_messages([msg("weight=1")].into_iter(), Here);
		assert_ring(&[Here]);
		assert_eq!(MessageQueue::service_queues(8.into_weight()), 5.into_weight());
		assert_eq!(MessagesProcessed::take().len(), 1);
		assert_ring(&[]);
	});
}

#[test]
fn permanently_overweight_book_unknits_multiple() {
	use MessageOrigin::*;

	build_and_execute::<Test>(|| {
		set_weight("bump_service_head", 1.into_weight());
		set_weight("service_queue_base", 1.into_weight());
		set_weight("service_page_base_completion", 1.into_weight());

		MessageQueue::enqueue_messages(
			[msg("weight=1"), msg("weight=9"), msg("weight=9")].into_iter(),
			Here,
		);

		assert_ring(&[Here]);
		// Process the first message.
		assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight());
		assert_eq!(num_overweight_enqueued_events(), 0);
		assert_eq!(MessagesProcessed::take().len(), 1);

		// Book is still ready since it was not marked as overweight yet.
		assert_ring(&[Here]);
		assert_eq!(MessageQueue::service_queues(8.into_weight()), 5.into_weight());
		assert_eq!(num_overweight_enqueued_events(), 2);
		assert_eq!(MessagesProcessed::take().len(), 0);
		// Now it is overweight.
		assert_ring(&[]);
		// Enqueue another message.
		MessageQueue::enqueue_messages([msg("weight=1")].into_iter(), Here);
		assert_ring(&[Here]);
		assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight());
		assert_eq!(MessagesProcessed::take().len(), 1);
		assert_ring(&[]);
	});
}

#[test]
fn permanently_overweight_limit_is_valid_basic() {
	use MessageOrigin::*;

	for w in 50..300 {
		build_and_execute::<Test>(|| {
			DefaultWeightForCall::set(Weight::MAX);

			set_weight("bump_service_head", 10.into());
			set_weight("service_queue_base", 10.into());
			set_weight("service_page_base_no_completion", 10.into());
			set_weight("service_page_base_completion", 0.into());

			set_weight("service_page_item", 10.into());
			set_weight("ready_ring_unknit", 10.into());

			let m = "weight=200".to_string();

			MessageQueue::enqueue_message(msg(&m), Here);
			MessageQueue::service_queues(w.into());

			let last_event =
				frame_system::Pallet::<Test>::events().into_iter().last().expect("No event");

			// The weight overhead for a single message is set to 50. The message itself needs 200.
			// Every weight in range `[50, 249]` should result in a permanently overweight message:
			if w < 250 {
				assert_eq!(
					last_event.event,
					RuntimeEvent::MessageQueue(Event::OverweightEnqueued {
						id: blake2_256(m.as_bytes()),
						origin: Here,
						message_index: 0,
						page_index: 0,
					})
				);
			} else {
				// Otherwise it is processed as normal:
				assert_eq!(
					last_event.event,
					RuntimeEvent::MessageQueue(Event::Processed {
						origin: Here,
						weight_used: 200.into(),
						id: blake2_256(m.as_bytes()).into(),
						success: true,
					})
				);
			}
		});
	}
}

#[test]
fn permanently_overweight_limit_is_valid_fuzzy() {
	use MessageOrigin::*;
	let mut rng = rand::rngs::StdRng::seed_from_u64(42);

	for _ in 0..10 {
		// Brainlet code, but works...
		let (s1, s2) = (rng.gen_range(0..=10), rng.gen_range(0..=10));
		let (s3, s4) = (rng.gen_range(0..=10), rng.gen_range(0..=10));
		let s5 = rng.gen_range(0..=10);
		let o = s1 + s2 + s3 + s4 + s5;

		for w in o..=o + 300 {
			build_and_execute::<Test>(|| {
				DefaultWeightForCall::set(Weight::MAX);

				set_weight("bump_service_head", s1.into());
				set_weight("service_queue_base", s2.into());
				// Only the larger one of these two is taken:
				set_weight("service_page_base_no_completion", s3.into());
				set_weight("service_page_base_completion", 0.into());
				set_weight("service_page_item", s4.into());
				set_weight("ready_ring_unknit", s5.into());

				let m = "weight=200".to_string();

				MessageQueue::enqueue_message(msg(&m), Here);
				MessageQueue::service_queues(w.into());

				let last_event =
					frame_system::Pallet::<Test>::events().into_iter().last().expect("No event");

				if w < o + 200 {
					assert_eq!(
						last_event.event,
						RuntimeEvent::MessageQueue(Event::OverweightEnqueued {
							id: blake2_256(m.as_bytes()),
							origin: Here,
							message_index: 0,
							page_index: 0,
						})
					);
				} else {
					assert_eq!(
						last_event.event,
						RuntimeEvent::MessageQueue(Event::Processed {
							origin: Here,
							weight_used: 200.into(),
							id: blake2_256(m.as_bytes()).into(),
/// We don't want empty books in the ready ring, but if they somehow make their way in there, it
/// should not panic.
#[test]
#[cfg(not(debug_assertions))] // Would trigger a defensive failure otherwise.
fn ready_but_empty_does_not_panic() {
	use MessageOrigin::*;

	build_and_execute::<Test>(|| {
		BookStateFor::<Test>::insert(Here, empty_book::<Test>());
		BookStateFor::<Test>::insert(There, empty_book::<Test>());

		knit(&Here);
		knit(&There);
		assert_ring(&[Here, There]);

		assert_eq!(MessageQueue::service_queues(Weight::MAX), 0.into_weight());
		assert_ring(&[]);
	});
}

/// We don't want permanently books in the ready ring, but if they somehow make their way in there,
/// it should not panic.
#[test]
#[cfg(not(debug_assertions))] // Would trigger a defensive failure otherwise.
fn ready_but_perm_overweight_does_not_panic() {
	use MessageOrigin::*;

	build_and_execute::<Test>(|| {
		MessageQueue::enqueue_message(msg("weight=9"), Here);
		assert_eq!(MessageQueue::service_queues(8.into_weight()), 0.into_weight());
		assert_ring(&[]);
		// Force it back into the ready ring.
		knit(&Here);
		assert_ring(&[Here]);
		assert_eq!(MessageQueue::service_queues(Weight::MAX), 0.into_weight());
		// Unready again.
		assert_ring(&[]);
	});
}

/// Checks that (un)knitting the ready ring works with just one queue.
///
/// This case is interesting since it wraps and a lot of `mutate` now operate on the same object.
#[test]
fn ready_ring_knit_basic_works() {
	use MessageOrigin::*;

	build_and_execute::<Test>(|| {
		BookStateFor::<Test>::insert(Here, empty_book::<Test>());

		for i in 0..10 {
			if i % 2 == 0 {
				knit(&Here);
				assert_ring(&[Here]);
			} else {
				unknit(&Here);
				assert_ring(&[]);
			}
		}
		assert_ring(&[]);
	});
}

#[test]
fn ready_ring_knit_and_unknit_works() {
	use MessageOrigin::*;

	build_and_execute::<Test>(|| {
		// Place three queues into the storage.
		BookStateFor::<Test>::insert(Here, empty_book::<Test>());
		BookStateFor::<Test>::insert(There, empty_book::<Test>());
		BookStateFor::<Test>::insert(Everywhere(0), empty_book::<Test>());

		// Pausing should make no difference:
		PausedQueues::set(vec![Here, There, Everywhere(0)]);

		// Knit them into the ready ring.
		assert_ring(&[]);
		knit(&Here);
		assert_ring(&[Here]);
		knit(&There);
		assert_ring(&[Here, There]);
		knit(&Everywhere(0));
		assert_ring(&[Here, There, Everywhere(0)]);

		// Now unknit…
		unknit(&Here);
		assert_ring(&[There, Everywhere(0)]);
		unknit(&There);
		assert_ring(&[Everywhere(0)]);
		unknit(&Everywhere(0));
		assert_ring(&[]);
	});
}

#[test]
fn enqueue_message_works() {
	use MessageOrigin::*;
	let max_msg_per_page = <Test as Config>::HeapSize::get() as u64 /
		(ItemHeader::<<Test as Config>::Size>::max_encoded_len() as u64 + 1);

	build_and_execute::<Test>(|| {
		// Enqueue messages which should fill three pages.
		let n = max_msg_per_page * 3;
		for i in 1..=n {
			MessageQueue::enqueue_message(msg("a"), Here);
			assert_eq!(QueueChanges::take(), vec![(Here, i, i)], "OnQueueChanged not called");
		}
		assert_eq!(Pages::<Test>::iter().count(), 3);

		// Enqueue one more onto page 4.
		MessageQueue::enqueue_message(msg("abc"), Here);
		assert_eq!(QueueChanges::take(), vec![(Here, n + 1, n + 3)]);
		assert_eq!(Pages::<Test>::iter().count(), 4);

		// Check the state.
		assert_eq!(BookStateFor::<Test>::iter().count(), 1);
		let book = BookStateFor::<Test>::get(Here);
		assert_eq!(book.message_count, n + 1);
		assert_eq!(book.size, n + 3);
		assert_eq!((book.begin, book.end), (0, 4));
		assert_eq!(book.count as usize, Pages::<Test>::iter().count());
	});
}

#[test]
fn enqueue_messages_works() {
	use MessageOrigin::*;
	let max_msg_per_page = <Test as Config>::HeapSize::get() as u64 /
		(ItemHeader::<<Test as Config>::Size>::max_encoded_len() as u64 + 1);

	build_and_execute::<Test>(|| {
		// Enqueue messages which should fill three pages.
		let n = max_msg_per_page * 3;
		let msgs = vec![msg("a"); n as usize];

		// Now queue all messages at once.
		MessageQueue::enqueue_messages(msgs.into_iter(), Here);
		// The changed handler should only be called once.
		assert_eq!(QueueChanges::take(), vec![(Here, n, n)], "OnQueueChanged not called");
		assert_eq!(Pages::<Test>::iter().count(), 3);

		// Enqueue one more onto page 4.
		MessageQueue::enqueue_message(msg("abc"), Here);
		assert_eq!(QueueChanges::take(), vec![(Here, n + 1, n + 3)]);
		assert_eq!(Pages::<Test>::iter().count(), 4);

		// Check the state.
		assert_eq!(BookStateFor::<Test>::iter().count(), 1);
		let book = BookStateFor::<Test>::get(Here);
		assert_eq!(book.message_count, n + 1);
		assert_eq!(book.size, n + 3);
		assert_eq!((book.begin, book.end), (0, 4));
		assert_eq!(book.count as usize, Pages::<Test>::iter().count());
	});
}

#[test]
fn service_queues_suspend_works() {
	use MessageOrigin::*;
	build_and_execute::<Test>(|| {
		MessageQueue::enqueue_messages(vec![msg("a"), msg("ab"), msg("abc")].into_iter(), Here);
		MessageQueue::enqueue_messages(vec![msg("x"), msg("xy"), msg("xyz")].into_iter(), There);
		assert_eq!(QueueChanges::take(), vec![(Here, 3, 6), (There, 3, 6)]);

		// Pause `Here` - execution starts `There`.
		PausedQueues::set(vec![Here]);
		assert_eq!(
			(true, false),
			(
				<Test as Config>::QueuePausedQuery::is_paused(&Here),
				<Test as Config>::QueuePausedQuery::is_paused(&There)
			)
		);
		assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
		assert_eq!(MessagesProcessed::take(), vec![(vmsg("x"), There)]);
		assert_eq!(QueueChanges::take(), vec![(There, 2, 5)]);

		// Unpause `Here` - execution continues `There`.
		PausedQueues::take();
		assert_eq!(
			(false, false),
			(
				<Test as Config>::QueuePausedQuery::is_paused(&Here),
				<Test as Config>::QueuePausedQuery::is_paused(&There)
			)
		);
		assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
		assert_eq!(MessagesProcessed::take(), vec![(vmsg("xy"), There)]);
		assert_eq!(QueueChanges::take(), vec![(There, 1, 3)]);

		// Now it swaps to `Here`.
		assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
		assert_eq!(MessagesProcessed::take(), vec![(vmsg("a"), Here)]);
		assert_eq!(QueueChanges::take(), vec![(Here, 2, 5)]);

		// Pause `There` - execution continues `Here`.
		PausedQueues::set(vec![There]);
		assert_eq!(
			(false, true),
			(
				<Test as Config>::QueuePausedQuery::is_paused(&Here),
				<Test as Config>::QueuePausedQuery::is_paused(&There)
			)
		);
		assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
		assert_eq!(MessagesProcessed::take(), vec![(vmsg("ab"), Here)]);
		assert_eq!(QueueChanges::take(), vec![(Here, 1, 3)]);

		// Unpause `There` and service all remaining messages.
		PausedQueues::take();
		assert_eq!(
			(false, false),
			(
				<Test as Config>::QueuePausedQuery::is_paused(&Here),
				<Test as Config>::QueuePausedQuery::is_paused(&There)
			)
		);
		assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight());
		assert_eq!(MessagesProcessed::take(), vec![(vmsg("abc"), Here), (vmsg("xyz"), There)]);
		assert_eq!(QueueChanges::take(), vec![(Here, 0, 0), (There, 0, 0)]);
	});
}

/// Tests that manual overweight execution on a suspended queue errors with `QueueSuspended`.
#[test]
fn execute_overweight_respects_suspension() {
	build_and_execute::<Test>(|| {
		let origin = MessageOrigin::Here;
		MessageQueue::enqueue_message(msg("weight=5"), origin);
		// Mark the message as permanently overweight.
		MessageQueue::service_queues(4.into_weight());
		assert_last_event::<Test>(
			Event::OverweightEnqueued {
				id: blake2_256(b"weight=5"),
				origin,
				message_index: 0,
				page_index: 0,
			}
			.into(),
		);
		PausedQueues::set(vec![origin]);
		assert!(<Test as Config>::QueuePausedQuery::is_paused(&origin));

		// Execution should fail.
		assert_eq!(
			<MessageQueue as ServiceQueues>::execute_overweight(Weight::MAX, (origin, 0, 0)),
			Err(ExecuteOverweightError::QueuePaused)
		);

		PausedQueues::take();
		assert!(!<Test as Config>::QueuePausedQuery::is_paused(&origin));

		// Execution should work again with same args.
		assert_ok!(<MessageQueue as ServiceQueues>::execute_overweight(
			Weight::MAX,
			(origin, 0, 0)
		));

		assert_last_event::<Test>(
			Event::Processed {
				id: blake2_256(b"weight=5").into(),
				origin,
				weight_used: 5.into_weight(),
				success: true,
			}
			.into(),
		);
	});
}

#[test]
fn service_queue_suspension_ready_ring_works() {
	build_and_execute::<Test>(|| {
		let origin = MessageOrigin::Here;
		PausedQueues::set(vec![origin]);
		MessageQueue::enqueue_message(msg("weight=5"), origin);

		MessageQueue::service_queues(Weight::MAX);
		// It did not execute but is in the ready ring.
		assert!(System::events().is_empty(), "Paused");
		assert_ring(&[origin]);

		// Now when we un-pause, it will execute.
		PausedQueues::take();
		MessageQueue::service_queues(Weight::MAX);
		assert_last_event::<Test>(
			Event::Processed {
				id: blake2_256(b"weight=5").into(),
				origin,
				weight_used: 5.into_weight(),
				success: true,
			}
			.into(),
		);
	});
}

#[test]
fn integrity_test_checks_service_weight() {
	build_and_execute::<Test>(|| {
		assert_eq!(<Test as Config>::ServiceWeight::get(), Some(100.into()), "precond");
		assert!(MessageQueue::do_integrity_test().is_ok(), "precond");

		// Enough for all:
		DefaultWeightForCall::set(20.into());
		assert!(MessageQueue::do_integrity_test().is_ok());

		// Not enough for anything:
		DefaultWeightForCall::set(101.into());
		assert_eq!(MessageQueue::single_msg_overhead(), 505.into());
		assert!(MessageQueue::do_integrity_test().is_err());

		// Not enough for a single function:
		for f in [
			"bump_service_head",
			"service_queue_base",
			"service_page_base_completion",
			"service_page_base_no_completion",
			"service_page_item",
			"ready_ring_unknit",
		] {
			WeightForCall::take();
			DefaultWeightForCall::set(Zero::zero());

			assert!(MessageQueue::do_integrity_test().is_ok());
			set_weight(f, 101.into());
			assert!(MessageQueue::do_integrity_test().is_err());
		}
	});
}

/// Test for <https://github.com/paritytech/polkadot-sdk/issues/2319>.
#[test]
fn regression_issue_2319() {
	build_and_execute::<Test>(|| {
		Callback::set(Box::new(|_, _| {
			MessageQueue::enqueue_message(mock_helpers::msg("anothermessage"), There);
		}));

		use MessageOrigin::*;
		MessageQueue::enqueue_message(msg("callback=0"), Here);

		// while servicing queue Here, "anothermessage" of origin There is enqueued in
		// "firstmessage"'s process_message
		assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
		assert_eq!(MessagesProcessed::take(), vec![(b"callback=0".to_vec(), Here)]);

		assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
		// It used to fail here but got fixed.
		assert_eq!(MessagesProcessed::take(), vec![(b"anothermessage".to_vec(), There)]);
	});
}

/// Enqueueing a message from within `service_queues` works.
#[test]
fn recursive_enqueue_works() {
	build_and_execute::<Test>(|| {
		Callback::set(Box::new(|o, i| match i {
			0 => {
				MessageQueue::enqueue_message(msg(&format!("callback={}", 1)), *o);
			},
			1 => {
				for _ in 0..100 {
					MessageQueue::enqueue_message(msg(&format!("callback={}", 2)), *o);
				}
				for i in 0..100 {
					MessageQueue::enqueue_message(msg(&format!("callback={}", 3)), i.into());
				}
			},
			2 | 3 => {
				MessageQueue::enqueue_message(msg(&format!("callback={}", 4)), *o);
			},
			4 => (),
			_ => unreachable!(),
		}));

		MessageQueue::enqueue_message(msg("callback=0"), MessageOrigin::Here);

		for _ in 0..402 {
			assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
		}
		assert_eq!(MessageQueue::service_queues(Weight::MAX), Weight::zero());

		assert_eq!(MessagesProcessed::take().len(), 402);
	});
}

/// Calling `service_queues` from within `service_queues` is forbidden.
#[test]
fn recursive_service_is_forbidden() {
	use MessageOrigin::*;
	build_and_execute::<Test>(|| {
		Callback::set(Box::new(|_, _| {
			MessageQueue::enqueue_message(msg("m1"), There);
			// This call will fail since it is recursive. But it will not mess up the state.
			assert_storage_noop!(MessageQueue::service_queues(10.into_weight()));
			MessageQueue::enqueue_message(msg("m2"), There);
		}));

		for _ in 0..5 {
			MessageQueue::enqueue_message(msg("callback=0"), Here);
			MessageQueue::service_queues(3.into_weight());

			// All three messages are correctly processed.
			assert_eq!(
				MessagesProcessed::take(),
				vec![
					(b"callback=0".to_vec(), Here),
					(b"m1".to_vec(), There),
					(b"m2".to_vec(), There)
				]
			);
		}
	});
}

/// Calling `service_queues` from within `service_queues` is forbidden.
#[test]
fn recursive_overweight_while_service_is_forbidden() {
	use MessageOrigin::*;
	build_and_execute::<Test>(|| {
		Callback::set(Box::new(|_, _| {
			// Check that the message was permanently overweight.
			assert_last_event::<Test>(
				Event::OverweightEnqueued {
					id: blake2_256(b"weight=10"),
					origin: There,
					message_index: 0,
					page_index: 0,
				}
				.into(),
			);
			// This call will fail since it is recursive. But it will not mess up the state.
			assert_noop!(
				<MessageQueue as ServiceQueues>::execute_overweight(
					10.into_weight(),
					(There, 0, 0)
				),
				ExecuteOverweightError::RecursiveDisallowed
			);
		}));

		MessageQueue::enqueue_message(msg("weight=10"), There);
		MessageQueue::enqueue_message(msg("callback=0"), Here);

		// Mark it as permanently overweight.
		MessageQueue::service_queues(5.into_weight());
		assert_ok!(<MessageQueue as ServiceQueues>::execute_overweight(
			10.into_weight(),
			(There, 0, 0)
		));
	});
}

/// Calling `reap_page` from within `service_queues` is forbidden.
#[test]
fn recursive_reap_page_is_forbidden() {
	use MessageOrigin::*;
	build_and_execute::<Test>(|| {
		Callback::set(Box::new(|_, _| {
			// This call will fail since it is recursive. But it will not mess up the state.
			assert_noop!(MessageQueue::do_reap_page(&Here, 0), Error::<Test>::RecursiveDisallowed);
		}));

		// Create 10 pages more than the stale limit.
		let n = (MaxStale::get() + 10) as usize;
		for _ in 0..n {
			MessageQueue::enqueue_message(msg("weight=2"), Here);
		}

		// Mark all pages as stale since their message is permanently overweight.
		MessageQueue::service_queues(1.into_weight());
		assert_ok!(MessageQueue::do_reap_page(&Here, 0));

		assert_last_event::<Test>(Event::PageReaped { origin: Here, index: 0 }.into());
	});
}

#[test]
fn with_service_mutex_works() {
	let mut called = 0;
	with_service_mutex(|| called = 1).unwrap();
	assert_eq!(called, 1);

	// The outer one is fine but the inner one errors.
	with_service_mutex(|| with_service_mutex(|| unreachable!()))
		.unwrap()
		.unwrap_err();
	with_service_mutex(|| with_service_mutex(|| unreachable!()).unwrap_err()).unwrap();
	with_service_mutex(|| {
		with_service_mutex(|| unreachable!()).unwrap_err();
		with_service_mutex(|| unreachable!()).unwrap_err();
		called = 2;
	})
	.unwrap();
	assert_eq!(called, 2);

	// Still works.
	with_service_mutex(|| called = 3).unwrap();
	assert_eq!(called, 3);
}

#[test]
fn process_enqueued_on_idle() {
	use MessageOrigin::*;
	build_and_execute::<Test>(|| {
		// Some messages enqueued on previous block.
		MessageQueue::enqueue_messages(vec![msg("a"), msg("ab"), msg("abc")].into_iter(), Here);
		assert_eq!(BookStateFor::<Test>::iter().count(), 1);

		// Process enqueued messages from previous block.
		Pallet::<Test>::on_initialize(1);
		assert_eq!(
			MessagesProcessed::take(),
			vec![(b"a".to_vec(), Here), (b"ab".to_vec(), Here), (b"abc".to_vec(), Here),]
		);

		MessageQueue::enqueue_messages(vec![msg("x"), msg("xy"), msg("xyz")].into_iter(), There);
		assert_eq!(BookStateFor::<Test>::iter().count(), 2);

		// Enough weight to process on idle.
		Pallet::<Test>::on_idle(1, Weight::from_parts(100, 100));
		assert_eq!(
			MessagesProcessed::take(),
			vec![(b"x".to_vec(), There), (b"xy".to_vec(), There), (b"xyz".to_vec(), There)]
		);
	})
}

#[test]
fn process_enqueued_on_idle_requires_enough_weight() {
	use MessageOrigin::*;
	build_and_execute::<Test>(|| {
		Pallet::<Test>::on_initialize(1);

		MessageQueue::enqueue_messages(vec![msg("x"), msg("xy"), msg("xyz")].into_iter(), There);
		assert_eq!(BookStateFor::<Test>::iter().count(), 1);

		// Not enough weight to process on idle.
		Pallet::<Test>::on_idle(1, Weight::from_parts(0, 0));
		assert_eq!(MessagesProcessed::take(), vec![]);
	})
}

/// A message that reports `StackLimitReached` will not be put into the overweight queue when
/// executed from the top level.
#[test]
fn process_discards_stack_ov_message() {
	use MessageOrigin::*;
	build_and_execute::<Test>(|| {
		MessageQueue::enqueue_message(msg("stacklimitreached"), Here);

		MessageQueue::service_queues(10.into_weight());

		assert_last_event::<Test>(
			Event::ProcessingFailed {
				id: blake2_256(b"stacklimitreached").into(),
				origin: MessageOrigin::Here,
				error: ProcessMessageError::StackLimitReached,
			}
			.into(),
		);

		assert!(MessagesProcessed::take().is_empty());
		// Message is gone and not overweight:
		assert_pages(&[]);
	});
}

/// A message that reports `StackLimitReached` will stay in the overweight queue when it is executed
/// by `execute_overweight`.
#[test]
fn execute_overweight_keeps_stack_ov_message() {
	use MessageOrigin::*;
	build_and_execute::<Test>(|| {
		// We need to create a mocked message that first reports insufficient weight, and then
		// `StackLimitReached`:
		IgnoreStackOvError::set(true);
		MessageQueue::enqueue_message(msg("stacklimitreached"), Here);
		MessageQueue::service_queues(0.into_weight());

		assert_last_event::<Test>(
			Event::OverweightEnqueued {
				id: blake2_256(b"stacklimitreached"),
				origin: MessageOrigin::Here,
				message_index: 0,
				page_index: 0,
			}
			.into(),
		);
		// Does not count as 'processed':
		assert!(MessagesProcessed::take().is_empty());
		assert_pages(&[0]);

		// Now let it return `StackLimitReached`. Note that this case would normally not happen,
		// since we assume that the top-level execution is the one with the most remaining stack
		// depth.
		IgnoreStackOvError::set(false);
		// Ensure that trying to execute the message does not change any state (besides events).
		System::reset_events();
		let storage_noop = StorageNoopGuard::new();
		assert_eq!(
			<MessageQueue as ServiceQueues>::execute_overweight(3.into_weight(), (Here, 0, 0)),
			Err(ExecuteOverweightError::Other)
		);
		assert_last_event::<Test>(
			Event::ProcessingFailed {
				id: blake2_256(b"stacklimitreached").into(),
				origin: MessageOrigin::Here,
				error: ProcessMessageError::StackLimitReached,
			}
			.into(),
		);
		System::reset_events();
		drop(storage_noop);

		// Now let's process it normally:
		IgnoreStackOvError::set(true);
		assert_eq!(
			<MessageQueue as ServiceQueues>::execute_overweight(1.into_weight(), (Here, 0, 0))
				.unwrap(),
			1.into_weight()
		);

		assert_last_event::<Test>(
			Event::Processed {
				id: blake2_256(b"stacklimitreached").into(),
				origin: MessageOrigin::Here,
				weight_used: 1.into_weight(),
				success: true,
			}
			.into(),
		);
		assert_pages(&[]);
		System::reset_events();
	});
}