Unverified Commit 093af455 authored by Gavin Wood's avatar Gavin Wood Committed by GitHub
Browse files

Fix teleport accounting and add some events (#3167)

* Stuff to help inspect the DMP activity

* Fix teleport accounting

* Fixes

* Fixes

* Fixes

* Fixes
parent b63e411d
Pipeline #140548 passed with stages
in 39 minutes and 59 seconds
...@@ -137,7 +137,7 @@ fn default_parachains_host_configuration() -> polkadot_runtime_parachains::confi ...@@ -137,7 +137,7 @@ fn default_parachains_host_configuration() -> polkadot_runtime_parachains::confi
// Same as `4 * frame_support::weights::WEIGHT_PER_MILLIS`. We don't bother with // Same as `4 * frame_support::weights::WEIGHT_PER_MILLIS`. We don't bother with
// an import since that's a made up number and should be replaced with a constant // an import since that's a made up number and should be replaced with a constant
// obtained by benchmarking anyway. // obtained by benchmarking anyway.
preferred_dispatchable_upward_messages_step_weight: 4 * 1_000_000_000, ump_service_total_weight: 4 * 1_000_000_000,
max_upward_message_size: 1024 * 1024, max_upward_message_size: 1024 * 1024,
max_upward_message_num_per_candidate: 5, max_upward_message_num_per_candidate: 5,
hrmp_open_request_ttl: 5, hrmp_open_request_ttl: 5,
......
...@@ -73,7 +73,7 @@ All failed checks should lead to an unrecoverable error making the block invalid ...@@ -73,7 +73,7 @@ All failed checks should lead to an unrecoverable error making the block invalid
1. If the receipt contains a code upgrade, Call `Paras::schedule_code_upgrade(para_id, code, relay_parent_number + config.validationl_upgrade_delay)`. 1. If the receipt contains a code upgrade, Call `Paras::schedule_code_upgrade(para_id, code, relay_parent_number + config.validationl_upgrade_delay)`.
> TODO: Note that this is safe as long as we never enact candidates where the relay parent is across a session boundary. In that case, which we should be careful to avoid with contextual execution, the configuration might have changed and the para may de-sync from the host's understanding of it. > TODO: Note that this is safe as long as we never enact candidates where the relay parent is across a session boundary. In that case, which we should be careful to avoid with contextual execution, the configuration might have changed and the para may de-sync from the host's understanding of it.
1. Reward all backing validators of each candidate, contained within the `backers` field. 1. Reward all backing validators of each candidate, contained within the `backers` field.
1. call `Ump::enact_upward_messages` for each backed candidate, using the [`UpwardMessage`s](../types/messages.md#upward-message) from the [`CandidateCommitments`](../types/candidate.md#candidate-commitments). 1. call `Ump::receive_upward_messages` for each backed candidate, using the [`UpwardMessage`s](../types/messages.md#upward-message) from the [`CandidateCommitments`](../types/candidate.md#candidate-commitments).
1. call `Dmp::prune_dmq` with the para id of the candidate and the candidate's `processed_downward_messages`. 1. call `Dmp::prune_dmq` with the para id of the candidate and the candidate's `processed_downward_messages`.
1. call `Hrmp::prune_hrmp` with the para id of the candiate and the candidate's `hrmp_watermark`. 1. call `Hrmp::prune_hrmp` with the para id of the candiate and the candidate's `hrmp_watermark`.
1. call `Hrmp::queue_outbound_hrmp` with the para id of the candidate and the list of horizontal messages taken from the commitment, 1. call `Hrmp::queue_outbound_hrmp` with the para id of the candidate and the list of horizontal messages taken from the commitment,
......
...@@ -56,14 +56,14 @@ Candidate Acceptance Function: ...@@ -56,14 +56,14 @@ Candidate Acceptance Function:
Candidate Enactment: Candidate Enactment:
* `enact_upward_messages(P: ParaId, Vec<UpwardMessage>)`: * `receive_upward_messages(P: ParaId, Vec<UpwardMessage>)`:
1. Process each upward message `M` in order: 1. Process each upward message `M` in order:
1. Append the message to `RelayDispatchQueues` for `P` 1. Append the message to `RelayDispatchQueues` for `P`
1. Increment the size and the count in `RelayDispatchQueueSize` for `P`. 1. Increment the size and the count in `RelayDispatchQueueSize` for `P`.
1. Ensure that `P` is present in `NeedsDispatch`. 1. Ensure that `P` is present in `NeedsDispatch`.
The following routine is meant to execute pending entries in upward message queues. This function doesn't fail, even if The following routine is meant to execute pending entries in upward message queues. This function doesn't fail, even if
dispatcing any of individual upward messages returns an error. dispatching any of individual upward messages returns an error.
`process_pending_upward_messages()`: `process_pending_upward_messages()`:
1. Initialize a cumulative weight counter `T` to 0 1. Initialize a cumulative weight counter `T` to 0
...@@ -71,7 +71,7 @@ dispatcing any of individual upward messages returns an error. ...@@ -71,7 +71,7 @@ dispatcing any of individual upward messages returns an error.
1. Dequeue the first upward message `D` from `RelayDispatchQueues` for `P` 1. Dequeue the first upward message `D` from `RelayDispatchQueues` for `P`
1. Decrement the size of the message from `RelayDispatchQueueSize` for `P` 1. Decrement the size of the message from `RelayDispatchQueueSize` for `P`
1. Delegate processing of the message to the runtime. The weight consumed is added to `T`. 1. Delegate processing of the message to the runtime. The weight consumed is added to `T`.
1. If `T >= config.preferred_dispatchable_upward_messages_step_weight`, set `NextDispatchRoundStartWith` to `P` and finish processing. 1. If `T >= config.ump_service_total_weight`, set `NextDispatchRoundStartWith` to `P` and finish processing.
1. If `RelayDispatchQueues` for `P` became empty, remove `P` from `NeedsDispatch`. 1. If `RelayDispatchQueues` for `P` became empty, remove `P` from `NeedsDispatch`.
1. If `NeedsDispatch` became empty then finish processing and set `NextDispatchRoundStartWith` to `None`. 1. If `NeedsDispatch` became empty then finish processing and set `NextDispatchRoundStartWith` to `None`.
> NOTE that in practice we would need to approach the weight calculation more thoroughly, i.e. incorporate all operations > NOTE that in practice we would need to approach the weight calculation more thoroughly, i.e. incorporate all operations
......
...@@ -69,7 +69,7 @@ struct HostConfiguration { ...@@ -69,7 +69,7 @@ struct HostConfiguration {
/// stage. /// stage.
/// ///
/// NOTE that this is a soft limit and could be exceeded. /// NOTE that this is a soft limit and could be exceeded.
pub preferred_dispatchable_upward_messages_step_weight: Weight, pub ump_service_total_weight: Weight,
/// The maximum size of an upward message that can be sent by a candidate. /// The maximum size of an upward message that can be sent by a candidate.
/// ///
/// This parameter affects the upper bound of size of `CandidateCommitments`. /// This parameter affects the upper bound of size of `CandidateCommitments`.
......
...@@ -1069,7 +1069,8 @@ parameter_types! { ...@@ -1069,7 +1069,8 @@ parameter_types! {
} }
impl parachains_ump::Config for Runtime { impl parachains_ump::Config for Runtime {
type UmpSink = crate::parachains_ump::XcmSink<XcmExecutor<XcmConfig>, Call>; type Event = Event;
type UmpSink = crate::parachains_ump::XcmSink<XcmExecutor<XcmConfig>, Runtime>;
type FirstMessageFactorPercent = FirstMessageFactorPercent; type FirstMessageFactorPercent = FirstMessageFactorPercent;
} }
...@@ -1455,7 +1456,7 @@ construct_runtime! { ...@@ -1455,7 +1456,7 @@ construct_runtime! {
Paras: parachains_paras::{Pallet, Call, Storage, Event, Config<T>} = 56, Paras: parachains_paras::{Pallet, Call, Storage, Event, Config<T>} = 56,
ParasInitializer: parachains_initializer::{Pallet, Call, Storage} = 57, ParasInitializer: parachains_initializer::{Pallet, Call, Storage} = 57,
ParasDmp: parachains_dmp::{Pallet, Call, Storage} = 58, ParasDmp: parachains_dmp::{Pallet, Call, Storage} = 58,
ParasUmp: parachains_ump::{Pallet, Call, Storage} = 59, ParasUmp: parachains_ump::{Pallet, Call, Storage, Event} = 59,
ParasHrmp: parachains_hrmp::{Pallet, Call, Storage, Event} = 60, ParasHrmp: parachains_hrmp::{Pallet, Call, Storage, Event} = 60,
ParasSessionInfo: parachains_session_info::{Pallet, Call, Storage} = 61, ParasSessionInfo: parachains_session_info::{Pallet, Call, Storage} = 61,
......
...@@ -90,7 +90,7 @@ pub struct HostConfiguration<BlockNumber> { ...@@ -90,7 +90,7 @@ pub struct HostConfiguration<BlockNumber> {
/// stage. /// stage.
/// ///
/// NOTE that this is a soft limit and could be exceeded. /// NOTE that this is a soft limit and could be exceeded.
pub preferred_dispatchable_upward_messages_step_weight: Weight, pub ump_service_total_weight: Weight,
/// The maximum number of outbound HRMP channels a parachain is allowed to open. /// The maximum number of outbound HRMP channels a parachain is allowed to open.
pub hrmp_max_parachain_outbound_channels: u32, pub hrmp_max_parachain_outbound_channels: u32,
/// The maximum number of outbound HRMP channels a parathread is allowed to open. /// The maximum number of outbound HRMP channels a parathread is allowed to open.
...@@ -203,7 +203,7 @@ impl<BlockNumber: Default + From<u32>> Default for HostConfiguration<BlockNumber ...@@ -203,7 +203,7 @@ impl<BlockNumber: Default + From<u32>> Default for HostConfiguration<BlockNumber
max_upward_queue_count: Default::default(), max_upward_queue_count: Default::default(),
max_upward_queue_size: Default::default(), max_upward_queue_size: Default::default(),
max_downward_message_size: Default::default(), max_downward_message_size: Default::default(),
preferred_dispatchable_upward_messages_step_weight: Default::default(), ump_service_total_weight: Default::default(),
max_upward_message_size: Default::default(), max_upward_message_size: Default::default(),
max_upward_message_num_per_candidate: Default::default(), max_upward_message_num_per_candidate: Default::default(),
hrmp_open_request_ttl: Default::default(), hrmp_open_request_ttl: Default::default(),
...@@ -555,10 +555,10 @@ decl_module! { ...@@ -555,10 +555,10 @@ decl_module! {
/// Sets the soft limit for the phase of dispatching dispatchable upward messages. /// Sets the soft limit for the phase of dispatching dispatchable upward messages.
#[weight = (1_000, DispatchClass::Operational)] #[weight = (1_000, DispatchClass::Operational)]
pub fn set_preferred_dispatchable_upward_messages_step_weight(origin, new: Weight) -> DispatchResult { pub fn set_ump_service_total_weight(origin, new: Weight) -> DispatchResult {
ensure_root(origin)?; ensure_root(origin)?;
Self::update_config_member(|config| { Self::update_config_member(|config| {
sp_std::mem::replace(&mut config.preferred_dispatchable_upward_messages_step_weight, new) != new sp_std::mem::replace(&mut config.ump_service_total_weight, new) != new
}); });
Ok(()) Ok(())
} }
...@@ -806,7 +806,7 @@ mod tests { ...@@ -806,7 +806,7 @@ mod tests {
max_upward_queue_count: 1337, max_upward_queue_count: 1337,
max_upward_queue_size: 228, max_upward_queue_size: 228,
max_downward_message_size: 2048, max_downward_message_size: 2048,
preferred_dispatchable_upward_messages_step_weight: 20000, ump_service_total_weight: 20000,
max_upward_message_size: 448, max_upward_message_size: 448,
max_upward_message_num_per_candidate: 5, max_upward_message_num_per_candidate: 5,
hrmp_open_request_ttl: 1312, hrmp_open_request_ttl: 1312,
...@@ -902,8 +902,8 @@ mod tests { ...@@ -902,8 +902,8 @@ mod tests {
Configuration::set_max_downward_message_size( Configuration::set_max_downward_message_size(
Origin::root(), new_config.max_downward_message_size, Origin::root(), new_config.max_downward_message_size,
).unwrap(); ).unwrap();
Configuration::set_preferred_dispatchable_upward_messages_step_weight( Configuration::set_ump_service_total_weight(
Origin::root(), new_config.preferred_dispatchable_upward_messages_step_weight, Origin::root(), new_config.ump_service_total_weight,
).unwrap(); ).unwrap();
Configuration::set_max_upward_message_size( Configuration::set_max_upward_message_size(
Origin::root(), new_config.max_upward_message_size, Origin::root(), new_config.max_upward_message_size,
......
...@@ -691,7 +691,7 @@ impl<T: Config> Module<T> { ...@@ -691,7 +691,7 @@ impl<T: Config> Module<T> {
receipt.descriptor.para_id, receipt.descriptor.para_id,
commitments.processed_downward_messages, commitments.processed_downward_messages,
); );
weight += <ump::Module<T>>::enact_upward_messages( weight += <ump::Module<T>>::receive_upward_messages(
receipt.descriptor.para_id, receipt.descriptor.para_id,
commitments.upward_messages, commitments.upward_messages,
); );
......
...@@ -49,7 +49,7 @@ frame_support::construct_runtime!( ...@@ -49,7 +49,7 @@ frame_support::construct_runtime!(
Scheduler: scheduler::{Pallet, Call, Storage}, Scheduler: scheduler::{Pallet, Call, Storage},
Initializer: initializer::{Pallet, Call, Storage}, Initializer: initializer::{Pallet, Call, Storage},
Dmp: dmp::{Pallet, Call, Storage}, Dmp: dmp::{Pallet, Call, Storage},
Ump: ump::{Pallet, Call, Storage}, Ump: ump::{Pallet, Call, Storage, Event},
Hrmp: hrmp::{Pallet, Call, Storage, Event}, Hrmp: hrmp::{Pallet, Call, Storage, Event},
SessionInfo: session_info::{Pallet, Call, Storage}, SessionInfo: session_info::{Pallet, Call, Storage},
} }
...@@ -122,6 +122,7 @@ parameter_types! { ...@@ -122,6 +122,7 @@ parameter_types! {
} }
impl crate::ump::Config for Test { impl crate::ump::Config for Test {
type Event = Event;
type UmpSink = crate::ump::mock_sink::MockUmpSink; type UmpSink = crate::ump::mock_sink::MockUmpSink;
type FirstMessageFactorPercent = FirstMessageFactorPercent; type FirstMessageFactorPercent = FirstMessageFactorPercent;
} }
......
...@@ -18,12 +18,11 @@ use crate::{ ...@@ -18,12 +18,11 @@ use crate::{
configuration::{self, HostConfiguration}, configuration::{self, HostConfiguration},
initializer, initializer,
}; };
use sp_std::{prelude::*, fmt, marker::PhantomData}; use sp_std::{prelude::*, fmt, marker::PhantomData, convert::TryFrom};
use sp_std::collections::{btree_map::BTreeMap, vec_deque::VecDeque}; use sp_std::collections::{btree_map::BTreeMap, vec_deque::VecDeque};
use frame_support::{decl_module, decl_storage, StorageMap, StorageValue, weights::Weight, traits::Get}; use frame_support::{decl_module, decl_event, decl_storage, StorageMap, StorageValue, weights::Weight, traits::Get};
use primitives::v1::{Id as ParaId, UpwardMessage}; use primitives::v1::{Id as ParaId, UpwardMessage};
use xcm::v0::Outcome;
const LOG_TARGET: &str = "runtime::ump-sink";
/// All upward messages coming from parachains will be funneled into an implementation of this trait. /// All upward messages coming from parachains will be funneled into an implementation of this trait.
/// ///
...@@ -45,45 +44,56 @@ pub trait UmpSink { ...@@ -45,45 +44,56 @@ pub trait UmpSink {
/// it did not begin processing a message since it would otherwise exceed `max_weight`. /// it did not begin processing a message since it would otherwise exceed `max_weight`.
/// ///
/// See the trait docs for more details. /// See the trait docs for more details.
fn process_upward_message(origin: ParaId, msg: &[u8], max_weight: Weight) -> Option<Weight>; fn process_upward_message(origin: ParaId, msg: &[u8], max_weight: Weight) -> Result<Weight, (MessageId, Weight)>;
} }
/// An implementation of a sink that just swallows the message without consuming any weight. Returns /// An implementation of a sink that just swallows the message without consuming any weight. Returns
/// `Some(0)` indicating that no messages existed for it to process. /// `Some(0)` indicating that no messages existed for it to process.
impl UmpSink for () { impl UmpSink for () {
fn process_upward_message(_: ParaId, _: &[u8], _: Weight) -> Option<Weight> { fn process_upward_message(_: ParaId, _: &[u8], _: Weight) -> Result<Weight, (MessageId, Weight)> {
Some(0) Ok(0)
} }
} }
/// Simple type used to identify messages for the purpose of reporting events. Secure if and only
/// if the message content is unique.
pub type MessageId = [u8; 32];
/// A specific implementation of a UmpSink where messages are in the XCM format /// A specific implementation of a UmpSink where messages are in the XCM format
/// and will be forwarded to the XCM Executor. /// and will be forwarded to the XCM Executor.
pub struct XcmSink<XcmExecutor, Call>(PhantomData<(XcmExecutor, Call)>); pub struct XcmSink<XcmExecutor, Config>(PhantomData<(XcmExecutor, Config)>);
impl<XcmExecutor: xcm::v0::ExecuteXcm<Call>, Call> UmpSink for XcmSink<XcmExecutor, Call> { impl<XcmExecutor: xcm::v0::ExecuteXcm<C::Call>, C: Config> UmpSink for XcmSink<XcmExecutor, C> {
fn process_upward_message(origin: ParaId, mut msg: &[u8], max_weight: Weight) -> Option<Weight> { fn process_upward_message(origin: ParaId, data: &[u8], max_weight: Weight) -> Result<Weight, (MessageId, Weight)> {
use parity_scale_codec::Decode; use parity_scale_codec::Decode;
use xcm::VersionedXcm; use xcm::VersionedXcm;
use xcm::v0::{Junction, MultiLocation, Outcome, Error as XcmError}; use xcm::v0::{Xcm, Junction, MultiLocation, Error as XcmError};
if let Ok(versioned_xcm_message) = VersionedXcm::decode(&mut msg) { let id = sp_io::hashing::blake2_256(&data[..]);
match versioned_xcm_message { let maybe_msg = VersionedXcm::<C::Call>::decode(&mut &data[..])
VersionedXcm::V0(xcm_message) => { .map(Xcm::<C::Call>::try_from);
let xcm_junction: Junction = Junction::Parachain(origin.into()); match maybe_msg {
let xcm_location: MultiLocation = xcm_junction.into(); Err(_) => {
match XcmExecutor::execute_xcm(xcm_location, xcm_message, max_weight) { Module::<C>::deposit_event(Event::InvalidFormat(id));
Outcome::Complete(w) | Outcome::Incomplete(w, _) => Some(w), Ok(0)
Outcome::Error(XcmError::WeightLimitReached(..)) => None, },
Outcome::Error(_) => Some(0), Ok(Err(())) => {
Module::<C>::deposit_event(Event::UnsupportedVersion(id));
Ok(0)
},
Ok(Ok(xcm_message)) => {
let xcm_junction: Junction = Junction::Parachain(origin.into());
let xcm_location: MultiLocation = xcm_junction.into();
let outcome = XcmExecutor::execute_xcm(xcm_location, xcm_message, max_weight);
match outcome {
Outcome::Error(XcmError::WeightLimitReached(required)) => Err((id, required)),
outcome => {
let weight_used = outcome.weight_used();
Module::<C>::deposit_event(Event::ExecutedUpward(id, outcome));
Ok(weight_used)
} }
} }
} }
} else {
log::error!(
target: LOG_TARGET,
"Failed to decode versioned XCM from upward message.",
);
None
} }
} }
} }
...@@ -142,10 +152,18 @@ impl fmt::Debug for AcceptanceCheckErr { ...@@ -142,10 +152,18 @@ impl fmt::Debug for AcceptanceCheckErr {
} }
pub trait Config: frame_system::Config + configuration::Config { pub trait Config: frame_system::Config + configuration::Config {
/// The aggregate event.
type Event: From<Event> + Into<<Self as frame_system::Config>::Event>;
/// A place where all received upward messages are funneled. /// A place where all received upward messages are funneled.
type UmpSink: UmpSink; type UmpSink: UmpSink;
/// The factor by which the weight limit it multiplied for the first UMP message to execute with. /// The factor by which the weight limit it multiplied for the first UMP message to execute with.
///
/// An amount less than 100 keeps more available weight in the queue for messages after the first, and potentially
/// stalls the queue in doing so. More than 100 will provide additional weight for the first message only.
///
/// Generally you'll want this to be a bit more - 150 or 200 would be good values.
type FirstMessageFactorPercent: Get<Weight>; type FirstMessageFactorPercent: Get<Weight>;
} }
...@@ -187,9 +205,31 @@ decl_storage! { ...@@ -187,9 +205,31 @@ decl_storage! {
} }
} }
decl_event! {
pub enum Event {
/// Upward message is invalid XCM.
/// \[ id \]
InvalidFormat(MessageId),
/// Upward message is unsupported version of XCM.
/// \[ id \]
UnsupportedVersion(MessageId),
/// Upward message executed with the given outcome.
/// \[ id, outcome \]
ExecutedUpward(MessageId, Outcome),
/// The weight limit for handling downward messages was reached.
/// \[ id, remaining, required \]
WeightExhausted(MessageId, Weight, Weight),
/// Some downward messages have been received and will be processed.
/// \[ para, count, size \]
UpwardMessagesReceived(ParaId, u32, u32),
}
}
decl_module! { decl_module! {
/// The UMP module. /// The UMP module.
pub struct Module<T: Config> for enum Call where origin: <T as frame_system::Config>::Origin { pub struct Module<T: Config> for enum Call where origin: <T as frame_system::Config>::Origin {
/// Deposit one of this module's events by using the default implementation.
fn deposit_event() = default;
} }
} }
...@@ -287,15 +327,15 @@ impl<T: Config> Module<T> { ...@@ -287,15 +327,15 @@ impl<T: Config> Module<T> {
Ok(()) Ok(())
} }
/// Enacts all the upward messages sent by a candidate. /// Enqueues `upward_messages` from a `para`'s accepted candidate block.
pub(crate) fn enact_upward_messages( pub(crate) fn receive_upward_messages(
para: ParaId, para: ParaId,
upward_messages: Vec<UpwardMessage>, upward_messages: Vec<UpwardMessage>,
) -> Weight { ) -> Weight {
let mut weight = 0; let mut weight = 0;
if !upward_messages.is_empty() { if !upward_messages.is_empty() {
let (extra_cnt, extra_size) = upward_messages let (extra_count, extra_size) = upward_messages
.iter() .iter()
.fold((0, 0), |(cnt, size), d| (cnt + 1, size + d.len() as u32)); .fold((0, 0), |(cnt, size), d| (cnt + 1, size + d.len() as u32));
...@@ -304,7 +344,7 @@ impl<T: Config> Module<T> { ...@@ -304,7 +344,7 @@ impl<T: Config> Module<T> {
}); });
<Self as Store>::RelayDispatchQueueSize::mutate(&para, |(ref mut cnt, ref mut size)| { <Self as Store>::RelayDispatchQueueSize::mutate(&para, |(ref mut cnt, ref mut size)| {
*cnt += extra_cnt; *cnt += extra_count;
*size += extra_size; *size += extra_size;
}); });
...@@ -314,42 +354,50 @@ impl<T: Config> Module<T> { ...@@ -314,42 +354,50 @@ impl<T: Config> Module<T> {
} }
}); });
// NOTE: The actual computation is not accounted for. It should be benchmarked.
weight += T::DbWeight::get().reads_writes(3, 3); weight += T::DbWeight::get().reads_writes(3, 3);
Self::deposit_event(Event::UpwardMessagesReceived(para, extra_count, extra_size));
} }
weight weight
} }
/// Devote some time into dispatching pending upward messages. /// Devote some time into dispatching pending upward messages.
pub(crate) fn process_pending_upward_messages() { pub(crate) fn process_pending_upward_messages() -> Weight {
let mut used_weight_so_far = 0; let mut weight_used = 0;
let config = <configuration::Module<T>>::config(); let config = <configuration::Module<T>>::config();
let mut cursor = NeedsDispatchCursor::new::<T>(); let mut cursor = NeedsDispatchCursor::new::<T>();
let mut queue_cache = QueueCache::new(); let mut queue_cache = QueueCache::new();
while let Some(dispatchee) = cursor.peek() { while let Some(dispatchee) = cursor.peek() {
if used_weight_so_far >= config.preferred_dispatchable_upward_messages_step_weight { if weight_used >= config.ump_service_total_weight {
// Then check whether we've reached or overshoot the // Then check whether we've reached or overshoot the
// preferred weight for the dispatching stage. // preferred weight for the dispatching stage.
// //
// if so - bail. // if so - bail.
break; break;
} }
let max_weight = if used_weight_so_far == 0 { let max_weight = if weight_used == 0 {
// we increase the amount of weight that we're allowed to use on the first message to try to prevent // we increase the amount of weight that we're allowed to use on the first message to try to prevent
// the possibility of blockage of the queue. // the possibility of blockage of the queue.
config.preferred_dispatchable_upward_messages_step_weight * T::FirstMessageFactorPercent::get() / 100 config.ump_service_total_weight * T::FirstMessageFactorPercent::get() / 100
} else { } else {
config.preferred_dispatchable_upward_messages_step_weight - used_weight_so_far config.ump_service_total_weight - weight_used
}; };
// dequeue the next message from the queue of the dispatchee // dequeue the next message from the queue of the dispatchee
let (upward_message, became_empty) = queue_cache.dequeue::<T>(dispatchee); let (upward_message, became_empty) = queue_cache.dequeue::<T>(dispatchee);
if let Some(upward_message) = upward_message { if let Some(upward_message) = upward_message {
match T::UmpSink::process_upward_message(dispatchee, &upward_message[..], max_weight) { match T::UmpSink::process_upward_message(dispatchee, &upward_message[..], max_weight) {
None => break, Ok(used) => weight_used += used,
Some(used) => used_weight_so_far += used, Err((id, required)) => {
// we process messages in order and don't drop them if we run out of weight, so need to break
// here.
Self::deposit_event(Event::WeightExhausted(id, max_weight, required));
break
},
} }
} }
...@@ -363,6 +411,8 @@ impl<T: Config> Module<T> { ...@@ -363,6 +411,8 @@ impl<T: Config> Module<T> {
cursor.flush::<T>(); cursor.flush::<T>();
queue_cache.flush::<T>(); queue_cache.flush::<T>();
weight_used
} }
} }
...@@ -461,7 +511,7 @@ impl QueueCache { ...@@ -461,7 +511,7 @@ impl QueueCache {
#[derive(Debug)] #[derive(Debug)]
struct NeedsDispatchCursor { struct NeedsDispatchCursor {
needs_dispatch: Vec<ParaId>, needs_dispatch: Vec<ParaId>,
cur_idx: usize, index: usize,
} }
impl NeedsDispatchCursor { impl NeedsDispatchCursor {
...@@ -469,10 +519,10 @@ impl NeedsDispatchCursor { ...@@ -469,10 +519,10 @@ impl NeedsDispatchCursor {
let needs_dispatch: Vec<ParaId> = <Module<T> as Store>::NeedsDispatch::get(); let needs_dispatch: Vec<ParaId> = <Module<T> as Store>::NeedsDispatch::get();
let start_with = <Module<T> as Store>::NextDispatchRoundStartWith::get(); let start_with = <Module<T> as Store>::NextDispatchRoundStartWith::get();
let start_with_idx = match start_with { let initial_index = match start_with {
Some(para) => match needs_dispatch.binary_search(&para) { Some(para) => match needs_dispatch.binary_search(&para) {
Ok(found_idx) => found_idx, Ok(found_index) => found_index,
Err(_supposed_idx) => { Err(_supposed_index) => {
// well that's weird because we maintain an invariant that // well that's weird because we maintain an invariant that
// `NextDispatchRoundStartWith` must point into one of the items in // `NextDispatchRoundStartWith` must point into one of the items in
// `NeedsDispatch`. // `NeedsDispatch`.
...@@ -487,13 +537,13 @@ impl NeedsDispatchCursor { ...@@ -487,13 +537,13 @@ impl NeedsDispatchCursor {
Self { Self {
needs_dispatch, needs_dispatch,
cur_idx: start_with_idx, index: initial_index,
} }
} }
/// Returns the item the cursor points to. /// Returns the item the cursor points to.
fn peek(&self) -> Option<ParaId> { fn peek(&self) -> Option<ParaId> {