From b74353d3e94ca685e953f5138f0676214140dcdd Mon Sep 17 00:00:00 2001 From: eskimor <eskimor@users.noreply.github.com> Date: Wed, 20 Mar 2024 14:53:55 +0100 Subject: [PATCH] Fix algorithmic complexity of on-demand scheduler with regards to number of cores. (#3190) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We witnessed really poor performance on Rococo, where we ended up with 50 on-demand cores. This was due to the fact that for each core the full queue was processed. With this change full queue processing will happen way less often (most of the time complexity is O(1) or O(log(n))) and if it happens then only for one core (in expectation). Also spot price is now updated before each order to ensure economic back pressure. TODO: - [x] Implement - [x] Basic tests - [x] Add more tests (see todos) - [x] Run benchmark to confirm better performance, first results suggest > 100x faster. - [x] Write migrations - [x] Bump scale-info version and remove patch in Cargo.toml - [x] Write PR docs: on-demand performance improved, more on-demand cores are now non problematic anymore. If need by also the max queue size can be increased again. (Maybe not to 10k) Optional: Performance can be improved even more, if we called `pop_assignment_for_core()`, before calling `report_processed` (Avoid needless affinity drops). The effect gets smaller the larger the claim queue and I would only go for it, if it does not add complexity to the scheduler. --------- Co-authored-by: eskimor <eskimor@no-such-url.com> Co-authored-by: antonva <anton.asgeirsson@parity.io> Co-authored-by: command-bot <> Co-authored-by: Anton Vilhelm Ãsgeirsson <antonva@users.noreply.github.com> Co-authored-by: ordian <write@reusable.software> --- Cargo.lock | 8 +- polkadot/primitives/src/lib.rs | 3 +- polkadot/primitives/src/v6/mod.rs | 7 + polkadot/runtime/parachains/Cargo.toml | 2 +- .../src/assigner_on_demand/benchmarking.rs | 11 +- .../src/assigner_on_demand/migration.rs | 181 +++++ .../parachains/src/assigner_on_demand/mod.rs | 690 ++++++++++++------ .../src/assigner_on_demand/tests.rs | 514 +++++++------ .../runtime/parachains/src/configuration.rs | 10 +- polkadot/runtime/rococo/src/lib.rs | 1 + .../runtime_parachains_assigner_on_demand.rs | 72 +- .../runtime_parachains_assigner_on_demand.rs | 76 +- prdoc/pr_3190.prdoc | 17 + 13 files changed, 1046 insertions(+), 546 deletions(-) create mode 100644 polkadot/runtime/parachains/src/assigner_on_demand/migration.rs create mode 100644 prdoc/pr_3190.prdoc diff --git a/Cargo.lock b/Cargo.lock index 9e48887e17a..5401dc5ecfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17005,9 +17005,9 @@ dependencies = [ [[package]] name = "scale-info" -version = "2.10.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f7d66a1128282b7ef025a8ead62a4a9fcf017382ec53b8ffbf4d7bf77bd3c60" +checksum = "2ef2175c2907e7c8bc0a9c3f86aeb5ec1f3b275300ad58a44d0c3ae379a5e52e" dependencies = [ "bitvec", "cfg-if", @@ -17019,9 +17019,9 @@ dependencies = [ [[package]] name = "scale-info-derive" -version = "2.10.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abf2c68b89cafb3b8d918dd07b42be0da66ff202cf1155c5739a4e0c1ea0dc19" +checksum = "634d9b8eb8fd61c5cdd3390d9b2132300a7e7618955b98b8416f118c1b4e144f" dependencies = [ "proc-macro-crate 1.3.1", "proc-macro2", diff --git a/polkadot/primitives/src/lib.rs b/polkadot/primitives/src/lib.rs index 2ddd9b58dfe..745195ce092 100644 --- a/polkadot/primitives/src/lib.rs +++ b/polkadot/primitives/src/lib.rs @@ -58,7 +58,8 @@ pub use v6::{ ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation, ValidityError, ASSIGNMENT_KEY_TYPE_ID, LEGACY_MIN_BACKING_VOTES, LOWEST_PUBLIC_ID, MAX_CODE_SIZE, MAX_HEAD_DATA_SIZE, MAX_POV_SIZE, MIN_CODE_SIZE, - ON_DEMAND_DEFAULT_QUEUE_MAX_SIZE, PARACHAINS_INHERENT_IDENTIFIER, PARACHAIN_KEY_TYPE_ID, + ON_DEMAND_DEFAULT_QUEUE_MAX_SIZE, ON_DEMAND_MAX_QUEUE_MAX_SIZE, PARACHAINS_INHERENT_IDENTIFIER, + PARACHAIN_KEY_TYPE_ID, }; #[cfg(feature = "std")] diff --git a/polkadot/primitives/src/v6/mod.rs b/polkadot/primitives/src/v6/mod.rs index 742dbed1cd8..9e7f910314c 100644 --- a/polkadot/primitives/src/v6/mod.rs +++ b/polkadot/primitives/src/v6/mod.rs @@ -399,6 +399,13 @@ pub const MAX_POV_SIZE: u32 = 5 * 1024 * 1024; /// Can be adjusted in configuration. pub const ON_DEMAND_DEFAULT_QUEUE_MAX_SIZE: u32 = 10_000; +/// Maximum for maximum queue size. +/// +/// Setting `on_demand_queue_max_size` to a value higher than this is unsound. This is more a +/// theoretical limit, just below enough what the target type supports, so comparisons are possible +/// even with indices that are overflowing the underyling type. +pub const ON_DEMAND_MAX_QUEUE_MAX_SIZE: u32 = 1_000_000_000; + /// Backing votes threshold used from the host prior to runtime API version 6 and from the runtime /// prior to v9 configuration migration. pub const LEGACY_MIN_BACKING_VOTES: u32 = 2; diff --git a/polkadot/runtime/parachains/Cargo.toml b/polkadot/runtime/parachains/Cargo.toml index 61040145476..6e693b83ae1 100644 --- a/polkadot/runtime/parachains/Cargo.toml +++ b/polkadot/runtime/parachains/Cargo.toml @@ -15,7 +15,7 @@ bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] } parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive", "max-encoded-len"] } log = { workspace = true } rustc-hex = { version = "2.1.0", default-features = false } -scale-info = { version = "2.10.0", default-features = false, features = ["derive"] } +scale-info = { version = "2.11.0", default-features = false, features = ["derive"] } serde = { features = ["alloc", "derive"], workspace = true } derive_more = "0.99.17" bitflags = "1.3.2" diff --git a/polkadot/runtime/parachains/src/assigner_on_demand/benchmarking.rs b/polkadot/runtime/parachains/src/assigner_on_demand/benchmarking.rs index 8360e7a78d0..779d6f04e39 100644 --- a/polkadot/runtime/parachains/src/assigner_on_demand/benchmarking.rs +++ b/polkadot/runtime/parachains/src/assigner_on_demand/benchmarking.rs @@ -70,11 +70,7 @@ mod benchmarks { let para_id = ParaId::from(111u32); init_parathread::<T>(para_id); T::Currency::make_free_balance_be(&caller, BalanceOf::<T>::max_value()); - let order = EnqueuedOrder::new(para_id); - - for _ in 0..s { - Pallet::<T>::add_on_demand_order(order.clone(), QueuePushDirection::Back).unwrap(); - } + Pallet::<T>::populate_queue(para_id, s); #[extrinsic_call] _(RawOrigin::Signed(caller.into()), BalanceOf::<T>::max_value(), para_id) @@ -87,11 +83,8 @@ mod benchmarks { let para_id = ParaId::from(111u32); init_parathread::<T>(para_id); T::Currency::make_free_balance_be(&caller, BalanceOf::<T>::max_value()); - let order = EnqueuedOrder::new(para_id); - for _ in 0..s { - Pallet::<T>::add_on_demand_order(order.clone(), QueuePushDirection::Back).unwrap(); - } + Pallet::<T>::populate_queue(para_id, s); #[extrinsic_call] _(RawOrigin::Signed(caller.into()), BalanceOf::<T>::max_value(), para_id) diff --git a/polkadot/runtime/parachains/src/assigner_on_demand/migration.rs b/polkadot/runtime/parachains/src/assigner_on_demand/migration.rs new file mode 100644 index 00000000000..5071653377d --- /dev/null +++ b/polkadot/runtime/parachains/src/assigner_on_demand/migration.rs @@ -0,0 +1,181 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +//! A module that is responsible for migration of storage. +use super::*; +use frame_support::{ + migrations::VersionedMigration, pallet_prelude::ValueQuery, storage_alias, + traits::OnRuntimeUpgrade, weights::Weight, +}; + +mod v0 { + use super::*; + use sp_std::collections::vec_deque::VecDeque; + + #[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone)] + pub(super) struct EnqueuedOrder { + pub para_id: ParaId, + } + + /// Keeps track of the multiplier used to calculate the current spot price for the on demand + /// assigner. + /// NOTE: Ignoring the `OnEmpty` field for the migration. + #[storage_alias] + pub(super) type SpotTraffic<T: Config> = StorageValue<Pallet<T>, FixedU128, ValueQuery>; + + /// The order storage entry. Uses a VecDeque to be able to push to the front of the + /// queue from the scheduler on session boundaries. + /// NOTE: Ignoring the `OnEmpty` field for the migration. + #[storage_alias] + pub(super) type OnDemandQueue<T: Config> = + StorageValue<Pallet<T>, VecDeque<EnqueuedOrder>, ValueQuery>; +} + +mod v1 { + use super::*; + + use crate::assigner_on_demand::LOG_TARGET; + + /// Migration to V1 + pub struct UncheckedMigrateToV1<T>(sp_std::marker::PhantomData<T>); + impl<T: Config> OnRuntimeUpgrade for UncheckedMigrateToV1<T> { + fn on_runtime_upgrade() -> Weight { + let mut weight: Weight = Weight::zero(); + + // Migrate the current traffic value + let config = <configuration::Pallet<T>>::config(); + QueueStatus::<T>::mutate(|mut queue_status| { + Pallet::<T>::update_spot_traffic(&config, &mut queue_status); + + let v0_queue = v0::OnDemandQueue::<T>::take(); + // Process the v0 queue into v1. + v0_queue.into_iter().for_each(|enqueued_order| { + // Readding the old orders will use the new systems. + Pallet::<T>::add_on_demand_order( + queue_status, + enqueued_order.para_id, + QueuePushDirection::Back, + ); + }); + }); + + // Remove the old storage. + v0::OnDemandQueue::<T>::kill(); // 1 write + v0::SpotTraffic::<T>::kill(); // 1 write + + // Config read + weight.saturating_accrue(T::DbWeight::get().reads(1)); + // QueueStatus read write (update_spot_traffic) + weight.saturating_accrue(T::DbWeight::get().reads_writes(1, 1)); + // Kill x 2 + weight.saturating_accrue(T::DbWeight::get().writes(2)); + + log::info!(target: LOG_TARGET, "Migrated on demand assigner storage to v1"); + weight + } + + #[cfg(feature = "try-runtime")] + fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::TryRuntimeError> { + let n: u32 = v0::OnDemandQueue::<T>::get().len() as u32; + + log::info!( + target: LOG_TARGET, + "Number of orders waiting in the queue before: {n}", + ); + + Ok(n.encode()) + } + + #[cfg(feature = "try-runtime")] + fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::TryRuntimeError> { + log::info!(target: LOG_TARGET, "Running post_upgrade()"); + + ensure!( + v0::OnDemandQueue::<T>::get().is_empty(), + "OnDemandQueue should be empty after the migration" + ); + + let expected_len = u32::decode(&mut &state[..]).unwrap(); + let queue_status_size = QueueStatus::<T>::get().size(); + ensure!( + expected_len == queue_status_size, + "Number of orders should be the same before and after migration" + ); + + let n_affinity_entries: u32 = + AffinityEntries::<T>::iter().map(|(_index, heap)| heap.len() as u32).sum(); + let n_para_id_affinity: u32 = ParaIdAffinity::<T>::iter() + .map(|(_para_id, affinity)| affinity.count as u32) + .sum(); + ensure!( + n_para_id_affinity == n_affinity_entries, + "Number of affinity entries should be the same as the counts in ParaIdAffinity" + ); + + Ok(()) + } + } +} + +/// Migrate `V0` to `V1` of the storage format. +pub type MigrateV0ToV1<T> = VersionedMigration< + 0, + 1, + v1::UncheckedMigrateToV1<T>, + Pallet<T>, + <T as frame_system::Config>::DbWeight, +>; + +#[cfg(test)] +mod tests { + use super::{v0, v1, OnRuntimeUpgrade, Weight}; + use crate::mock::{new_test_ext, MockGenesisConfig, OnDemandAssigner, Test}; + use primitives::Id as ParaId; + + #[test] + fn migration_to_v1_preserves_queue_ordering() { + new_test_ext(MockGenesisConfig::default()).execute_with(|| { + // Place orders for paraids 1..5 + for i in 1..=5 { + v0::OnDemandQueue::<Test>::mutate(|queue| { + queue.push_back(v0::EnqueuedOrder { para_id: ParaId::new(i) }) + }); + } + + // Queue has 5 orders + let old_queue = v0::OnDemandQueue::<Test>::get(); + assert_eq!(old_queue.len(), 5); + // New queue has 0 orders + assert_eq!(OnDemandAssigner::get_queue_status().size(), 0); + + // For tests, db weight is zero. + assert_eq!( + <v1::UncheckedMigrateToV1<Test> as OnRuntimeUpgrade>::on_runtime_upgrade(), + Weight::zero() + ); + + // New queue has 5 orders + assert_eq!(OnDemandAssigner::get_queue_status().size(), 5); + + // Compare each entry from the old queue with the entry in the new queue. + old_queue.iter().zip(OnDemandAssigner::get_free_entries().iter()).for_each( + |(old_enq, new_enq)| { + assert_eq!(old_enq.para_id, new_enq.para_id); + }, + ); + }); + } +} diff --git a/polkadot/runtime/parachains/src/assigner_on_demand/mod.rs b/polkadot/runtime/parachains/src/assigner_on_demand/mod.rs index bc450dc7812..c47c8745e65 100644 --- a/polkadot/runtime/parachains/src/assigner_on_demand/mod.rs +++ b/polkadot/runtime/parachains/src/assigner_on_demand/mod.rs @@ -16,22 +16,32 @@ //! The parachain on demand assignment module. //! -//! Implements a mechanism for taking in orders for pay as you go (PAYG) or on demand -//! parachain (previously parathreads) assignments. This module is not handled by the -//! initializer but is instead instantiated in the `construct_runtime` macro. +//! Implements a mechanism for taking in orders for on-demand parachain (previously parathreads) +//! assignments. This module is not handled by the initializer but is instead instantiated in the +//! `construct_runtime` macro. //! //! The module currently limits parallel execution of blocks from the same `ParaId` via //! a core affinity mechanism. As long as there exists an affinity for a `CoreIndex` for //! a specific `ParaId`, orders for blockspace for that `ParaId` will only be assigned to -//! that `CoreIndex`. This affinity mechanism can be removed if it can be shown that parallel -//! execution is valid. +//! that `CoreIndex`. +//! +//! NOTE: Once we have elastic scaling implemented we might want to extend this module to support +//! ignoring core affinity up to a certain extend. This should be opt-in though as the parachain +//! needs to support multiple cores in the same block. If we want to enable a single parachain +//! occupying multiple cores in on-demand, we will likely add a separate order type, where the +//! intent can be made explicit. mod benchmarking; +pub mod migration; mod mock_helpers; +extern crate alloc; + #[cfg(test)] mod tests; +use core::mem::take; + use crate::{configuration, paras, scheduler::common::Assignment}; use frame_support::{ @@ -43,13 +53,17 @@ use frame_support::{ }, }; use frame_system::pallet_prelude::*; -use primitives::{CoreIndex, Id as ParaId}; +use primitives::{CoreIndex, Id as ParaId, ON_DEMAND_MAX_QUEUE_MAX_SIZE}; use sp_runtime::{ traits::{One, SaturatedConversion}, FixedPointNumber, FixedPointOperand, FixedU128, Perbill, Saturating, }; -use sp_std::{collections::vec_deque::VecDeque, prelude::*}; +use alloc::collections::BinaryHeap; +use sp_std::{ + cmp::{Ord, Ordering, PartialOrd}, + prelude::*, +}; const LOG_TARGET: &str = "runtime::parachains::assigner-on-demand"; @@ -73,17 +87,116 @@ impl WeightInfo for TestWeightInfo { } } +/// Meta data for full queue. +/// +/// This includes elements with affinity and free entries. +/// +/// The actual queue is implemented via multiple priority queues. One for each core, for entries +/// which currently have a core affinity and one free queue, with entries without any affinity yet. +/// +/// The design aims to have most queue accessess be O(1) or O(log(N)). Absolute worst case is O(N). +/// Importantly this includes all accessess that happen in a single block. Even with 50 cores, the +/// total complexity of all operations in the block should maintain above complexities. In +/// particular O(N) stays O(N), it should never be O(N*cores). +/// +/// More concrete rundown on complexity: +/// +/// - insert: O(1) for placing an order, O(log(N)) for push backs. +/// - pop_assignment_for_core: O(log(N)), O(N) worst case: Can only happen for one core, next core +/// is already less work. +/// - report_processed & push back: If affinity dropped to 0, then O(N) in the worst case. Again +/// this divides per core. +/// +/// Reads still exist, also improved slightly, but worst case we fetch all entries. +#[derive(Encode, Decode, TypeInfo)] +struct QueueStatusType { + /// Last calculated traffic value. + traffic: FixedU128, + /// The next index to use. + next_index: QueueIndex, + /// Smallest index still in use. + /// + /// In case of a completely empty queue (free + affinity queues), `next_index - smallest_index + /// == 0`. + smallest_index: QueueIndex, + /// Indices that have been freed already. + /// + /// But have a hole to `smallest_index`, so we can not yet bump `smallest_index`. This binary + /// heap is roughly bounded in the number of on demand cores: + /// + /// For a single core, elements will always be processed in order. With each core added, a + /// level of out of order execution is added. + freed_indices: BinaryHeap<ReverseQueueIndex>, +} + +impl Default for QueueStatusType { + fn default() -> QueueStatusType { + QueueStatusType { + traffic: FixedU128::default(), + next_index: QueueIndex(0), + smallest_index: QueueIndex(0), + freed_indices: BinaryHeap::new(), + } + } +} + +impl QueueStatusType { + /// How many orders are queued in total? + /// + /// This includes entries which have core affinity. + fn size(&self) -> u32 { + self.next_index + .0 + .overflowing_sub(self.smallest_index.0) + .0 + .saturating_sub(self.freed_indices.len() as u32) + } + + /// Get current next index + /// + /// to use for an element newly pushed to the back of the queue. + fn push_back(&mut self) -> QueueIndex { + let QueueIndex(next_index) = self.next_index; + self.next_index = QueueIndex(next_index.overflowing_add(1).0); + QueueIndex(next_index) + } + + /// Push something to the front of the queue + fn push_front(&mut self) -> QueueIndex { + self.smallest_index = QueueIndex(self.smallest_index.0.overflowing_sub(1).0); + self.smallest_index + } + + /// The given index is no longer part of the queue. + /// + /// This updates `smallest_index` if need be. + fn consume_index(&mut self, removed_index: QueueIndex) { + if removed_index != self.smallest_index { + self.freed_indices.push(removed_index.reverse()); + return + } + let mut index = self.smallest_index.0.overflowing_add(1).0; + // Even more to advance? + while self.freed_indices.peek() == Some(&ReverseQueueIndex(index)) { + index = index.overflowing_add(1).0; + self.freed_indices.pop(); + } + self.smallest_index = QueueIndex(index); + } +} + /// Keeps track of how many assignments a scheduler currently has at a specific `CoreIndex` for a /// specific `ParaId`. #[derive(Encode, Decode, Default, Clone, Copy, TypeInfo)] #[cfg_attr(test, derive(PartialEq, RuntimeDebug))] -pub struct CoreAffinityCount { - core_idx: CoreIndex, +struct CoreAffinityCount { + core_index: CoreIndex, count: u32, } /// An indicator as to which end of the `OnDemandQueue` an assignment will be placed. -pub enum QueuePushDirection { +#[cfg_attr(test, derive(RuntimeDebug))] +enum QueuePushDirection { Back, Front, } @@ -93,9 +206,8 @@ type BalanceOf<T> = <<T as Config>::Currency as Currency<<T as frame_system::Config>::AccountId>>::Balance; /// Errors that can happen during spot traffic calculation. -#[derive(PartialEq)] -#[cfg_attr(feature = "std", derive(Debug))] -pub enum SpotTrafficCalculationErr { +#[derive(PartialEq, RuntimeDebug)] +enum SpotTrafficCalculationErr { /// The order queue capacity is at 0. QueueCapacityIsZero, /// The queue size is larger than the queue capacity. @@ -104,15 +216,85 @@ pub enum SpotTrafficCalculationErr { Division, } +/// Type used for priority indices. +// NOTE: The `Ord` implementation for this type is unsound in the general case. +// Do not use it for anything but it's intended purpose. +#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq, Copy)] +struct QueueIndex(u32); + +/// QueueIndex with reverse ordering. +/// +/// Same as `Reverse(QueueIndex)`, but with all the needed traits implemented. +#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq, Copy)] +struct ReverseQueueIndex(u32); + +impl QueueIndex { + fn reverse(self) -> ReverseQueueIndex { + ReverseQueueIndex(self.0) + } +} + +impl Ord for QueueIndex { + fn cmp(&self, other: &Self) -> Ordering { + let diff = self.0.overflowing_sub(other.0).0; + if diff == 0 { + Ordering::Equal + } else if diff <= ON_DEMAND_MAX_QUEUE_MAX_SIZE { + Ordering::Greater + } else { + Ordering::Less + } + } +} + +impl PartialOrd for QueueIndex { + fn partial_cmp(&self, other: &Self) -> Option<Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for ReverseQueueIndex { + fn cmp(&self, other: &Self) -> Ordering { + QueueIndex(other.0).cmp(&QueueIndex(self.0)) + } +} +impl PartialOrd for ReverseQueueIndex { + fn partial_cmp(&self, other: &Self) -> Option<Ordering> { + Some(self.cmp(&other)) + } +} + /// Internal representation of an order after it has been enqueued already. -#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone)] -pub(super) struct EnqueuedOrder { - pub para_id: ParaId, +/// +/// This data structure is provided for a min BinaryHeap (Ord compares in reverse order with regards +/// to its elements) +#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq)] +struct EnqueuedOrder { + para_id: ParaId, + idx: QueueIndex, } impl EnqueuedOrder { - pub fn new(para_id: ParaId) -> Self { - Self { para_id } + fn new(idx: QueueIndex, para_id: ParaId) -> Self { + Self { idx, para_id } + } +} + +impl PartialOrd for EnqueuedOrder { + fn partial_cmp(&self, other: &Self) -> Option<Ordering> { + match other.idx.partial_cmp(&self.idx) { + Some(Ordering::Equal) => other.para_id.partial_cmp(&self.para_id), + o => o, + } + } +} + +impl Ord for EnqueuedOrder { + fn cmp(&self, other: &Self) -> Ordering { + match other.idx.cmp(&self.idx) { + Ordering::Equal => other.para_id.cmp(&self.para_id), + o => o, + } } } @@ -121,8 +303,11 @@ pub mod pallet { use super::*; + const STORAGE_VERSION: StorageVersion = StorageVersion::new(1); + #[pallet::pallet] #[pallet::without_storage_info] + #[pallet::storage_version(STORAGE_VERSION)] pub struct Pallet<T>(_); #[pallet::config] @@ -141,36 +326,44 @@ pub mod pallet { type TrafficDefaultValue: Get<FixedU128>; } - /// Creates an empty spot traffic value if one isn't present in storage already. + /// Creates an empty queue status for an empty queue with initial traffic value. #[pallet::type_value] - pub fn SpotTrafficOnEmpty<T: Config>() -> FixedU128 { - T::TrafficDefaultValue::get() + pub(super) fn QueueStatusOnEmpty<T: Config>() -> QueueStatusType { + QueueStatusType { traffic: T::TrafficDefaultValue::get(), ..Default::default() } } - /// Creates an empty on demand queue if one isn't present in storage already. #[pallet::type_value] - pub(super) fn OnDemandQueueOnEmpty<T: Config>() -> VecDeque<EnqueuedOrder> { - VecDeque::new() + pub(super) fn EntriesOnEmpty<T: Config>() -> BinaryHeap<EnqueuedOrder> { + BinaryHeap::new() } - /// Keeps track of the multiplier used to calculate the current spot price for the on demand - /// assigner. - #[pallet::storage] - pub(super) type SpotTraffic<T: Config> = - StorageValue<_, FixedU128, ValueQuery, SpotTrafficOnEmpty<T>>; - - /// The order storage entry. Uses a VecDeque to be able to push to the front of the - /// queue from the scheduler on session boundaries. - #[pallet::storage] - pub(super) type OnDemandQueue<T: Config> = - StorageValue<_, VecDeque<EnqueuedOrder>, ValueQuery, OnDemandQueueOnEmpty<T>>; - /// Maps a `ParaId` to `CoreIndex` and keeps track of how many assignments the scheduler has in /// it's lookahead. Keeping track of this affinity prevents parallel execution of the same /// `ParaId` on two or more `CoreIndex`es. #[pallet::storage] pub(super) type ParaIdAffinity<T: Config> = - StorageMap<_, Twox256, ParaId, CoreAffinityCount, OptionQuery>; + StorageMap<_, Twox64Concat, ParaId, CoreAffinityCount, OptionQuery>; + + /// Overall status of queue (both free + affinity entries) + #[pallet::storage] + pub(super) type QueueStatus<T: Config> = + StorageValue<_, QueueStatusType, ValueQuery, QueueStatusOnEmpty<T>>; + + /// Priority queue for all orders which don't yet (or not any more) have any core affinity. + #[pallet::storage] + pub(super) type FreeEntries<T: Config> = + StorageValue<_, BinaryHeap<EnqueuedOrder>, ValueQuery, EntriesOnEmpty<T>>; + + /// Queue entries that are currently bound to a particular core due to core affinity. + #[pallet::storage] + pub(super) type AffinityEntries<T: Config> = StorageMap< + _, + Twox64Concat, + CoreIndex, + BinaryHeap<EnqueuedOrder>, + ValueQuery, + EntriesOnEmpty<T>, + >; #[pallet::event] #[pallet::generate_deposit(pub(super) fn deposit_event)] @@ -183,9 +376,6 @@ pub mod pallet { #[pallet::error] pub enum Error<T> { - /// The `ParaId` supplied to the `place_order` call is not a valid `ParaThread`, making the - /// call is invalid. - InvalidParaId, /// The order queue is full, `place_order` will not continue. QueueFull, /// The current spot price is higher than the max amount specified in the `place_order` @@ -197,45 +387,14 @@ pub mod pallet { impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> { fn on_initialize(_now: BlockNumberFor<T>) -> Weight { let config = <configuration::Pallet<T>>::config(); - // Calculate spot price multiplier and store it. - let old_traffic = SpotTraffic::<T>::get(); - match Self::calculate_spot_traffic( - old_traffic, - config.scheduler_params.on_demand_queue_max_size, - Self::queue_size(), - config.scheduler_params.on_demand_target_queue_utilization, - config.scheduler_params.on_demand_fee_variability, - ) { - Ok(new_traffic) => { - // Only update storage on change - if new_traffic != old_traffic { - SpotTraffic::<T>::set(new_traffic); - Pallet::<T>::deposit_event(Event::<T>::SpotTrafficSet { - traffic: new_traffic, - }); - return T::DbWeight::get().reads_writes(2, 1) - } - }, - Err(SpotTrafficCalculationErr::QueueCapacityIsZero) => { - log::debug!( - target: LOG_TARGET, - "Error calculating spot traffic: The order queue capacity is at 0." - ); - }, - Err(SpotTrafficCalculationErr::QueueSizeLargerThanCapacity) => { - log::debug!( - target: LOG_TARGET, - "Error calculating spot traffic: The queue size is larger than the queue capacity." - ); - }, - Err(SpotTrafficCalculationErr::Division) => { - log::debug!( - target: LOG_TARGET, - "Error calculating spot traffic: Arithmetic error during division, either division by 0 or over/underflow." - ); - }, - }; - T::DbWeight::get().reads_writes(2, 0) + // We need to update the spot traffic on block initialize in order to account for idle + // blocks. + QueueStatus::<T>::mutate(|queue_status| { + Self::update_spot_traffic(&config, queue_status); + }); + + // 2 reads in config and queuestatus, at maximum 1 write to queuestatus. + T::DbWeight::get().reads_writes(2, 1) } } @@ -258,7 +417,7 @@ pub mod pallet { /// Events: /// - `SpotOrderPlaced` #[pallet::call_index(0)] - #[pallet::weight(<T as Config>::WeightInfo::place_order_allow_death(OnDemandQueue::<T>::get().len() as u32))] + #[pallet::weight(<T as Config>::WeightInfo::place_order_allow_death(QueueStatus::<T>::get().size()))] pub fn place_order_allow_death( origin: OriginFor<T>, max_amount: BalanceOf<T>, @@ -285,7 +444,7 @@ pub mod pallet { /// Events: /// - `SpotOrderPlaced` #[pallet::call_index(1)] - #[pallet::weight(<T as Config>::WeightInfo::place_order_keep_alive(OnDemandQueue::<T>::get().len() as u32))] + #[pallet::weight(<T as Config>::WeightInfo::place_order_keep_alive(QueueStatus::<T>::get().size()))] pub fn place_order_keep_alive( origin: OriginFor<T>, max_amount: BalanceOf<T>, @@ -297,10 +456,78 @@ pub mod pallet { } } +// Internal functions and interface to scheduler/wrapping assignment provider. impl<T: Config> Pallet<T> where BalanceOf<T>: FixedPointOperand, { + /// Take the next queued entry that is available for a given core index. + /// + /// Parameters: + /// - `core_index`: The core index + pub fn pop_assignment_for_core(core_index: CoreIndex) -> Option<Assignment> { + let entry: Result<EnqueuedOrder, ()> = QueueStatus::<T>::try_mutate(|queue_status| { + AffinityEntries::<T>::try_mutate(core_index, |affinity_entries| { + let free_entry = FreeEntries::<T>::try_mutate(|free_entries| { + let affinity_next = affinity_entries.peek(); + let free_next = free_entries.peek(); + let pick_free = match (affinity_next, free_next) { + (None, _) => true, + (Some(_), None) => false, + (Some(a), Some(f)) => f < a, + }; + if pick_free { + let entry = free_entries.pop().ok_or(())?; + let (mut affinities, free): (BinaryHeap<_>, BinaryHeap<_>) = + take(free_entries) + .into_iter() + .partition(|e| e.para_id == entry.para_id); + affinity_entries.append(&mut affinities); + *free_entries = free; + Ok(entry) + } else { + Err(()) + } + }); + let entry = free_entry.or_else(|()| affinity_entries.pop().ok_or(()))?; + queue_status.consume_index(entry.idx); + Ok(entry) + }) + }); + + let assignment = entry.map(|e| Assignment::Pool { para_id: e.para_id, core_index }).ok()?; + + Pallet::<T>::increase_affinity(assignment.para_id(), core_index); + Some(assignment) + } + + /// Report that the `para_id` & `core_index` combination was processed. + /// + /// This should be called once it is clear that the assignment won't get pushed back anymore. + /// + /// In other words for each `pop_assignment_for_core` a call to this function or + /// `push_back_assignment` must follow, but only one. + pub fn report_processed(para_id: ParaId, core_index: CoreIndex) { + Pallet::<T>::decrease_affinity_update_queue(para_id, core_index); + } + + /// Push an assignment back to the front of the queue. + /// + /// The assignment has not been processed yet. Typically used on session boundaries. + /// + /// NOTE: We are not checking queue size here. So due to push backs it is possible that we + /// exceed the maximum queue size slightly. + /// + /// Parameters: + /// - `para_id`: The para that did not make it. + /// - `core_index`: The core the para was scheduled on. + pub fn push_back_assignment(para_id: ParaId, core_index: CoreIndex) { + Pallet::<T>::decrease_affinity_update_queue(para_id, core_index); + QueueStatus::<T>::mutate(|queue_status| { + Pallet::<T>::add_on_demand_order(queue_status, para_id, QueuePushDirection::Front); + }); + } + /// Helper function for `place_order_*` calls. Used to differentiate between placing orders /// with a keep alive check or to allow the account to be reaped. /// @@ -326,34 +553,62 @@ where ) -> DispatchResult { let config = <configuration::Pallet<T>>::config(); - // Traffic always falls back to 1.0 - let traffic = SpotTraffic::<T>::get(); - - // Calculate spot price - let spot_price: BalanceOf<T> = traffic.saturating_mul_int( - config.scheduler_params.on_demand_base_fee.saturated_into::<BalanceOf<T>>(), - ); - - // Is the current price higher than `max_amount` - ensure!(spot_price.le(&max_amount), Error::<T>::SpotPriceHigherThanMaxAmount); + QueueStatus::<T>::mutate(|queue_status| { + Self::update_spot_traffic(&config, queue_status); + let traffic = queue_status.traffic; - // Charge the sending account the spot price - let _ = T::Currency::withdraw( - &sender, - spot_price, - WithdrawReasons::FEE, - existence_requirement, - )?; + // Calculate spot price + let spot_price: BalanceOf<T> = traffic.saturating_mul_int( + config.scheduler_params.on_demand_base_fee.saturated_into::<BalanceOf<T>>(), + ); - let order = EnqueuedOrder::new(para_id); + // Is the current price higher than `max_amount` + ensure!(spot_price.le(&max_amount), Error::<T>::SpotPriceHigherThanMaxAmount); - let res = Pallet::<T>::add_on_demand_order(order, QueuePushDirection::Back); + // Charge the sending account the spot price + let _ = T::Currency::withdraw( + &sender, + spot_price, + WithdrawReasons::FEE, + existence_requirement, + )?; - if res.is_ok() { - Pallet::<T>::deposit_event(Event::<T>::OnDemandOrderPlaced { para_id, spot_price }); - } + ensure!( + queue_status.size() < config.scheduler_params.on_demand_queue_max_size, + Error::<T>::QueueFull + ); + Pallet::<T>::add_on_demand_order(queue_status, para_id, QueuePushDirection::Back); + Ok(()) + }) + } - res + /// Calculate and update spot traffic. + fn update_spot_traffic( + config: &configuration::HostConfiguration<BlockNumberFor<T>>, + queue_status: &mut QueueStatusType, + ) { + let old_traffic = queue_status.traffic; + match Self::calculate_spot_traffic( + old_traffic, + config.scheduler_params.on_demand_queue_max_size, + queue_status.size(), + config.scheduler_params.on_demand_target_queue_utilization, + config.scheduler_params.on_demand_fee_variability, + ) { + Ok(new_traffic) => { + // Only update storage on change + if new_traffic != old_traffic { + queue_status.traffic = new_traffic; + Pallet::<T>::deposit_event(Event::<T>::SpotTrafficSet { traffic: new_traffic }); + } + }, + Err(err) => { + log::debug!( + target: LOG_TARGET, + "Error calculating spot traffic: {:?}", err + ); + }, + }; } /// The spot price multiplier. This is based on the transaction fee calculations defined in: @@ -378,7 +633,7 @@ where /// - `SpotTrafficCalculationErr::QueueCapacityIsZero` /// - `SpotTrafficCalculationErr::QueueSizeLargerThanCapacity` /// - `SpotTrafficCalculationErr::Division` - pub(crate) fn calculate_spot_traffic( + fn calculate_spot_traffic( traffic: FixedU128, queue_capacity: u32, queue_size: u32, @@ -430,175 +685,140 @@ where /// Adds an order to the on demand queue. /// /// Paramenters: - /// - `order`: The `EnqueuedOrder` to add to the queue. /// - `location`: Whether to push this entry to the back or the front of the queue. Pushing an /// entry to the front of the queue is only used when the scheduler wants to push back an /// entry it has already popped. - /// Returns: - /// - The unit type on success. - /// - /// Errors: - /// - `InvalidParaId` - /// - `QueueFull` fn add_on_demand_order( - order: EnqueuedOrder, + queue_status: &mut QueueStatusType, + para_id: ParaId, location: QueuePushDirection, - ) -> Result<(), DispatchError> { - // Only parathreads are valid paraids for on the go parachains. - ensure!(<paras::Pallet<T>>::is_parathread(order.para_id), Error::<T>::InvalidParaId); - - let config = <configuration::Pallet<T>>::config(); - - OnDemandQueue::<T>::try_mutate(|queue| { - // Abort transaction if queue is too large - ensure!( - Self::queue_size() < config.scheduler_params.on_demand_queue_max_size, - Error::<T>::QueueFull - ); - match location { - QueuePushDirection::Back => queue.push_back(order), - QueuePushDirection::Front => queue.push_front(order), - }; - Ok(()) - }) + ) { + let idx = match location { + QueuePushDirection::Back => queue_status.push_back(), + QueuePushDirection::Front => queue_status.push_front(), + }; + + let affinity = ParaIdAffinity::<T>::get(para_id); + let order = EnqueuedOrder::new(idx, para_id); + #[cfg(test)] + log::debug!(target: LOG_TARGET, "add_on_demand_order, order: {:?}, affinity: {:?}, direction: {:?}", order, affinity, location); + + match affinity { + None => FreeEntries::<T>::mutate(|entries| entries.push(order)), + Some(affinity) => + AffinityEntries::<T>::mutate(affinity.core_index, |entries| entries.push(order)), + } } - /// Get the size of the on demand queue. + /// Decrease core affinity for para and update queue /// - /// Returns: - /// - The size of the on demand queue. - fn queue_size() -> u32 { - let config = <configuration::Pallet<T>>::config(); - match OnDemandQueue::<T>::get().len().try_into() { - Ok(size) => return size, - Err(_) => { - log::debug!( - target: LOG_TARGET, - "Failed to fetch the on demand queue size, returning the max size." - ); - return config.scheduler_params.on_demand_queue_max_size - }, + /// if affinity dropped to 0, moving entries back to `FreeEntries`. + fn decrease_affinity_update_queue(para_id: ParaId, core_index: CoreIndex) { + let affinity = Pallet::<T>::decrease_affinity(para_id, core_index); + #[cfg(not(test))] + debug_assert_ne!( + affinity, None, + "Decreased affinity for a para that has not been served on a core?" + ); + if affinity != Some(0) { + return } - } - - /// Getter for the order queue. - #[cfg(test)] - fn get_queue() -> VecDeque<EnqueuedOrder> { - OnDemandQueue::<T>::get() - } - - /// Getter for the affinity tracker. - pub fn get_affinity_map(para_id: ParaId) -> Option<CoreAffinityCount> { - ParaIdAffinity::<T>::get(para_id) + // No affinity more for entries on this core, free any entries: + // + // This is necessary to ensure them being served as the core might no longer exist at all. + AffinityEntries::<T>::mutate(core_index, |affinity_entries| { + FreeEntries::<T>::mutate(|free_entries| { + let (mut freed, affinities): (BinaryHeap<_>, BinaryHeap<_>) = + take(affinity_entries).into_iter().partition(|e| e.para_id == para_id); + free_entries.append(&mut freed); + *affinity_entries = affinities; + }) + }); } /// Decreases the affinity of a `ParaId` to a specified `CoreIndex`. - /// Subtracts from the count of the `CoreAffinityCount` if an entry is found and the core_idx + /// + /// Subtracts from the count of the `CoreAffinityCount` if an entry is found and the core_index /// matches. When the count reaches 0, the entry is removed. /// A non-existant entry is a no-op. - fn decrease_affinity(para_id: ParaId, core_idx: CoreIndex) { + /// + /// Returns: The new affinity of the para on that core. `None` if there is no affinity on this + /// core. + fn decrease_affinity(para_id: ParaId, core_index: CoreIndex) -> Option<u32> { ParaIdAffinity::<T>::mutate(para_id, |maybe_affinity| { - if let Some(affinity) = maybe_affinity { - if affinity.core_idx == core_idx { - let new_count = affinity.count.saturating_sub(1); - if new_count > 0 { - *maybe_affinity = Some(CoreAffinityCount { core_idx, count: new_count }); - } else { - *maybe_affinity = None; - } + let affinity = maybe_affinity.as_mut()?; + if affinity.core_index == core_index { + let new_count = affinity.count.saturating_sub(1); + if new_count > 0 { + *maybe_affinity = Some(CoreAffinityCount { core_index, count: new_count }); + } else { + *maybe_affinity = None; } + return Some(new_count) + } else { + None } - }); + }) } /// Increases the affinity of a `ParaId` to a specified `CoreIndex`. - /// Adds to the count of the `CoreAffinityCount` if an entry is found and the core_idx matches. - /// A non-existant entry will be initialized with a count of 1 and uses the supplied + /// Adds to the count of the `CoreAffinityCount` if an entry is found and the core_index + /// matches. A non-existant entry will be initialized with a count of 1 and uses the supplied /// `CoreIndex`. - fn increase_affinity(para_id: ParaId, core_idx: CoreIndex) { + fn increase_affinity(para_id: ParaId, core_index: CoreIndex) { ParaIdAffinity::<T>::mutate(para_id, |maybe_affinity| match maybe_affinity { Some(affinity) => - if affinity.core_idx == core_idx { + if affinity.core_index == core_index { *maybe_affinity = Some(CoreAffinityCount { - core_idx, + core_index, count: affinity.count.saturating_add(1), }); }, None => { - *maybe_affinity = Some(CoreAffinityCount { core_idx, count: 1 }); + *maybe_affinity = Some(CoreAffinityCount { core_index, count: 1 }); }, }) } -} -impl<T: Config> Pallet<T> { - /// Take the next queued entry that is available for a given core index. - /// Invalidates and removes orders with a `para_id` that is not `ParaLifecycle::Parathread` - /// but only in [0..P] range slice of the order queue, where P is the element that is - /// removed from the order queue. - /// - /// Parameters: - /// - `core_idx`: The core index - pub fn pop_assignment_for_core(core_idx: CoreIndex) -> Option<Assignment> { - let mut queue: VecDeque<EnqueuedOrder> = OnDemandQueue::<T>::get(); - - let mut invalidated_para_id_indexes: Vec<usize> = vec![]; - - // Get the position of the next `ParaId`. Select either a valid `ParaId` that has an - // affinity to the same `CoreIndex` as the scheduler asks for or a valid `ParaId` with no - // affinity at all. - let pos = queue.iter().enumerate().position(|(index, assignment)| { - if <paras::Pallet<T>>::is_parathread(assignment.para_id) { - match ParaIdAffinity::<T>::get(&assignment.para_id) { - Some(affinity) => return affinity.core_idx == core_idx, - None => return true, - } - } - // Record no longer valid para_ids. - invalidated_para_id_indexes.push(index); - return false - }); + /// Getter for the affinity tracker. + #[cfg(test)] + fn get_affinity_map(para_id: ParaId) -> Option<CoreAffinityCount> { + ParaIdAffinity::<T>::get(para_id) + } - // Collect the popped value. - let popped = pos.and_then(|p: usize| { - if let Some(assignment) = queue.remove(p) { - Pallet::<T>::increase_affinity(assignment.para_id, core_idx); - return Some(assignment) - }; - None - }); + /// Getter for the affinity entries. + #[cfg(test)] + fn get_affinity_entries(core_index: CoreIndex) -> BinaryHeap<EnqueuedOrder> { + AffinityEntries::<T>::get(core_index) + } - // Only remove the invalid indexes *after* using the index. - // Removed in reverse order so that the indexes don't shift. - invalidated_para_id_indexes.iter().rev().for_each(|idx| { - queue.remove(*idx); - }); + /// Getter for the free entries. + #[cfg(test)] + fn get_free_entries() -> BinaryHeap<EnqueuedOrder> { + FreeEntries::<T>::get() + } - // Write changes to storage. - OnDemandQueue::<T>::set(queue); + #[cfg(feature = "runtime-benchmarks")] + pub fn populate_queue(para_id: ParaId, num: u32) { + QueueStatus::<T>::mutate(|queue_status| { + for _ in 0..num { + Pallet::<T>::add_on_demand_order(queue_status, para_id, QueuePushDirection::Back); + } + }); + } - popped.map(|p| Assignment::Pool { para_id: p.para_id, core_index: core_idx }) + #[cfg(test)] + fn set_queue_status(new_status: QueueStatusType) { + QueueStatus::<T>::set(new_status); } - /// Report that the `para_id` & `core_index` combination was processed. - pub fn report_processed(para_id: ParaId, core_index: CoreIndex) { - Pallet::<T>::decrease_affinity(para_id, core_index) + #[cfg(test)] + fn get_queue_status() -> QueueStatusType { + QueueStatus::<T>::get() } - /// Push an assignment back to the front of the queue. - /// - /// The assignment has not been processed yet. Typically used on session boundaries. - /// Parameters: - /// - `assignment`: The on demand assignment. - pub fn push_back_assignment(para_id: ParaId, core_index: CoreIndex) { - Pallet::<T>::decrease_affinity(para_id, core_index); - // Skip the queue on push backs from scheduler - match Pallet::<T>::add_on_demand_order( - EnqueuedOrder::new(para_id), - QueuePushDirection::Front, - ) { - Ok(_) => {}, - Err(_) => {}, - } + #[cfg(test)] + fn get_traffic_default_value() -> FixedU128 { + <T as Config>::TrafficDefaultValue::get() } } diff --git a/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs b/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs index 8404700780c..982efe77b93 100644 --- a/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs +++ b/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs @@ -73,11 +73,24 @@ fn run_to_block( Paras::initializer_initialize(b + 1); Scheduler::initializer_initialize(b + 1); + // We need to update the spot traffic on every block. + OnDemandAssigner::on_initialize(b + 1); + // In the real runtime this is expected to be called by the `InclusionInherent` pallet. Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), b + 1); } } +fn place_order(para_id: ParaId) { + let alice = 100u64; + let amt = 10_000_000u128; + + Balances::make_free_balance_be(&alice, amt); + + run_to_block(101, |n| if n == 101 { Some(Default::default()) } else { None }); + OnDemandAssigner::place_order_allow_death(RuntimeOrigin::signed(alice), amt, para_id).unwrap() +} + #[test] fn spot_traffic_capacity_zero_returns_none() { match OnDemandAssigner::calculate_spot_traffic( @@ -201,6 +214,42 @@ fn spot_traffic_decreases_over_time() { assert_eq!(traffic, FixedU128::from_inner(3_125_000_000_000_000_000u128)) } +#[test] +fn spot_traffic_decreases_between_idle_blocks() { + // Testing spot traffic assumptions, but using the mock runtime and default on demand + // configuration values. Ensuring that blocks with no on demand activity (idle) + // decrease traffic. + + let para_id = ParaId::from(111); + + new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { + // Initialize the parathread and wait for it to be ready. + schedule_blank_para(para_id, ParaKind::Parathread); + assert!(!Paras::is_parathread(para_id)); + run_to_block(100, |n| if n == 100 { Some(Default::default()) } else { None }); + assert!(Paras::is_parathread(para_id)); + + // Set the spot traffic to a large number + OnDemandAssigner::set_queue_status(QueueStatusType { + traffic: FixedU128::from_u32(10), + ..Default::default() + }); + + assert_eq!(OnDemandAssigner::get_queue_status().traffic, FixedU128::from_u32(10)); + + // Run to block 101 and ensure that the traffic decreases. + run_to_block(101, |n| if n == 100 { Some(Default::default()) } else { None }); + assert!(OnDemandAssigner::get_queue_status().traffic < FixedU128::from_u32(10)); + + // Run to block 102 and observe that we've hit the default traffic value. + run_to_block(102, |n| if n == 100 { Some(Default::default()) } else { None }); + assert_eq!( + OnDemandAssigner::get_queue_status().traffic, + OnDemandAssigner::get_traffic_default_value() + ); + }) +} + #[test] fn place_order_works() { let alice = 1u64; @@ -278,74 +327,6 @@ fn place_order_keep_alive_keeps_alive() { }); } -#[test] -fn add_on_demand_order_works() { - let para_a = ParaId::from(111); - let order = EnqueuedOrder::new(para_a); - - let mut genesis = GenesisConfigBuilder::default(); - genesis.on_demand_max_queue_size = 1; - new_test_ext(genesis.build()).execute_with(|| { - // Initialize the parathread and wait for it to be ready. - schedule_blank_para(para_a, ParaKind::Parathread); - - // `para_a` is not onboarded as a parathread yet. - assert_noop!( - OnDemandAssigner::add_on_demand_order(order.clone(), QueuePushDirection::Back), - Error::<Test>::InvalidParaId - ); - - assert!(!Paras::is_parathread(para_a)); - run_to_block(100, |n| if n == 100 { Some(Default::default()) } else { None }); - assert!(Paras::is_parathread(para_a)); - - // `para_a` is now onboarded as a valid parathread. - assert_ok!(OnDemandAssigner::add_on_demand_order(order.clone(), QueuePushDirection::Back)); - - // Max queue size is 1, queue should be full. - assert_noop!( - OnDemandAssigner::add_on_demand_order(order, QueuePushDirection::Back), - Error::<Test>::QueueFull - ); - }); -} - -#[test] -fn spotqueue_push_directions() { - new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { - let para_a = ParaId::from(111); - let para_b = ParaId::from(222); - let para_c = ParaId::from(333); - - schedule_blank_para(para_a, ParaKind::Parathread); - schedule_blank_para(para_b, ParaKind::Parathread); - schedule_blank_para(para_c, ParaKind::Parathread); - - run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); - - let order_a = EnqueuedOrder::new(para_a); - let order_b = EnqueuedOrder::new(para_b); - let order_c = EnqueuedOrder::new(para_c); - - assert_ok!(OnDemandAssigner::add_on_demand_order( - order_a.clone(), - QueuePushDirection::Front - )); - assert_ok!(OnDemandAssigner::add_on_demand_order( - order_b.clone(), - QueuePushDirection::Front - )); - - assert_ok!(OnDemandAssigner::add_on_demand_order( - order_c.clone(), - QueuePushDirection::Back - )); - - assert_eq!(OnDemandAssigner::queue_size(), 3); - assert_eq!(OnDemandAssigner::get_queue(), VecDeque::from(vec![order_b, order_a, order_c])) - }); -} - #[test] fn pop_assignment_for_core_works() { new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { @@ -356,51 +337,32 @@ fn pop_assignment_for_core_works() { run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); - let order_a = EnqueuedOrder::new(para_a); - let order_b = EnqueuedOrder::new(para_b); - let assignment_a = Assignment::Pool { para_id: para_a, core_index: CoreIndex(0) }; - let assignment_b = Assignment::Pool { para_id: para_b, core_index: CoreIndex(1) }; - // Pop should return none with empty queue assert_eq!(OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)), None); // Add enough assignments to the order queue. for _ in 0..2 { - OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); - - OnDemandAssigner::add_on_demand_order(order_b.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); - } - - // Queue should contain orders a, b, a, b - { - let queue: Vec<EnqueuedOrder> = OnDemandQueue::<Test>::get().into_iter().collect(); - assert_eq!( - queue, - vec![order_a.clone(), order_b.clone(), order_a.clone(), order_b.clone()] - ); + place_order(para_a); + place_order(para_b); } // Popped assignments should be for the correct paras and cores assert_eq!( - OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)), - Some(assignment_a.clone()) + OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).map(|a| a.para_id()), + Some(para_a) ); assert_eq!( - OnDemandAssigner::pop_assignment_for_core(CoreIndex(1)), - Some(assignment_b.clone()) + OnDemandAssigner::pop_assignment_for_core(CoreIndex(1)).map(|a| a.para_id()), + Some(para_b) ); assert_eq!( - OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)), - Some(assignment_a.clone()) + OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).map(|a| a.para_id()), + Some(para_a) + ); + assert_eq!( + OnDemandAssigner::pop_assignment_for_core(CoreIndex(1)).map(|a| a.para_id()), + Some(para_b) ); - - // Queue should contain one left over order - { - let queue: Vec<EnqueuedOrder> = OnDemandQueue::<Test>::get().into_iter().collect(); - assert_eq!(queue, vec![order_b.clone(),]); - } }); } @@ -414,28 +376,19 @@ fn push_back_assignment_works() { run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); - let order_a = EnqueuedOrder::new(para_a); - let order_b = EnqueuedOrder::new(para_b); - // Add enough assignments to the order queue. - OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); - - OnDemandAssigner::add_on_demand_order(order_b.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); + place_order(para_a); + place_order(para_b); // Pop order a - OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)); + assert_eq!( + OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).unwrap().para_id(), + para_a + ); // Para a should have affinity for core 0 assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 1); - assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().core_idx, CoreIndex(0)); - - // Queue should still contain order b - { - let queue: Vec<EnqueuedOrder> = OnDemandQueue::<Test>::get().into_iter().collect(); - assert_eq!(queue, vec![order_b.clone()]); - } + assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().core_index, CoreIndex(0)); // Push back order a OnDemandAssigner::push_back_assignment(para_a, CoreIndex(0)); @@ -444,10 +397,82 @@ fn push_back_assignment_works() { assert_eq!(OnDemandAssigner::get_affinity_map(para_a).is_none(), true); // Queue should contain orders a, b. A in front of b. - { - let queue: Vec<EnqueuedOrder> = OnDemandQueue::<Test>::get().into_iter().collect(); - assert_eq!(queue, vec![order_a.clone(), order_b.clone()]); + assert_eq!( + OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).unwrap().para_id(), + para_a + ); + assert_eq!( + OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).unwrap().para_id(), + para_b + ); + }); +} + +#[test] +fn affinity_prohibits_parallel_scheduling() { + new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { + let para_a = ParaId::from(111); + let para_b = ParaId::from(222); + + schedule_blank_para(para_a, ParaKind::Parathread); + schedule_blank_para(para_b, ParaKind::Parathread); + + run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); + + // There should be no affinity before starting. + assert!(OnDemandAssigner::get_affinity_map(para_a).is_none()); + assert!(OnDemandAssigner::get_affinity_map(para_b).is_none()); + + // Add 2 assignments for para_a for every para_b. + place_order(para_a); + place_order(para_a); + place_order(para_b); + + // Approximate having 1 core. + for _ in 0..3 { + assert!(OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).is_some()); } + assert!(OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).is_none()); + + // Affinity on one core is meaningless. + assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 2); + assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().count, 1); + assert_eq!( + OnDemandAssigner::get_affinity_map(para_a).unwrap().core_index, + OnDemandAssigner::get_affinity_map(para_b).unwrap().core_index, + ); + + // Clear affinity + OnDemandAssigner::report_processed(para_a, 0.into()); + OnDemandAssigner::report_processed(para_a, 0.into()); + OnDemandAssigner::report_processed(para_b, 0.into()); + + // Add 2 assignments for para_a for every para_b. + place_order(para_a); + place_order(para_a); + place_order(para_b); + + // Approximate having 3 cores. CoreIndex 2 should be unable to obtain an assignment + for _ in 0..3 { + OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)); + OnDemandAssigner::pop_assignment_for_core(CoreIndex(1)); + assert!(OnDemandAssigner::pop_assignment_for_core(CoreIndex(2)).is_none()); + } + + // Affinity should be the same as before, but on different cores. + assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 2); + assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().count, 1); + assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().core_index, CoreIndex(0)); + assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().core_index, CoreIndex(1)); + + // Clear affinity + OnDemandAssigner::report_processed(para_a, CoreIndex(0)); + OnDemandAssigner::report_processed(para_a, CoreIndex(0)); + OnDemandAssigner::report_processed(para_b, CoreIndex(1)); + + // There should be no affinity after clearing. + assert!(OnDemandAssigner::get_affinity_map(para_a).is_none()); + assert!(OnDemandAssigner::get_affinity_map(para_b).is_none()); }); } @@ -458,7 +483,6 @@ fn affinity_changes_work() { let core_index = CoreIndex(0); schedule_blank_para(para_a, ParaKind::Parathread); - let order_a = EnqueuedOrder::new(para_a); run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); // There should be no affinity before starting. @@ -466,8 +490,7 @@ fn affinity_changes_work() { // Add enough assignments to the order queue. for _ in 0..10 { - OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Front) - .expect("Invalid paraid or queue full"); + place_order(para_a); } // There should be no affinity before the scheduler pops. @@ -483,7 +506,6 @@ fn affinity_changes_work() { // Affinity count is 1 after popping with a previous para. assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 1); - assert_eq!(OnDemandAssigner::queue_size(), 8); for _ in 0..3 { OnDemandAssigner::pop_assignment_for_core(core_index); @@ -491,147 +513,197 @@ fn affinity_changes_work() { // Affinity count is 4 after popping 3 times without a previous para. assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 4); - assert_eq!(OnDemandAssigner::queue_size(), 5); for _ in 0..5 { OnDemandAssigner::report_processed(para_a, 0.into()); - OnDemandAssigner::pop_assignment_for_core(core_index); + assert!(OnDemandAssigner::pop_assignment_for_core(core_index).is_some()); } // Affinity count should still be 4 but queue should be empty. + assert!(OnDemandAssigner::pop_assignment_for_core(core_index).is_none()); assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 4); - assert_eq!(OnDemandAssigner::queue_size(), 0); // Pop 4 times and get to exactly 0 (None) affinity. for _ in 0..4 { OnDemandAssigner::report_processed(para_a, 0.into()); - OnDemandAssigner::pop_assignment_for_core(core_index); + assert!(OnDemandAssigner::pop_assignment_for_core(core_index).is_none()); } assert!(OnDemandAssigner::get_affinity_map(para_a).is_none()); // Decreasing affinity beyond 0 should still be None. OnDemandAssigner::report_processed(para_a, 0.into()); - OnDemandAssigner::pop_assignment_for_core(core_index); + assert!(OnDemandAssigner::pop_assignment_for_core(core_index).is_none()); assert!(OnDemandAssigner::get_affinity_map(para_a).is_none()); }); } #[test] -fn affinity_prohibits_parallel_scheduling() { - new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { - let para_a = ParaId::from(111); - let para_b = ParaId::from(222); +fn new_affinity_for_a_core_must_come_from_free_entries() { + // If affinity count for a core was zero before, and is 1 now, then the entry + // must have come from free_entries. + let parachains = + vec![ParaId::from(111), ParaId::from(222), ParaId::from(333), ParaId::from(444)]; + let core_indices = vec![CoreIndex(0), CoreIndex(1), CoreIndex(2), CoreIndex(3)]; - schedule_blank_para(para_a, ParaKind::Parathread); - schedule_blank_para(para_b, ParaKind::Parathread); + new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { + parachains.iter().for_each(|chain| { + schedule_blank_para(*chain, ParaKind::Parathread); + }); run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); - let order_a = EnqueuedOrder::new(para_a); - let order_b = EnqueuedOrder::new(para_b); - - // There should be no affinity before starting. - assert!(OnDemandAssigner::get_affinity_map(para_a).is_none()); - assert!(OnDemandAssigner::get_affinity_map(para_b).is_none()); - - // Add 2 assignments for para_a for every para_b. - OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); - - OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); - - OnDemandAssigner::add_on_demand_order(order_b.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); - - assert_eq!(OnDemandAssigner::queue_size(), 3); + // Place orders for all chains. + parachains.iter().for_each(|chain| { + place_order(*chain); + }); + + // There are 4 entries in free_entries. + let start_free_entries = OnDemandAssigner::get_free_entries().len(); + assert_eq!(start_free_entries, 4); + + // Pop assignments on all cores. + core_indices.iter().enumerate().for_each(|(n, core_index)| { + // There is no affinity on the core prior to popping. + assert!(OnDemandAssigner::get_affinity_entries(*core_index).is_empty()); + + // There's always an order to be popped for each core. + let free_entries = OnDemandAssigner::get_free_entries(); + let next_order = free_entries.peek(); + + // There is no affinity on the paraid prior to popping. + assert!(OnDemandAssigner::get_affinity_map(next_order.unwrap().para_id).is_none()); + + match OnDemandAssigner::pop_assignment_for_core(*core_index) { + Some(assignment) => { + // The popped assignment came from free entries. + assert_eq!( + start_free_entries - 1 - n, + OnDemandAssigner::get_free_entries().len() + ); + // The popped assignment has the same para id as the next order. + assert_eq!(assignment.para_id(), next_order.unwrap().para_id); + }, + None => panic!("Should not happen"), + } + }); - // Approximate having 1 core. - for _ in 0..3 { - OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)); - } + // All entries have been removed from free_entries. + assert!(OnDemandAssigner::get_free_entries().is_empty()); - // Affinity on one core is meaningless. - assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 2); - assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().count, 1); - assert_eq!( - OnDemandAssigner::get_affinity_map(para_a).unwrap().core_idx, - OnDemandAssigner::get_affinity_map(para_b).unwrap().core_idx - ); - - // Clear affinity - OnDemandAssigner::report_processed(para_a, 0.into()); - OnDemandAssigner::report_processed(para_a, 0.into()); - OnDemandAssigner::report_processed(para_b, 0.into()); - - // Add 2 assignments for para_a for every para_b. - OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); + // All chains have an affinity count of 1. + parachains.iter().for_each(|chain| { + assert_eq!(OnDemandAssigner::get_affinity_map(*chain).unwrap().count, 1); + }); + }); +} - OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); +#[test] +#[should_panic] +fn queue_index_ordering_is_unsound_over_max_size() { + // NOTE: Unsoundness proof. If the number goes sufficiently over the max_queue_max_size + // the overflow will cause an opposite comparison to what would be expected. + let max_num = u32::MAX - ON_DEMAND_MAX_QUEUE_MAX_SIZE; + // 0 < some large number. + assert_eq!(QueueIndex(0).cmp(&QueueIndex(max_num + 1)), Ordering::Less); +} - OnDemandAssigner::add_on_demand_order(order_b.clone(), QueuePushDirection::Back) - .expect("Invalid paraid or queue full"); +#[test] +fn queue_index_ordering_works() { + // The largest accepted queue size. + let max_num = ON_DEMAND_MAX_QUEUE_MAX_SIZE; + + // 0 == 0 + assert_eq!(QueueIndex(0).cmp(&QueueIndex(0)), Ordering::Equal); + // 0 < 1 + assert_eq!(QueueIndex(0).cmp(&QueueIndex(1)), Ordering::Less); + // 1 > 0 + assert_eq!(QueueIndex(1).cmp(&QueueIndex(0)), Ordering::Greater); + // 0 < max_num + assert_eq!(QueueIndex(0).cmp(&QueueIndex(max_num)), Ordering::Less); + // 0 > max_num + 1 + assert_eq!(QueueIndex(0).cmp(&QueueIndex(max_num + 1)), Ordering::Less); + + // Ordering within the bounds of ON_DEMAND_MAX_QUEUE_MAX_SIZE works. + let mut v = vec![3, 6, 2, 1, 5, 4]; + v.sort_by_key(|&num| QueueIndex(num)); + assert_eq!(v, vec![1, 2, 3, 4, 5, 6]); + + v = vec![max_num, 4, 5, 1, 6]; + v.sort_by_key(|&num| QueueIndex(num)); + assert_eq!(v, vec![1, 4, 5, 6, max_num]); + + // Ordering with an element outside of the bounds of the max size also works. + v = vec![max_num + 2, 0, 6, 2, 1, 5, 4]; + v.sort_by_key(|&num| QueueIndex(num)); + assert_eq!(v, vec![0, 1, 2, 4, 5, 6, max_num + 2]); + + // Numbers way above the max size will overflow + v = vec![u32::MAX - 1, u32::MAX, 6, 2, 1, 5, 4]; + v.sort_by_key(|&num| QueueIndex(num)); + assert_eq!(v, vec![u32::MAX - 1, u32::MAX, 1, 2, 4, 5, 6]); +} - // Approximate having 3 cores. CoreIndex 2 should be unable to obtain an assignment - for _ in 0..3 { - OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)); - OnDemandAssigner::pop_assignment_for_core(CoreIndex(1)); - assert_eq!(None, OnDemandAssigner::pop_assignment_for_core(CoreIndex(2))); - } +#[test] +fn reverse_queue_index_does_reverse() { + let mut v = vec![1, 2, 3, 4, 5, 6]; - // Affinity should be the same as before, but on different cores. - assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 2); - assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().count, 1); - assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().core_idx, CoreIndex(0)); - assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().core_idx, CoreIndex(1)); + // Basic reversal of a vector. + v.sort_by_key(|&num| ReverseQueueIndex(num)); + assert_eq!(v, vec![6, 5, 4, 3, 2, 1]); - // Clear affinity - OnDemandAssigner::report_processed(para_a, 0.into()); - OnDemandAssigner::report_processed(para_a, 0.into()); - OnDemandAssigner::report_processed(para_b, 1.into()); + // Example from rust docs on `Reverse`. Should work identically. + v.sort_by_key(|&num| (num > 3, ReverseQueueIndex(num))); + assert_eq!(v, vec![3, 2, 1, 6, 5, 4]); - // There should be no affinity after clearing. - assert!(OnDemandAssigner::get_affinity_map(para_a).is_none()); - assert!(OnDemandAssigner::get_affinity_map(para_b).is_none()); - }); + let mut v2 = vec![1, 2, u32::MAX]; + v2.sort_by_key(|&num| ReverseQueueIndex(num)); + assert_eq!(v2, vec![2, 1, u32::MAX]); } #[test] -fn on_demand_orders_cannot_be_popped_if_lifecycle_changes() { - let para_id = ParaId::from(10); - let core_index = CoreIndex(0); - let order = EnqueuedOrder::new(para_id); +fn queue_status_size_fn_works() { + // Add orders to the on demand queue, and make sure that they are properly represented + // by the QueueStatusType::size fn. + let parachains = vec![ParaId::from(111), ParaId::from(222), ParaId::from(333)]; + let core_indices = vec![CoreIndex(0), CoreIndex(1)]; new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { - // Register the para_id as a parathread - schedule_blank_para(para_id, ParaKind::Parathread); - - assert!(!Paras::is_parathread(para_id)); - run_to_block(10, |n| if n == 10 { Some(Default::default()) } else { None }); - assert!(Paras::is_parathread(para_id)); + parachains.iter().for_each(|chain| { + schedule_blank_para(*chain, ParaKind::Parathread); + }); - // Add two assignments for a para_id with a valid lifecycle. - assert_ok!(OnDemandAssigner::add_on_demand_order(order.clone(), QueuePushDirection::Back)); - assert_ok!(OnDemandAssigner::add_on_demand_order(order.clone(), QueuePushDirection::Back)); + assert_eq!(OnDemandAssigner::get_queue_status().size(), 0); - // First pop is fine - assert!( - OnDemandAssigner::pop_assignment_for_core(core_index) == - Some(Assignment::Pool { para_id, core_index }) - ); + run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None }); - // Deregister para - assert_ok!(Paras::schedule_para_cleanup(para_id)); + // Place orders for all chains. + parachains.iter().for_each(|chain| { + // 2 per chain for a total of 6 + place_order(*chain); + place_order(*chain); + }); - // Run to new session and verify that para_id is no longer a valid parathread. - assert!(Paras::is_parathread(para_id)); - run_to_block(20, |n| if n == 20 { Some(Default::default()) } else { None }); - assert!(!Paras::is_parathread(para_id)); + // 6 orders in free entries + assert_eq!(OnDemandAssigner::get_free_entries().len(), 6); + // 6 orders via queue status size + assert_eq!( + OnDemandAssigner::get_free_entries().len(), + OnDemandAssigner::get_queue_status().size() as usize + ); - // Second pop should be None. - OnDemandAssigner::report_processed(para_id, core_index); - assert_eq!(OnDemandAssigner::pop_assignment_for_core(core_index), None); + core_indices.iter().for_each(|core_index| { + OnDemandAssigner::pop_assignment_for_core(*core_index); + }); + + // There should be 2 orders in the scheduler's claimqueue, + // 2 in assorted AffinityMaps and 2 in free. + // ParaId 111 + assert_eq!(OnDemandAssigner::get_affinity_entries(core_indices[0]).len(), 1); + // ParaId 222 + assert_eq!(OnDemandAssigner::get_affinity_entries(core_indices[1]).len(), 1); + // Free entries are from ParaId 333 + assert_eq!(OnDemandAssigner::get_free_entries().len(), 2); + // For a total size of 4. + assert_eq!(OnDemandAssigner::get_queue_status().size(), 4) }); } diff --git a/polkadot/runtime/parachains/src/configuration.rs b/polkadot/runtime/parachains/src/configuration.rs index 364a15215d3..b7635dcd7b2 100644 --- a/polkadot/runtime/parachains/src/configuration.rs +++ b/polkadot/runtime/parachains/src/configuration.rs @@ -29,6 +29,7 @@ use primitives::{ vstaging::{ApprovalVotingParams, NodeFeatures}, AsyncBackingParams, Balance, ExecutorParamError, ExecutorParams, SessionIndex, LEGACY_MIN_BACKING_VOTES, MAX_CODE_SIZE, MAX_HEAD_DATA_SIZE, MAX_POV_SIZE, + ON_DEMAND_MAX_QUEUE_MAX_SIZE, }; use sp_runtime::{traits::Zero, Perbill}; use sp_std::prelude::*; @@ -312,6 +313,8 @@ pub enum InconsistentError<BlockNumber> { InconsistentExecutorParams { inner: ExecutorParamError }, /// TTL should be bigger than lookahead LookaheadExceedsTTL, + /// Passed in queue size for on-demand was too large. + OnDemandQueueSizeTooLarge, } impl<BlockNumber> HostConfiguration<BlockNumber> @@ -405,6 +408,10 @@ where return Err(LookaheadExceedsTTL) } + if self.scheduler_params.on_demand_queue_max_size > ON_DEMAND_MAX_QUEUE_MAX_SIZE { + return Err(OnDemandQueueSizeTooLarge) + } + Ok(()) } @@ -630,7 +637,7 @@ pub mod pallet { /// Set the number of coretime execution cores. /// - /// Note that this configuration is managed by the coretime chain. Only manually change + /// NOTE: that this configuration is managed by the coretime chain. Only manually change /// this, if you really know what you are doing! #[pallet::call_index(6)] #[pallet::weight(( @@ -1133,6 +1140,7 @@ pub mod pallet { config.scheduler_params.on_demand_queue_max_size = new; }) } + /// Set the on demand (parathreads) fee variability. #[pallet::call_index(50)] #[pallet::weight(( diff --git a/polkadot/runtime/rococo/src/lib.rs b/polkadot/runtime/rococo/src/lib.rs index f68870c98ea..a773eeb5cbd 100644 --- a/polkadot/runtime/rococo/src/lib.rs +++ b/polkadot/runtime/rococo/src/lib.rs @@ -1659,6 +1659,7 @@ pub mod migrations { // This needs to come after the `parachains_configuration` above as we are reading the configuration. coretime::migration::MigrateToCoretime<Runtime, crate::xcm_config::XcmRouter, GetLegacyLeaseImpl>, parachains_configuration::migration::v12::MigrateToV12<Runtime>, + parachains_assigner_on_demand::migration::MigrateV0ToV1<Runtime>, // permanent pallet_xcm::migration::MigrateToLatestXcmVersion<Runtime>, diff --git a/polkadot/runtime/rococo/src/weights/runtime_parachains_assigner_on_demand.rs b/polkadot/runtime/rococo/src/weights/runtime_parachains_assigner_on_demand.rs index ac0f05301b4..dba9e7904c7 100644 --- a/polkadot/runtime/rococo/src/weights/runtime_parachains_assigner_on_demand.rs +++ b/polkadot/runtime/rococo/src/weights/runtime_parachains_assigner_on_demand.rs @@ -16,10 +16,10 @@ //! Autogenerated weights for `runtime_parachains::assigner_on_demand` //! -//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 4.0.0-dev -//! DATE: 2023-08-11, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]` +//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 32.0.0 +//! DATE: 2024-03-18, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]` //! WORST CASE MAP SIZE: `1000000` -//! HOSTNAME: `runner-fljshgub-project-163-concurrent-0`, CPU: `Intel(R) Xeon(R) CPU @ 2.60GHz` +//! HOSTNAME: `runner-h2rr8wx7-project-674-concurrent-0`, CPU: `Intel(R) Xeon(R) CPU @ 2.60GHz` //! WASM-EXECUTION: `Compiled`, CHAIN: `Some("rococo-dev")`, DB CACHE: 1024 // Executed Command: @@ -31,11 +31,11 @@ // --extrinsic=* // --wasm-execution=compiled // --heap-pages=4096 -// --json-file=/builds/parity/mirrors/polkadot/.git/.artifacts/bench.json +// --json-file=/builds/parity/mirrors/polkadot-sdk/.git/.artifacts/bench.json // --pallet=runtime_parachains::assigner_on_demand // --chain=rococo-dev -// --header=./file_header.txt -// --output=./runtime/rococo/src/weights/ +// --header=./polkadot/file_header.txt +// --output=./polkadot/runtime/rococo/src/weights/ #![cfg_attr(rustfmt, rustfmt_skip)] #![allow(unused_parens)] @@ -48,44 +48,44 @@ use core::marker::PhantomData; /// Weight functions for `runtime_parachains::assigner_on_demand`. pub struct WeightInfo<T>(PhantomData<T>); impl<T: frame_system::Config> runtime_parachains::assigner_on_demand::WeightInfo for WeightInfo<T> { - /// Storage: `OnDemandAssignmentProvider::SpotTraffic` (r:1 w:0) - /// Proof: `OnDemandAssignmentProvider::SpotTraffic` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) - /// Storage: `Paras::ParaLifecycles` (r:1 w:0) - /// Proof: `Paras::ParaLifecycles` (`max_values`: None, `max_size`: None, mode: `Measured`) - /// Storage: `OnDemandAssignmentProvider::OnDemandQueue` (r:1 w:1) - /// Proof: `OnDemandAssignmentProvider::OnDemandQueue` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::QueueStatus` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::QueueStatus` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::ParaIdAffinity` (r:1 w:0) + /// Proof: `OnDemandAssignmentProvider::ParaIdAffinity` (`max_values`: None, `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::FreeEntries` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::FreeEntries` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) /// The range of component `s` is `[1, 9999]`. fn place_order_keep_alive(s: u32, ) -> Weight { // Proof Size summary in bytes: - // Measured: `297 + s * (4 ±0)` - // Estimated: `3762 + s * (4 ±0)` - // Minimum execution time: 33_522_000 picoseconds. - Weight::from_parts(35_436_835, 0) - .saturating_add(Weight::from_parts(0, 3762)) - // Standard Error: 129 - .saturating_add(Weight::from_parts(14_041, 0).saturating_mul(s.into())) + // Measured: `218 + s * (8 ±0)` + // Estimated: `3681 + s * (8 ±0)` + // Minimum execution time: 21_053_000 picoseconds. + Weight::from_parts(17_291_897, 0) + .saturating_add(Weight::from_parts(0, 3681)) + // Standard Error: 104 + .saturating_add(Weight::from_parts(18_779, 0).saturating_mul(s.into())) .saturating_add(T::DbWeight::get().reads(3)) - .saturating_add(T::DbWeight::get().writes(1)) - .saturating_add(Weight::from_parts(0, 4).saturating_mul(s.into())) + .saturating_add(T::DbWeight::get().writes(2)) + .saturating_add(Weight::from_parts(0, 8).saturating_mul(s.into())) } - /// Storage: `OnDemandAssignmentProvider::SpotTraffic` (r:1 w:0) - /// Proof: `OnDemandAssignmentProvider::SpotTraffic` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) - /// Storage: `Paras::ParaLifecycles` (r:1 w:0) - /// Proof: `Paras::ParaLifecycles` (`max_values`: None, `max_size`: None, mode: `Measured`) - /// Storage: `OnDemandAssignmentProvider::OnDemandQueue` (r:1 w:1) - /// Proof: `OnDemandAssignmentProvider::OnDemandQueue` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::QueueStatus` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::QueueStatus` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::ParaIdAffinity` (r:1 w:0) + /// Proof: `OnDemandAssignmentProvider::ParaIdAffinity` (`max_values`: None, `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::FreeEntries` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::FreeEntries` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) /// The range of component `s` is `[1, 9999]`. fn place_order_allow_death(s: u32, ) -> Weight { // Proof Size summary in bytes: - // Measured: `297 + s * (4 ±0)` - // Estimated: `3762 + s * (4 ±0)` - // Minimum execution time: 33_488_000 picoseconds. - Weight::from_parts(34_848_934, 0) - .saturating_add(Weight::from_parts(0, 3762)) - // Standard Error: 143 - .saturating_add(Weight::from_parts(14_215, 0).saturating_mul(s.into())) + // Measured: `218 + s * (8 ±0)` + // Estimated: `3681 + s * (8 ±0)` + // Minimum execution time: 20_843_000 picoseconds. + Weight::from_parts(16_881_986, 0) + .saturating_add(Weight::from_parts(0, 3681)) + // Standard Error: 104 + .saturating_add(Weight::from_parts(18_788, 0).saturating_mul(s.into())) .saturating_add(T::DbWeight::get().reads(3)) - .saturating_add(T::DbWeight::get().writes(1)) - .saturating_add(Weight::from_parts(0, 4).saturating_mul(s.into())) + .saturating_add(T::DbWeight::get().writes(2)) + .saturating_add(Weight::from_parts(0, 8).saturating_mul(s.into())) } } diff --git a/polkadot/runtime/westend/src/weights/runtime_parachains_assigner_on_demand.rs b/polkadot/runtime/westend/src/weights/runtime_parachains_assigner_on_demand.rs index ac0f05301b4..acd1834f79e 100644 --- a/polkadot/runtime/westend/src/weights/runtime_parachains_assigner_on_demand.rs +++ b/polkadot/runtime/westend/src/weights/runtime_parachains_assigner_on_demand.rs @@ -16,11 +16,11 @@ //! Autogenerated weights for `runtime_parachains::assigner_on_demand` //! -//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 4.0.0-dev -//! DATE: 2023-08-11, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]` +//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 32.0.0 +//! DATE: 2024-03-18, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]` //! WORST CASE MAP SIZE: `1000000` -//! HOSTNAME: `runner-fljshgub-project-163-concurrent-0`, CPU: `Intel(R) Xeon(R) CPU @ 2.60GHz` -//! WASM-EXECUTION: `Compiled`, CHAIN: `Some("rococo-dev")`, DB CACHE: 1024 +//! HOSTNAME: `runner-h2rr8wx7-project-674-concurrent-0`, CPU: `Intel(R) Xeon(R) CPU @ 2.60GHz` +//! WASM-EXECUTION: `Compiled`, CHAIN: `Some("westend-dev")`, DB CACHE: 1024 // Executed Command: // target/production/polkadot @@ -31,11 +31,11 @@ // --extrinsic=* // --wasm-execution=compiled // --heap-pages=4096 -// --json-file=/builds/parity/mirrors/polkadot/.git/.artifacts/bench.json +// --json-file=/builds/parity/mirrors/polkadot-sdk/.git/.artifacts/bench.json // --pallet=runtime_parachains::assigner_on_demand -// --chain=rococo-dev -// --header=./file_header.txt -// --output=./runtime/rococo/src/weights/ +// --chain=westend-dev +// --header=./polkadot/file_header.txt +// --output=./polkadot/runtime/westend/src/weights/ #![cfg_attr(rustfmt, rustfmt_skip)] #![allow(unused_parens)] @@ -48,44 +48,44 @@ use core::marker::PhantomData; /// Weight functions for `runtime_parachains::assigner_on_demand`. pub struct WeightInfo<T>(PhantomData<T>); impl<T: frame_system::Config> runtime_parachains::assigner_on_demand::WeightInfo for WeightInfo<T> { - /// Storage: `OnDemandAssignmentProvider::SpotTraffic` (r:1 w:0) - /// Proof: `OnDemandAssignmentProvider::SpotTraffic` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) - /// Storage: `Paras::ParaLifecycles` (r:1 w:0) - /// Proof: `Paras::ParaLifecycles` (`max_values`: None, `max_size`: None, mode: `Measured`) - /// Storage: `OnDemandAssignmentProvider::OnDemandQueue` (r:1 w:1) - /// Proof: `OnDemandAssignmentProvider::OnDemandQueue` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::QueueStatus` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::QueueStatus` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::ParaIdAffinity` (r:1 w:0) + /// Proof: `OnDemandAssignmentProvider::ParaIdAffinity` (`max_values`: None, `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::FreeEntries` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::FreeEntries` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) /// The range of component `s` is `[1, 9999]`. fn place_order_keep_alive(s: u32, ) -> Weight { // Proof Size summary in bytes: - // Measured: `297 + s * (4 ±0)` - // Estimated: `3762 + s * (4 ±0)` - // Minimum execution time: 33_522_000 picoseconds. - Weight::from_parts(35_436_835, 0) - .saturating_add(Weight::from_parts(0, 3762)) - // Standard Error: 129 - .saturating_add(Weight::from_parts(14_041, 0).saturating_mul(s.into())) + // Measured: `218 + s * (8 ±0)` + // Estimated: `3681 + s * (8 ±0)` + // Minimum execution time: 21_396_000 picoseconds. + Weight::from_parts(20_585_695, 0) + .saturating_add(Weight::from_parts(0, 3681)) + // Standard Error: 127 + .saturating_add(Weight::from_parts(20_951, 0).saturating_mul(s.into())) .saturating_add(T::DbWeight::get().reads(3)) - .saturating_add(T::DbWeight::get().writes(1)) - .saturating_add(Weight::from_parts(0, 4).saturating_mul(s.into())) + .saturating_add(T::DbWeight::get().writes(2)) + .saturating_add(Weight::from_parts(0, 8).saturating_mul(s.into())) } - /// Storage: `OnDemandAssignmentProvider::SpotTraffic` (r:1 w:0) - /// Proof: `OnDemandAssignmentProvider::SpotTraffic` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) - /// Storage: `Paras::ParaLifecycles` (r:1 w:0) - /// Proof: `Paras::ParaLifecycles` (`max_values`: None, `max_size`: None, mode: `Measured`) - /// Storage: `OnDemandAssignmentProvider::OnDemandQueue` (r:1 w:1) - /// Proof: `OnDemandAssignmentProvider::OnDemandQueue` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::QueueStatus` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::QueueStatus` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::ParaIdAffinity` (r:1 w:0) + /// Proof: `OnDemandAssignmentProvider::ParaIdAffinity` (`max_values`: None, `max_size`: None, mode: `Measured`) + /// Storage: `OnDemandAssignmentProvider::FreeEntries` (r:1 w:1) + /// Proof: `OnDemandAssignmentProvider::FreeEntries` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) /// The range of component `s` is `[1, 9999]`. fn place_order_allow_death(s: u32, ) -> Weight { // Proof Size summary in bytes: - // Measured: `297 + s * (4 ±0)` - // Estimated: `3762 + s * (4 ±0)` - // Minimum execution time: 33_488_000 picoseconds. - Weight::from_parts(34_848_934, 0) - .saturating_add(Weight::from_parts(0, 3762)) - // Standard Error: 143 - .saturating_add(Weight::from_parts(14_215, 0).saturating_mul(s.into())) + // Measured: `218 + s * (8 ±0)` + // Estimated: `3681 + s * (8 ±0)` + // Minimum execution time: 21_412_000 picoseconds. + Weight::from_parts(19_731_554, 0) + .saturating_add(Weight::from_parts(0, 3681)) + // Standard Error: 128 + .saturating_add(Weight::from_parts(21_055, 0).saturating_mul(s.into())) .saturating_add(T::DbWeight::get().reads(3)) - .saturating_add(T::DbWeight::get().writes(1)) - .saturating_add(Weight::from_parts(0, 4).saturating_mul(s.into())) + .saturating_add(T::DbWeight::get().writes(2)) + .saturating_add(Weight::from_parts(0, 8).saturating_mul(s.into())) } } diff --git a/prdoc/pr_3190.prdoc b/prdoc/pr_3190.prdoc new file mode 100644 index 00000000000..2f7a89a0b1a --- /dev/null +++ b/prdoc/pr_3190.prdoc @@ -0,0 +1,17 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Fix algorithmic complexity of the on-demand scheduler. + +doc: + - audience: Runtime Dev + description: | + Improves on demand performance by a significant factor. Previously, having many on-demand cores + would cause really poor blocktimes due to the fact that for each core the full order queue was + processed. This allows for increasing the max size of the on-demand queue if needed. + + At the same time, the spot price for on-demand is now checked prior to every order, ensuring + that economic backpressure will be applied. + +crates: + - name: polkadot-runtime-parachains -- GitLab