tests.rs 45.7 KiB
Newer Older
		assert!(QueueChanges::take().is_empty());
	})
}

/// The footprint of an invalid queue is the default footprint.
#[test]
fn footprint_invalid_works() {
		let origin = MessageOrigin::Here;
		assert_eq!(MessageQueue::footprint(origin), Default::default());
	})
}

/// The footprint of a swept queue is still correct.
#[test]
fn footprint_on_swept_works() {
	use MessageOrigin::*;
		let mut book = empty_book::<Test>();
		book.message_count = 3;
		book.size = 10;
		BookStateFor::<Test>::insert(Here, &book);
		knit(&Here);

		MessageQueue::sweep_queue(Here);
		let fp = MessageQueue::footprint(Here);
		assert_eq!(fp.count, 3);
		assert_eq!(fp.size, 10);
	})
}

#[test]
fn execute_overweight_works() {
		set_weight("bump_service_head", 1.into_weight());
		set_weight("service_queue_base", 1.into_weight());
		set_weight("service_page_base_completion", 1.into_weight());

		// Enqueue a message
		let origin = MessageOrigin::Here;
		MessageQueue::enqueue_message(msg("weight=6"), origin);
		// Load the current book
		let book = BookStateFor::<Test>::get(origin);
		assert_eq!(book.message_count, 1);
		assert!(Pages::<Test>::contains_key(origin, 0));

		// Mark the message as permanently overweight.
		assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight());
		assert_eq!(QueueChanges::take(), vec![(origin, 1, 8)]);
		assert_last_event::<Test>(
			Event::OverweightEnqueued {
				id: blake2_256(b"weight=6"),
				origin: MessageOrigin::Here,
				message_index: 0,
				page_index: 0,
			}
			.into(),
		);

		// Now try to execute it with too few weight.
		let consumed =
			<MessageQueue as ServiceQueues>::execute_overweight(5.into_weight(), (origin, 0, 0));
		assert_eq!(consumed, Err(ExecuteOverweightError::InsufficientWeight));

		// Execute it with enough weight.
		assert_eq!(Pages::<Test>::iter().count(), 1);
		assert!(QueueChanges::take().is_empty());
		let consumed =
			<MessageQueue as ServiceQueues>::execute_overweight(7.into_weight(), (origin, 0, 0))
				.unwrap();
		assert_eq!(consumed, 6.into_weight());
		assert_eq!(QueueChanges::take(), vec![(origin, 0, 0)]);
		// There is no message left in the book.
		let book = BookStateFor::<Test>::get(origin);
		assert_eq!(book.message_count, 0);
		// And no more pages.
		assert_eq!(Pages::<Test>::iter().count(), 0);

		// Doing it again with enough weight will error.
		let consumed =
			<MessageQueue as ServiceQueues>::execute_overweight(70.into_weight(), (origin, 0, 0));
		assert_eq!(consumed, Err(ExecuteOverweightError::NotFound));
		assert!(QueueChanges::take().is_empty());
		assert!(!Pages::<Test>::contains_key(origin, 0), "Page is gone");
		// The book should have been unknit from the ready ring.
		assert!(!ServiceHead::<Test>::exists(), "No ready book");
	});
}

#[test]
fn permanently_overweight_book_unknits() {
	use MessageOrigin::*;

		set_weight("bump_service_head", 1.into_weight());
		set_weight("service_queue_base", 1.into_weight());
		set_weight("service_page_base_completion", 1.into_weight());

		MessageQueue::enqueue_messages([msg("weight=9")].into_iter(), Here);

		// It is the only ready book.
		assert_ring(&[Here]);
		// Mark the message as overweight.
		assert_eq!(MessageQueue::service_queues(8.into_weight()), 4.into_weight());
		assert_last_event::<Test>(
			Event::OverweightEnqueued {
				id: blake2_256(b"weight=9"),
				origin: Here,
				message_index: 0,
				page_index: 0,
			}
			.into(),
		);
		// The book is not ready anymore.
		assert_ring(&[]);
		assert_eq!(MessagesProcessed::take().len(), 0);
		assert_eq!(BookStateFor::<Test>::get(Here).message_count, 1);
		// Now if we enqueue another message, it will become ready again.
		MessageQueue::enqueue_messages([msg("weight=1")].into_iter(), Here);
		assert_ring(&[Here]);
		assert_eq!(MessageQueue::service_queues(8.into_weight()), 5.into_weight());
		assert_eq!(MessagesProcessed::take().len(), 1);
		assert_ring(&[]);
	});
}

