Unverified Commit c0a3e56b authored by Gavin Wood's avatar Gavin Wood Committed by GitHub
Browse files

Don't drop UMP queue items if weight exhausted (#3784)



* Requeue UMP queue items if weight exhausted

* Reduce complexity and remove Deque

* Formatting

* Formatting

* Avoid needless storage writes

* Test

* Formatting

* Docs and cleanup

* fmt

* Remove now irrelevant comment.

* Simplify `take_processed` by using `mem::take`

* Clean up & fmt: use `upward_message` directly.

* Grumbles

Co-authored-by: Shawn Tabrizi's avatarShawn Tabrizi <shawntabrizi@gmail.com>
Co-authored-by: Sergey Pepyakin's avatarSergei Shulepov <sergei@parity.io>
parent 04735b52
Pipeline #156170 passed with stages
in 42 minutes and 41 seconds
......@@ -18,12 +18,15 @@
use crate::{
configuration, disputes, dmp, hrmp, inclusion, initializer, paras, paras_inherent, scheduler,
session_info, shared, ump,
session_info, shared,
ump::{self, MessageId, UmpSink},
ParaId,
};
use frame_support::{parameter_types, traits::GenesisBuild};
use frame_support::{parameter_types, traits::GenesisBuild, weights::Weight};
use frame_support_test::TestRandomness;
use parity_scale_codec::Decode;
use primitives::v1::{
AuthorityDiscoveryId, Balance, BlockNumber, Header, SessionIndex, ValidatorIndex,
AuthorityDiscoveryId, Balance, BlockNumber, Header, SessionIndex, UpwardMessage, ValidatorIndex,
};
use sp_core::H256;
use sp_io::TestExternalities;
......@@ -128,7 +131,7 @@ parameter_types! {
impl crate::ump::Config for Test {
type Event = Event;
type UmpSink = crate::ump::mock_sink::MockUmpSink;
type UmpSink = TestUmpSink;
type FirstMessageFactorPercent = FirstMessageFactorPercent;
}
......@@ -232,6 +235,41 @@ pub fn availability_rewards() -> HashMap<ValidatorIndex, usize> {
AVAILABILITY_REWARDS.with(|r| r.borrow().clone())
}
std::thread_local! {
static PROCESSED: RefCell<Vec<(ParaId, UpwardMessage)>> = RefCell::new(vec![]);
}
/// Return which messages have been processed by `pocess_upward_message` and clear the buffer.
pub fn take_processed() -> Vec<(ParaId, UpwardMessage)> {
PROCESSED.with(|opt_hook| std::mem::take(&mut *opt_hook.borrow_mut()))
}
/// An implementation of a UMP sink that just records which messages were processed.
///
/// A message's weight is defined by the first 4 bytes of its data, which we decode into a
/// `u32`.
pub struct TestUmpSink;
impl UmpSink for TestUmpSink {
fn process_upward_message(
actual_origin: ParaId,
actual_msg: &[u8],
max_weight: Weight,
) -> Result<Weight, (MessageId, Weight)> {
let weight = match u32::decode(&mut &actual_msg[..]) {
Ok(w) => w as Weight,
Err(_) => return Ok(0), // same as the real `UmpSink`
};
if weight > max_weight {
let id = sp_io::hashing::blake2_256(actual_msg);
return Err((id, weight))
}
PROCESSED.with(|opt_hook| {
opt_hook.borrow_mut().push((actual_origin, actual_msg.to_owned()));
});
Ok(weight)
}
}
pub struct TestRewardValidators;
impl inclusion::RewardValidators for TestRewardValidators {
......
......@@ -21,11 +21,7 @@ use crate::{
use frame_support::pallet_prelude::*;
use primitives::v1::{Id as ParaId, UpwardMessage};
use sp_std::{
collections::{btree_map::BTreeMap, vec_deque::VecDeque},
convert::TryFrom,
fmt,
marker::PhantomData,
prelude::*,
collections::btree_map::BTreeMap, convert::TryFrom, fmt, marker::PhantomData, prelude::*,
};
use xcm::latest::Outcome;
......@@ -211,7 +207,7 @@ pub mod pallet {
/// The messages are processed in FIFO order.
#[pallet::storage]
pub type RelayDispatchQueues<T: Config> =
StorageMap<_, Twox64Concat, ParaId, VecDeque<UpwardMessage>, ValueQuery>;
StorageMap<_, Twox64Concat, ParaId, Vec<UpwardMessage>, ValueQuery>;
/// Size of the dispatch queues. Caches sizes of the queues in `RelayDispatchQueue`.
///
......@@ -407,23 +403,27 @@ impl<T: Config> Pallet<T> {
config.ump_service_total_weight - weight_used
};
// dequeue the next message from the queue of the dispatchee
let (upward_message, became_empty) = queue_cache.dequeue::<T>(dispatchee);
if let Some(upward_message) = upward_message {
match T::UmpSink::process_upward_message(
dispatchee,
&upward_message[..],
max_weight,
) {
Ok(used) => weight_used += used,
// attempt to process the next message from the queue of the dispatchee; if not beyond
// our remaining weight limit, then consume it.
let maybe_next = queue_cache.peek_front::<T>(dispatchee);
let became_empty = if let Some(upward_message) = maybe_next {
match T::UmpSink::process_upward_message(dispatchee, upward_message, max_weight) {
Ok(used) => {
weight_used += used;
queue_cache.consume_front::<T>(dispatchee)
},
Err((id, required)) => {
// we process messages in order and don't drop them if we run out of weight, so need to break
// here.
// we process messages in order and don't drop them if we run out of weight,
// so need to break here without calling `consume_front`.
Self::deposit_event(Event::WeightExhausted(id, max_weight, required));
break
},
}
}
} else {
// this should never happen, since the cursor should never point to an empty queue.
// it is resolved harmlessly here anyway.
true
};
if became_empty {
// the queue is empty now - this para doesn't need attention anymore.
......@@ -442,8 +442,8 @@ impl<T: Config> Pallet<T> {
/// To avoid constant fetching, deserializing and serialization the queues are cached.
///
/// After an item dequeued from a queue for the first time, the queue is stored in this struct rather
/// than being serialized and persisted.
/// After an item dequeued from a queue for the first time, the queue is stored in this struct
/// rather than being serialized and persisted.
///
/// This implementation works best when:
///
......@@ -461,9 +461,10 @@ impl<T: Config> Pallet<T> {
struct QueueCache(BTreeMap<ParaId, QueueCacheEntry>);
struct QueueCacheEntry {
queue: VecDeque<UpwardMessage>,
count: u32,
queue: Vec<UpwardMessage>,
total_size: u32,
consumed_count: usize,
consumed_size: usize,
}
impl QueueCache {
......@@ -471,26 +472,35 @@ impl QueueCache {
Self(BTreeMap::new())
}
/// Dequeues one item from the upward message queue of the given para.
fn ensure_cached<T: Config>(&mut self, para: ParaId) -> &mut QueueCacheEntry {
self.0.entry(para).or_insert_with(|| {
let queue = RelayDispatchQueues::<T>::get(&para);
let (_, total_size) = RelayDispatchQueueSize::<T>::get(&para);
QueueCacheEntry { queue, total_size, consumed_count: 0, consumed_size: 0 }
})
}
/// Returns the message at the front of `para`'s queue, or `None` if the queue is empty.
///
/// Returns `(upward_message, became_empty)`, where
/// Does not mutate the queue.
fn peek_front<T: Config>(&mut self, para: ParaId) -> Option<&UpwardMessage> {
let entry = self.ensure_cached::<T>(para);
entry.queue.get(entry.consumed_count)
}
/// Attempts to remove one message from the front of `para`'s queue. If the queue is empty, then
/// does nothing.
///
/// - `upward_message` a dequeued message or `None` if the queue _was_ empty.
/// - `became_empty` is true if the queue _became_ empty.
fn dequeue<T: Config>(&mut self, para: ParaId) -> (Option<UpwardMessage>, bool) {
let cache_entry = self.0.entry(para).or_insert_with(|| {
let queue = <Pallet<T> as Store>::RelayDispatchQueues::get(&para);
let (count, total_size) = <Pallet<T> as Store>::RelayDispatchQueueSize::get(&para);
QueueCacheEntry { queue, count, total_size }
});
let upward_message = cache_entry.queue.pop_front();
if let Some(ref msg) = upward_message {
cache_entry.count -= 1;
cache_entry.total_size -= msg.len() as u32;
/// Returns `true` iff there are no more messages in the queue after the removal attempt.
fn consume_front<T: Config>(&mut self, para: ParaId) -> bool {
let cache_entry = self.ensure_cached::<T>(para);
let upward_message = cache_entry.queue.get(cache_entry.consumed_count);
if let Some(msg) = upward_message {
cache_entry.consumed_count += 1;
cache_entry.consumed_size += msg.len();
}
let became_empty = cache_entry.queue.is_empty();
(upward_message, became_empty)
cache_entry.consumed_count >= cache_entry.queue.len()
}
/// Flushes the updated queues into the storage.
......@@ -498,14 +508,16 @@ impl QueueCache {
// NOTE we use an explicit method here instead of Drop impl because it has unwanted semantics
// within runtime. It is dangerous to use because of double-panics and flushing on a panic
// is not necessary as well.
for (para, QueueCacheEntry { queue, count, total_size }) in self.0 {
if queue.is_empty() {
for (para, entry) in self.0 {
if entry.consumed_count >= entry.queue.len() {
// remove the entries altogether.
<Pallet<T> as Store>::RelayDispatchQueues::remove(&para);
<Pallet<T> as Store>::RelayDispatchQueueSize::remove(&para);
} else {
<Pallet<T> as Store>::RelayDispatchQueues::insert(&para, queue);
<Pallet<T> as Store>::RelayDispatchQueueSize::insert(&para, (count, total_size));
RelayDispatchQueues::<T>::remove(&para);
RelayDispatchQueueSize::<T>::remove(&para);
} else if entry.consumed_count > 0 {
RelayDispatchQueues::<T>::insert(&para, &entry.queue[entry.consumed_count..]);
let count = (entry.queue.len() - entry.consumed_count) as u32;
let size = entry.total_size.saturating_sub(entry.consumed_size as u32);
RelayDispatchQueueSize::<T>::insert(&para, (count, size));
}
}
}
......@@ -586,137 +598,10 @@ impl NeedsDispatchCursor {
}
#[cfg(test)]
pub(crate) mod mock_sink {
//! An implementation of a mock UMP sink that allows attaching a probe for mocking the weights
//! and checking the sent messages.
//!
//! A default behavior of the UMP sink is to ignore an incoming message and return 0 weight.
//!
//! A probe can be attached to the mock UMP sink. When attached, the mock sink would consult the
//! probe to check whether the received message was expected and what weight it should return.
//!
//! There are two rules on how to use a probe:
//!
//! 1. There can be only one active probe at a time. Creation of another probe while there is
//! already an active one leads to a panic. The probe is scoped to a thread where it was created.
//!
//! 2. All messages expected by the probe must be received by the time of dropping it. Unreceived
//! messages will lead to a panic while dropping a probe.
use super::{MessageId, ParaId, UmpSink, UpwardMessage};
pub(crate) mod tests {
use super::*;
use crate::mock::{new_test_ext, take_processed, Configuration, MockGenesisConfig, Ump};
use frame_support::weights::Weight;
use std::{cell::RefCell, collections::vec_deque::VecDeque};
#[derive(Debug)]
struct UmpExpectation {
expected_origin: ParaId,
expected_msg: UpwardMessage,
mock_weight: Weight,
}
std::thread_local! {
// `Some` here indicates that there is an active probe.
static HOOK: RefCell<Option<VecDeque<UmpExpectation>>> = RefCell::new(None);
}
pub struct MockUmpSink;
impl UmpSink for MockUmpSink {
fn process_upward_message(
actual_origin: ParaId,
actual_msg: &[u8],
_max_weight: Weight,
) -> Result<Weight, (MessageId, Weight)> {
Ok(HOOK
.with(|opt_hook| {
opt_hook.borrow_mut().as_mut().map(|hook| {
let UmpExpectation { expected_origin, expected_msg, mock_weight } =
match hook.pop_front() {
Some(expectation) => expectation,
None => {
panic!(
"The probe is active but didn't expect the message:\n\n\t{:?}.",
actual_msg,
);
},
};
assert_eq!(expected_origin, actual_origin);
assert_eq!(expected_msg, &actual_msg[..]);
mock_weight
})
})
.unwrap_or(0))
}
}
pub struct Probe {
_private: (),
}
impl Probe {
pub fn new() -> Self {
HOOK.with(|opt_hook| {
let prev = opt_hook.borrow_mut().replace(VecDeque::default());
// that can trigger if there were two probes were created during one session which
// is may be a bit strict, but may save time figuring out what's wrong.
// if you land here and you do need the two probes in one session consider
// dropping the the existing probe explicitly.
assert!(prev.is_none());
});
Self { _private: () }
}
/// Add an expected message.
///
/// The enqueued messages are processed in FIFO order.
pub fn assert_msg(
&mut self,
expected_origin: ParaId,
expected_msg: UpwardMessage,
mock_weight: Weight,
) {
HOOK.with(|opt_hook| {
opt_hook.borrow_mut().as_mut().unwrap().push_back(UmpExpectation {
expected_origin,
expected_msg,
mock_weight,
})
});
}
}
impl Drop for Probe {
fn drop(&mut self) {
let _ = HOOK.try_with(|opt_hook| {
let prev = opt_hook.borrow_mut().take().expect(
"this probe was created and hasn't been yet destroyed;
the probe cannot be replaced;
there is only one probe at a time allowed;
thus it cannot be `None`;
qed",
);
if !prev.is_empty() {
// some messages are left unchecked. We should notify the developer about this.
// however, we do so only if the thread doesn't panic already. Otherwise, the
// developer would get a SIGILL or SIGABRT without a meaningful error message.
if !std::thread::panicking() {
panic!(
"the probe is dropped and not all expected messages arrived: {:?}",
prev
);
}
}
});
// an `Err` here signals here that the thread local was already destroyed.
}
}
}
#[cfg(test)]
mod tests {
use super::{mock_sink::Probe, *};
use crate::mock::{new_test_ext, Configuration, MockGenesisConfig, Ump};
use std::collections::HashSet;
struct GenesisConfigBuilder {
......@@ -826,15 +711,12 @@ mod tests {
#[test]
fn dispatch_single_message() {
let a = ParaId::from(228);
let msg = vec![1, 2, 3];
let msg = 1000u32.encode();
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
let mut probe = Probe::new();
probe.assert_msg(a, msg.clone(), 0);
queue_upward_msg(a, msg);
queue_upward_msg(a, msg.clone());
Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![(a, msg)]);
assert_storage_consistency_exhaustive();
});
......@@ -846,11 +728,11 @@ mod tests {
let c = ParaId::from(228);
let q = ParaId::from(911);
let a_msg_1 = vec![1, 2, 3];
let a_msg_2 = vec![3, 2, 1];
let c_msg_1 = vec![4, 5, 6];
let c_msg_2 = vec![9, 8, 7];
let q_msg = b"we are Q".to_vec();
let a_msg_1 = (200u32, "a_msg_1").encode();
let a_msg_2 = (100u32, "a_msg_2").encode();
let c_msg_1 = (300u32, "c_msg_1").encode();
let c_msg_2 = (100u32, "c_msg_2").encode();
let q_msg = (500u32, "q_msg").encode();
new_test_ext(
GenesisConfigBuilder { ump_service_total_weight: 500, ..Default::default() }.build(),
......@@ -864,52 +746,60 @@ mod tests {
assert_storage_consistency_exhaustive();
// we expect only two first messages to fit in the first iteration.
{
let mut probe = Probe::new();
probe.assert_msg(a, a_msg_1.clone(), 300);
probe.assert_msg(c, c_msg_1.clone(), 300);
Ump::process_pending_upward_messages();
assert_storage_consistency_exhaustive();
drop(probe);
}
Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![(a, a_msg_1), (c, c_msg_1)]);
assert_storage_consistency_exhaustive();
queue_upward_msg(c, c_msg_2.clone());
assert_storage_consistency_exhaustive();
// second iteration should process the second message.
{
let mut probe = Probe::new();
Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![(q, q_msg)]);
assert_storage_consistency_exhaustive();
probe.assert_msg(q, q_msg.clone(), 500);
Ump::process_pending_upward_messages();
assert_storage_consistency_exhaustive();
// 3rd iteration.
Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![(a, a_msg_2), (c, c_msg_2)]);
assert_storage_consistency_exhaustive();
drop(probe);
}
// finally, make sure that the queue is empty.
Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![]);
assert_storage_consistency_exhaustive();
});
}
// 3rd iteration.
{
let mut probe = Probe::new();
#[test]
fn dispatch_keeps_message_after_weight_exhausted() {
let a = ParaId::from(128);
probe.assert_msg(a, a_msg_2.clone(), 100);
probe.assert_msg(c, c_msg_2.clone(), 100);
Ump::process_pending_upward_messages();
assert_storage_consistency_exhaustive();
let a_msg_1 = (300u32, "a_msg_1").encode();
let a_msg_2 = (300u32, "a_msg_2").encode();
drop(probe);
}
new_test_ext(
GenesisConfigBuilder { ump_service_total_weight: 500, ..Default::default() }.build(),
)
.execute_with(|| {
queue_upward_msg(a, a_msg_1.clone());
queue_upward_msg(a, a_msg_2.clone());
// finally, make sure that the queue is empty.
{
let probe = Probe::new();
assert_storage_consistency_exhaustive();
Ump::process_pending_upward_messages();
assert_storage_consistency_exhaustive();
// we expect only one message to fit in the first iteration.
Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![(a, a_msg_1)]);
assert_storage_consistency_exhaustive();
drop(probe);
}
// second iteration should process the remaining message.
Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![(a, a_msg_2)]);
assert_storage_consistency_exhaustive();
// finally, make sure that the queue is empty.
Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![]);
assert_storage_consistency_exhaustive();
});
}
......@@ -918,9 +808,9 @@ mod tests {
let a = ParaId::from(1991);
let b = ParaId::from(1999);
let a_msg_1 = vec![1, 2, 3];
let a_msg_2 = vec![3, 2, 1];
let b_msg_1 = vec![4, 5, 6];
let a_msg_1 = (300u32, "a_msg_1").encode();
let a_msg_2 = (300u32, "a_msg_2").encode();
let b_msg_1 = (300u32, "b_msg_1").encode();
new_test_ext(
GenesisConfigBuilder { ump_service_total_weight: 900, ..Default::default() }.build(),
......@@ -935,18 +825,8 @@ mod tests {
queue_upward_msg(a, a_msg_1.clone());
queue_upward_msg(a, a_msg_2.clone());
queue_upward_msg(b, b_msg_1.clone());
{
let mut probe = Probe::new();
probe.assert_msg(a, a_msg_1.clone(), 300);
probe.assert_msg(b, b_msg_1.clone(), 300);
probe.assert_msg(a, a_msg_2.clone(), 300);
Ump::process_pending_upward_messages();
drop(probe);
}
Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![(a, a_msg_1), (b, b_msg_1), (a, a_msg_2)]);
});
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment