Skip to content
Snippets Groups Projects
Unverified Commit 5d7181cd authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

pallet-message-queue: Fix max message size calculation (#6205)


The max size of a message should not depend on the weight left in a
given execution context. Instead the max message size depends on the
service weights configured for the pallet. A message that may does not
fit into `on_idle` is not automatically overweight, because it may can
be executed successfully in `on_initialize` or in another block in
`on_idle` when there is more weight left.

---------

Co-authored-by: default avatarGitHub Action <action@github.com>
parent 0596928e
No related merge requests found
Pipeline #502787 waiting for manual action with stages
in 29 minutes and 32 seconds
title: 'pallet-message-queue: Fix max message size calculation'
doc:
- audience: Runtime Dev
description: |-
The max size of a message should not depend on the weight left in a given execution context. Instead the max message size depends on the service weights configured for the pallet. A message that may does not fit into `on_idle` is not automatically overweight, because it may can be executed successfully in `on_initialize` or in another block in `on_idle` when there is more weight left.
crates:
- name: pallet-message-queue
bump: patch
......@@ -868,13 +868,26 @@ impl<T: Config> Pallet<T> {
}
}
/// The maximal weight that a single message can consume.
/// The maximal weight that a single message ever can consume.
///
/// Any message using more than this will be marked as permanently overweight and not
/// automatically re-attempted. Returns `None` if the servicing of a message cannot begin.
/// `Some(0)` means that only messages with no weight may be served.
fn max_message_weight(limit: Weight) -> Option<Weight> {
limit.checked_sub(&Self::single_msg_overhead())
let service_weight = T::ServiceWeight::get().unwrap_or_default();
let on_idle_weight = T::IdleMaxServiceWeight::get().unwrap_or_default();
// Whatever weight is set, the one with the biggest one is used as the maximum weight. If a
// message is tried in one context and fails, it will be retried in the other context later.
let max_message_weight =
if service_weight.any_gt(on_idle_weight) { service_weight } else { on_idle_weight };
if max_message_weight.is_zero() {
// If no service weight is set, we need to use the given limit as max message weight.
limit.checked_sub(&Self::single_msg_overhead())
} else {
max_message_weight.checked_sub(&Self::single_msg_overhead())
}
}
/// The overhead of servicing a single message.
......@@ -896,6 +909,8 @@ impl<T: Config> Pallet<T> {
fn do_integrity_test() -> Result<(), String> {
ensure!(!MaxMessageLenOf::<T>::get().is_zero(), "HeapSize too low");
let max_block = T::BlockWeights::get().max_block;
if let Some(service) = T::ServiceWeight::get() {
if Self::max_message_weight(service).is_none() {
return Err(format!(
......@@ -904,6 +919,31 @@ impl<T: Config> Pallet<T> {
Self::single_msg_overhead(),
))
}
if service.any_gt(max_block) {
return Err(format!(
"ServiceWeight {service} is bigger than max block weight {max_block}"
))
}
}
if let Some(on_idle) = T::IdleMaxServiceWeight::get() {
if on_idle.any_gt(max_block) {
return Err(format!(
"IdleMaxServiceWeight {on_idle} is bigger than max block weight {max_block}"
))
}
}
if let (Some(service_weight), Some(on_idle)) =
(T::ServiceWeight::get(), T::IdleMaxServiceWeight::get())
{
if !(service_weight.all_gt(on_idle) ||
on_idle.all_gt(service_weight) ||
service_weight == on_idle)
{
return Err("One of `ServiceWeight` or `IdleMaxServiceWeight` needs to be `all_gt` or both need to be equal.".into())
}
}
Ok(())
......@@ -1531,7 +1571,7 @@ impl<T: Config> Pallet<T> {
let mut weight = WeightMeter::with_limit(weight_limit);
// Get the maximum weight that processing a single message may take:
let max_weight = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
let overweight_limit = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
if matches!(context, ServiceQueuesContext::OnInitialize) {
defensive!("Not enough weight to service a single message.");
}
......@@ -1549,7 +1589,8 @@ impl<T: Config> Pallet<T> {
let mut last_no_progress = None;
loop {
let (progressed, n) = Self::service_queue(next.clone(), &mut weight, max_weight);
let (progressed, n) =
Self::service_queue(next.clone(), &mut weight, overweight_limit);
next = match n {
Some(n) =>
if !progressed {
......
......@@ -42,7 +42,7 @@ impl frame_system::Config for Test {
type Block = Block;
}
parameter_types! {
pub const HeapSize: u32 = 24;
pub const HeapSize: u32 = 40;
pub const MaxStale: u32 = 2;
pub const ServiceWeight: Option<Weight> = Some(Weight::from_parts(100, 100));
}
......
......@@ -177,7 +177,7 @@ fn service_queues_failing_messages_works() {
MessageQueue::enqueue_message(msg("stacklimitreached"), Here);
MessageQueue::enqueue_message(msg("yield"), Here);
// Starts with four pages.
assert_pages(&[0, 1, 2, 3, 4]);
assert_pages(&[0, 1, 2]);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_last_event::<Test>(
......@@ -209,7 +209,7 @@ fn service_queues_failing_messages_works() {
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(System::events().len(), 4);
// Last page with the `yield` stays in.
assert_pages(&[4]);
assert_pages(&[2]);
});
}
......@@ -313,7 +313,7 @@ fn reap_page_permanent_overweight_works() {
// Create 10 pages more than the stale limit.
let n = (MaxStale::get() + 10) as usize;
for _ in 0..n {
MessageQueue::enqueue_message(msg("weight=2"), Here);
MessageQueue::enqueue_message(msg("weight=200 datadatadata"), Here);
}
assert_eq!(Pages::<Test>::iter().count(), n);
assert_eq!(MessageQueue::footprint(Here).pages, n as u32);
......@@ -334,7 +334,7 @@ fn reap_page_permanent_overweight_works() {
break
}
assert_ok!(MessageQueue::do_reap_page(&Here, i));
assert_eq!(QueueChanges::take(), vec![(Here, b.message_count - 1, b.size - 8)]);
assert_eq!(QueueChanges::take(), vec![(Here, b.message_count - 1, b.size - 23)]);
}
// Cannot reap any more pages.
......@@ -353,20 +353,20 @@ fn reaping_overweight_fails_properly() {
build_and_execute::<Test>(|| {
// page 0
MessageQueue::enqueue_message(msg("weight=4"), Here);
MessageQueue::enqueue_message(msg("weight=200 datadata"), Here);
MessageQueue::enqueue_message(msg("a"), Here);
// page 1
MessageQueue::enqueue_message(msg("weight=4"), Here);
MessageQueue::enqueue_message(msg("weight=200 datadata"), Here);
MessageQueue::enqueue_message(msg("b"), Here);
// page 2
MessageQueue::enqueue_message(msg("weight=4"), Here);
MessageQueue::enqueue_message(msg("weight=200 datadata"), Here);
MessageQueue::enqueue_message(msg("c"), Here);
// page 3
MessageQueue::enqueue_message(msg("bigbig 1"), Here);
MessageQueue::enqueue_message(msg("bigbig 1 datadata"), Here);
// page 4
MessageQueue::enqueue_message(msg("bigbig 2"), Here);
MessageQueue::enqueue_message(msg("bigbig 2 datadata"), Here);
// page 5
MessageQueue::enqueue_message(msg("bigbig 3"), Here);
MessageQueue::enqueue_message(msg("bigbig 3 datadata"), Here);
// Double-check that exactly these pages exist.
assert_pages(&[0, 1, 2, 3, 4, 5]);
......@@ -385,7 +385,7 @@ fn reaping_overweight_fails_properly() {
// 3 stale now: can take something 4 pages in history.
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 1"), Here)]);
assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 1 datadata"), Here)]);
// Nothing reapable yet, because we haven't hit the stale limit.
for (o, i, _) in Pages::<Test>::iter() {
......@@ -394,7 +394,7 @@ fn reaping_overweight_fails_properly() {
assert_pages(&[0, 1, 2, 4, 5]);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 2"), Here)]);
assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 2 datadata"), Here)]);
assert_pages(&[0, 1, 2, 5]);
// First is now reapable as it is too far behind the first ready page (5).
......@@ -406,7 +406,7 @@ fn reaping_overweight_fails_properly() {
assert_pages(&[1, 2, 5]);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 3"), Here)]);
assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 3 datadata"), Here)]);
assert_noop!(MessageQueue::do_reap_page(&Here, 0), Error::<Test>::NoPage);
assert_noop!(MessageQueue::do_reap_page(&Here, 3), Error::<Test>::NoPage);
......@@ -1062,29 +1062,29 @@ fn footprint_on_swept_works() {
fn footprint_num_pages_works() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
MessageQueue::enqueue_message(msg("weight=2"), Here);
MessageQueue::enqueue_message(msg("weight=3"), Here);
MessageQueue::enqueue_message(msg("weight=200"), Here);
MessageQueue::enqueue_message(msg("weight=300"), Here);
assert_eq!(MessageQueue::footprint(Here), fp(2, 2, 2, 16));
assert_eq!(MessageQueue::footprint(Here), fp(1, 1, 2, 20));
// Mark the messages as overweight.
assert_eq!(MessageQueue::service_queues(1.into_weight()), 0.into_weight());
assert_eq!(System::events().len(), 2);
// `ready_pages` decreases but `page` count does not.
assert_eq!(MessageQueue::footprint(Here), fp(2, 0, 2, 16));
assert_eq!(MessageQueue::footprint(Here), fp(1, 0, 2, 20));
// Now execute the second message.
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(3.into_weight(), (Here, 1, 0))
<MessageQueue as ServiceQueues>::execute_overweight(300.into_weight(), (Here, 0, 1))
.unwrap(),
3.into_weight()
300.into_weight()
);
assert_eq!(MessageQueue::footprint(Here), fp(1, 0, 1, 8));
assert_eq!(MessageQueue::footprint(Here), fp(1, 0, 1, 10));
// And the first one:
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(2.into_weight(), (Here, 0, 0))
<MessageQueue as ServiceQueues>::execute_overweight(200.into_weight(), (Here, 0, 0))
.unwrap(),
2.into_weight()
200.into_weight()
);
assert_eq!(MessageQueue::footprint(Here), Default::default());
assert_eq!(MessageQueue::footprint(Here), fp(0, 0, 0, 0));
......@@ -1104,7 +1104,7 @@ fn execute_overweight_works() {
// Enqueue a message
let origin = MessageOrigin::Here;
MessageQueue::enqueue_message(msg("weight=6"), origin);
MessageQueue::enqueue_message(msg("weight=200"), origin);
// Load the current book
let book = BookStateFor::<Test>::get(origin);
assert_eq!(book.message_count, 1);
......@@ -1112,10 +1112,10 @@ fn execute_overweight_works() {
// Mark the message as permanently overweight.
assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight());
assert_eq!(QueueChanges::take(), vec![(origin, 1, 8)]);
assert_eq!(QueueChanges::take(), vec![(origin, 1, 10)]);
assert_last_event::<Test>(
Event::OverweightEnqueued {
id: blake2_256(b"weight=6"),
id: blake2_256(b"weight=200"),
origin: MessageOrigin::Here,
message_index: 0,
page_index: 0,
......@@ -1132,9 +1132,9 @@ fn execute_overweight_works() {
assert_eq!(Pages::<Test>::iter().count(), 1);
assert!(QueueChanges::take().is_empty());
let consumed =
<MessageQueue as ServiceQueues>::execute_overweight(7.into_weight(), (origin, 0, 0))
<MessageQueue as ServiceQueues>::execute_overweight(200.into_weight(), (origin, 0, 0))
.unwrap();
assert_eq!(consumed, 6.into_weight());
assert_eq!(consumed, 200.into_weight());
assert_eq!(QueueChanges::take(), vec![(origin, 0, 0)]);
// There is no message left in the book.
let book = BookStateFor::<Test>::get(origin);
......@@ -1162,7 +1162,7 @@ fn permanently_overweight_book_unknits() {
set_weight("service_queue_base", 1.into_weight());
set_weight("service_page_base_completion", 1.into_weight());
MessageQueue::enqueue_messages([msg("weight=9")].into_iter(), Here);
MessageQueue::enqueue_messages([msg("weight=200")].into_iter(), Here);
// It is the only ready book.
assert_ring(&[Here]);
......@@ -1170,7 +1170,7 @@ fn permanently_overweight_book_unknits() {
assert_eq!(MessageQueue::service_queues(8.into_weight()), 4.into_weight());
assert_last_event::<Test>(
Event::OverweightEnqueued {
id: blake2_256(b"weight=9"),
id: blake2_256(b"weight=200"),
origin: Here,
message_index: 0,
page_index: 0,
......@@ -1201,19 +1201,19 @@ fn permanently_overweight_book_unknits_multiple() {
set_weight("service_page_base_completion", 1.into_weight());
MessageQueue::enqueue_messages(
[msg("weight=1"), msg("weight=9"), msg("weight=9")].into_iter(),
[msg("weight=1"), msg("weight=200"), msg("weight=200")].into_iter(),
Here,
);
assert_ring(&[Here]);
// Process the first message.
assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight());
assert_eq!(num_overweight_enqueued_events(), 0);
assert_eq!(num_overweight_enqueued_events(), 1);
assert_eq!(MessagesProcessed::take().len(), 1);
// Book is still ready since it was not marked as overweight yet.
assert_ring(&[Here]);
assert_eq!(MessageQueue::service_queues(8.into_weight()), 5.into_weight());
assert_eq!(MessageQueue::service_queues(8.into_weight()), 4.into_weight());
assert_eq!(num_overweight_enqueued_events(), 2);
assert_eq!(MessagesProcessed::take().len(), 0);
// Now it is overweight.
......@@ -1566,12 +1566,12 @@ fn service_queues_suspend_works() {
fn execute_overweight_respects_suspension() {
build_and_execute::<Test>(|| {
let origin = MessageOrigin::Here;
MessageQueue::enqueue_message(msg("weight=5"), origin);
MessageQueue::enqueue_message(msg("weight=200"), origin);
// Mark the message as permanently overweight.
MessageQueue::service_queues(4.into_weight());
assert_last_event::<Test>(
Event::OverweightEnqueued {
id: blake2_256(b"weight=5"),
id: blake2_256(b"weight=200"),
origin,
message_index: 0,
page_index: 0,
......@@ -1598,9 +1598,9 @@ fn execute_overweight_respects_suspension() {
assert_last_event::<Test>(
Event::Processed {
id: blake2_256(b"weight=5").into(),
id: blake2_256(b"weight=200").into(),
origin,
weight_used: 5.into_weight(),
weight_used: 200.into_weight(),
success: true,
}
.into(),
......@@ -1768,7 +1768,7 @@ fn recursive_overweight_while_service_is_forbidden() {
// Check that the message was permanently overweight.
assert_last_event::<Test>(
Event::OverweightEnqueued {
id: blake2_256(b"weight=10"),
id: blake2_256(b"weight=200"),
origin: There,
message_index: 0,
page_index: 0,
......@@ -1786,13 +1786,13 @@ fn recursive_overweight_while_service_is_forbidden() {
Ok(())
}));
MessageQueue::enqueue_message(msg("weight=10"), There);
MessageQueue::enqueue_message(msg("weight=200"), There);
MessageQueue::enqueue_message(msg("callback=0"), Here);
// Mark it as permanently overweight.
MessageQueue::service_queues(5.into_weight());
assert_ok!(<MessageQueue as ServiceQueues>::execute_overweight(
10.into_weight(),
200.into_weight(),
(There, 0, 0)
));
});
......@@ -1812,7 +1812,7 @@ fn recursive_reap_page_is_forbidden() {
// Create 10 pages more than the stale limit.
let n = (MaxStale::get() + 10) as usize;
for _ in 0..n {
MessageQueue::enqueue_message(msg("weight=2"), Here);
MessageQueue::enqueue_message(msg("weight=200"), Here);
}
// Mark all pages as stale since their message is permanently overweight.
......@@ -1886,6 +1886,11 @@ fn process_enqueued_on_idle_requires_enough_weight() {
// Not enough weight to process on idle.
Pallet::<Test>::on_idle(1, Weight::from_parts(0, 0));
assert_eq!(MessagesProcessed::take(), vec![]);
assert!(!System::events().into_iter().any(|e| matches!(
e.event,
RuntimeEvent::MessageQueue(Event::<Test>::OverweightEnqueued { .. })
)));
})
}
......@@ -1923,12 +1928,12 @@ fn execute_overweight_keeps_stack_ov_message() {
// We need to create a mocked message that first reports insufficient weight, and then
// `StackLimitReached`:
IgnoreStackOvError::set(true);
MessageQueue::enqueue_message(msg("stacklimitreached"), Here);
MessageQueue::enqueue_message(msg("weight=200 stacklimitreached"), Here);
MessageQueue::service_queues(0.into_weight());
assert_last_event::<Test>(
Event::OverweightEnqueued {
id: blake2_256(b"stacklimitreached"),
id: blake2_256(b"weight=200 stacklimitreached"),
origin: MessageOrigin::Here,
message_index: 0,
page_index: 0,
......@@ -1952,7 +1957,7 @@ fn execute_overweight_keeps_stack_ov_message() {
);
assert_last_event::<Test>(
Event::ProcessingFailed {
id: blake2_256(b"stacklimitreached").into(),
id: blake2_256(b"weight=200 stacklimitreached").into(),
origin: MessageOrigin::Here,
error: ProcessMessageError::StackLimitReached,
}
......@@ -1964,16 +1969,16 @@ fn execute_overweight_keeps_stack_ov_message() {
// Now let's process it normally:
IgnoreStackOvError::set(true);
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(1.into_weight(), (Here, 0, 0))
<MessageQueue as ServiceQueues>::execute_overweight(200.into_weight(), (Here, 0, 0))
.unwrap(),
1.into_weight()
200.into_weight()
);
assert_last_event::<Test>(
Event::Processed {
id: blake2_256(b"stacklimitreached").into(),
id: blake2_256(b"weight=200 stacklimitreached").into(),
origin: MessageOrigin::Here,
weight_used: 1.into_weight(),
weight_used: 200.into_weight(),
success: true,
}
.into(),
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment