diff --git a/substrate/bin/node/runtime/src/lib.rs b/substrate/bin/node/runtime/src/lib.rs index 2270cd0145353c26d140efbb7041ab6ba6bef776..2f6aed32acb0144fe5f7d01d445c0130ddede2bc 100644 --- a/substrate/bin/node/runtime/src/lib.rs +++ b/substrate/bin/node/runtime/src/lib.rs @@ -1190,6 +1190,7 @@ impl pallet_message_queue::Config for Runtime { type MessageProcessor = pallet_message_queue::mock_helpers::NoopMessageProcessor<u32>; type Size = u32; type QueueChangeHandler = (); + type QueuePausedQuery = (); type HeapSize = ConstU32<{ 64 * 1024 }>; type MaxStale = ConstU32<128>; type ServiceWeight = MessageQueueServiceWeight; diff --git a/substrate/frame/message-queue/src/integration_test.rs b/substrate/frame/message-queue/src/integration_test.rs index 255098b3b141545a848b0f0d6c38bf7c90218737..9371fa7c7c29120dfbf5977ab0c86836e1bec3ec 100644 --- a/substrate/frame/message-queue/src/integration_test.rs +++ b/substrate/frame/message-queue/src/integration_test.rs @@ -23,7 +23,7 @@ use crate::{ mock::{ new_test_ext, CountingMessageProcessor, IntoWeight, MockedWeightInfo, NumMessagesProcessed, - SuspendedQueues, + YieldingQueues, }, mock_helpers::MessageOrigin, *, @@ -96,6 +96,7 @@ impl Config for Test { type MessageProcessor = CountingMessageProcessor; type Size = u32; type QueueChangeHandler = (); + type QueuePausedQuery = (); type HeapSize = HeapSize; type MaxStale = MaxStale; type ServiceWeight = ServiceWeight; @@ -207,7 +208,7 @@ fn stress_test_queue_suspension() { to_resume, per_queue.len() ); - SuspendedQueues::set(suspended.iter().map(|q| MessageOrigin::Everywhere(*q)).collect()); + YieldingQueues::set(suspended.iter().map(|q| MessageOrigin::Everywhere(*q)).collect()); // Pick a fraction of all messages currently in queue and process them. let resumed_messages = @@ -229,7 +230,7 @@ fn stress_test_queue_suspension() { process_all_messages(resumed_messages); msgs_remaining -= resumed_messages; - let resumed = SuspendedQueues::take(); + let resumed = YieldingQueues::take(); log::info!("Resumed all {} suspended queues", resumed.len()); log::info!("Processing all remaining {} messages", msgs_remaining); process_all_messages(msgs_remaining); diff --git a/substrate/frame/message-queue/src/lib.rs b/substrate/frame/message-queue/src/lib.rs index 37fbe85fd56be7a5a6c6a522837ac39b2acc3163..55a41643993aa97b33a1227216dc25782068ac8d 100644 --- a/substrate/frame/message-queue/src/lib.rs +++ b/substrate/frame/message-queue/src/lib.rs @@ -195,7 +195,7 @@ use frame_support::{ pallet_prelude::*, traits::{ DefensiveTruncateFrom, EnqueueMessage, ExecuteOverweightError, Footprint, ProcessMessage, - ProcessMessageError, ServiceQueues, + ProcessMessageError, QueuePausedQuery, ServiceQueues, }, BoundedSlice, CloneNoBound, DefaultNoBound, }; @@ -473,6 +473,13 @@ pub mod pallet { /// removed. type QueueChangeHandler: OnQueueChanged<<Self::MessageProcessor as ProcessMessage>::Origin>; + /// Queried by the pallet to check whether a queue can be serviced. + /// + /// This also applies to manual servicing via `execute_overweight` and `service_queues`. The + /// value of this is only polled once before servicing the queue. This means that changes to + /// it that happen *within* the servicing will not be reflected. + type QueuePausedQuery: QueuePausedQuery<<Self::MessageProcessor as ProcessMessage>::Origin>; + /// The size of the page; this implies the maximum message size which can be sent. /// /// A good value depends on the expected message sizes, their weights, the weight that is @@ -534,6 +541,10 @@ pub mod pallet { /// Such errors are expected, but not guaranteed, to resolve themselves eventually through /// retrying. TemporarilyUnprocessable, + /// The queue is paused and no message can be executed from it. + /// + /// This can change at any time and may resolve in the future by re-trying. + QueuePaused, } /// The index of the first and last (non-empty) pages. @@ -803,6 +814,8 @@ impl<T: Config> Pallet<T> { weight_limit: Weight, ) -> Result<Weight, Error<T>> { let mut book_state = BookStateFor::<T>::get(&origin); + ensure!(!T::QueuePausedQuery::is_paused(&origin), Error::<T>::QueuePaused); + let mut page = Pages::<T>::get(&origin, page_index).ok_or(Error::<T>::NoPage)?; let (pos, is_processed, payload) = page.peek_index(index.into() as usize).ok_or(Error::<T>::NoMessage)?; @@ -943,6 +956,10 @@ impl<T: Config> Pallet<T> { let mut book_state = BookStateFor::<T>::get(&origin); let mut total_processed = 0; + if T::QueuePausedQuery::is_paused(&origin) { + let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone()); + return (false, next_ready) + } while book_state.end > book_state.begin { let (processed, status) = @@ -1284,7 +1301,11 @@ impl<T: Config> ServiceQueues for Pallet<T> { Pallet::<T>::do_execute_overweight(message_origin, page, index, weight.remaining()).map_err( |e| match e { Error::<T>::InsufficientWeight => ExecuteOverweightError::InsufficientWeight, - _ => ExecuteOverweightError::NotFound, + Error::<T>::AlreadyProcessed => ExecuteOverweightError::AlreadyProcessed, + Error::<T>::QueuePaused => ExecuteOverweightError::QueuePaused, + Error::<T>::NoPage | Error::<T>::NoMessage | Error::<T>::Queued => + ExecuteOverweightError::NotFound, + _ => ExecuteOverweightError::Other, }, ) } diff --git a/substrate/frame/message-queue/src/mock.rs b/substrate/frame/message-queue/src/mock.rs index 71f0b0fa20e30489e351da0360c61d25adb1c399..f03ab8130952beba4f93f573e0335af980a40fb6 100644 --- a/substrate/frame/message-queue/src/mock.rs +++ b/substrate/frame/message-queue/src/mock.rs @@ -84,6 +84,7 @@ impl Config for Test { type MessageProcessor = RecordingMessageProcessor; type Size = u32; type QueueChangeHandler = RecordingQueueChangeHandler; + type QueuePausedQuery = MockedQueuePauser; type HeapSize = HeapSize; type MaxStale = MaxStale; type ServiceWeight = ServiceWeight; @@ -154,7 +155,8 @@ impl crate::weights::WeightInfo for MockedWeightInfo { parameter_types! { pub static MessagesProcessed: Vec<(Vec<u8>, MessageOrigin)> = vec![]; - pub static SuspendedQueues: Vec<MessageOrigin> = vec![]; + /// Queues that should return `Yield` upon being processed. + pub static YieldingQueues: Vec<MessageOrigin> = vec![]; } /// A message processor which records all processed messages into [`MessagesProcessed`]. @@ -205,7 +207,7 @@ impl ProcessMessage for RecordingMessageProcessor { /// Processed a mocked message. Messages that end with `badformat`, `corrupt`, `unsupported` or /// `yield` will fail with an error respectively. fn processing_message(msg: &[u8], origin: &MessageOrigin) -> Result<(), ProcessMessageError> { - if SuspendedQueues::get().contains(&origin) { + if YieldingQueues::get().contains(&origin) { return Err(ProcessMessageError::Yield) } @@ -270,6 +272,17 @@ impl OnQueueChanged<MessageOrigin> for RecordingQueueChangeHandler { } } +parameter_types! { + pub static PausedQueues: Vec<MessageOrigin> = vec![]; +} + +pub struct MockedQueuePauser; +impl QueuePausedQuery<MessageOrigin> for MockedQueuePauser { + fn is_paused(id: &MessageOrigin) -> bool { + PausedQueues::get().contains(id) + } +} + /// Create new test externalities. /// /// Is generic since it is used by the unit test, integration tests and benchmarks. @@ -287,6 +300,12 @@ where ext } +/// Run this closure in test externalities. +pub fn test_closure<R>(f: impl FnOnce() -> R) -> R { + let mut ext = new_test_ext::<Test>(); + ext.execute_with(f) +} + /// Set the weight of a specific weight function. pub fn set_weight(name: &str, w: Weight) { MockedWeightInfo::set_weight::<Test>(name, w); diff --git a/substrate/frame/message-queue/src/mock_helpers.rs b/substrate/frame/message-queue/src/mock_helpers.rs index 5a39eb0a598855a04f907be42941f3614a362eb6..65b1e5af9099a7acdf69423ca52d7139f860c646 100644 --- a/substrate/frame/message-queue/src/mock_helpers.rs +++ b/substrate/frame/message-queue/src/mock_helpers.rs @@ -89,11 +89,11 @@ pub fn page<T: Config>(msg: &[u8]) -> PageOf<T> { } pub fn single_page_book<T: Config>() -> BookStateOf<T> { - BookState { begin: 0, end: 1, count: 1, ready_neighbours: None, message_count: 0, size: 0 } + BookState { begin: 0, end: 1, count: 1, ..Default::default() } } pub fn empty_book<T: Config>() -> BookStateOf<T> { - BookState { begin: 0, end: 1, count: 1, ready_neighbours: None, message_count: 0, size: 0 } + BookState { begin: 0, end: 1, count: 1, ..Default::default() } } /// Returns a full page of messages with their index as payload and the number of messages. @@ -118,9 +118,9 @@ pub fn book_for<T: Config>(page: &PageOf<T>) -> BookStateOf<T> { count: 1, begin: 0, end: 1, - ready_neighbours: None, message_count: page.remaining.into() as u64, size: page.remaining_size.into() as u64, + ..Default::default() } } diff --git a/substrate/frame/message-queue/src/tests.rs b/substrate/frame/message-queue/src/tests.rs index 10eff69c926e560deb7eb07398ac290832ea3a93..6587c949bde059e5ad72b02cf9e16519d90ebb5c 100644 --- a/substrate/frame/message-queue/src/tests.rs +++ b/substrate/frame/message-queue/src/tests.rs @@ -27,22 +27,22 @@ use sp_core::blake2_256; #[test] fn mocked_weight_works() { - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { assert!(<Test as Config>::WeightInfo::service_queue_base().is_zero()); }); - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { set_weight("service_queue_base", Weight::MAX); assert_eq!(<Test as Config>::WeightInfo::service_queue_base(), Weight::MAX); }); // The externalities reset it. - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { assert!(<Test as Config>::WeightInfo::service_queue_base().is_zero()); }); } #[test] fn enqueue_within_one_page_works() { - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { use MessageOrigin::*; MessageQueue::enqueue_message(msg("a"), Here); MessageQueue::enqueue_message(msg("b"), Here); @@ -77,7 +77,7 @@ fn enqueue_within_one_page_works() { #[test] fn queue_priority_retains() { - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { use MessageOrigin::*; assert_ring(&[]); MessageQueue::enqueue_message(msg("a"), Everywhere(1)); @@ -108,7 +108,7 @@ fn queue_priority_retains() { #[test] fn queue_priority_reset_once_serviced() { - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { use MessageOrigin::*; MessageQueue::enqueue_message(msg("a"), Everywhere(1)); MessageQueue::enqueue_message(msg("b"), Everywhere(2)); @@ -135,7 +135,7 @@ fn queue_priority_reset_once_serviced() { #[test] fn service_queues_basic_works() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { MessageQueue::enqueue_messages(vec![msg("a"), msg("ab"), msg("abc")].into_iter(), Here); MessageQueue::enqueue_messages(vec![msg("x"), msg("xy"), msg("xyz")].into_iter(), There); assert_eq!(QueueChanges::take(), vec![(Here, 3, 6), (There, 3, 6)]); @@ -146,13 +146,11 @@ fn service_queues_basic_works() { assert_eq!(QueueChanges::take(), vec![(Here, 2, 5)]); // Service one message from `There`. - ServiceHead::<Test>::set(There.into()); assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); assert_eq!(MessagesProcessed::take(), vec![(vmsg("x"), There)]); assert_eq!(QueueChanges::take(), vec![(There, 2, 5)]); // Service the remaining from `Here`. - ServiceHead::<Test>::set(Here.into()); assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight()); assert_eq!(MessagesProcessed::take(), vec![(vmsg("ab"), Here), (vmsg("abc"), Here)]); assert_eq!(QueueChanges::take(), vec![(Here, 0, 0)]); @@ -167,7 +165,7 @@ fn service_queues_basic_works() { #[test] fn service_queues_failing_messages_works() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { set_weight("service_page_item", 1.into_weight()); MessageQueue::enqueue_message(msg("badformat"), Here); MessageQueue::enqueue_message(msg("corrupt"), Here); @@ -213,7 +211,7 @@ fn service_queues_failing_messages_works() { #[test] fn service_queues_suspension_works() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { MessageQueue::enqueue_messages(vec![msg("a"), msg("b"), msg("c")].into_iter(), Here); MessageQueue::enqueue_messages(vec![msg("x"), msg("y"), msg("z")].into_iter(), There); MessageQueue::enqueue_messages( @@ -227,8 +225,8 @@ fn service_queues_suspension_works() { assert_eq!(MessagesProcessed::take(), vec![(vmsg("a"), Here)]); assert_eq!(QueueChanges::take(), vec![(Here, 2, 2)]); - // Pause queue `Here` and `Everywhere(0)`. - SuspendedQueues::set(vec![Here, Everywhere(0)]); + // Make queue `Here` and `Everywhere(0)` yield. + YieldingQueues::set(vec![Here, Everywhere(0)]); // Service one message from `There`. assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); @@ -245,13 +243,13 @@ fn service_queues_suspension_works() { assert_eq!(MessageQueue::service_queues(Weight::MAX), Weight::zero()); // ... until we resume `Here`: - SuspendedQueues::set(vec![Everywhere(0)]); + YieldingQueues::set(vec![Everywhere(0)]); assert_eq!(MessageQueue::service_queues(Weight::MAX), 2.into_weight()); assert_eq!(MessagesProcessed::take(), vec![(vmsg("b"), Here), (vmsg("c"), Here)]); // Everywhere still won't move. assert_eq!(MessageQueue::service_queues(Weight::MAX), Weight::zero()); - SuspendedQueues::take(); + YieldingQueues::take(); // Resume `Everywhere(0)` makes it work. assert_eq!(MessageQueue::service_queues(Weight::MAX), 3.into_weight()); assert_eq!( @@ -268,7 +266,7 @@ fn service_queues_suspension_works() { #[test] fn reap_page_permanent_overweight_works() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { // Create 10 pages more than the stale limit. let n = (MaxStale::get() + 10) as usize; for _ in 0..n { @@ -308,7 +306,7 @@ fn reaping_overweight_fails_properly() { use MessageOrigin::*; assert_eq!(MaxStale::get(), 2, "The stale limit is two"); - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { // page 0 MessageQueue::enqueue_message(msg("weight=4"), Here); MessageQueue::enqueue_message(msg("a"), Here); @@ -378,7 +376,7 @@ fn reaping_overweight_fails_properly() { #[test] fn service_queue_bails() { // Not enough weight for `service_queue_base`. - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { set_weight("service_queue_base", 2.into_weight()); let mut meter = WeightMeter::from_limit(1.into_weight()); @@ -386,7 +384,7 @@ fn service_queue_bails() { assert!(meter.consumed.is_zero()); }); // Not enough weight for `ready_ring_unknit`. - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { set_weight("ready_ring_unknit", 2.into_weight()); let mut meter = WeightMeter::from_limit(1.into_weight()); @@ -394,7 +392,7 @@ fn service_queue_bails() { assert!(meter.consumed.is_zero()); }); // Not enough weight for `service_queue_base` and `ready_ring_unknit`. - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { set_weight("service_queue_base", 2.into_weight()); set_weight("ready_ring_unknit", 2.into_weight()); @@ -409,7 +407,7 @@ fn service_page_works() { use super::integration_test::Test; // Run with larger page size. use MessageOrigin::*; use PageExecutionStatus::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { set_weight("service_page_base_completion", 2.into_weight()); set_weight("service_page_item", 3.into_weight()); @@ -446,7 +444,7 @@ fn service_page_works() { #[test] fn service_page_bails() { // Not enough weight for `service_page_base_completion`. - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { set_weight("service_page_base_completion", 2.into_weight()); let mut meter = WeightMeter::from_limit(1.into_weight()); @@ -463,7 +461,7 @@ fn service_page_bails() { assert!(meter.consumed.is_zero()); }); // Not enough weight for `service_page_base_no_completion`. - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { set_weight("service_page_base_no_completion", 2.into_weight()); let mut meter = WeightMeter::from_limit(1.into_weight()); @@ -483,7 +481,7 @@ fn service_page_bails() { #[test] fn service_page_item_bails() { - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { let _guard = StorageNoopGuard::default(); let (mut page, _) = full_page::<Test>(); let mut weight = WeightMeter::from_limit(10.into_weight()); @@ -510,7 +508,7 @@ fn service_page_suspension_works() { use MessageOrigin::*; use PageExecutionStatus::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { let (page, mut msgs) = full_page::<Test>(); assert!(msgs >= 10, "pre-condition: need at least 10 msgs per page"); let mut book = book_for::<Test>(&page); @@ -527,7 +525,7 @@ fn service_page_suspension_works() { msgs -= 5; // Then we pause the queue. - SuspendedQueues::set(vec![Here]); + YieldingQueues::set(vec![Here]); // Noting happens... for _ in 0..5 { let (_, status) = crate::Pallet::<Test>::service_page( @@ -541,7 +539,7 @@ fn service_page_suspension_works() { } // Resume and process all remaining. - SuspendedQueues::take(); + YieldingQueues::take(); let (_, status) = crate::Pallet::<Test>::service_page( &Here, &mut book, @@ -558,7 +556,7 @@ fn service_page_suspension_works() { #[test] fn bump_service_head_works() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { // Create a ready ring with three queues. BookStateFor::<Test>::insert(Here, empty_book::<Test>()); knit(&Here); @@ -581,7 +579,7 @@ fn bump_service_head_works() { /// `bump_service_head` does nothing when called with an insufficient weight limit. #[test] fn bump_service_head_bails() { - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { set_weight("bump_service_head", 2.into_weight()); setup_bump_service_head::<Test>(0.into(), 10.into()); @@ -594,7 +592,7 @@ fn bump_service_head_bails() { #[test] fn bump_service_head_trivial_works() { - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { set_weight("bump_service_head", 2.into_weight()); let mut meter = WeightMeter::max_limit(); @@ -615,7 +613,7 @@ fn bump_service_head_trivial_works() { #[test] fn bump_service_head_no_head_noops() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { // Create a ready ring with three queues. BookStateFor::<Test>::insert(Here, empty_book::<Test>()); knit(&Here); @@ -634,7 +632,7 @@ fn bump_service_head_no_head_noops() { #[test] fn service_page_item_consumes_correct_weight() { - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { let mut page = page::<Test>(b"weight=3"); let mut weight = WeightMeter::from_limit(10.into_weight()); let overweight_limit = 0.into_weight(); @@ -658,7 +656,7 @@ fn service_page_item_consumes_correct_weight() { /// `service_page_item` skips a permanently `Overweight` message and marks it as `unprocessed`. #[test] fn service_page_item_skips_perm_overweight_message() { - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { let mut page = page::<Test>(b"TooMuch"); let mut weight = WeightMeter::from_limit(2.into_weight()); let overweight_limit = 0.into_weight(); @@ -697,7 +695,7 @@ fn service_page_item_skips_perm_overweight_message() { #[test] fn peek_index_works() { use super::integration_test::Test; // Run with larger page size. - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { // Fill a page with messages. let (mut page, msgs) = full_page::<Test>(); let msg_enc_len = ItemHeader::<<Test as Config>::Size>::max_encoded_len() + 4; @@ -718,7 +716,7 @@ fn peek_index_works() { #[test] fn peek_first_and_skip_first_works() { use super::integration_test::Test; // Run with larger page size. - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { // Fill a page with messages. let (mut page, msgs) = full_page::<Test>(); @@ -741,7 +739,7 @@ fn peek_first_and_skip_first_works() { #[test] fn note_processed_at_pos_works() { use super::integration_test::Test; // Run with larger page size. - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { let (mut page, msgs) = full_page::<Test>(); for i in 0..msgs { @@ -777,7 +775,7 @@ fn note_processed_at_pos_idempotent() { #[test] fn is_complete_works() { use super::integration_test::Test; // Run with larger page size. - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { let (mut page, msgs) = full_page::<Test>(); assert!(msgs > 3, "Boring"); let msg_enc_len = ItemHeader::<<Test as Config>::Size>::max_encoded_len() + 4; @@ -933,7 +931,7 @@ fn page_from_message_max_len_works() { #[test] fn sweep_queue_works() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { build_triple_ring(); let book = BookStateFor::<Test>::get(Here); @@ -969,7 +967,7 @@ fn sweep_queue_works() { #[test] fn sweep_queue_wraps_works() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { BookStateFor::<Test>::insert(Here, empty_book::<Test>()); knit(&Here); @@ -982,14 +980,14 @@ fn sweep_queue_wraps_works() { #[test] fn sweep_queue_invalid_noops() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { assert_storage_noop!(MessageQueue::sweep_queue(Here)); }); } #[test] fn footprint_works() { - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { let origin = MessageOrigin::Here; let (page, msgs) = full_page::<Test>(); let book = book_for::<Test>(&page); @@ -1007,7 +1005,7 @@ fn footprint_works() { /// The footprint of an invalid queue is the default footprint. #[test] fn footprint_invalid_works() { - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { let origin = MessageOrigin::Here; assert_eq!(MessageQueue::footprint(origin), Default::default()); }) @@ -1017,7 +1015,7 @@ fn footprint_invalid_works() { #[test] fn footprint_on_swept_works() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { let mut book = empty_book::<Test>(); book.message_count = 3; book.size = 10; @@ -1033,7 +1031,7 @@ fn footprint_on_swept_works() { #[test] fn execute_overweight_works() { - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { set_weight("bump_service_head", 1.into_weight()); set_weight("service_queue_base", 1.into_weight()); set_weight("service_page_base_completion", 1.into_weight()); @@ -1093,7 +1091,7 @@ fn execute_overweight_works() { fn permanently_overweight_book_unknits() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { set_weight("bump_service_head", 1.into_weight()); set_weight("service_queue_base", 1.into_weight()); set_weight("service_page_base_completion", 1.into_weight()); @@ -1130,7 +1128,7 @@ fn permanently_overweight_book_unknits() { fn permanently_overweight_book_unknits_multiple() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { set_weight("bump_service_head", 1.into_weight()); set_weight("service_queue_base", 1.into_weight()); set_weight("service_page_base_completion", 1.into_weight()); @@ -1169,7 +1167,7 @@ fn permanently_overweight_book_unknits_multiple() { fn ready_but_empty_does_not_panic() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { BookStateFor::<Test>::insert(Here, empty_book::<Test>()); BookStateFor::<Test>::insert(There, empty_book::<Test>()); @@ -1189,7 +1187,7 @@ fn ready_but_empty_does_not_panic() { fn ready_but_perm_overweight_does_not_panic() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { MessageQueue::enqueue_message(msg("weight=9"), Here); assert_eq!(MessageQueue::service_queues(8.into_weight()), 0.into_weight()); assert_ring(&[]); @@ -1209,7 +1207,7 @@ fn ready_but_perm_overweight_does_not_panic() { fn ready_ring_knit_basic_works() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { BookStateFor::<Test>::insert(Here, empty_book::<Test>()); for i in 0..10 { @@ -1229,12 +1227,15 @@ fn ready_ring_knit_basic_works() { fn ready_ring_knit_and_unknit_works() { use MessageOrigin::*; - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { // Place three queues into the storage. BookStateFor::<Test>::insert(Here, empty_book::<Test>()); BookStateFor::<Test>::insert(There, empty_book::<Test>()); BookStateFor::<Test>::insert(Everywhere(0), empty_book::<Test>()); + // Pausing should make no difference: + PausedQueues::set(vec![Here, There, Everywhere(0)]); + // Knit them into the ready ring. assert_ring(&[]); knit(&Here); @@ -1260,7 +1261,7 @@ fn enqueue_message_works() { let max_msg_per_page = <Test as Config>::HeapSize::get() as u64 / (ItemHeader::<<Test as Config>::Size>::max_encoded_len() as u64 + 1); - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { // Enqueue messages which should fill three pages. let n = max_msg_per_page * 3; for i in 1..=n { @@ -1290,7 +1291,7 @@ fn enqueue_messages_works() { let max_msg_per_page = <Test as Config>::HeapSize::get() as u64 / (ItemHeader::<<Test as Config>::Size>::max_encoded_len() as u64 + 1); - new_test_ext::<Test>().execute_with(|| { + test_closure(|| { // Enqueue messages which should fill three pages. let n = max_msg_per_page * 3; let msgs = vec![msg("a"); n as usize]; @@ -1315,3 +1316,144 @@ fn enqueue_messages_works() { assert_eq!(book.count as usize, Pages::<Test>::iter().count()); }); } + +#[test] +fn service_queues_suspend_works() { + use MessageOrigin::*; + test_closure(|| { + MessageQueue::enqueue_messages(vec![msg("a"), msg("ab"), msg("abc")].into_iter(), Here); + MessageQueue::enqueue_messages(vec![msg("x"), msg("xy"), msg("xyz")].into_iter(), There); + assert_eq!(QueueChanges::take(), vec![(Here, 3, 6), (There, 3, 6)]); + + // Pause `Here` - execution starts `There`. + PausedQueues::set(vec![Here]); + assert_eq!( + (true, false), + ( + <Test as Config>::QueuePausedQuery::is_paused(&Here), + <Test as Config>::QueuePausedQuery::is_paused(&There) + ) + ); + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("x"), There)]); + assert_eq!(QueueChanges::take(), vec![(There, 2, 5)]); + + // Unpause `Here` - execution continues `There`. + PausedQueues::take(); + assert_eq!( + (false, false), + ( + <Test as Config>::QueuePausedQuery::is_paused(&Here), + <Test as Config>::QueuePausedQuery::is_paused(&There) + ) + ); + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("xy"), There)]); + assert_eq!(QueueChanges::take(), vec![(There, 1, 3)]); + + // Now it swaps to `Here`. + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("a"), Here)]); + assert_eq!(QueueChanges::take(), vec![(Here, 2, 5)]); + + // Pause `There` - execution continues `Here`. + PausedQueues::set(vec![There]); + assert_eq!( + (false, true), + ( + <Test as Config>::QueuePausedQuery::is_paused(&Here), + <Test as Config>::QueuePausedQuery::is_paused(&There) + ) + ); + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("ab"), Here)]); + assert_eq!(QueueChanges::take(), vec![(Here, 1, 3)]); + + // Unpause `There` and service all remaining messages. + PausedQueues::take(); + assert_eq!( + (false, false), + ( + <Test as Config>::QueuePausedQuery::is_paused(&Here), + <Test as Config>::QueuePausedQuery::is_paused(&There) + ) + ); + assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("abc"), Here), (vmsg("xyz"), There)]); + assert_eq!(QueueChanges::take(), vec![(Here, 0, 0), (There, 0, 0)]); + }); +} + +/// Tests that manual overweight execution on a suspended queue errors with `QueueSuspended`. +#[test] +fn execute_overweight_respects_suspension() { + test_closure(|| { + let origin = MessageOrigin::Here; + MessageQueue::enqueue_message(msg("weight=5"), origin); + // Mark the message as permanently overweight. + MessageQueue::service_queues(4.into_weight()); + assert_last_event::<Test>( + Event::OverweightEnqueued { + id: blake2_256(b"weight=5"), + origin, + message_index: 0, + page_index: 0, + } + .into(), + ); + PausedQueues::set(vec![origin]); + assert!(<Test as Config>::QueuePausedQuery::is_paused(&origin)); + + // Execution should fail. + assert_eq!( + <MessageQueue as ServiceQueues>::execute_overweight(Weight::MAX, (origin, 0, 0)), + Err(ExecuteOverweightError::QueuePaused) + ); + + PausedQueues::take(); + assert!(!<Test as Config>::QueuePausedQuery::is_paused(&origin)); + + // Execution should work again with same args. + assert_ok!(<MessageQueue as ServiceQueues>::execute_overweight( + Weight::MAX, + (origin, 0, 0) + )); + + assert_last_event::<Test>( + Event::Processed { + id: blake2_256(b"weight=5"), + origin, + weight_used: 5.into_weight(), + success: true, + } + .into(), + ); + }); +} + +#[test] +fn service_queue_suspension_ready_ring_works() { + test_closure(|| { + let origin = MessageOrigin::Here; + PausedQueues::set(vec![origin]); + MessageQueue::enqueue_message(msg("weight=5"), origin); + + MessageQueue::service_queues(Weight::MAX); + // It did not execute but is in the ready ring. + assert!(System::events().is_empty(), "Paused"); + assert_ring(&[origin]); + + // Now when we un-pause, it will execute. + PausedQueues::take(); + MessageQueue::service_queues(Weight::MAX); + assert_last_event::<Test>( + Event::Processed { + id: blake2_256(b"weight=5"), + origin, + weight_used: 5.into_weight(), + success: true, + } + .into(), + ); + }); +} diff --git a/substrate/frame/support/src/traits.rs b/substrate/frame/support/src/traits.rs index e4fd04750441a7781e4352fe2b5ed8ecb9b246c6..5b34a77df448a47337b4842f30d64b347765a19d 100644 --- a/substrate/frame/support/src/traits.rs +++ b/substrate/frame/support/src/traits.rs @@ -113,7 +113,7 @@ pub use preimages::{Bounded, BoundedInline, FetchResult, Hash, QueryPreimage, St mod messages; pub use messages::{ EnqueueMessage, ExecuteOverweightError, Footprint, NoopServiceQueues, ProcessMessage, - ProcessMessageError, ServiceQueues, TransformOrigin, + ProcessMessageError, QueuePausedQuery, ServiceQueues, TransformOrigin, }; #[cfg(feature = "try-runtime")] diff --git a/substrate/frame/support/src/traits/messages.rs b/substrate/frame/support/src/traits/messages.rs index fe907b0c6d63e95ab9a6c22f90c1f3f161d3a6d9..36fa7957dff7c3891053eddc1f33b29da20366de 100644 --- a/substrate/frame/support/src/traits/messages.rs +++ b/substrate/frame/support/src/traits/messages.rs @@ -69,8 +69,18 @@ pub trait ProcessMessage { pub enum ExecuteOverweightError { /// The referenced message was not found. NotFound, + /// The message was already processed. + /// + /// This can be treated as success condition. + AlreadyProcessed, /// The available weight was insufficient to execute the message. InsufficientWeight, + /// The queue is paused and no message can be executed from it. + /// + /// This can change at any time and may resolve in the future by re-trying. + QueuePaused, + /// An unspecified error. + Other, } /// Can service queues and execute overweight messages. @@ -220,3 +230,15 @@ where E::footprint(O::get()) } } + +/// Provides information on paused queues. +pub trait QueuePausedQuery<Origin> { + /// Whether this queue is paused. + fn is_paused(origin: &Origin) -> bool; +} + +impl<Origin> QueuePausedQuery<Origin> for () { + fn is_paused(_: &Origin) -> bool { + false + } +}