#[test]
fn permanently_overweight_book_unknits_multiple() {
	use MessageOrigin::*;

		set_weight("bump_service_head", 1.into_weight());
		set_weight("service_queue_base", 1.into_weight());
		set_weight("service_page_base_completion", 1.into_weight());

		MessageQueue::enqueue_messages(
			[msg("weight=1"), msg("weight=9"), msg("weight=9")].into_iter(),
			Here,
		);

		assert_ring(&[Here]);
		// Process the first message.
		assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight());
		assert_eq!(num_overweight_enqueued_events(), 0);
		assert_eq!(MessagesProcessed::take().len(), 1);

		// Book is still ready since it was not marked as overweight yet.
		assert_ring(&[Here]);
		assert_eq!(MessageQueue::service_queues(8.into_weight()), 5.into_weight());
		assert_eq!(num_overweight_enqueued_events(), 2);
		assert_eq!(MessagesProcessed::take().len(), 0);
		// Now it is overweight.
		assert_ring(&[]);
		// Enqueue another message.
		MessageQueue::enqueue_messages([msg("weight=1")].into_iter(), Here);
		assert_ring(&[Here]);
		assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight());
		assert_eq!(MessagesProcessed::take().len(), 1);
		assert_ring(&[]);
	});
}

/// We don't want empty books in the ready ring, but if they somehow make their way in there, it
/// should not panic.
#[test]
#[cfg(not(debug_assertions))] // Would trigger a defensive failure otherwise.
fn ready_but_empty_does_not_panic() {
	use MessageOrigin::*;

		BookStateFor::<Test>::insert(Here, empty_book::<Test>());
		BookStateFor::<Test>::insert(There, empty_book::<Test>());

		knit(&Here);
		knit(&There);
		assert_ring(&[Here, There]);

		assert_eq!(MessageQueue::service_queues(Weight::MAX), 0.into_weight());
		assert_ring(&[]);
	});
}

/// We don't want permanently books in the ready ring, but if they somehow make their way in there,
/// it should not panic.
#[test]
#[cfg(not(debug_assertions))] // Would trigger a defensive failure otherwise.
fn ready_but_perm_overweight_does_not_panic() {
	use MessageOrigin::*;

		MessageQueue::enqueue_message(msg("weight=9"), Here);
		assert_eq!(MessageQueue::service_queues(8.into_weight()), 0.into_weight());
		assert_ring(&[]);
		// Force it back into the ready ring.
		knit(&Here);
		assert_ring(&[Here]);
		assert_eq!(MessageQueue::service_queues(Weight::MAX), 0.into_weight());
		// Unready again.
		assert_ring(&[]);
	});
}

/// Checks that (un)knitting the ready ring works with just one queue.
///
/// This case is interesting since it wraps and a lot of `mutate` now operate on the same object.
#[test]
fn ready_ring_knit_basic_works() {
	use MessageOrigin::*;

		BookStateFor::<Test>::insert(Here, empty_book::<Test>());

		for i in 0..10 {
			if i % 2 == 0 {
				knit(&Here);
				assert_ring(&[Here]);
			} else {
				unknit(&Here);
				assert_ring(&[]);
			}
		}
		assert_ring(&[]);
	});
}

#[test]
fn ready_ring_knit_and_unknit_works() {
	use MessageOrigin::*;

		// Place three queues into the storage.
		BookStateFor::<Test>::insert(Here, empty_book::<Test>());
		BookStateFor::<Test>::insert(There, empty_book::<Test>());
		BookStateFor::<Test>::insert(Everywhere(0), empty_book::<Test>());

		// Pausing should make no difference:
		PausedQueues::set(vec![Here, There, Everywhere(0)]);

		// Knit them into the ready ring.
		assert_ring(&[]);
		knit(&Here);
		assert_ring(&[Here]);
		knit(&There);
		assert_ring(&[Here, There]);
		knit(&Everywhere(0));
		assert_ring(&[Here, There, Everywhere(0)]);

		// Now unknit…
		unknit(&Here);
		assert_ring(&[There, Everywhere(0)]);
		unknit(&There);
		assert_ring(&[Everywhere(0)]);
		unknit(&Everywhere(0));
		assert_ring(&[]);
	});
}

#[test]
fn enqueue_message_works() {
	use MessageOrigin::*;
	let max_msg_per_page = <Test as Config>::HeapSize::get() as u64 /
		(ItemHeader::<<Test as Config>::Size>::max_encoded_len() as u64 + 1);

		// Enqueue messages which should fill three pages.
		let n = max_msg_per_page * 3;
		for i in 1..=n {
			MessageQueue::enqueue_message(msg("a"), Here);
			assert_eq!(QueueChanges::take(), vec![(Here, i, i)], "OnQueueChanged not called");
		}
		assert_eq!(Pages::<Test>::iter().count(), 3);

		// Enqueue one more onto page 4.
		MessageQueue::enqueue_message(msg("abc"), Here);
		assert_eq!(QueueChanges::take(), vec![(Here, n + 1, n + 3)]);
		assert_eq!(Pages::<Test>::iter().count(), 4);

		// Check the state.
		assert_eq!(BookStateFor::<Test>::iter().count(), 1);
		let book = BookStateFor::<Test>::get(Here);
		assert_eq!(book.message_count, n + 1);
		assert_eq!(book.size, n + 3);
		assert_eq!((book.begin, book.end), (0, 4));
		assert_eq!(book.count as usize, Pages::<Test>::iter().count());
	});
}

#[test]
fn enqueue_messages_works() {
	use MessageOrigin::*;
	let max_msg_per_page = <Test as Config>::HeapSize::get() as u64 /
		(ItemHeader::<<Test as Config>::Size>::max_encoded_len() as u64 + 1);

		// Enqueue messages which should fill three pages.
		let n = max_msg_per_page * 3;
		let msgs = vec![msg("a"); n as usize];

		// Now queue all messages at once.
		MessageQueue::enqueue_messages(msgs.into_iter(), Here);
		// The changed handler should only be called once.
		assert_eq!(QueueChanges::take(), vec![(Here, n, n)], "OnQueueChanged not called");
		assert_eq!(Pages::<Test>::iter().count(), 3);

		// Enqueue one more onto page 4.
		MessageQueue::enqueue_message(msg("abc"), Here);
		assert_eq!(QueueChanges::take(), vec![(Here, n + 1, n + 3)]);
		assert_eq!(Pages::<Test>::iter().count(), 4);

		// Check the state.
		assert_eq!(BookStateFor::<Test>::iter().count(), 1);
		let book = BookStateFor::<Test>::get(Here);
		assert_eq!(book.message_count, n + 1);
		assert_eq!(book.size, n + 3);
		assert_eq!((book.begin, book.end), (0, 4));
		assert_eq!(book.count as usize, Pages::<Test>::iter().count());
	});
}

#[test]
fn service_queues_suspend_works() {
	use MessageOrigin::*;
	test_closure(|| {
		MessageQueue::enqueue_messages(vec![msg("a"), msg("ab"), msg("abc")].into_iter(), Here);
		MessageQueue::enqueue_messages(vec![msg("x"), msg("xy"), msg("xyz")].into_iter(), There);
		assert_eq!(QueueChanges::take(), vec![(Here, 3, 6), (There, 3, 6)]);

		// Pause `Here` - execution starts `There`.
		PausedQueues::set(vec![Here]);
		assert_eq!(
			(true, false),
			(
				<Test as Config>::QueuePausedQuery::is_paused(&Here),
				<Test as Config>::QueuePausedQuery::is_paused(&There)
			)
		);
		assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
		assert_eq!(MessagesProcessed::take(), vec![(vmsg("x"), There)]);
		assert_eq!(QueueChanges::take(), vec![(There, 2, 5)]);

		// Unpause `Here` - execution continues `There`.
		PausedQueues::take();
		assert_eq!(
			(false, false),
			(
				<Test as Config>::QueuePausedQuery::is_paused(&Here),
				<Test as Config>::QueuePausedQuery::is_paused(&There)
			)
		);
		assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
		assert_eq!(MessagesProcessed::take(), vec![(vmsg("xy"), There)]);
		assert_eq!(QueueChanges::take(), vec![(There, 1, 3)]);

		// Now it swaps to `Here`.
		assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
		assert_eq!(MessagesProcessed::take(), vec![(vmsg("a"), Here)]);
		assert_eq!(QueueChanges::take(), vec![(Here, 2, 5)]);

		// Pause `There` - execution continues `Here`.
		PausedQueues::set(vec![There]);
		assert_eq!(
			(false, true),
			(
				<Test as Config>::QueuePausedQuery::is_paused(&Here),
				<Test as Config>::QueuePausedQuery::is_paused(&There)
			)
		);
		assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
		assert_eq!(MessagesProcessed::take(), vec![(vmsg("ab"), Here)]);
		assert_eq!(QueueChanges::take(), vec![(Here, 1, 3)]);

		// Unpause `There` and service all remaining messages.
		PausedQueues::take();
		assert_eq!(
			(false, false),
			(
				<Test as Config>::QueuePausedQuery::is_paused(&Here),
				<Test as Config>::QueuePausedQuery::is_paused(&There)
			)
		);
		assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight());
		assert_eq!(MessagesProcessed::take(), vec![(vmsg("abc"), Here), (vmsg("xyz"), There)]);
		assert_eq!(QueueChanges::take(), vec![(Here, 0, 0), (There, 0, 0)]);
	});
}

/// Tests that manual overweight execution on a suspended queue errors with `QueueSuspended`.
#[test]
fn execute_overweight_respects_suspension() {
	test_closure(|| {
		let origin = MessageOrigin::Here;
		MessageQueue::enqueue_message(msg("weight=5"), origin);
		// Mark the message as permanently overweight.
		MessageQueue::service_queues(4.into_weight());
		assert_last_event::<Test>(
			Event::OverweightEnqueued {
				id: blake2_256(b"weight=5"),
				origin,
				message_index: 0,
				page_index: 0,
			}
			.into(),
		);
		PausedQueues::set(vec![origin]);
		assert!(<Test as Config>::QueuePausedQuery::is_paused(&origin));

		// Execution should fail.
		assert_eq!(
			<MessageQueue as ServiceQueues>::execute_overweight(Weight::MAX, (origin, 0, 0)),
			Err(ExecuteOverweightError::QueuePaused)
		);

		PausedQueues::take();
		assert!(!<Test as Config>::QueuePausedQuery::is_paused(&origin));

		// Execution should work again with same args.
		assert_ok!(<MessageQueue as ServiceQueues>::execute_overweight(
			Weight::MAX,
			(origin, 0, 0)
		));

		assert_last_event::<Test>(
			Event::Processed {
				id: blake2_256(b"weight=5"),
				origin,
				weight_used: 5.into_weight(),
				success: true,
			}
			.into(),
		);
	});
}

#[test]
fn service_queue_suspension_ready_ring_works() {
	test_closure(|| {
		let origin = MessageOrigin::Here;
		PausedQueues::set(vec![origin]);
		MessageQueue::enqueue_message(msg("weight=5"), origin);

		MessageQueue::service_queues(Weight::MAX);
		// It did not execute but is in the ready ring.
		assert!(System::events().is_empty(), "Paused");
		assert_ring(&[origin]);

		// Now when we un-pause, it will execute.
		PausedQueues::take();
		MessageQueue::service_queues(Weight::MAX);
		assert_last_event::<Test>(
			Event::Processed {
				id: blake2_256(b"weight=5"),
				origin,
				weight_used: 5.into_weight(),
				success: true,
			}
			.into(),
		);
	});
}