Unverified Commit 9115d427 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

scheduler: handle re-scheduling around finalization correctly (#2257)



* scheduler: handle re-scheduling around finalization correctly

* also make sure parathreads get cleaned

* run scheduling in finalization

* Remove stray println!

* Update the schedule call site in inclusion inherent

* Clarify subtlety around SessionStartBlock

* Remove double semi-colon

* reschedule prior to `availability_cores` and in on-initialize

* improve docs

* fix line

* more doc reformat

* remove unneeded call

* avoid unnecessary scheduling on initialize

* split `clear` and `schedule

* Update runtime/parachains/src/scheduler.rs

Co-authored-by: Sergey Pepyakin's avatarSergei Shulepov <sergei@parity.io>

Co-authored-by: Sergey Pepyakin's avatarSergei Shulepov <sergei@parity.io>
parent 2fd345b8
Pipeline #120159 passed with stages
in 30 minutes and 23 seconds
...@@ -822,14 +822,14 @@ sp_api::decl_runtime_apis! { ...@@ -822,14 +822,14 @@ sp_api::decl_runtime_apis! {
#[skip_initialize_block] #[skip_initialize_block]
fn validators() -> Vec<ValidatorId>; fn validators() -> Vec<ValidatorId>;
/// Returns the validator groups and rotation info localized based on the block whose state /// Returns the validator groups and rotation info localized based on the hypothetical child
/// this is invoked on. Note that `now` in the `GroupRotationInfo` should be the successor of /// of a block whose state this is invoked on. Note that `now` in the `GroupRotationInfo`
/// the number of the block. /// should be the successor of the number of the block.
#[skip_initialize_block] #[skip_initialize_block]
fn validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo<N>); fn validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo<N>);
/// Yields information on all availability cores. Cores are either free or occupied. Free /// Yields information on all availability cores as relevant to the child block.
/// cores can have paras assigned to them. /// Cores are either free or occupied. Free cores can have paras assigned to them.
#[skip_initialize_block] #[skip_initialize_block]
fn availability_cores() -> Vec<CoreState<H, N>>; fn availability_cores() -> Vec<CoreState<H, N>>;
......
# Availability Cores # Availability Cores
Yields information on all availability cores. Cores are either free or occupied. Free cores can have paras assigned to them. Occupied cores don't, but they can become available part-way through a block due to bitfields and then have something scheduled on them. To allow optimistic validation of candidates, the occupied cores are accompanied by information on what is upcoming. This information can be leveraged when validators perceive that there is a high likelihood of a core becoming available based on bitfields seen, and then optimistically validate something that would become scheduled based on that, although there is no guarantee on what the block producer will actually include in the block. Yields information on all availability cores. Cores are either free or occupied. Free cores can have paras assigned to them. Occupied cores don't, but they can become available part-way through a block due to bitfields and then have something scheduled on them. To allow optimistic validation of candidates, the occupied cores are accompanied by information on what is upcoming. This information can be leveraged when validators perceive that there is a high likelihood of a core becoming available based on bitfields seen, and then optimistically validate something that would become scheduled based on that, although there is no guarantee on what the block producer will actually include in the block.
See also the [Scheduler Module](../runtime/scheduler.md) for a high-level description of what an availability core is and why it exists. See also the [Scheduler Module](../runtime/scheduler.md) for a high-level description of what an availability core is and why it exists.
...@@ -8,7 +8,7 @@ See also the [Scheduler Module](../runtime/scheduler.md) for a high-level descri ...@@ -8,7 +8,7 @@ See also the [Scheduler Module](../runtime/scheduler.md) for a high-level descri
fn availability_cores(at: Block) -> Vec<CoreState>; fn availability_cores(at: Block) -> Vec<CoreState>;
``` ```
This is all the information that a validator needs about scheduling for the current block. It includes all information on [Scheduler](../runtime/scheduler.md) core-assignments and [Inclusion](../runtime/inclusion.md) state of blocks occupying availability cores. It includes data necessary to determine not only which paras are assigned now, but which cores are likely to become freed after processing bitfields, and exactly which bitfields would be necessary to make them so. This is all the information that a validator needs about scheduling for the current block. It includes all information on [Scheduler](../runtime/scheduler.md) core-assignments and [Inclusion](../runtime/inclusion.md) state of blocks occupying availability cores. It includes data necessary to determine not only which paras are assigned now, but which cores are likely to become freed after processing bitfields, and exactly which bitfields would be necessary to make them so. The implementation of this runtime API should invoke `Scheduler::clear` and `Scheduler::schedule(Vec::new(), current_block_number + 1)` to ensure that scheduling is accurate.
```rust ```rust
struct OccupiedCore { struct OccupiedCore {
......
# Validator Groups # Validator Groups
Yields the validator groups used during the current session. The validators in the groups are referred to by their index into the validator-set. Yields the validator groups used during the current session. The validators in the groups are referred to by their index into the validator-set and this is assumed to be as-of the child of the block whose state is being queried.
```rust ```rust
/// A helper data-type for tracking validator-group rotations. /// A helper data-type for tracking validator-group rotations.
struct GroupRotationInfo { struct GroupRotationInfo {
session_start_block: BlockNumber, session_start_block: BlockNumber,
group_rotation_frequency: BlockNumber, group_rotation_frequency: BlockNumber,
now: BlockNumber, now: BlockNumber, // The successor of the block in whose state this runtime API is queried.
} }
impl GroupRotationInfo { impl GroupRotationInfo {
......
...@@ -20,7 +20,8 @@ Included: Option<()>, ...@@ -20,7 +20,8 @@ Included: Option<()>,
1. Hash the parent header and make sure that it corresponds to the block hash of the parent (tracked by the `frame_system` FRAME module), 1. Hash the parent header and make sure that it corresponds to the block hash of the parent (tracked by the `frame_system` FRAME module),
1. The `Bitfields` are first forwarded to the `Inclusion::process_bitfields` routine, returning a set of freed cores. Provide a `Scheduler::core_para` as a core-lookup to the `process_bitfields` routine. Annotate each of these freed cores with `FreedReason::Concluded`. 1. The `Bitfields` are first forwarded to the `Inclusion::process_bitfields` routine, returning a set of freed cores. Provide a `Scheduler::core_para` as a core-lookup to the `process_bitfields` routine. Annotate each of these freed cores with `FreedReason::Concluded`.
1. If `Scheduler::availability_timeout_predicate` is `Some`, invoke `Inclusion::collect_pending` using it, and add timed-out cores to the free cores, annotated with `FreedReason::TimedOut`. 1. If `Scheduler::availability_timeout_predicate` is `Some`, invoke `Inclusion::collect_pending` using it, and add timed-out cores to the free cores, annotated with `FreedReason::TimedOut`.
1. Invoke `Scheduler::schedule(freed)` 1. Invoke `Scheduler::clear`
1. Invoke `Scheduler::schedule(freed, System::current_block())`
1. Extract `parent_storage_root` from the parent header, 1. Extract `parent_storage_root` from the parent header,
1. Invoke the `Inclusion::process_candidates` routine with the parameters `(parent_storage_root, backed_candidates, Scheduler::scheduled(), Scheduler::group_validators)`. 1. Invoke the `Inclusion::process_candidates` routine with the parameters `(parent_storage_root, backed_candidates, Scheduler::scheduled(), Scheduler::group_validators)`.
1. Call `Scheduler::occupied` using the return value of the `Inclusion::process_candidates` call above, first sorting the list of assigned core indices. 1. Call `Scheduler::occupied` using the return value of the `Inclusion::process_candidates` call above, first sorting the list of assigned core indices.
......
...@@ -163,6 +163,9 @@ ParathreadClaimIndex: Vec<ParaId>; ...@@ -163,6 +163,9 @@ ParathreadClaimIndex: Vec<ParaId>;
/// The block number where the session start occurred. Used to track how many group rotations have occurred. /// The block number where the session start occurred. Used to track how many group rotations have occurred.
SessionStartBlock: BlockNumber; SessionStartBlock: BlockNumber;
/// Currently scheduled cores - free but up to be occupied. /// Currently scheduled cores - free but up to be occupied.
/// The value contained here will not be valid after the end of a block.
/// Runtime APIs should be used to determine scheduled cores
/// for the upcoming block.
Scheduled: Vec<CoreAssignment>, // sorted ascending by CoreIndex. Scheduled: Vec<CoreAssignment>, // sorted ascending by CoreIndex.
``` ```
...@@ -172,7 +175,7 @@ Session changes are the only time that configuration can change, and the [Config ...@@ -172,7 +175,7 @@ Session changes are the only time that configuration can change, and the [Config
Actions: Actions:
1. Set `SessionStartBlock` to current block number. 1. Set `SessionStartBlock` to current block number + 1, as session changes are applied at the end of the block.
1. Clear all `Some` members of `AvailabilityCores`. Return all parathread claims to queue with retries un-incremented. 1. Clear all `Some` members of `AvailabilityCores`. Return all parathread claims to queue with retries un-incremented.
1. Set `configuration = Configuration::configuration()` (see [`HostConfiguration`](../types/runtime.md#host-configuration)) 1. Set `configuration = Configuration::configuration()` (see [`HostConfiguration`](../types/runtime.md#host-configuration))
1. Determine the number of cores & validator groups as `n_cores`. This is the maximum of 1. Determine the number of cores & validator groups as `n_cores`. This is the maximum of
...@@ -187,12 +190,11 @@ Actions: ...@@ -187,12 +190,11 @@ Actions:
- Also prune all parathread claims corresponding to de-registered parathreads. - Also prune all parathread claims corresponding to de-registered parathreads.
- all pruned claims should have their entry removed from the parathread index. - all pruned claims should have their entry removed from the parathread index.
- assign all non-pruned claims to new cores if the number of parathread cores has changed between the `new_config` and `old_config` of the `SessionChangeNotification`. - assign all non-pruned claims to new cores if the number of parathread cores has changed between the `new_config` and `old_config` of the `SessionChangeNotification`.
- Assign claims in equal balance across all cores if rebalancing, and set the `next_core` of the `ParathreadQueue` by incrementing the relative index of the last assigned core and taking it modulo the number of parathread cores. - Assign claims in equal balance across all cores if rebalancing, and set the `next_core` of the `ParathreadQueue` by incrementing the relative index of the last assigned core and taking it modulo the number of parathread cores.
## Initialization ## Initialization
1. Free all scheduled cores and return parathread claims to queue, with retries incremented. No initialization routine runs for this module.
1. Schedule free cores using the `schedule(Vec::new())`.
## Finalization ## Finalization
...@@ -206,12 +208,12 @@ No finalization routine runs for this module. ...@@ -206,12 +208,12 @@ No finalization routine runs for this module.
- The core used for the parathread claim is the `next_core` field of the `ParathreadQueue` and adding `Paras::parachains().len()` to it. - The core used for the parathread claim is the `next_core` field of the `ParathreadQueue` and adding `Paras::parachains().len()` to it.
- `next_core` is then updated by adding 1 and taking it modulo `config.parathread_cores`. - `next_core` is then updated by adding 1 and taking it modulo `config.parathread_cores`.
- The claim is then added to the claim index. - The claim is then added to the claim index.
- `schedule(Vec<(CoreIndex, FreedReason)>)`: schedule new core assignments, with a parameter indicating previously-occupied cores which are to be considered returned and why they are being returned. - `schedule(Vec<(CoreIndex, FreedReason)>, now: BlockNumber)`: schedule new core assignments, with a parameter indicating previously-occupied cores which are to be considered returned and why they are being returned.
- All freed parachain cores should be assigned to their respective parachain - All freed parachain cores should be assigned to their respective parachain
- All freed parathread cores whose reason for freeing was `FreedReason::Concluded` should have the claim removed from the claim index. - All freed parathread cores whose reason for freeing was `FreedReason::Concluded` should have the claim removed from the claim index.
- All freed parathread cores whose reason for freeing was `FreedReason::TimedOut` should have the claim added to the parathread queue again without retries incremented - All freed parathread cores whose reason for freeing was `FreedReason::TimedOut` should have the claim added to the parathread queue again without retries incremented
- All freed parathread cores should take the next parathread entry from the queue. - All freed parathread cores should take the next parathread entry from the queue.
- The i'th validator group will be assigned to the `(i+k)%n`'th core at any point in time, where `k` is the number of rotations that have occurred in the session, and `n` is the total number of cores. This makes upcoming rotations within the same session predictable. - The i'th validator group will be assigned to the `(i+k)%n`'th core at any point in time, where `k` is the number of rotations that have occurred in the session, and `n` is the total number of cores. This makes upcoming rotations within the same session predictable. Rotations are based off of `now`.
- `scheduled() -> Vec<CoreAssignment>`: Get currently scheduled core assignments. - `scheduled() -> Vec<CoreAssignment>`: Get currently scheduled core assignments.
- `occupied(Vec<CoreIndex>)`. Note that the given cores have become occupied. - `occupied(Vec<CoreIndex>)`. Note that the given cores have become occupied.
- Behavior undefined if any given cores were not scheduled. - Behavior undefined if any given cores were not scheduled.
...@@ -221,6 +223,8 @@ No finalization routine runs for this module. ...@@ -221,6 +223,8 @@ No finalization routine runs for this module.
- `core_para(CoreIndex) -> ParaId`: return the currently-scheduled or occupied ParaId for the given core. - `core_para(CoreIndex) -> ParaId`: return the currently-scheduled or occupied ParaId for the given core.
- `group_validators(GroupIndex) -> Option<Vec<ValidatorIndex>>`: return all validators in a given group, if the group index is valid for this session. - `group_validators(GroupIndex) -> Option<Vec<ValidatorIndex>>`: return all validators in a given group, if the group index is valid for this session.
- `availability_timeout_predicate() -> Option<impl Fn(CoreIndex, BlockNumber) -> bool>`: returns an optional predicate that should be used for timing out occupied cores. if `None`, no timing-out should be done. The predicate accepts the index of the core, and the block number since which it has been occupied. The predicate should be implemented based on the time since the last validator group rotation, and the respective parachain and parathread timeouts, i.e. only within `max(config.chain_availability_period, config.thread_availability_period)` of the last rotation would this return `Some`. - `availability_timeout_predicate() -> Option<impl Fn(CoreIndex, BlockNumber) -> bool>`: returns an optional predicate that should be used for timing out occupied cores. if `None`, no timing-out should be done. The predicate accepts the index of the core, and the block number since which it has been occupied. The predicate should be implemented based on the time since the last validator group rotation, and the respective parachain and parathread timeouts, i.e. only within `max(config.chain_availability_period, config.thread_availability_period)` of the last rotation would this return `Some`.
- `group_rotation_info() -> GroupRotationInfo`: Returns a helper for determining group rotation. - `group_rotation_info(now: BlockNumber) -> GroupRotationInfo`: Returns a helper for determining group rotation.
- `next_up_on_available(CoreIndex) -> Option<ScheduledCore>`: Return the next thing that will be scheduled on this core assuming it is currently occupied and the candidate occupying it became available. Returns in `ScheduledCore` format (todo: link to Runtime APIs page; linkcheck doesn't allow this right now). For parachains, this is always the ID of the parachain and no specified collator. For parathreads, this is based on the next item in the `ParathreadQueue` assigned to that core, and is `None` if there isn't one. - `next_up_on_available(CoreIndex) -> Option<ScheduledCore>`: Return the next thing that will be scheduled on this core assuming it is currently occupied and the candidate occupying it became available. Returns in `ScheduledCore` format (todo: link to Runtime APIs page; linkcheck doesn't allow this right now). For parachains, this is always the ID of the parachain and no specified collator. For parathreads, this is based on the next item in the `ParathreadQueue` assigned to that core, and is `None` if there isn't one.
- `next_up_on_time_out(CoreIndex) -> Option<ScheduledCore>`: Return the next thing that will be scheduled on this core assuming it is currently occupied and the candidate occupying it timed out. Returns in `ScheduledCore` format (todo: link to Runtime APIs page; linkcheck doesn't allow this right now). For parachains, this is always the ID of the parachain and no specified collator. For parathreads, this is based on the next item in the `ParathreadQueue` assigned to that core, or if there isn't one, the claim that is currently occupying the core. Otherwise `None`. - `next_up_on_time_out(CoreIndex) -> Option<ScheduledCore>`: Return the next thing that will be scheduled on this core assuming it is currently occupied and the candidate occupying it timed out. Returns in `ScheduledCore` format (todo: link to Runtime APIs page; linkcheck doesn't allow this right now). For parachains, this is always the ID of the parachain and no specified collator. For parathreads, this is based on the next item in the `ParathreadQueue` assigned to that core, or if there isn't one, the claim that is currently occupying the core. Otherwise `None`.
- `clear()`:
- Free all scheduled cores and return parathread claims to queue, with retries incremented. Skip parathreads which no longer exist under paras.
...@@ -124,7 +124,11 @@ decl_module! { ...@@ -124,7 +124,11 @@ decl_module! {
let freed = freed_concluded.into_iter().map(|c| (c, FreedReason::Concluded)) let freed = freed_concluded.into_iter().map(|c| (c, FreedReason::Concluded))
.chain(freed_timeout.into_iter().map(|c| (c, FreedReason::TimedOut))); .chain(freed_timeout.into_iter().map(|c| (c, FreedReason::TimedOut)));
<scheduler::Module<T>>::schedule(freed); <scheduler::Module<T>>::clear();
<scheduler::Module<T>>::schedule(
freed,
<frame_system::Module<T>>::block_number(),
);
let backed_candidates = limit_backed_candidates::<T>(backed_candidates); let backed_candidates = limit_backed_candidates::<T>(backed_candidates);
let backed_candidates_len = backed_candidates.len() as Weight; let backed_candidates_len = backed_candidates.len() as Weight;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
use sp_std::prelude::*; use sp_std::prelude::*;
use sp_std::collections::btree_map::BTreeMap; use sp_std::collections::btree_map::BTreeMap;
use sp_runtime::traits::One;
use primitives::v1::{ use primitives::v1::{
ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, ValidationData, ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, ValidationData,
Id as ParaId, OccupiedCoreAssumption, SessionIndex, ValidationCode, Id as ParaId, OccupiedCoreAssumption, SessionIndex, ValidationCode,
...@@ -39,8 +40,10 @@ pub fn validator_groups<T: initializer::Config>() -> ( ...@@ -39,8 +40,10 @@ pub fn validator_groups<T: initializer::Config>() -> (
Vec<Vec<ValidatorIndex>>, Vec<Vec<ValidatorIndex>>,
GroupRotationInfo<T::BlockNumber>, GroupRotationInfo<T::BlockNumber>,
) { ) {
let now = <frame_system::Module<T>>::block_number() + One::one();
let groups = <scheduler::Module<T>>::validator_groups(); let groups = <scheduler::Module<T>>::validator_groups();
let rotation_info = <scheduler::Module<T>>::group_rotation_info(); let rotation_info = <scheduler::Module<T>>::group_rotation_info(now);
(groups, rotation_info) (groups, rotation_info)
} }
...@@ -51,7 +54,11 @@ pub fn availability_cores<T: initializer::Config>() -> Vec<CoreState<T::Hash, T: ...@@ -51,7 +54,11 @@ pub fn availability_cores<T: initializer::Config>() -> Vec<CoreState<T::Hash, T:
let parachains = <paras::Module<T>>::parachains(); let parachains = <paras::Module<T>>::parachains();
let config = <configuration::Module<T>>::config(); let config = <configuration::Module<T>>::config();
let rotation_info = <scheduler::Module<T>>::group_rotation_info(); let now = <frame_system::Module<T>>::block_number() + One::one();
<scheduler::Module<T>>::clear();
<scheduler::Module<T>>::schedule(Vec::new(), now);
let rotation_info = <scheduler::Module<T>>::group_rotation_info(now);
let time_out_at = |backed_in_number, availability_period| { let time_out_at = |backed_in_number, availability_period| {
let time_out_at = backed_in_number + availability_period; let time_out_at = backed_in_number + availability_period;
......
...@@ -46,7 +46,7 @@ use frame_support::{ ...@@ -46,7 +46,7 @@ use frame_support::{
weights::Weight, weights::Weight,
}; };
use parity_scale_codec::{Encode, Decode}; use parity_scale_codec::{Encode, Decode};
use sp_runtime::traits::Saturating; use sp_runtime::traits::{One, Saturating};
use rand::{SeedableRng, seq::SliceRandom}; use rand::{SeedableRng, seq::SliceRandom};
use rand_chacha::ChaCha20Rng; use rand_chacha::ChaCha20Rng;
...@@ -183,10 +183,18 @@ decl_storage! { ...@@ -183,10 +183,18 @@ decl_storage! {
/// Bounded by the number of parathread cores and scheduling lookahead. Reasonably, 10 * 50 = 500. /// Bounded by the number of parathread cores and scheduling lookahead. Reasonably, 10 * 50 = 500.
ParathreadClaimIndex: Vec<ParaId>; ParathreadClaimIndex: Vec<ParaId>;
/// The block number where the session start occurred. Used to track how many group rotations have occurred. /// The block number where the session start occurred. Used to track how many group rotations have occurred.
///
/// Note that in the context of parachains modules the session change is signalled during
/// the block and enacted at the end of the block (at the finalization stage, to be exact).
/// Thus for all intents and purposes the effect of the session change is observed at the
/// block following the session change, block number of which we save in this storage value.
SessionStartBlock get(fn session_start_block): T::BlockNumber; SessionStartBlock get(fn session_start_block): T::BlockNumber;
/// Currently scheduled cores - free but up to be occupied. /// Currently scheduled cores - free but up to be occupied.
/// ///
/// Bounded by the number of cores: one for each parachain and parathread multiplexer. /// Bounded by the number of cores: one for each parachain and parathread multiplexer.
///
/// The value contained here will not be valid after the end of a block. Runtime APIs should be used to determine scheduled cores/
/// for the upcoming block.
Scheduled get(fn scheduled): Vec<CoreAssignment>; // sorted ascending by CoreIndex. Scheduled get(fn scheduled): Vec<CoreAssignment>; // sorted ascending by CoreIndex.
} }
} }
...@@ -205,30 +213,11 @@ decl_module! { ...@@ -205,30 +213,11 @@ decl_module! {
impl<T: Config> Module<T> { impl<T: Config> Module<T> {
/// Called by the initializer to initialize the scheduler module. /// Called by the initializer to initialize the scheduler module.
pub(crate) fn initializer_initialize(_now: T::BlockNumber) -> Weight { pub(crate) fn initializer_initialize(_now: T::BlockNumber) -> Weight {
// Free all scheduled cores and return parathread claims to queue, with retries incremented.
let config = <configuration::Module<T>>::config();
ParathreadQueue::mutate(|queue| {
for core_assignment in Scheduled::take() {
if let AssignmentKind::Parathread(collator, retries) = core_assignment.kind {
let entry = ParathreadEntry {
claim: ParathreadClaim(core_assignment.para_id, collator),
retries: retries + 1,
};
if entry.retries <= config.parathread_retries {
queue.enqueue_entry(entry, config.parathread_cores);
}
}
}
});
Self::schedule(Vec::new());
0 0
} }
/// Called by the initializer to finalize the scheduler module. /// Called by the initializer to finalize the scheduler module.
pub(crate) fn initializer_finalize() {} pub(crate) fn initializer_finalize() { }
/// Called by the initializer to note that a new session has started. /// Called by the initializer to note that a new session has started.
pub(crate) fn initializer_on_new_session(notification: &SessionChangeNotification<T::BlockNumber>) { pub(crate) fn initializer_on_new_session(notification: &SessionChangeNotification<T::BlockNumber>) {
...@@ -250,7 +239,6 @@ impl<T: Config> Module<T> { ...@@ -250,7 +239,6 @@ impl<T: Config> Module<T> {
}, },
); );
<SessionStartBlock<T>>::set(<frame_system::Module<T>>::block_number());
AvailabilityCores::mutate(|cores| { AvailabilityCores::mutate(|cores| {
// clear all occupied cores. // clear all occupied cores.
for maybe_occupied in cores.iter_mut() { for maybe_occupied in cores.iter_mut() {
...@@ -337,6 +325,9 @@ impl<T: Config> Module<T> { ...@@ -337,6 +325,9 @@ impl<T: Config> Module<T> {
} }
}); });
ParathreadQueue::set(thread_queue); ParathreadQueue::set(thread_queue);
let now = <frame_system::Module<T>>::block_number() + One::one();
<SessionStartBlock<T>>::set(now);
} }
/// Add a parathread claim to the queue. If there is a competing claim in the queue or currently /// Add a parathread claim to the queue. If there is a competing claim in the queue or currently
...@@ -375,7 +366,10 @@ impl<T: Config> Module<T> { ...@@ -375,7 +366,10 @@ impl<T: Config> Module<T> {
/// Schedule all unassigned cores, where possible. Provide a list of cores that should be considered /// Schedule all unassigned cores, where possible. Provide a list of cores that should be considered
/// newly-freed along with the reason for them being freed. The list is assumed to be sorted in /// newly-freed along with the reason for them being freed. The list is assumed to be sorted in
/// ascending order by core index. /// ascending order by core index.
pub(crate) fn schedule(just_freed_cores: impl IntoIterator<Item = (CoreIndex, FreedReason)>) { pub(crate) fn schedule(
just_freed_cores: impl IntoIterator<Item = (CoreIndex, FreedReason)>,
now: T::BlockNumber,
) {
let mut cores = AvailabilityCores::get(); let mut cores = AvailabilityCores::get();
let config = <configuration::Module<T>>::config(); let config = <configuration::Module<T>>::config();
...@@ -411,7 +405,6 @@ impl<T: Config> Module<T> { ...@@ -411,7 +405,6 @@ impl<T: Config> Module<T> {
let parachains = <paras::Module<T>>::parachains(); let parachains = <paras::Module<T>>::parachains();
let mut scheduled = Scheduled::get(); let mut scheduled = Scheduled::get();
let mut parathread_queue = ParathreadQueue::get(); let mut parathread_queue = ParathreadQueue::get();
let now = <frame_system::Module<T>>::block_number();
if ValidatorGroups::get().is_empty() { return } if ValidatorGroups::get().is_empty() { return }
...@@ -638,9 +631,8 @@ impl<T: Config> Module<T> { ...@@ -638,9 +631,8 @@ impl<T: Config> Module<T> {
} }
/// Returns a helper for determining group rotation. /// Returns a helper for determining group rotation.
pub(crate) fn group_rotation_info() -> GroupRotationInfo<T::BlockNumber> { pub(crate) fn group_rotation_info(now: T::BlockNumber) -> GroupRotationInfo<T::BlockNumber> {
let session_start_block = Self::session_start_block(); let session_start_block = Self::session_start_block();
let now = <frame_system::Module<T>>::block_number();
let group_rotation_frequency = <configuration::Module<T>>::config() let group_rotation_frequency = <configuration::Module<T>>::config()
.group_rotation_frequency; .group_rotation_frequency;
...@@ -716,6 +708,27 @@ impl<T: Config> Module<T> { ...@@ -716,6 +708,27 @@ impl<T: Config> Module<T> {
}) })
} }
} }
// Free all scheduled cores and return parathread claims to queue, with retries incremented.
pub(crate) fn clear() {
let config = <configuration::Module<T>>::config();
ParathreadQueue::mutate(|queue| {
for core_assignment in Scheduled::take() {
if let AssignmentKind::Parathread(collator, retries) = core_assignment.kind {
if !<paras::Module<T>>::is_parathread(core_assignment.para_id) { continue }
let entry = ParathreadEntry {
claim: ParathreadClaim(core_assignment.para_id, collator),
retries: retries + 1,
};
if entry.retries <= config.parathread_retries {
queue.enqueue_entry(entry, config.parathread_cores);
}
}
}
});
}
} }
#[cfg(test)] #[cfg(test)]
...@@ -741,21 +754,42 @@ mod tests { ...@@ -741,21 +754,42 @@ mod tests {
Scheduler::initializer_finalize(); Scheduler::initializer_finalize();
Paras::initializer_finalize(); Paras::initializer_finalize();
System::on_finalize(b);
System::on_initialize(b + 1);
System::set_block_number(b + 1);
if let Some(notification) = new_session(b + 1) { if let Some(notification) = new_session(b + 1) {
Paras::initializer_on_new_session(&notification); Paras::initializer_on_new_session(&notification);
Scheduler::initializer_on_new_session(&notification); Scheduler::initializer_on_new_session(&notification);
} }
System::on_finalize(b);
System::on_initialize(b + 1);
System::set_block_number(b + 1);
Paras::initializer_initialize(b + 1); Paras::initializer_initialize(b + 1);
Scheduler::initializer_initialize(b + 1); Scheduler::initializer_initialize(b + 1);
// In the real runt;me this is expected to be called by the `InclusionInherent` module.
Scheduler::clear();
Scheduler::schedule(Vec::new(), b + 1);
} }
} }
fn run_to_end_of_block(
to: BlockNumber,
new_session: impl Fn(BlockNumber) -> Option<SessionChangeNotification<BlockNumber>>,
) {
run_to_block(to, &new_session);
Scheduler::initializer_finalize();
Paras::initializer_finalize();
if let Some(notification) = new_session(to + 1) {
Paras::initializer_on_new_session(&notification);
Scheduler::initializer_on_new_session(&notification);
}
System::on_finalize(to);
}
fn default_config() -> HostConfiguration<BlockNumber> { fn default_config() -> HostConfiguration<BlockNumber> {
HostConfiguration { HostConfiguration {
parathread_cores: 3, parathread_cores: 3,
...@@ -1334,11 +1368,14 @@ mod tests { ...@@ -1334,11 +1368,14 @@ mod tests {
} }
// now note that cores 0, 2, and 3 were freed. // now note that cores 0, 2, and 3 were freed.
Scheduler::schedule(vec![ Scheduler::schedule(
(CoreIndex(0), FreedReason::Concluded), vec![
(CoreIndex(2), FreedReason::Concluded), (CoreIndex(0), FreedReason::Concluded),
(CoreIndex(3), FreedReason::TimedOut), // should go back on queue. (CoreIndex(2), FreedReason::Concluded),
]); (CoreIndex(3), FreedReason::TimedOut), // should go back on queue.
],
3
);
{ {
let scheduled = Scheduler::scheduled(); let scheduled = Scheduler::scheduled();
...@@ -1455,10 +1492,13 @@ mod tests { ...@@ -1455,10 +1492,13 @@ mod tests {
run_to_block(3, |_| None); run_to_block(3, |_| None);
// now note that cores 0 and 2 were freed. // now note that cores 0 and 2 were freed.
Scheduler::schedule(vec![ Scheduler::schedule(
(CoreIndex(0), FreedReason::Concluded), vec![
(CoreIndex(2), FreedReason::Concluded), (CoreIndex(0), FreedReason::Concluded),
]); (CoreIndex(2), FreedReason::Concluded),
],
3,
);
{ {
let scheduled = Scheduler::scheduled(); let scheduled = Scheduler::scheduled();
...@@ -1557,8 +1597,6 @@ mod tests { ...@@ -1557,8 +1597,6 @@ mod tests {
// one block before first rotation. // one block before first rotation.
run_to_block(rotation_frequency, |_| None); run_to_block(rotation_frequency, |_| None);
let rotations_since_session_start = (rotation_frequency - session_start_block) / rotation_frequency;
assert_eq!(rotations_since_session_start, 0);
assert_groups_rotated(0); assert_groups_rotated(0);
// first rotation. // first rotation.
...@@ -2038,4 +2076,157 @@ mod tests { ...@@ -2038,4 +2076,157 @@ mod tests {
} }
}); });
} }
#[test]
fn session_change_requires_reschedule_dropping_removed_paras() {
let genesis_config = MockGenesisConfig {
configuration: crate::configuration::GenesisConfig {
config: default_config(),
..Default::default()
},
..Default::default()
};
assert_eq!(default_config().parathread_cores, 3);
new_test_ext(genesis_config).execute_with(|| {
let chain_a = ParaId::from(1);
let chain_b = ParaId::from(2);
// ensure that we have 5 groups by registering 2 parachains.
Paras::schedule_para_initialize(chain_a, ParaGenesisArgs {
genesis_head: Vec::new().into(),
validation_code: Vec::new().into(),
parachain: true,
});
Paras::schedule_para_initialize(chain_b, ParaGenesisArgs {
genesis_head: Vec::new().into(),
validation_code: Vec::new().into(),
parachain: true,
});
run_to_block(1, |number| match number {
1 => Some(SessionChangeNotification {
new_config: default_config(),
validators: vec![
ValidatorId::from(Sr25519Keyring::Alice.public()),
ValidatorId::from(Sr25519Keyring::Bob.public()),
ValidatorId::from(Sr25519Keyring::Charlie.public()),
ValidatorId::from(Sr25519Keyring::Dave.public()),
ValidatorId::from(Sr25519Keyring::Eve.public()),
ValidatorId::from(Sr25519Keyring::Ferdie.public()),
ValidatorId::from(Sr25519Keyring::One.public()),
],