Unverified Commit d376fe66 authored by Shawn Tabrizi's avatar Shawn Tabrizi Committed by GitHub
Browse files

Initializer + Paras Clean Up Messages When Offboarding (#2413)

* initial hack in

* finish up

* use notification to pass outgoing paras

* move outgoing paras from notifications

* missing comma

* update guides

* clean up
parent 7faacf94
Pipeline #124205 passed with stages
in 34 minutes and 47 seconds
......@@ -4,14 +4,6 @@ A module responsible for Downward Message Processing (DMP). See [Messaging Overv
## Storage
General storage entries
```rust
/// Paras that are to be cleaned up at the end of the session.
/// The entries are sorted ascending by the para id.
OutgoingParas: Vec<ParaId>;
```
Storage layout required for implementation of DMP.
```rust
......@@ -54,6 +46,6 @@ Utility routines.
## Session Change
1. Drain `OutgoingParas`. For each `P` happened to be in the list:
1. For each `P` in `outgoing_paras` (generated by `Paras::on_new_session`):
1. Remove all `DownwardMessageQueues` of `P`.
1. Remove `DownwardMessageQueueHeads` for `P`.
......@@ -4,14 +4,6 @@ A module responsible for Horizontally Relay-routed Message Passing (HRMP). See [
## Storage
General storage entries
```rust
/// Paras that are to be cleaned up at the end of the session.
/// The entries are sorted ascending by the para id.
OutgoingParas: Vec<ParaId>;
```
HRMP related structs:
```rust
......@@ -171,11 +163,6 @@ Candidate Enactment:
> If that becomes a problem consider introducing an extra dictionary which says at what block the given sender
> sent a message to the recipient.
The following routine is intended to be called in the same time when `Paras::schedule_para_cleanup` is called.
`schedule_para_cleanup(ParaId)`:
1. Add the para into the `OutgoingParas` vector maintaining the sorted order.
## Entry-points
The following entry-points are meant to be used for HRMP channel management.
......@@ -241,7 +228,7 @@ the parachain executed the message.
## Session Change
1. Drain `OutgoingParas`. For each `P` happened to be in the list:
1. For each `P` in `outgoing_paras` (generated by `Paras::on_new_session`):
1. Remove all inbound channels of `P`, i.e. `(_, P)`,
1. Remove all outbound channels of `P`, i.e. `(P, _)`,
1. Remove `HrmpOpenChannelRequestCount` for `P`
......
......@@ -168,6 +168,7 @@ UpcomingDowngrades: Vec<ParaId>;
`ParaLifecycle`.
1. Downgrade all parachains that should become parathreads, updating the `Parachains` list and
`ParaLifecycle`.
1. Return list of outgoing paras to the initializer for use by other modules.
## Initialization
......
......@@ -4,14 +4,6 @@ A module responsible for Upward Message Passing (UMP). See [Messaging Overview](
## Storage
General storage entries
```rust
/// Paras that are to be cleaned up at the end of the session.
/// The entries are sorted ascending by the para id.
OutgoingParas: Vec<ParaId>;
```
Storage related to UMP
```rust
......@@ -70,11 +62,6 @@ Candidate Enactment:
1. Increment the size and the count in `RelayDispatchQueueSize` for `P`.
1. Ensure that `P` is present in `NeedsDispatch`.
The following routine is intended to be called in the same time when `Paras::schedule_para_cleanup` is called.
`schedule_para_cleanup(ParaId)`:
1. Add the para into the `OutgoingParas` vector maintaining the sorted order.
The following routine is meant to execute pending entries in upward message queues. This function doesn't fail, even if
dispatcing any of individual upward messages returns an error.
......@@ -92,7 +79,7 @@ dispatcing any of individual upward messages returns an error.
## Session Change
1. Drain `OutgoingParas`. For each `P` happened to be in the list:.
1. For each `P` in `outgoing_paras` (generated by `Paras::on_new_session`):
1. Remove `RelayDispatchQueueSize` of `P`.
1. Remove `RelayDispatchQueues` of `P`.
1. Remove `P` if it exists in `NeedsDispatch`.
......
......@@ -66,10 +66,6 @@ pub trait Config: frame_system::Config + configuration::Config {}
decl_storage! {
trait Store for Module<T: Config> as Dmp {
/// Paras that are to be cleaned up at the end of the session.
/// The entries are sorted ascending by the para id.
OutgoingParas: Vec<ParaId>;
/// The downward messages addressed for a certain para.
DownwardMessageQueues: map hasher(twox_64_concat) ParaId => Vec<InboundDownwardMessage<T::BlockNumber>>;
/// A mapping that stores the downward message queue MQC head for each para.
......@@ -101,31 +97,23 @@ impl<T: Config> Module<T> {
/// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(
_notification: &initializer::SessionChangeNotification<T::BlockNumber>,
outgoing_paras: &[ParaId],
) {
Self::perform_outgoing_para_cleanup();
Self::perform_outgoing_para_cleanup(outgoing_paras);
}
/// Iterate over all paras that were registered for offboarding and remove all the data
/// Iterate over all paras that were noted for offboarding and remove all the data
/// associated with them.
fn perform_outgoing_para_cleanup() {
let outgoing = OutgoingParas::take();
fn perform_outgoing_para_cleanup(outgoing: &[ParaId]) {
for outgoing_para in outgoing {
Self::clean_dmp_after_outgoing(outgoing_para);
}
}
fn clean_dmp_after_outgoing(outgoing_para: ParaId) {
<Self as Store>::DownwardMessageQueues::remove(&outgoing_para);
<Self as Store>::DownwardMessageQueueHeads::remove(&outgoing_para);
}
/// Schedule a para to be cleaned up at the start of the next session.
pub(crate) fn schedule_para_cleanup(id: ParaId) {
OutgoingParas::mutate(|v| {
if let Err(i) = v.binary_search(&id) {
v.insert(i, id);
}
});
/// Remove all relevant storage items for an outgoing parachain.
fn clean_dmp_after_outgoing(outgoing_para: &ParaId) {
<Self as Store>::DownwardMessageQueues::remove(outgoing_para);
<Self as Store>::DownwardMessageQueueHeads::remove(outgoing_para);
}
/// Enqueue a downward message to a specific recipient para.
......@@ -229,23 +217,24 @@ mod tests {
use super::*;
use hex_literal::hex;
use primitives::v1::BlockNumber;
use frame_support::StorageValue;
use frame_support::traits::{OnFinalize, OnInitialize};
use parity_scale_codec::Encode;
use crate::mock::{Configuration, new_test_ext, System, Dmp, MockGenesisConfig};
use crate::mock::{Configuration, new_test_ext, System, Dmp, MockGenesisConfig, Paras};
pub(crate) fn run_to_block(to: BlockNumber, new_session: Option<Vec<BlockNumber>>) {
while System::block_number() < to {
let b = System::block_number();
Paras::initializer_finalize();
Dmp::initializer_finalize();
if new_session.as_ref().map_or(false, |v| v.contains(&(b + 1))) {
Dmp::initializer_on_new_session(&Default::default());
Dmp::initializer_on_new_session(&Default::default(), &Vec::new());
}
System::on_finalize(b);
System::on_initialize(b + 1);
System::set_block_number(b + 1);
Paras::initializer_finalize();
Dmp::initializer_initialize(b + 1);
}
}
......@@ -270,39 +259,24 @@ mod tests {
}
#[test]
fn scheduled_cleanup_performed() {
fn clean_dmp_works() {
let a = ParaId::from(1312);
let b = ParaId::from(228);
let c = ParaId::from(123);
new_test_ext(default_genesis_config()).execute_with(|| {
run_to_block(1, None);
// enqueue downward messages to A, B and C.
queue_downward_message(a, vec![1, 2, 3]).unwrap();
queue_downward_message(b, vec![4, 5, 6]).unwrap();
queue_downward_message(c, vec![7, 8, 9]).unwrap();
Dmp::schedule_para_cleanup(a);
// run to block without session change.
run_to_block(2, None);
assert!(!<Dmp as Store>::DownwardMessageQueues::get(&a).is_empty());
assert!(!<Dmp as Store>::DownwardMessageQueues::get(&b).is_empty());
assert!(!<Dmp as Store>::DownwardMessageQueues::get(&c).is_empty());
Dmp::schedule_para_cleanup(b);
// run to block changing the session.
run_to_block(3, Some(vec![3]));
let notification = crate::initializer::SessionChangeNotification::default();
let outgoing_paras = vec![a, b];
Dmp::initializer_on_new_session(&notification, &outgoing_paras);
assert!(<Dmp as Store>::DownwardMessageQueues::get(&a).is_empty());
assert!(<Dmp as Store>::DownwardMessageQueues::get(&b).is_empty());
assert!(!<Dmp as Store>::DownwardMessageQueues::get(&c).is_empty());
// verify that the outgoing paras are emptied.
assert!(OutgoingParas::get().is_empty())
});
}
......
......@@ -229,10 +229,6 @@ pub trait Config: frame_system::Config + configuration::Config + paras::Config +
decl_storage! {
trait Store for Module<T: Config> as Hrmp {
/// Paras that are to be cleaned up at the end of the session.
/// The entries are sorted ascending by the para id.
OutgoingParas: Vec<ParaId>;
/// The set of pending HRMP open channel requests.
///
/// The set is accompanied by a list for iteration.
......@@ -404,42 +400,33 @@ impl<T: Config> Module<T> {
/// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(
notification: &initializer::SessionChangeNotification<T::BlockNumber>,
outgoing_paras: &[ParaId],
) {
Self::perform_outgoing_para_cleanup();
Self::perform_outgoing_para_cleanup(outgoing_paras);
Self::process_hrmp_open_channel_requests(&notification.prev_config);
Self::process_hrmp_close_channel_requests();
}
/// Iterate over all paras that were registered for offboarding and remove all the data
/// Iterate over all paras that were noted for offboarding and remove all the data
/// associated with them.
fn perform_outgoing_para_cleanup() {
let outgoing = OutgoingParas::take();
fn perform_outgoing_para_cleanup(outgoing: &[ParaId]) {
for outgoing_para in outgoing {
Self::clean_hrmp_after_outgoing(outgoing_para);
}
}
/// Schedule a para to be cleaned up at the start of the next session.
pub(crate) fn schedule_para_cleanup(id: ParaId) {
OutgoingParas::mutate(|v| {
if let Err(i) = v.binary_search(&id) {
v.insert(i, id);
}
});
}
/// Remove all storage entries associated with the given para.
fn clean_hrmp_after_outgoing(outgoing_para: ParaId) {
<Self as Store>::HrmpOpenChannelRequestCount::remove(&outgoing_para);
<Self as Store>::HrmpAcceptedChannelRequestCount::remove(&outgoing_para);
fn clean_hrmp_after_outgoing(outgoing_para: &ParaId) {
<Self as Store>::HrmpOpenChannelRequestCount::remove(outgoing_para);
<Self as Store>::HrmpAcceptedChannelRequestCount::remove(outgoing_para);
let ingress = <Self as Store>::HrmpIngressChannelsIndex::take(&outgoing_para)
let ingress = <Self as Store>::HrmpIngressChannelsIndex::take(outgoing_para)
.into_iter()
.map(|sender| HrmpChannelId {
sender,
recipient: outgoing_para.clone(),
});
let egress = <Self as Store>::HrmpEgressChannelsIndex::take(&outgoing_para)
let egress = <Self as Store>::HrmpEgressChannelsIndex::take(outgoing_para)
.into_iter()
.map(|recipient| HrmpChannelId {
sender: outgoing_para.clone(),
......@@ -1149,8 +1136,8 @@ mod tests {
};
// NOTE: this is in initialization order.
Paras::initializer_on_new_session(&notification);
Hrmp::initializer_on_new_session(&notification);
let outgoing_paras = Paras::initializer_on_new_session(&notification);
Hrmp::initializer_on_new_session(&notification, &outgoing_paras);
}
System::on_finalize(b);
......@@ -1247,7 +1234,6 @@ mod tests {
fn deregister_parachain(id: ParaId) {
Paras::schedule_para_cleanup(id);
Hrmp::schedule_para_cleanup(id);
}
fn channel_exists(sender: ParaId, recipient: ParaId) -> bool {
......
......@@ -199,13 +199,13 @@ impl<T: Config> Module<T> {
session_index,
};
paras::Module::<T>::initializer_on_new_session(&notification);
let outgoing_paras = paras::Module::<T>::initializer_on_new_session(&notification);
scheduler::Module::<T>::initializer_on_new_session(&notification);
inclusion::Module::<T>::initializer_on_new_session(&notification);
session_info::Module::<T>::initializer_on_new_session(&notification);
dmp::Module::<T>::initializer_on_new_session(&notification);
ump::Module::<T>::initializer_on_new_session(&notification);
hrmp::Module::<T>::initializer_on_new_session(&notification);
dmp::Module::<T>::initializer_on_new_session(&notification, &outgoing_paras);
ump::Module::<T>::initializer_on_new_session(&notification, &outgoing_paras);
hrmp::Module::<T>::initializer_on_new_session(&notification, &outgoing_paras);
}
/// Should be called when a new session occurs. Buffers the session notification to be applied
......@@ -259,9 +259,16 @@ impl<T: pallet_session::Config + Config> OneSessionHandler<T::AccountId> for Mod
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::{new_test_ext, Initializer, System};
use primitives::v1::{Id as ParaId};
use crate::mock::{
new_test_ext,
Initializer, System, Dmp, Paras, Configuration, MockGenesisConfig,
};
use frame_support::traits::{OnFinalize, OnInitialize};
use frame_support::{
assert_ok,
traits::{OnFinalize, OnInitialize},
};
#[test]
fn session_change_before_initialize_is_still_buffered_after() {
......@@ -316,4 +323,52 @@ mod tests {
assert!(HasInitialized::get().is_none());
})
}
#[test]
fn scheduled_cleanup_performed() {
let a = ParaId::from(1312);
let b = ParaId::from(228);
let c = ParaId::from(123);
let mock_genesis = crate::paras::ParaGenesisArgs {
parachain: true,
genesis_head: Default::default(),
validation_code: Default::default(),
};
new_test_ext(
MockGenesisConfig {
configuration: crate::configuration::GenesisConfig {
config: crate::configuration::HostConfiguration {
max_downward_message_size: 1024,
..Default::default()
},
},
paras: crate::paras::GenesisConfig {
paras: vec![
(a, mock_genesis.clone()),
(b, mock_genesis.clone()),
(c, mock_genesis.clone()),
],
..Default::default()
},
..Default::default()
}
).execute_with(|| {
// enqueue downward messages to A, B and C.
assert_ok!(Dmp::queue_downward_message(&Configuration::config(), a, vec![1, 2, 3]));
assert_ok!(Dmp::queue_downward_message(&Configuration::config(), b, vec![4, 5, 6]));
assert_ok!(Dmp::queue_downward_message(&Configuration::config(), c, vec![7, 8, 9]));
Paras::schedule_para_cleanup(a);
Paras::schedule_para_cleanup(b);
Initializer::apply_new_session(1, vec![], vec![]);
assert!(Dmp::dmq_contents(a).is_empty());
assert!(Dmp::dmq_contents(b).is_empty());
assert!(!Dmp::dmq_contents(c).is_empty());
});
}
}
......@@ -54,15 +54,6 @@ pub fn schedule_para_initialize<T: paras::Config>(
}
/// Schedule a para to be cleaned up at the start of the next session.
pub fn schedule_para_cleanup<T>(id: primitives::v1::Id)
where
T: paras::Config
+ dmp::Config
+ ump::Config
+ hrmp::Config,
{
pub fn schedule_para_cleanup<T: paras::Config>(id: primitives::v1::Id) {
<paras::Module<T>>::schedule_para_cleanup(id);
<dmp::Module<T>>::schedule_para_cleanup(id);
<ump::Module<T>>::schedule_para_cleanup(id);
<hrmp::Module<T>>::schedule_para_cleanup(id);
}
......@@ -317,21 +317,27 @@ impl<T: Config> Module<T> {
pub(crate) fn initializer_finalize() { }
/// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(_notification: &SessionChangeNotification<T::BlockNumber>) {
///
/// Returns the list of outgoing parachains for this session.
pub(crate) fn initializer_on_new_session(_notification: &SessionChangeNotification<T::BlockNumber>)
-> Vec<ParaId>
{
let now = <frame_system::Module<T>>::block_number();
let mut parachains = Self::clean_up_outgoing(now);
let (mut parachains, outgoing) = Self::clean_up_outgoing(now);
Self::apply_incoming(&mut parachains);
Self::apply_upgrades(&mut parachains);
Self::apply_downgrades(&mut parachains);
<Self as Store>::Parachains::set(parachains);
outgoing
}
/// Cleans up all outgoing paras. Returns the new set of parachains
fn clean_up_outgoing(now: T::BlockNumber) -> Vec<ParaId> {
/// Cleans up all outgoing paras. Returns the new set of parachains and any outgoing parachains.
fn clean_up_outgoing(now: T::BlockNumber) -> (Vec<ParaId>, Vec<ParaId>) {
let mut parachains = <Self as Store>::Parachains::get();
let outgoing = <Self as Store>::OutgoingParas::take();
for outgoing_para in outgoing {
for outgoing_para in &outgoing {
// Warn if there is a state error... but still perform the offboarding to be defensive.
if let Some(state) = ParaLifecycles::get(&outgoing_para) {
if !state.is_outgoing() {
......@@ -353,11 +359,11 @@ impl<T: Config> Module<T> {
let removed_code = <Self as Store>::CurrentCode::take(&outgoing_para);
if let Some(removed_code) = removed_code {
Self::note_past_code(outgoing_para, now, now, removed_code);
Self::note_past_code(*outgoing_para, now, now, removed_code);
}
}
parachains
(parachains, outgoing)
}
/// Applies all incoming paras, updating the parachains list for those that are parachains.
......
......@@ -148,10 +148,6 @@ pub trait Config: frame_system::Config + configuration::Config {
decl_storage! {
trait Store for Module<T: Config> as Ump {
/// Paras that are to be cleaned up at the end of the session.
/// The entries are sorted ascending by the para id.
OutgoingParas: Vec<ParaId>;
/// The messages waiting to be handled by the relay-chain originating from a certain parachain.
///
/// Note that some upward messages might have been already processed by the inclusion logic. E.g.
......@@ -207,31 +203,23 @@ impl<T: Config> Module<T> {
/// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(
_notification: &initializer::SessionChangeNotification<T::BlockNumber>,
outgoing_paras: &[ParaId],
) {
Self::perform_outgoing_para_cleanup();
Self::perform_outgoing_para_cleanup(outgoing_paras);
}
/// Iterate over all paras that were registered for offboarding and remove all the data
/// Iterate over all paras that were noted for offboarding and remove all the data
/// associated with them.
fn perform_outgoing_para_cleanup() {
let outgoing = OutgoingParas::take();
fn perform_outgoing_para_cleanup(outgoing: &[ParaId]) {
for outgoing_para in outgoing {
Self::clean_ump_after_outgoing(outgoing_para);
}
}
/// Schedule a para to be cleaned up at the start of the next session.
pub(crate) fn schedule_para_cleanup(id: ParaId) {
OutgoingParas::mutate(|v| {
if let Err(i) = v.binary_search(&id) {
v.insert(i, id);
}
});
}
fn clean_ump_after_outgoing(outgoing_para: ParaId) {
<Self as Store>::RelayDispatchQueueSize::remove(&outgoing_para);
<Self as Store>::RelayDispatchQueues::remove(&outgoing_para);
/// Remove all relevant storage items for an outgoing parachain.
fn clean_ump_after_outgoing(outgoing_para: &ParaId) {
<Self as Store>::RelayDispatchQueueSize::remove(outgoing_para);
<Self as Store>::RelayDispatchQueues::remove(outgoing_para);
// Remove the outgoing para from the `NeedsDispatch` list and from
// `NextDispatchRoundStartWith`.
......@@ -239,12 +227,12 @@ impl<T: Config> Module<T> {
// That's needed for maintaining invariant that `NextDispatchRoundStartWith` points to an
// existing item in `NeedsDispatch`.
<Self as Store>::NeedsDispatch::mutate(|v| {
if let Ok(i) = v.binary_search(&outgoing_para) {
if let Ok(i) = v.binary_search(outgoing_para) {
v.remove(i);
}
});
<Self as Store>::NextDispatchRoundStartWith::mutate(|v| {
*v = v.filter(|p| *p == outgoing_para)
*v = v.filter(|p| p == outgoing_para)
});
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment