diff --git a/Cargo.lock b/Cargo.lock
index 9e48887e17a4ccd62255b557fed239c7dc9cc954..5401dc5ecfb09ea3cbbc1e5ce9b2adcbe19fd00f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -17005,9 +17005,9 @@ dependencies = [
 
 [[package]]
 name = "scale-info"
-version = "2.10.0"
+version = "2.11.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7f7d66a1128282b7ef025a8ead62a4a9fcf017382ec53b8ffbf4d7bf77bd3c60"
+checksum = "2ef2175c2907e7c8bc0a9c3f86aeb5ec1f3b275300ad58a44d0c3ae379a5e52e"
 dependencies = [
  "bitvec",
  "cfg-if",
@@ -17019,9 +17019,9 @@ dependencies = [
 
 [[package]]
 name = "scale-info-derive"
-version = "2.10.0"
+version = "2.11.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "abf2c68b89cafb3b8d918dd07b42be0da66ff202cf1155c5739a4e0c1ea0dc19"
+checksum = "634d9b8eb8fd61c5cdd3390d9b2132300a7e7618955b98b8416f118c1b4e144f"
 dependencies = [
  "proc-macro-crate 1.3.1",
  "proc-macro2",
diff --git a/polkadot/primitives/src/lib.rs b/polkadot/primitives/src/lib.rs
index 2ddd9b58dfe45d6560e9dc5c62f3d047ad8d9850..745195ce092ab1e4678b54e594901275260dc7da 100644
--- a/polkadot/primitives/src/lib.rs
+++ b/polkadot/primitives/src/lib.rs
@@ -58,7 +58,8 @@ pub use v6::{
 	ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
 	ValidityError, ASSIGNMENT_KEY_TYPE_ID, LEGACY_MIN_BACKING_VOTES, LOWEST_PUBLIC_ID,
 	MAX_CODE_SIZE, MAX_HEAD_DATA_SIZE, MAX_POV_SIZE, MIN_CODE_SIZE,
-	ON_DEMAND_DEFAULT_QUEUE_MAX_SIZE, PARACHAINS_INHERENT_IDENTIFIER, PARACHAIN_KEY_TYPE_ID,
+	ON_DEMAND_DEFAULT_QUEUE_MAX_SIZE, ON_DEMAND_MAX_QUEUE_MAX_SIZE, PARACHAINS_INHERENT_IDENTIFIER,
+	PARACHAIN_KEY_TYPE_ID,
 };
 
 #[cfg(feature = "std")]
diff --git a/polkadot/primitives/src/v6/mod.rs b/polkadot/primitives/src/v6/mod.rs
index 742dbed1cd87b88b284238fd255a4c7c0bbe28aa..9e7f910314cc3ae5db5c260ca5ba4a9f873a4d47 100644
--- a/polkadot/primitives/src/v6/mod.rs
+++ b/polkadot/primitives/src/v6/mod.rs
@@ -399,6 +399,13 @@ pub const MAX_POV_SIZE: u32 = 5 * 1024 * 1024;
 /// Can be adjusted in configuration.
 pub const ON_DEMAND_DEFAULT_QUEUE_MAX_SIZE: u32 = 10_000;
 
+/// Maximum for maximum queue size.
+///
+/// Setting `on_demand_queue_max_size` to a value higher than this is unsound. This is more a
+/// theoretical limit, just below enough what the target type supports, so comparisons are possible
+/// even with indices that are overflowing the underyling type.
+pub const ON_DEMAND_MAX_QUEUE_MAX_SIZE: u32 = 1_000_000_000;
+
 /// Backing votes threshold used from the host prior to runtime API version 6 and from the runtime
 /// prior to v9 configuration migration.
 pub const LEGACY_MIN_BACKING_VOTES: u32 = 2;
diff --git a/polkadot/runtime/parachains/Cargo.toml b/polkadot/runtime/parachains/Cargo.toml
index 6104014547638dbf13c5b080a280e7cf369293c9..6e693b83ae138d3b9a0a3cb580485f3560b2d50c 100644
--- a/polkadot/runtime/parachains/Cargo.toml
+++ b/polkadot/runtime/parachains/Cargo.toml
@@ -15,7 +15,7 @@ bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] }
 parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive", "max-encoded-len"] }
 log = { workspace = true }
 rustc-hex = { version = "2.1.0", default-features = false }
-scale-info = { version = "2.10.0", default-features = false, features = ["derive"] }
+scale-info = { version = "2.11.0", default-features = false, features = ["derive"] }
 serde = { features = ["alloc", "derive"], workspace = true }
 derive_more = "0.99.17"
 bitflags = "1.3.2"
diff --git a/polkadot/runtime/parachains/src/assigner_on_demand/benchmarking.rs b/polkadot/runtime/parachains/src/assigner_on_demand/benchmarking.rs
index 8360e7a78d0a5a155f5b9d63ffae05bb85e44f32..779d6f04e39638d5dfe2552c983de4715ee089d7 100644
--- a/polkadot/runtime/parachains/src/assigner_on_demand/benchmarking.rs
+++ b/polkadot/runtime/parachains/src/assigner_on_demand/benchmarking.rs
@@ -70,11 +70,7 @@ mod benchmarks {
 		let para_id = ParaId::from(111u32);
 		init_parathread::<T>(para_id);
 		T::Currency::make_free_balance_be(&caller, BalanceOf::<T>::max_value());
-		let order = EnqueuedOrder::new(para_id);
-
-		for _ in 0..s {
-			Pallet::<T>::add_on_demand_order(order.clone(), QueuePushDirection::Back).unwrap();
-		}
+		Pallet::<T>::populate_queue(para_id, s);
 
 		#[extrinsic_call]
 		_(RawOrigin::Signed(caller.into()), BalanceOf::<T>::max_value(), para_id)
@@ -87,11 +83,8 @@ mod benchmarks {
 		let para_id = ParaId::from(111u32);
 		init_parathread::<T>(para_id);
 		T::Currency::make_free_balance_be(&caller, BalanceOf::<T>::max_value());
-		let order = EnqueuedOrder::new(para_id);
 
-		for _ in 0..s {
-			Pallet::<T>::add_on_demand_order(order.clone(), QueuePushDirection::Back).unwrap();
-		}
+		Pallet::<T>::populate_queue(para_id, s);
 
 		#[extrinsic_call]
 		_(RawOrigin::Signed(caller.into()), BalanceOf::<T>::max_value(), para_id)
diff --git a/polkadot/runtime/parachains/src/assigner_on_demand/migration.rs b/polkadot/runtime/parachains/src/assigner_on_demand/migration.rs
new file mode 100644
index 0000000000000000000000000000000000000000..5071653377d49cb80d9af73375ead355e81ade01
--- /dev/null
+++ b/polkadot/runtime/parachains/src/assigner_on_demand/migration.rs
@@ -0,0 +1,181 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! A module that is responsible for migration of storage.
+use super::*;
+use frame_support::{
+	migrations::VersionedMigration, pallet_prelude::ValueQuery, storage_alias,
+	traits::OnRuntimeUpgrade, weights::Weight,
+};
+
+mod v0 {
+	use super::*;
+	use sp_std::collections::vec_deque::VecDeque;
+
+	#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone)]
+	pub(super) struct EnqueuedOrder {
+		pub para_id: ParaId,
+	}
+
+	/// Keeps track of the multiplier used to calculate the current spot price for the on demand
+	/// assigner.
+	/// NOTE: Ignoring the `OnEmpty` field for the migration.
+	#[storage_alias]
+	pub(super) type SpotTraffic<T: Config> = StorageValue<Pallet<T>, FixedU128, ValueQuery>;
+
+	/// The order storage entry. Uses a VecDeque to be able to push to the front of the
+	/// queue from the scheduler on session boundaries.
+	/// NOTE: Ignoring the `OnEmpty` field for the migration.
+	#[storage_alias]
+	pub(super) type OnDemandQueue<T: Config> =
+		StorageValue<Pallet<T>, VecDeque<EnqueuedOrder>, ValueQuery>;
+}
+
+mod v1 {
+	use super::*;
+
+	use crate::assigner_on_demand::LOG_TARGET;
+
+	/// Migration to V1
+	pub struct UncheckedMigrateToV1<T>(sp_std::marker::PhantomData<T>);
+	impl<T: Config> OnRuntimeUpgrade for UncheckedMigrateToV1<T> {
+		fn on_runtime_upgrade() -> Weight {
+			let mut weight: Weight = Weight::zero();
+
+			// Migrate the current traffic value
+			let config = <configuration::Pallet<T>>::config();
+			QueueStatus::<T>::mutate(|mut queue_status| {
+				Pallet::<T>::update_spot_traffic(&config, &mut queue_status);
+
+				let v0_queue = v0::OnDemandQueue::<T>::take();
+				// Process the v0 queue into v1.
+				v0_queue.into_iter().for_each(|enqueued_order| {
+					// Readding the old orders will use the new systems.
+					Pallet::<T>::add_on_demand_order(
+						queue_status,
+						enqueued_order.para_id,
+						QueuePushDirection::Back,
+					);
+				});
+			});
+
+			// Remove the old storage.
+			v0::OnDemandQueue::<T>::kill(); // 1 write
+			v0::SpotTraffic::<T>::kill(); // 1 write
+
+			// Config read
+			weight.saturating_accrue(T::DbWeight::get().reads(1));
+			// QueueStatus read write (update_spot_traffic)
+			weight.saturating_accrue(T::DbWeight::get().reads_writes(1, 1));
+			// Kill x 2
+			weight.saturating_accrue(T::DbWeight::get().writes(2));
+
+			log::info!(target: LOG_TARGET, "Migrated on demand assigner storage to v1");
+			weight
+		}
+
+		#[cfg(feature = "try-runtime")]
+		fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::TryRuntimeError> {
+			let n: u32 = v0::OnDemandQueue::<T>::get().len() as u32;
+
+			log::info!(
+				target: LOG_TARGET,
+				"Number of orders waiting in the queue before: {n}",
+			);
+
+			Ok(n.encode())
+		}
+
+		#[cfg(feature = "try-runtime")]
+		fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::TryRuntimeError> {
+			log::info!(target: LOG_TARGET, "Running post_upgrade()");
+
+			ensure!(
+				v0::OnDemandQueue::<T>::get().is_empty(),
+				"OnDemandQueue should be empty after the migration"
+			);
+
+			let expected_len = u32::decode(&mut &state[..]).unwrap();
+			let queue_status_size = QueueStatus::<T>::get().size();
+			ensure!(
+				expected_len == queue_status_size,
+				"Number of orders should be the same before and after migration"
+			);
+
+			let n_affinity_entries: u32 =
+				AffinityEntries::<T>::iter().map(|(_index, heap)| heap.len() as u32).sum();
+			let n_para_id_affinity: u32 = ParaIdAffinity::<T>::iter()
+				.map(|(_para_id, affinity)| affinity.count as u32)
+				.sum();
+			ensure!(
+				n_para_id_affinity == n_affinity_entries,
+				"Number of affinity entries should be the same as the counts in ParaIdAffinity"
+			);
+
+			Ok(())
+		}
+	}
+}
+
+/// Migrate `V0` to `V1` of the storage format.
+pub type MigrateV0ToV1<T> = VersionedMigration<
+	0,
+	1,
+	v1::UncheckedMigrateToV1<T>,
+	Pallet<T>,
+	<T as frame_system::Config>::DbWeight,
+>;
+
+#[cfg(test)]
+mod tests {
+	use super::{v0, v1, OnRuntimeUpgrade, Weight};
+	use crate::mock::{new_test_ext, MockGenesisConfig, OnDemandAssigner, Test};
+	use primitives::Id as ParaId;
+
+	#[test]
+	fn migration_to_v1_preserves_queue_ordering() {
+		new_test_ext(MockGenesisConfig::default()).execute_with(|| {
+			// Place orders for paraids 1..5
+			for i in 1..=5 {
+				v0::OnDemandQueue::<Test>::mutate(|queue| {
+					queue.push_back(v0::EnqueuedOrder { para_id: ParaId::new(i) })
+				});
+			}
+
+			// Queue has 5 orders
+			let old_queue = v0::OnDemandQueue::<Test>::get();
+			assert_eq!(old_queue.len(), 5);
+			// New queue has 0 orders
+			assert_eq!(OnDemandAssigner::get_queue_status().size(), 0);
+
+			// For tests, db weight is zero.
+			assert_eq!(
+				<v1::UncheckedMigrateToV1<Test> as OnRuntimeUpgrade>::on_runtime_upgrade(),
+				Weight::zero()
+			);
+
+			// New queue has 5 orders
+			assert_eq!(OnDemandAssigner::get_queue_status().size(), 5);
+
+			// Compare each entry from the old queue with the entry in the new queue.
+			old_queue.iter().zip(OnDemandAssigner::get_free_entries().iter()).for_each(
+				|(old_enq, new_enq)| {
+					assert_eq!(old_enq.para_id, new_enq.para_id);
+				},
+			);
+		});
+	}
+}
diff --git a/polkadot/runtime/parachains/src/assigner_on_demand/mod.rs b/polkadot/runtime/parachains/src/assigner_on_demand/mod.rs
index bc450dc78129ad7404627e49b9882e2723c09f08..c47c8745e654a616eda92e06ffb127ddfe40cd22 100644
--- a/polkadot/runtime/parachains/src/assigner_on_demand/mod.rs
+++ b/polkadot/runtime/parachains/src/assigner_on_demand/mod.rs
@@ -16,22 +16,32 @@
 
 //! The parachain on demand assignment module.
 //!
-//! Implements a mechanism for taking in orders for pay as you go (PAYG) or on demand
-//! parachain (previously parathreads) assignments. This module is not handled by the
-//! initializer but is instead instantiated in the `construct_runtime` macro.
+//! Implements a mechanism for taking in orders for on-demand parachain (previously parathreads)
+//! assignments. This module is not handled by the initializer but is instead instantiated in the
+//! `construct_runtime` macro.
 //!
 //! The module currently limits parallel execution of blocks from the same `ParaId` via
 //! a core affinity mechanism. As long as there exists an affinity for a `CoreIndex` for
 //! a specific `ParaId`, orders for blockspace for that `ParaId` will only be assigned to
-//! that `CoreIndex`. This affinity mechanism can be removed if it can be shown that parallel
-//! execution is valid.
+//! that `CoreIndex`.
+//!
+//! NOTE: Once we have elastic scaling implemented we might want to extend this module to support
+//! ignoring core affinity up to a certain extend. This should be opt-in though as the parachain
+//! needs to support multiple cores in the same block. If we want to enable a single parachain
+//! occupying multiple cores in on-demand, we will likely add a separate order type, where the
+//! intent can be made explicit.
 
 mod benchmarking;
+pub mod migration;
 mod mock_helpers;
 
+extern crate alloc;
+
 #[cfg(test)]
 mod tests;
 
+use core::mem::take;
+
 use crate::{configuration, paras, scheduler::common::Assignment};
 
 use frame_support::{
@@ -43,13 +53,17 @@ use frame_support::{
 	},
 };
 use frame_system::pallet_prelude::*;
-use primitives::{CoreIndex, Id as ParaId};
+use primitives::{CoreIndex, Id as ParaId, ON_DEMAND_MAX_QUEUE_MAX_SIZE};
 use sp_runtime::{
 	traits::{One, SaturatedConversion},
 	FixedPointNumber, FixedPointOperand, FixedU128, Perbill, Saturating,
 };
 
-use sp_std::{collections::vec_deque::VecDeque, prelude::*};
+use alloc::collections::BinaryHeap;
+use sp_std::{
+	cmp::{Ord, Ordering, PartialOrd},
+	prelude::*,
+};
 
 const LOG_TARGET: &str = "runtime::parachains::assigner-on-demand";
 
@@ -73,17 +87,116 @@ impl WeightInfo for TestWeightInfo {
 	}
 }
 
+/// Meta data for full queue.
+///
+/// This includes elements with affinity and free entries.
+///
+/// The actual queue is implemented via multiple priority queues. One for each core, for entries
+/// which currently have a core affinity and one free queue, with entries without any affinity yet.
+///
+/// The design aims to have most queue accessess be O(1) or O(log(N)). Absolute worst case is O(N).
+/// Importantly this includes all accessess that happen in a single block. Even with 50 cores, the
+/// total complexity of all operations in the block should maintain above complexities. In
+/// particular O(N) stays O(N), it should never be O(N*cores).
+///
+/// More concrete rundown on complexity:
+///
+///  - insert: O(1) for placing an order, O(log(N)) for push backs.
+///  - pop_assignment_for_core: O(log(N)), O(N) worst case: Can only happen for one core, next core
+///  is already less work.
+///  - report_processed & push back: If affinity dropped to 0, then O(N) in the worst case. Again
+///  this divides per core.
+///
+///  Reads still exist, also improved slightly, but worst case we fetch all entries.
+#[derive(Encode, Decode, TypeInfo)]
+struct QueueStatusType {
+	/// Last calculated traffic value.
+	traffic: FixedU128,
+	/// The next index to use.
+	next_index: QueueIndex,
+	/// Smallest index still in use.
+	///
+	/// In case of a completely empty queue (free + affinity queues), `next_index - smallest_index
+	/// == 0`.
+	smallest_index: QueueIndex,
+	/// Indices that have been freed already.
+	///
+	/// But have a hole to `smallest_index`, so we can not yet bump `smallest_index`. This binary
+	/// heap is roughly bounded in the number of on demand cores:
+	///
+	/// For a single core, elements will always be processed in order. With each core added, a
+	/// level of out of order execution is added.
+	freed_indices: BinaryHeap<ReverseQueueIndex>,
+}
+
+impl Default for QueueStatusType {
+	fn default() -> QueueStatusType {
+		QueueStatusType {
+			traffic: FixedU128::default(),
+			next_index: QueueIndex(0),
+			smallest_index: QueueIndex(0),
+			freed_indices: BinaryHeap::new(),
+		}
+	}
+}
+
+impl QueueStatusType {
+	/// How many orders are queued in total?
+	///
+	/// This includes entries which have core affinity.
+	fn size(&self) -> u32 {
+		self.next_index
+			.0
+			.overflowing_sub(self.smallest_index.0)
+			.0
+			.saturating_sub(self.freed_indices.len() as u32)
+	}
+
+	/// Get current next index
+	///
+	/// to use for an element newly pushed to the back of the queue.
+	fn push_back(&mut self) -> QueueIndex {
+		let QueueIndex(next_index) = self.next_index;
+		self.next_index = QueueIndex(next_index.overflowing_add(1).0);
+		QueueIndex(next_index)
+	}
+
+	/// Push something to the front of the queue
+	fn push_front(&mut self) -> QueueIndex {
+		self.smallest_index = QueueIndex(self.smallest_index.0.overflowing_sub(1).0);
+		self.smallest_index
+	}
+
+	/// The given index is no longer part of the queue.
+	///
+	/// This updates `smallest_index` if need be.
+	fn consume_index(&mut self, removed_index: QueueIndex) {
+		if removed_index != self.smallest_index {
+			self.freed_indices.push(removed_index.reverse());
+			return
+		}
+		let mut index = self.smallest_index.0.overflowing_add(1).0;
+		// Even more to advance?
+		while self.freed_indices.peek() == Some(&ReverseQueueIndex(index)) {
+			index = index.overflowing_add(1).0;
+			self.freed_indices.pop();
+		}
+		self.smallest_index = QueueIndex(index);
+	}
+}
+
 /// Keeps track of how many assignments a scheduler currently has at a specific `CoreIndex` for a
 /// specific `ParaId`.
 #[derive(Encode, Decode, Default, Clone, Copy, TypeInfo)]
 #[cfg_attr(test, derive(PartialEq, RuntimeDebug))]
-pub struct CoreAffinityCount {
-	core_idx: CoreIndex,
+struct CoreAffinityCount {
+	core_index: CoreIndex,
 	count: u32,
 }
 
 /// An indicator as to which end of the `OnDemandQueue` an assignment will be placed.
-pub enum QueuePushDirection {
+#[cfg_attr(test, derive(RuntimeDebug))]
+enum QueuePushDirection {
 	Back,
 	Front,
 }
@@ -93,9 +206,8 @@ type BalanceOf<T> =
 	<<T as Config>::Currency as Currency<<T as frame_system::Config>::AccountId>>::Balance;
 
 /// Errors that can happen during spot traffic calculation.
-#[derive(PartialEq)]
-#[cfg_attr(feature = "std", derive(Debug))]
-pub enum SpotTrafficCalculationErr {
+#[derive(PartialEq, RuntimeDebug)]
+enum SpotTrafficCalculationErr {
 	/// The order queue capacity is at 0.
 	QueueCapacityIsZero,
 	/// The queue size is larger than the queue capacity.
@@ -104,15 +216,85 @@ pub enum SpotTrafficCalculationErr {
 	Division,
 }
 
+/// Type used for priority indices.
+//  NOTE: The `Ord` implementation for this type is unsound in the general case.
+//        Do not use it for anything but it's intended purpose.
+#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq, Copy)]
+struct QueueIndex(u32);
+
+/// QueueIndex with reverse ordering.
+///
+/// Same as `Reverse(QueueIndex)`, but with all the needed traits implemented.
+#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq, Copy)]
+struct ReverseQueueIndex(u32);
+
+impl QueueIndex {
+	fn reverse(self) -> ReverseQueueIndex {
+		ReverseQueueIndex(self.0)
+	}
+}
+
+impl Ord for QueueIndex {
+	fn cmp(&self, other: &Self) -> Ordering {
+		let diff = self.0.overflowing_sub(other.0).0;
+		if diff == 0 {
+			Ordering::Equal
+		} else if diff <= ON_DEMAND_MAX_QUEUE_MAX_SIZE {
+			Ordering::Greater
+		} else {
+			Ordering::Less
+		}
+	}
+}
+
+impl PartialOrd for QueueIndex {
+	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+		Some(self.cmp(other))
+	}
+}
+
+impl Ord for ReverseQueueIndex {
+	fn cmp(&self, other: &Self) -> Ordering {
+		QueueIndex(other.0).cmp(&QueueIndex(self.0))
+	}
+}
+impl PartialOrd for ReverseQueueIndex {
+	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+		Some(self.cmp(&other))
+	}
+}
+
 /// Internal representation of an order after it has been enqueued already.
-#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone)]
-pub(super) struct EnqueuedOrder {
-	pub para_id: ParaId,
+///
+/// This data structure is provided for a min BinaryHeap (Ord compares in reverse order with regards
+/// to its elements)
+#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq)]
+struct EnqueuedOrder {
+	para_id: ParaId,
+	idx: QueueIndex,
 }
 
 impl EnqueuedOrder {
-	pub fn new(para_id: ParaId) -> Self {
-		Self { para_id }
+	fn new(idx: QueueIndex, para_id: ParaId) -> Self {
+		Self { idx, para_id }
+	}
+}
+
+impl PartialOrd for EnqueuedOrder {
+	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+		match other.idx.partial_cmp(&self.idx) {
+			Some(Ordering::Equal) => other.para_id.partial_cmp(&self.para_id),
+			o => o,
+		}
+	}
+}
+
+impl Ord for EnqueuedOrder {
+	fn cmp(&self, other: &Self) -> Ordering {
+		match other.idx.cmp(&self.idx) {
+			Ordering::Equal => other.para_id.cmp(&self.para_id),
+			o => o,
+		}
 	}
 }
 
@@ -121,8 +303,11 @@ pub mod pallet {
 
 	use super::*;
 
+	const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
+
 	#[pallet::pallet]
 	#[pallet::without_storage_info]
+	#[pallet::storage_version(STORAGE_VERSION)]
 	pub struct Pallet<T>(_);
 
 	#[pallet::config]
@@ -141,36 +326,44 @@ pub mod pallet {
 		type TrafficDefaultValue: Get<FixedU128>;
 	}
 
-	/// Creates an empty spot traffic value if one isn't present in storage already.
+	/// Creates an empty queue status for an empty queue with initial traffic value.
 	#[pallet::type_value]
-	pub fn SpotTrafficOnEmpty<T: Config>() -> FixedU128 {
-		T::TrafficDefaultValue::get()
+	pub(super) fn QueueStatusOnEmpty<T: Config>() -> QueueStatusType {
+		QueueStatusType { traffic: T::TrafficDefaultValue::get(), ..Default::default() }
 	}
 
-	/// Creates an empty on demand queue if one isn't present in storage already.
 	#[pallet::type_value]
-	pub(super) fn OnDemandQueueOnEmpty<T: Config>() -> VecDeque<EnqueuedOrder> {
-		VecDeque::new()
+	pub(super) fn EntriesOnEmpty<T: Config>() -> BinaryHeap<EnqueuedOrder> {
+		BinaryHeap::new()
 	}
 
-	/// Keeps track of the multiplier used to calculate the current spot price for the on demand
-	/// assigner.
-	#[pallet::storage]
-	pub(super) type SpotTraffic<T: Config> =
-		StorageValue<_, FixedU128, ValueQuery, SpotTrafficOnEmpty<T>>;
-
-	/// The order storage entry. Uses a VecDeque to be able to push to the front of the
-	/// queue from the scheduler on session boundaries.
-	#[pallet::storage]
-	pub(super) type OnDemandQueue<T: Config> =
-		StorageValue<_, VecDeque<EnqueuedOrder>, ValueQuery, OnDemandQueueOnEmpty<T>>;
-
 	/// Maps a `ParaId` to `CoreIndex` and keeps track of how many assignments the scheduler has in
 	/// it's lookahead. Keeping track of this affinity prevents parallel execution of the same
 	/// `ParaId` on two or more `CoreIndex`es.
 	#[pallet::storage]
 	pub(super) type ParaIdAffinity<T: Config> =
-		StorageMap<_, Twox256, ParaId, CoreAffinityCount, OptionQuery>;
+		StorageMap<_, Twox64Concat, ParaId, CoreAffinityCount, OptionQuery>;
+
+	/// Overall status of queue (both free + affinity entries)
+	#[pallet::storage]
+	pub(super) type QueueStatus<T: Config> =
+		StorageValue<_, QueueStatusType, ValueQuery, QueueStatusOnEmpty<T>>;
+
+	/// Priority queue for all orders which don't yet (or not any more) have any core affinity.
+	#[pallet::storage]
+	pub(super) type FreeEntries<T: Config> =
+		StorageValue<_, BinaryHeap<EnqueuedOrder>, ValueQuery, EntriesOnEmpty<T>>;
+
+	/// Queue entries that are currently bound to a particular core due to core affinity.
+	#[pallet::storage]
+	pub(super) type AffinityEntries<T: Config> = StorageMap<
+		_,
+		Twox64Concat,
+		CoreIndex,
+		BinaryHeap<EnqueuedOrder>,
+		ValueQuery,
+		EntriesOnEmpty<T>,
+	>;
 
 	#[pallet::event]
 	#[pallet::generate_deposit(pub(super) fn deposit_event)]
@@ -183,9 +376,6 @@ pub mod pallet {
 
 	#[pallet::error]
 	pub enum Error<T> {
-		/// The `ParaId` supplied to the `place_order` call is not a valid `ParaThread`, making the
-		/// call is invalid.
-		InvalidParaId,
 		/// The order queue is full, `place_order` will not continue.
 		QueueFull,
 		/// The current spot price is higher than the max amount specified in the `place_order`
@@ -197,45 +387,14 @@ pub mod pallet {
 	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
 		fn on_initialize(_now: BlockNumberFor<T>) -> Weight {
 			let config = <configuration::Pallet<T>>::config();
-			// Calculate spot price multiplier and store it.
-			let old_traffic = SpotTraffic::<T>::get();
-			match Self::calculate_spot_traffic(
-				old_traffic,
-				config.scheduler_params.on_demand_queue_max_size,
-				Self::queue_size(),
-				config.scheduler_params.on_demand_target_queue_utilization,
-				config.scheduler_params.on_demand_fee_variability,
-			) {
-				Ok(new_traffic) => {
-					// Only update storage on change
-					if new_traffic != old_traffic {
-						SpotTraffic::<T>::set(new_traffic);
-						Pallet::<T>::deposit_event(Event::<T>::SpotTrafficSet {
-							traffic: new_traffic,
-						});
-						return T::DbWeight::get().reads_writes(2, 1)
-					}
-				},
-				Err(SpotTrafficCalculationErr::QueueCapacityIsZero) => {
-					log::debug!(
-						target: LOG_TARGET,
-						"Error calculating spot traffic: The order queue capacity is at 0."
-					);
-				},
-				Err(SpotTrafficCalculationErr::QueueSizeLargerThanCapacity) => {
-					log::debug!(
-						target: LOG_TARGET,
-						"Error calculating spot traffic: The queue size is larger than the queue capacity."
-					);
-				},
-				Err(SpotTrafficCalculationErr::Division) => {
-					log::debug!(
-						target: LOG_TARGET,
-						"Error calculating spot traffic: Arithmetic error during division, either division by 0 or over/underflow."
-					);
-				},
-			};
-			T::DbWeight::get().reads_writes(2, 0)
+			// We need to update the spot traffic on block initialize in order to account for idle
+			// blocks.
+			QueueStatus::<T>::mutate(|queue_status| {
+				Self::update_spot_traffic(&config, queue_status);
+			});
+
+			// 2 reads in config and queuestatus, at maximum 1 write to queuestatus.
+			T::DbWeight::get().reads_writes(2, 1)
 		}
 	}
 
@@ -258,7 +417,7 @@ pub mod pallet {
 		/// Events:
 		/// - `SpotOrderPlaced`
 		#[pallet::call_index(0)]
-		#[pallet::weight(<T as Config>::WeightInfo::place_order_allow_death(OnDemandQueue::<T>::get().len() as u32))]
+		#[pallet::weight(<T as Config>::WeightInfo::place_order_allow_death(QueueStatus::<T>::get().size()))]
 		pub fn place_order_allow_death(
 			origin: OriginFor<T>,
 			max_amount: BalanceOf<T>,
@@ -285,7 +444,7 @@ pub mod pallet {
 		/// Events:
 		/// - `SpotOrderPlaced`
 		#[pallet::call_index(1)]
-		#[pallet::weight(<T as Config>::WeightInfo::place_order_keep_alive(OnDemandQueue::<T>::get().len() as u32))]
+		#[pallet::weight(<T as Config>::WeightInfo::place_order_keep_alive(QueueStatus::<T>::get().size()))]
 		pub fn place_order_keep_alive(
 			origin: OriginFor<T>,
 			max_amount: BalanceOf<T>,
@@ -297,10 +456,78 @@ pub mod pallet {
 	}
 }
 
+// Internal functions and interface to scheduler/wrapping assignment provider.
 impl<T: Config> Pallet<T>
 where
 	BalanceOf<T>: FixedPointOperand,
 {
+	/// Take the next queued entry that is available for a given core index.
+	///
+	/// Parameters:
+	/// - `core_index`: The core index
+	pub fn pop_assignment_for_core(core_index: CoreIndex) -> Option<Assignment> {
+		let entry: Result<EnqueuedOrder, ()> = QueueStatus::<T>::try_mutate(|queue_status| {
+			AffinityEntries::<T>::try_mutate(core_index, |affinity_entries| {
+				let free_entry = FreeEntries::<T>::try_mutate(|free_entries| {
+					let affinity_next = affinity_entries.peek();
+					let free_next = free_entries.peek();
+					let pick_free = match (affinity_next, free_next) {
+						(None, _) => true,
+						(Some(_), None) => false,
+						(Some(a), Some(f)) => f < a,
+					};
+					if pick_free {
+						let entry = free_entries.pop().ok_or(())?;
+						let (mut affinities, free): (BinaryHeap<_>, BinaryHeap<_>) =
+							take(free_entries)
+								.into_iter()
+								.partition(|e| e.para_id == entry.para_id);
+						affinity_entries.append(&mut affinities);
+						*free_entries = free;
+						Ok(entry)
+					} else {
+						Err(())
+					}
+				});
+				let entry = free_entry.or_else(|()| affinity_entries.pop().ok_or(()))?;
+				queue_status.consume_index(entry.idx);
+				Ok(entry)
+			})
+		});
+
+		let assignment = entry.map(|e| Assignment::Pool { para_id: e.para_id, core_index }).ok()?;
+
+		Pallet::<T>::increase_affinity(assignment.para_id(), core_index);
+		Some(assignment)
+	}
+
+	/// Report that the `para_id` & `core_index` combination was processed.
+	///
+	/// This should be called once it is clear that the assignment won't get pushed back anymore.
+	///
+	/// In other words for each `pop_assignment_for_core` a call to this function or
+	/// `push_back_assignment` must follow, but only one.
+	pub fn report_processed(para_id: ParaId, core_index: CoreIndex) {
+		Pallet::<T>::decrease_affinity_update_queue(para_id, core_index);
+	}
+
+	/// Push an assignment back to the front of the queue.
+	///
+	/// The assignment has not been processed yet. Typically used on session boundaries.
+	///
+	/// NOTE: We are not checking queue size here. So due to push backs it is possible that we
+	/// exceed the maximum queue size slightly.
+	///
+	/// Parameters:
+	/// - `para_id`: The para that did not make it.
+	/// - `core_index`: The core the para was scheduled on.
+	pub fn push_back_assignment(para_id: ParaId, core_index: CoreIndex) {
+		Pallet::<T>::decrease_affinity_update_queue(para_id, core_index);
+		QueueStatus::<T>::mutate(|queue_status| {
+			Pallet::<T>::add_on_demand_order(queue_status, para_id, QueuePushDirection::Front);
+		});
+	}
+
 	/// Helper function for `place_order_*` calls. Used to differentiate between placing orders
 	/// with a keep alive check or to allow the account to be reaped.
 	///
@@ -326,34 +553,62 @@ where
 	) -> DispatchResult {
 		let config = <configuration::Pallet<T>>::config();
 
-		// Traffic always falls back to 1.0
-		let traffic = SpotTraffic::<T>::get();
-
-		// Calculate spot price
-		let spot_price: BalanceOf<T> = traffic.saturating_mul_int(
-			config.scheduler_params.on_demand_base_fee.saturated_into::<BalanceOf<T>>(),
-		);
-
-		// Is the current price higher than `max_amount`
-		ensure!(spot_price.le(&max_amount), Error::<T>::SpotPriceHigherThanMaxAmount);
+		QueueStatus::<T>::mutate(|queue_status| {
+			Self::update_spot_traffic(&config, queue_status);
+			let traffic = queue_status.traffic;
 
-		// Charge the sending account the spot price
-		let _ = T::Currency::withdraw(
-			&sender,
-			spot_price,
-			WithdrawReasons::FEE,
-			existence_requirement,
-		)?;
+			// Calculate spot price
+			let spot_price: BalanceOf<T> = traffic.saturating_mul_int(
+				config.scheduler_params.on_demand_base_fee.saturated_into::<BalanceOf<T>>(),
+			);
 
-		let order = EnqueuedOrder::new(para_id);
+			// Is the current price higher than `max_amount`
+			ensure!(spot_price.le(&max_amount), Error::<T>::SpotPriceHigherThanMaxAmount);
 
-		let res = Pallet::<T>::add_on_demand_order(order, QueuePushDirection::Back);
+			// Charge the sending account the spot price
+			let _ = T::Currency::withdraw(
+				&sender,
+				spot_price,
+				WithdrawReasons::FEE,
+				existence_requirement,
+			)?;
 
-		if res.is_ok() {
-			Pallet::<T>::deposit_event(Event::<T>::OnDemandOrderPlaced { para_id, spot_price });
-		}
+			ensure!(
+				queue_status.size() < config.scheduler_params.on_demand_queue_max_size,
+				Error::<T>::QueueFull
+			);
+			Pallet::<T>::add_on_demand_order(queue_status, para_id, QueuePushDirection::Back);
+			Ok(())
+		})
+	}
 
-		res
+	/// Calculate and update spot traffic.
+	fn update_spot_traffic(
+		config: &configuration::HostConfiguration<BlockNumberFor<T>>,
+		queue_status: &mut QueueStatusType,
+	) {
+		let old_traffic = queue_status.traffic;
+		match Self::calculate_spot_traffic(
+			old_traffic,
+			config.scheduler_params.on_demand_queue_max_size,
+			queue_status.size(),
+			config.scheduler_params.on_demand_target_queue_utilization,
+			config.scheduler_params.on_demand_fee_variability,
+		) {
+			Ok(new_traffic) => {
+				// Only update storage on change
+				if new_traffic != old_traffic {
+					queue_status.traffic = new_traffic;
+					Pallet::<T>::deposit_event(Event::<T>::SpotTrafficSet { traffic: new_traffic });
+				}
+			},
+			Err(err) => {
+				log::debug!(
+					target: LOG_TARGET,
+					"Error calculating spot traffic: {:?}", err
+				);
+			},
+		};
 	}
 
 	/// The spot price multiplier. This is based on the transaction fee calculations defined in:
@@ -378,7 +633,7 @@ where
 	/// - `SpotTrafficCalculationErr::QueueCapacityIsZero`
 	/// - `SpotTrafficCalculationErr::QueueSizeLargerThanCapacity`
 	/// - `SpotTrafficCalculationErr::Division`
-	pub(crate) fn calculate_spot_traffic(
+	fn calculate_spot_traffic(
 		traffic: FixedU128,
 		queue_capacity: u32,
 		queue_size: u32,
@@ -430,175 +685,140 @@ where
 	/// Adds an order to the on demand queue.
 	///
 	/// Paramenters:
-	/// - `order`: The `EnqueuedOrder` to add to the queue.
 	/// - `location`: Whether to push this entry to the back or the front of the queue. Pushing an
 	///   entry to the front of the queue is only used when the scheduler wants to push back an
 	///   entry it has already popped.
-	/// Returns:
-	/// - The unit type on success.
-	///
-	/// Errors:
-	/// - `InvalidParaId`
-	/// - `QueueFull`
 	fn add_on_demand_order(
-		order: EnqueuedOrder,
+		queue_status: &mut QueueStatusType,
+		para_id: ParaId,
 		location: QueuePushDirection,
-	) -> Result<(), DispatchError> {
-		// Only parathreads are valid paraids for on the go parachains.
-		ensure!(<paras::Pallet<T>>::is_parathread(order.para_id), Error::<T>::InvalidParaId);
-
-		let config = <configuration::Pallet<T>>::config();
-
-		OnDemandQueue::<T>::try_mutate(|queue| {
-			// Abort transaction if queue is too large
-			ensure!(
-				Self::queue_size() < config.scheduler_params.on_demand_queue_max_size,
-				Error::<T>::QueueFull
-			);
-			match location {
-				QueuePushDirection::Back => queue.push_back(order),
-				QueuePushDirection::Front => queue.push_front(order),
-			};
-			Ok(())
-		})
+	) {
+		let idx = match location {
+			QueuePushDirection::Back => queue_status.push_back(),
+			QueuePushDirection::Front => queue_status.push_front(),
+		};
+
+		let affinity = ParaIdAffinity::<T>::get(para_id);
+		let order = EnqueuedOrder::new(idx, para_id);
+		#[cfg(test)]
+		log::debug!(target: LOG_TARGET, "add_on_demand_order, order: {:?}, affinity: {:?}, direction: {:?}", order, affinity, location);
+
+		match affinity {
+			None => FreeEntries::<T>::mutate(|entries| entries.push(order)),
+			Some(affinity) =>
+				AffinityEntries::<T>::mutate(affinity.core_index, |entries| entries.push(order)),
+		}
 	}
 
-	/// Get the size of the on demand queue.
+	/// Decrease core affinity for para and update queue
 	///
-	/// Returns:
-	/// - The size of the on demand queue.
-	fn queue_size() -> u32 {
-		let config = <configuration::Pallet<T>>::config();
-		match OnDemandQueue::<T>::get().len().try_into() {
-			Ok(size) => return size,
-			Err(_) => {
-				log::debug!(
-					target: LOG_TARGET,
-					"Failed to fetch the on demand queue size, returning the max size."
-				);
-				return config.scheduler_params.on_demand_queue_max_size
-			},
+	/// if affinity dropped to 0, moving entries back to `FreeEntries`.
+	fn decrease_affinity_update_queue(para_id: ParaId, core_index: CoreIndex) {
+		let affinity = Pallet::<T>::decrease_affinity(para_id, core_index);
+		#[cfg(not(test))]
+		debug_assert_ne!(
+			affinity, None,
+			"Decreased affinity for a para that has not been served on a core?"
+		);
+		if affinity != Some(0) {
+			return
 		}
-	}
-
-	/// Getter for the order queue.
-	#[cfg(test)]
-	fn get_queue() -> VecDeque<EnqueuedOrder> {
-		OnDemandQueue::<T>::get()
-	}
-
-	/// Getter for the affinity tracker.
-	pub fn get_affinity_map(para_id: ParaId) -> Option<CoreAffinityCount> {
-		ParaIdAffinity::<T>::get(para_id)
+		// No affinity more for entries on this core, free any entries:
+		//
+		// This is necessary to ensure them being served as the core might no longer exist at all.
+		AffinityEntries::<T>::mutate(core_index, |affinity_entries| {
+			FreeEntries::<T>::mutate(|free_entries| {
+				let (mut freed, affinities): (BinaryHeap<_>, BinaryHeap<_>) =
+					take(affinity_entries).into_iter().partition(|e| e.para_id == para_id);
+				free_entries.append(&mut freed);
+				*affinity_entries = affinities;
+			})
+		});
 	}
 
 	/// Decreases the affinity of a `ParaId` to a specified `CoreIndex`.
-	/// Subtracts from the count of the `CoreAffinityCount` if an entry is found and the core_idx
+	///
+	/// Subtracts from the count of the `CoreAffinityCount` if an entry is found and the core_index
 	/// matches. When the count reaches 0, the entry is removed.
 	/// A non-existant entry is a no-op.
-	fn decrease_affinity(para_id: ParaId, core_idx: CoreIndex) {
+	///
+	/// Returns: The new affinity of the para on that core. `None` if there is no affinity on this
+	/// core.
+	fn decrease_affinity(para_id: ParaId, core_index: CoreIndex) -> Option<u32> {
 		ParaIdAffinity::<T>::mutate(para_id, |maybe_affinity| {
-			if let Some(affinity) = maybe_affinity {
-				if affinity.core_idx == core_idx {
-					let new_count = affinity.count.saturating_sub(1);
-					if new_count > 0 {
-						*maybe_affinity = Some(CoreAffinityCount { core_idx, count: new_count });
-					} else {
-						*maybe_affinity = None;
-					}
+			let affinity = maybe_affinity.as_mut()?;
+			if affinity.core_index == core_index {
+				let new_count = affinity.count.saturating_sub(1);
+				if new_count > 0 {
+					*maybe_affinity = Some(CoreAffinityCount { core_index, count: new_count });
+				} else {
+					*maybe_affinity = None;
 				}
+				return Some(new_count)
+			} else {
+				None
 			}
-		});
+		})
 	}
 
 	/// Increases the affinity of a `ParaId` to a specified `CoreIndex`.
-	/// Adds to the count of the `CoreAffinityCount` if an entry is found and the core_idx matches.
-	/// A non-existant entry will be initialized with a count of 1 and uses the  supplied
+	/// Adds to the count of the `CoreAffinityCount` if an entry is found and the core_index
+	/// matches. A non-existant entry will be initialized with a count of 1 and uses the  supplied
 	/// `CoreIndex`.
-	fn increase_affinity(para_id: ParaId, core_idx: CoreIndex) {
+	fn increase_affinity(para_id: ParaId, core_index: CoreIndex) {
 		ParaIdAffinity::<T>::mutate(para_id, |maybe_affinity| match maybe_affinity {
 			Some(affinity) =>
-				if affinity.core_idx == core_idx {
+				if affinity.core_index == core_index {
 					*maybe_affinity = Some(CoreAffinityCount {
-						core_idx,
+						core_index,
 						count: affinity.count.saturating_add(1),
 					});
 				},
 			None => {
-				*maybe_affinity = Some(CoreAffinityCount { core_idx, count: 1 });
+				*maybe_affinity = Some(CoreAffinityCount { core_index, count: 1 });
 			},
 		})
 	}
-}
 
-impl<T: Config> Pallet<T> {
-	/// Take the next queued entry that is available for a given core index.
-	/// Invalidates and removes orders with a `para_id` that is not `ParaLifecycle::Parathread`
-	/// but only in [0..P] range slice of the order queue, where P is the element that is
-	/// removed from the order queue.
-	///
-	/// Parameters:
-	/// - `core_idx`: The core index
-	pub fn pop_assignment_for_core(core_idx: CoreIndex) -> Option<Assignment> {
-		let mut queue: VecDeque<EnqueuedOrder> = OnDemandQueue::<T>::get();
-
-		let mut invalidated_para_id_indexes: Vec<usize> = vec![];
-
-		// Get the position of the next `ParaId`. Select either a valid `ParaId` that has an
-		// affinity to the same `CoreIndex` as the scheduler asks for or a valid `ParaId` with no
-		// affinity at all.
-		let pos = queue.iter().enumerate().position(|(index, assignment)| {
-			if <paras::Pallet<T>>::is_parathread(assignment.para_id) {
-				match ParaIdAffinity::<T>::get(&assignment.para_id) {
-					Some(affinity) => return affinity.core_idx == core_idx,
-					None => return true,
-				}
-			}
-			// Record no longer valid para_ids.
-			invalidated_para_id_indexes.push(index);
-			return false
-		});
+	/// Getter for the affinity tracker.
+	#[cfg(test)]
+	fn get_affinity_map(para_id: ParaId) -> Option<CoreAffinityCount> {
+		ParaIdAffinity::<T>::get(para_id)
+	}
 
-		// Collect the popped value.
-		let popped = pos.and_then(|p: usize| {
-			if let Some(assignment) = queue.remove(p) {
-				Pallet::<T>::increase_affinity(assignment.para_id, core_idx);
-				return Some(assignment)
-			};
-			None
-		});
+	/// Getter for the affinity entries.
+	#[cfg(test)]
+	fn get_affinity_entries(core_index: CoreIndex) -> BinaryHeap<EnqueuedOrder> {
+		AffinityEntries::<T>::get(core_index)
+	}
 
-		// Only remove the invalid indexes *after* using the index.
-		// Removed in reverse order so that the indexes don't shift.
-		invalidated_para_id_indexes.iter().rev().for_each(|idx| {
-			queue.remove(*idx);
-		});
+	/// Getter for the free entries.
+	#[cfg(test)]
+	fn get_free_entries() -> BinaryHeap<EnqueuedOrder> {
+		FreeEntries::<T>::get()
+	}
 
-		// Write changes to storage.
-		OnDemandQueue::<T>::set(queue);
+	#[cfg(feature = "runtime-benchmarks")]
+	pub fn populate_queue(para_id: ParaId, num: u32) {
+		QueueStatus::<T>::mutate(|queue_status| {
+			for _ in 0..num {
+				Pallet::<T>::add_on_demand_order(queue_status, para_id, QueuePushDirection::Back);
+			}
+		});
+	}
 
-		popped.map(|p| Assignment::Pool { para_id: p.para_id, core_index: core_idx })
+	#[cfg(test)]
+	fn set_queue_status(new_status: QueueStatusType) {
+		QueueStatus::<T>::set(new_status);
 	}
 
-	/// Report that the `para_id` & `core_index` combination was processed.
-	pub fn report_processed(para_id: ParaId, core_index: CoreIndex) {
-		Pallet::<T>::decrease_affinity(para_id, core_index)
+	#[cfg(test)]
+	fn get_queue_status() -> QueueStatusType {
+		QueueStatus::<T>::get()
 	}
 
-	/// Push an assignment back to the front of the queue.
-	///
-	/// The assignment has not been processed yet. Typically used on session boundaries.
-	/// Parameters:
-	/// - `assignment`: The on demand assignment.
-	pub fn push_back_assignment(para_id: ParaId, core_index: CoreIndex) {
-		Pallet::<T>::decrease_affinity(para_id, core_index);
-		// Skip the queue on push backs from scheduler
-		match Pallet::<T>::add_on_demand_order(
-			EnqueuedOrder::new(para_id),
-			QueuePushDirection::Front,
-		) {
-			Ok(_) => {},
-			Err(_) => {},
-		}
+	#[cfg(test)]
+	fn get_traffic_default_value() -> FixedU128 {
+		<T as Config>::TrafficDefaultValue::get()
 	}
 }
diff --git a/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs b/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs
index 8404700780c84e493d6436c5f3174f814c1082ef..982efe77b939cf64d355e1111606c3856c4a0aea 100644
--- a/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs
+++ b/polkadot/runtime/parachains/src/assigner_on_demand/tests.rs
@@ -73,11 +73,24 @@ fn run_to_block(
 		Paras::initializer_initialize(b + 1);
 		Scheduler::initializer_initialize(b + 1);
 
+		// We need to update the spot traffic on every block.
+		OnDemandAssigner::on_initialize(b + 1);
+
 		// In the real runtime this is expected to be called by the `InclusionInherent` pallet.
 		Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), b + 1);
 	}
 }
 
+fn place_order(para_id: ParaId) {
+	let alice = 100u64;
+	let amt = 10_000_000u128;
+
+	Balances::make_free_balance_be(&alice, amt);
+
+	run_to_block(101, |n| if n == 101 { Some(Default::default()) } else { None });
+	OnDemandAssigner::place_order_allow_death(RuntimeOrigin::signed(alice), amt, para_id).unwrap()
+}
+
 #[test]
 fn spot_traffic_capacity_zero_returns_none() {
 	match OnDemandAssigner::calculate_spot_traffic(
@@ -201,6 +214,42 @@ fn spot_traffic_decreases_over_time() {
 	assert_eq!(traffic, FixedU128::from_inner(3_125_000_000_000_000_000u128))
 }
 
+#[test]
+fn spot_traffic_decreases_between_idle_blocks() {
+	// Testing spot traffic assumptions, but using the mock runtime and default on demand
+	// configuration values. Ensuring that blocks with no on demand activity (idle)
+	// decrease traffic.
+
+	let para_id = ParaId::from(111);
+
+	new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
+		// Initialize the parathread and wait for it to be ready.
+		schedule_blank_para(para_id, ParaKind::Parathread);
+		assert!(!Paras::is_parathread(para_id));
+		run_to_block(100, |n| if n == 100 { Some(Default::default()) } else { None });
+		assert!(Paras::is_parathread(para_id));
+
+		// Set the spot traffic to a large number
+		OnDemandAssigner::set_queue_status(QueueStatusType {
+			traffic: FixedU128::from_u32(10),
+			..Default::default()
+		});
+
+		assert_eq!(OnDemandAssigner::get_queue_status().traffic, FixedU128::from_u32(10));
+
+		// Run to block 101 and ensure that the traffic decreases.
+		run_to_block(101, |n| if n == 100 { Some(Default::default()) } else { None });
+		assert!(OnDemandAssigner::get_queue_status().traffic < FixedU128::from_u32(10));
+
+		// Run to block 102 and observe that we've hit the default traffic value.
+		run_to_block(102, |n| if n == 100 { Some(Default::default()) } else { None });
+		assert_eq!(
+			OnDemandAssigner::get_queue_status().traffic,
+			OnDemandAssigner::get_traffic_default_value()
+		);
+	})
+}
+
 #[test]
 fn place_order_works() {
 	let alice = 1u64;
@@ -278,74 +327,6 @@ fn place_order_keep_alive_keeps_alive() {
 	});
 }
 
-#[test]
-fn add_on_demand_order_works() {
-	let para_a = ParaId::from(111);
-	let order = EnqueuedOrder::new(para_a);
-
-	let mut genesis = GenesisConfigBuilder::default();
-	genesis.on_demand_max_queue_size = 1;
-	new_test_ext(genesis.build()).execute_with(|| {
-		// Initialize the parathread and wait for it to be ready.
-		schedule_blank_para(para_a, ParaKind::Parathread);
-
-		// `para_a` is not onboarded as a parathread yet.
-		assert_noop!(
-			OnDemandAssigner::add_on_demand_order(order.clone(), QueuePushDirection::Back),
-			Error::<Test>::InvalidParaId
-		);
-
-		assert!(!Paras::is_parathread(para_a));
-		run_to_block(100, |n| if n == 100 { Some(Default::default()) } else { None });
-		assert!(Paras::is_parathread(para_a));
-
-		// `para_a` is now onboarded as a valid parathread.
-		assert_ok!(OnDemandAssigner::add_on_demand_order(order.clone(), QueuePushDirection::Back));
-
-		// Max queue size is 1, queue should be full.
-		assert_noop!(
-			OnDemandAssigner::add_on_demand_order(order, QueuePushDirection::Back),
-			Error::<Test>::QueueFull
-		);
-	});
-}
-
-#[test]
-fn spotqueue_push_directions() {
-	new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
-		let para_a = ParaId::from(111);
-		let para_b = ParaId::from(222);
-		let para_c = ParaId::from(333);
-
-		schedule_blank_para(para_a, ParaKind::Parathread);
-		schedule_blank_para(para_b, ParaKind::Parathread);
-		schedule_blank_para(para_c, ParaKind::Parathread);
-
-		run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None });
-
-		let order_a = EnqueuedOrder::new(para_a);
-		let order_b = EnqueuedOrder::new(para_b);
-		let order_c = EnqueuedOrder::new(para_c);
-
-		assert_ok!(OnDemandAssigner::add_on_demand_order(
-			order_a.clone(),
-			QueuePushDirection::Front
-		));
-		assert_ok!(OnDemandAssigner::add_on_demand_order(
-			order_b.clone(),
-			QueuePushDirection::Front
-		));
-
-		assert_ok!(OnDemandAssigner::add_on_demand_order(
-			order_c.clone(),
-			QueuePushDirection::Back
-		));
-
-		assert_eq!(OnDemandAssigner::queue_size(), 3);
-		assert_eq!(OnDemandAssigner::get_queue(), VecDeque::from(vec![order_b, order_a, order_c]))
-	});
-}
-
 #[test]
 fn pop_assignment_for_core_works() {
 	new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
@@ -356,51 +337,32 @@ fn pop_assignment_for_core_works() {
 
 		run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None });
 
-		let order_a = EnqueuedOrder::new(para_a);
-		let order_b = EnqueuedOrder::new(para_b);
-		let assignment_a = Assignment::Pool { para_id: para_a, core_index: CoreIndex(0) };
-		let assignment_b = Assignment::Pool { para_id: para_b, core_index: CoreIndex(1) };
-
 		// Pop should return none with empty queue
 		assert_eq!(OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)), None);
 
 		// Add enough assignments to the order queue.
 		for _ in 0..2 {
-			OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back)
-				.expect("Invalid paraid or queue full");
-
-			OnDemandAssigner::add_on_demand_order(order_b.clone(), QueuePushDirection::Back)
-				.expect("Invalid paraid or queue full");
-		}
-
-		// Queue should contain orders a, b, a, b
-		{
-			let queue: Vec<EnqueuedOrder> = OnDemandQueue::<Test>::get().into_iter().collect();
-			assert_eq!(
-				queue,
-				vec![order_a.clone(), order_b.clone(), order_a.clone(), order_b.clone()]
-			);
+			place_order(para_a);
+			place_order(para_b);
 		}
 
 		// Popped assignments should be for the correct paras and cores
 		assert_eq!(
-			OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)),
-			Some(assignment_a.clone())
+			OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).map(|a| a.para_id()),
+			Some(para_a)
 		);
 		assert_eq!(
-			OnDemandAssigner::pop_assignment_for_core(CoreIndex(1)),
-			Some(assignment_b.clone())
+			OnDemandAssigner::pop_assignment_for_core(CoreIndex(1)).map(|a| a.para_id()),
+			Some(para_b)
 		);
 		assert_eq!(
-			OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)),
-			Some(assignment_a.clone())
+			OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).map(|a| a.para_id()),
+			Some(para_a)
+		);
+		assert_eq!(
+			OnDemandAssigner::pop_assignment_for_core(CoreIndex(1)).map(|a| a.para_id()),
+			Some(para_b)
 		);
-
-		// Queue should contain one left over order
-		{
-			let queue: Vec<EnqueuedOrder> = OnDemandQueue::<Test>::get().into_iter().collect();
-			assert_eq!(queue, vec![order_b.clone(),]);
-		}
 	});
 }
 
@@ -414,28 +376,19 @@ fn push_back_assignment_works() {
 
 		run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None });
 
-		let order_a = EnqueuedOrder::new(para_a);
-		let order_b = EnqueuedOrder::new(para_b);
-
 		// Add enough assignments to the order queue.
-		OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back)
-			.expect("Invalid paraid or queue full");
-
-		OnDemandAssigner::add_on_demand_order(order_b.clone(), QueuePushDirection::Back)
-			.expect("Invalid paraid or queue full");
+		place_order(para_a);
+		place_order(para_b);
 
 		// Pop order a
-		OnDemandAssigner::pop_assignment_for_core(CoreIndex(0));
+		assert_eq!(
+			OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).unwrap().para_id(),
+			para_a
+		);
 
 		// Para a should have affinity for core 0
 		assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 1);
-		assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().core_idx, CoreIndex(0));
-
-		// Queue should still contain order b
-		{
-			let queue: Vec<EnqueuedOrder> = OnDemandQueue::<Test>::get().into_iter().collect();
-			assert_eq!(queue, vec![order_b.clone()]);
-		}
+		assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().core_index, CoreIndex(0));
 
 		// Push back order a
 		OnDemandAssigner::push_back_assignment(para_a, CoreIndex(0));
@@ -444,10 +397,82 @@ fn push_back_assignment_works() {
 		assert_eq!(OnDemandAssigner::get_affinity_map(para_a).is_none(), true);
 
 		// Queue should contain orders a, b. A in front of b.
-		{
-			let queue: Vec<EnqueuedOrder> = OnDemandQueue::<Test>::get().into_iter().collect();
-			assert_eq!(queue, vec![order_a.clone(), order_b.clone()]);
+		assert_eq!(
+			OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).unwrap().para_id(),
+			para_a
+		);
+		assert_eq!(
+			OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).unwrap().para_id(),
+			para_b
+		);
+	});
+}
+
+#[test]
+fn affinity_prohibits_parallel_scheduling() {
+	new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
+		let para_a = ParaId::from(111);
+		let para_b = ParaId::from(222);
+
+		schedule_blank_para(para_a, ParaKind::Parathread);
+		schedule_blank_para(para_b, ParaKind::Parathread);
+
+		run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None });
+
+		// There should be no affinity before starting.
+		assert!(OnDemandAssigner::get_affinity_map(para_a).is_none());
+		assert!(OnDemandAssigner::get_affinity_map(para_b).is_none());
+
+		// Add 2 assignments for para_a for every para_b.
+		place_order(para_a);
+		place_order(para_a);
+		place_order(para_b);
+
+		// Approximate having 1 core.
+		for _ in 0..3 {
+			assert!(OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).is_some());
 		}
+		assert!(OnDemandAssigner::pop_assignment_for_core(CoreIndex(0)).is_none());
+
+		// Affinity on one core is meaningless.
+		assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 2);
+		assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().count, 1);
+		assert_eq!(
+			OnDemandAssigner::get_affinity_map(para_a).unwrap().core_index,
+			OnDemandAssigner::get_affinity_map(para_b).unwrap().core_index,
+		);
+
+		// Clear affinity
+		OnDemandAssigner::report_processed(para_a, 0.into());
+		OnDemandAssigner::report_processed(para_a, 0.into());
+		OnDemandAssigner::report_processed(para_b, 0.into());
+
+		// Add 2 assignments for para_a for every para_b.
+		place_order(para_a);
+		place_order(para_a);
+		place_order(para_b);
+
+		// Approximate having 3 cores. CoreIndex 2 should be unable to obtain an assignment
+		for _ in 0..3 {
+			OnDemandAssigner::pop_assignment_for_core(CoreIndex(0));
+			OnDemandAssigner::pop_assignment_for_core(CoreIndex(1));
+			assert!(OnDemandAssigner::pop_assignment_for_core(CoreIndex(2)).is_none());
+		}
+
+		// Affinity should be the same as before, but on different cores.
+		assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 2);
+		assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().count, 1);
+		assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().core_index, CoreIndex(0));
+		assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().core_index, CoreIndex(1));
+
+		// Clear affinity
+		OnDemandAssigner::report_processed(para_a, CoreIndex(0));
+		OnDemandAssigner::report_processed(para_a, CoreIndex(0));
+		OnDemandAssigner::report_processed(para_b, CoreIndex(1));
+
+		// There should be no affinity after clearing.
+		assert!(OnDemandAssigner::get_affinity_map(para_a).is_none());
+		assert!(OnDemandAssigner::get_affinity_map(para_b).is_none());
 	});
 }
 
@@ -458,7 +483,6 @@ fn affinity_changes_work() {
 		let core_index = CoreIndex(0);
 		schedule_blank_para(para_a, ParaKind::Parathread);
 
-		let order_a = EnqueuedOrder::new(para_a);
 		run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None });
 
 		// There should be no affinity before starting.
@@ -466,8 +490,7 @@ fn affinity_changes_work() {
 
 		// Add enough assignments to the order queue.
 		for _ in 0..10 {
-			OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Front)
-				.expect("Invalid paraid or queue full");
+			place_order(para_a);
 		}
 
 		// There should be no affinity before the scheduler pops.
@@ -483,7 +506,6 @@ fn affinity_changes_work() {
 
 		// Affinity count is 1 after popping with a previous para.
 		assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 1);
-		assert_eq!(OnDemandAssigner::queue_size(), 8);
 
 		for _ in 0..3 {
 			OnDemandAssigner::pop_assignment_for_core(core_index);
@@ -491,147 +513,197 @@ fn affinity_changes_work() {
 
 		// Affinity count is 4 after popping 3 times without a previous para.
 		assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 4);
-		assert_eq!(OnDemandAssigner::queue_size(), 5);
 
 		for _ in 0..5 {
 			OnDemandAssigner::report_processed(para_a, 0.into());
-			OnDemandAssigner::pop_assignment_for_core(core_index);
+			assert!(OnDemandAssigner::pop_assignment_for_core(core_index).is_some());
 		}
 
 		// Affinity count should still be 4 but queue should be empty.
+		assert!(OnDemandAssigner::pop_assignment_for_core(core_index).is_none());
 		assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 4);
-		assert_eq!(OnDemandAssigner::queue_size(), 0);
 
 		// Pop 4 times and get to exactly 0 (None) affinity.
 		for _ in 0..4 {
 			OnDemandAssigner::report_processed(para_a, 0.into());
-			OnDemandAssigner::pop_assignment_for_core(core_index);
+			assert!(OnDemandAssigner::pop_assignment_for_core(core_index).is_none());
 		}
 		assert!(OnDemandAssigner::get_affinity_map(para_a).is_none());
 
 		// Decreasing affinity beyond 0 should still be None.
 		OnDemandAssigner::report_processed(para_a, 0.into());
-		OnDemandAssigner::pop_assignment_for_core(core_index);
+		assert!(OnDemandAssigner::pop_assignment_for_core(core_index).is_none());
 		assert!(OnDemandAssigner::get_affinity_map(para_a).is_none());
 	});
 }
 
 #[test]
-fn affinity_prohibits_parallel_scheduling() {
-	new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
-		let para_a = ParaId::from(111);
-		let para_b = ParaId::from(222);
+fn new_affinity_for_a_core_must_come_from_free_entries() {
+	// If affinity count for a core was zero before, and is 1 now, then the entry
+	// must have come from free_entries.
+	let parachains =
+		vec![ParaId::from(111), ParaId::from(222), ParaId::from(333), ParaId::from(444)];
+	let core_indices = vec![CoreIndex(0), CoreIndex(1), CoreIndex(2), CoreIndex(3)];
 
-		schedule_blank_para(para_a, ParaKind::Parathread);
-		schedule_blank_para(para_b, ParaKind::Parathread);
+	new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
+		parachains.iter().for_each(|chain| {
+			schedule_blank_para(*chain, ParaKind::Parathread);
+		});
 
 		run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None });
 
-		let order_a = EnqueuedOrder::new(para_a);
-		let order_b = EnqueuedOrder::new(para_b);
-
-		// There should be no affinity before starting.
-		assert!(OnDemandAssigner::get_affinity_map(para_a).is_none());
-		assert!(OnDemandAssigner::get_affinity_map(para_b).is_none());
-
-		// Add 2 assignments for para_a for every para_b.
-		OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back)
-			.expect("Invalid paraid or queue full");
-
-		OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back)
-			.expect("Invalid paraid or queue full");
-
-		OnDemandAssigner::add_on_demand_order(order_b.clone(), QueuePushDirection::Back)
-			.expect("Invalid paraid or queue full");
-
-		assert_eq!(OnDemandAssigner::queue_size(), 3);
+		// Place orders for all chains.
+		parachains.iter().for_each(|chain| {
+			place_order(*chain);
+		});
+
+		// There are 4 entries in free_entries.
+		let start_free_entries = OnDemandAssigner::get_free_entries().len();
+		assert_eq!(start_free_entries, 4);
+
+		// Pop assignments on all cores.
+		core_indices.iter().enumerate().for_each(|(n, core_index)| {
+			// There is no affinity on the core prior to popping.
+			assert!(OnDemandAssigner::get_affinity_entries(*core_index).is_empty());
+
+			// There's always an order to be popped for each core.
+			let free_entries = OnDemandAssigner::get_free_entries();
+			let next_order = free_entries.peek();
+
+			// There is no affinity on the paraid prior to popping.
+			assert!(OnDemandAssigner::get_affinity_map(next_order.unwrap().para_id).is_none());
+
+			match OnDemandAssigner::pop_assignment_for_core(*core_index) {
+				Some(assignment) => {
+					// The popped assignment came from free entries.
+					assert_eq!(
+						start_free_entries - 1 - n,
+						OnDemandAssigner::get_free_entries().len()
+					);
+					// The popped assignment has the same para id as the next order.
+					assert_eq!(assignment.para_id(), next_order.unwrap().para_id);
+				},
+				None => panic!("Should not happen"),
+			}
+		});
 
-		// Approximate having 1 core.
-		for _ in 0..3 {
-			OnDemandAssigner::pop_assignment_for_core(CoreIndex(0));
-		}
+		// All entries have been removed from free_entries.
+		assert!(OnDemandAssigner::get_free_entries().is_empty());
 
-		// Affinity on one core is meaningless.
-		assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 2);
-		assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().count, 1);
-		assert_eq!(
-			OnDemandAssigner::get_affinity_map(para_a).unwrap().core_idx,
-			OnDemandAssigner::get_affinity_map(para_b).unwrap().core_idx
-		);
-
-		// Clear affinity
-		OnDemandAssigner::report_processed(para_a, 0.into());
-		OnDemandAssigner::report_processed(para_a, 0.into());
-		OnDemandAssigner::report_processed(para_b, 0.into());
-
-		// Add 2 assignments for para_a for every para_b.
-		OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back)
-			.expect("Invalid paraid or queue full");
+		// All chains have an affinity count of 1.
+		parachains.iter().for_each(|chain| {
+			assert_eq!(OnDemandAssigner::get_affinity_map(*chain).unwrap().count, 1);
+		});
+	});
+}
 
-		OnDemandAssigner::add_on_demand_order(order_a.clone(), QueuePushDirection::Back)
-			.expect("Invalid paraid or queue full");
+#[test]
+#[should_panic]
+fn queue_index_ordering_is_unsound_over_max_size() {
+	// NOTE: Unsoundness proof. If the number goes sufficiently over the max_queue_max_size
+	// the overflow will cause an opposite comparison to what would be expected.
+	let max_num = u32::MAX - ON_DEMAND_MAX_QUEUE_MAX_SIZE;
+	// 0 < some large number.
+	assert_eq!(QueueIndex(0).cmp(&QueueIndex(max_num + 1)), Ordering::Less);
+}
 
-		OnDemandAssigner::add_on_demand_order(order_b.clone(), QueuePushDirection::Back)
-			.expect("Invalid paraid or queue full");
+#[test]
+fn queue_index_ordering_works() {
+	// The largest accepted queue size.
+	let max_num = ON_DEMAND_MAX_QUEUE_MAX_SIZE;
+
+	// 0 == 0
+	assert_eq!(QueueIndex(0).cmp(&QueueIndex(0)), Ordering::Equal);
+	// 0 < 1
+	assert_eq!(QueueIndex(0).cmp(&QueueIndex(1)), Ordering::Less);
+	// 1 > 0
+	assert_eq!(QueueIndex(1).cmp(&QueueIndex(0)), Ordering::Greater);
+	// 0 < max_num
+	assert_eq!(QueueIndex(0).cmp(&QueueIndex(max_num)), Ordering::Less);
+	// 0 > max_num + 1
+	assert_eq!(QueueIndex(0).cmp(&QueueIndex(max_num + 1)), Ordering::Less);
+
+	// Ordering within the bounds of ON_DEMAND_MAX_QUEUE_MAX_SIZE works.
+	let mut v = vec![3, 6, 2, 1, 5, 4];
+	v.sort_by_key(|&num| QueueIndex(num));
+	assert_eq!(v, vec![1, 2, 3, 4, 5, 6]);
+
+	v = vec![max_num, 4, 5, 1, 6];
+	v.sort_by_key(|&num| QueueIndex(num));
+	assert_eq!(v, vec![1, 4, 5, 6, max_num]);
+
+	// Ordering with an element outside of the bounds of the max size also works.
+	v = vec![max_num + 2, 0, 6, 2, 1, 5, 4];
+	v.sort_by_key(|&num| QueueIndex(num));
+	assert_eq!(v, vec![0, 1, 2, 4, 5, 6, max_num + 2]);
+
+	// Numbers way above the max size will overflow
+	v = vec![u32::MAX - 1, u32::MAX, 6, 2, 1, 5, 4];
+	v.sort_by_key(|&num| QueueIndex(num));
+	assert_eq!(v, vec![u32::MAX - 1, u32::MAX, 1, 2, 4, 5, 6]);
+}
 
-		// Approximate having 3 cores. CoreIndex 2 should be unable to obtain an assignment
-		for _ in 0..3 {
-			OnDemandAssigner::pop_assignment_for_core(CoreIndex(0));
-			OnDemandAssigner::pop_assignment_for_core(CoreIndex(1));
-			assert_eq!(None, OnDemandAssigner::pop_assignment_for_core(CoreIndex(2)));
-		}
+#[test]
+fn reverse_queue_index_does_reverse() {
+	let mut v = vec![1, 2, 3, 4, 5, 6];
 
-		// Affinity should be the same as before, but on different cores.
-		assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().count, 2);
-		assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().count, 1);
-		assert_eq!(OnDemandAssigner::get_affinity_map(para_a).unwrap().core_idx, CoreIndex(0));
-		assert_eq!(OnDemandAssigner::get_affinity_map(para_b).unwrap().core_idx, CoreIndex(1));
+	// Basic reversal of a vector.
+	v.sort_by_key(|&num| ReverseQueueIndex(num));
+	assert_eq!(v, vec![6, 5, 4, 3, 2, 1]);
 
-		// Clear affinity
-		OnDemandAssigner::report_processed(para_a, 0.into());
-		OnDemandAssigner::report_processed(para_a, 0.into());
-		OnDemandAssigner::report_processed(para_b, 1.into());
+	// Example from rust docs on `Reverse`. Should work identically.
+	v.sort_by_key(|&num| (num > 3, ReverseQueueIndex(num)));
+	assert_eq!(v, vec![3, 2, 1, 6, 5, 4]);
 
-		// There should be no affinity after clearing.
-		assert!(OnDemandAssigner::get_affinity_map(para_a).is_none());
-		assert!(OnDemandAssigner::get_affinity_map(para_b).is_none());
-	});
+	let mut v2 = vec![1, 2, u32::MAX];
+	v2.sort_by_key(|&num| ReverseQueueIndex(num));
+	assert_eq!(v2, vec![2, 1, u32::MAX]);
 }
 
 #[test]
-fn on_demand_orders_cannot_be_popped_if_lifecycle_changes() {
-	let para_id = ParaId::from(10);
-	let core_index = CoreIndex(0);
-	let order = EnqueuedOrder::new(para_id);
+fn queue_status_size_fn_works() {
+	// Add orders to the on demand queue, and make sure that they are properly represented
+	// by the QueueStatusType::size fn.
+	let parachains = vec![ParaId::from(111), ParaId::from(222), ParaId::from(333)];
+	let core_indices = vec![CoreIndex(0), CoreIndex(1)];
 
 	new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
-		// Register the para_id as a parathread
-		schedule_blank_para(para_id, ParaKind::Parathread);
-
-		assert!(!Paras::is_parathread(para_id));
-		run_to_block(10, |n| if n == 10 { Some(Default::default()) } else { None });
-		assert!(Paras::is_parathread(para_id));
+		parachains.iter().for_each(|chain| {
+			schedule_blank_para(*chain, ParaKind::Parathread);
+		});
 
-		// Add two assignments for a para_id with a valid lifecycle.
-		assert_ok!(OnDemandAssigner::add_on_demand_order(order.clone(), QueuePushDirection::Back));
-		assert_ok!(OnDemandAssigner::add_on_demand_order(order.clone(), QueuePushDirection::Back));
+		assert_eq!(OnDemandAssigner::get_queue_status().size(), 0);
 
-		// First pop is fine
-		assert!(
-			OnDemandAssigner::pop_assignment_for_core(core_index) ==
-				Some(Assignment::Pool { para_id, core_index })
-		);
+		run_to_block(11, |n| if n == 11 { Some(Default::default()) } else { None });
 
-		// Deregister para
-		assert_ok!(Paras::schedule_para_cleanup(para_id));
+		// Place orders for all chains.
+		parachains.iter().for_each(|chain| {
+			// 2 per chain for a total of 6
+			place_order(*chain);
+			place_order(*chain);
+		});
 
-		// Run to new session and verify that para_id is no longer a valid parathread.
-		assert!(Paras::is_parathread(para_id));
-		run_to_block(20, |n| if n == 20 { Some(Default::default()) } else { None });
-		assert!(!Paras::is_parathread(para_id));
+		// 6 orders in free entries
+		assert_eq!(OnDemandAssigner::get_free_entries().len(), 6);
+		// 6 orders via queue status size
+		assert_eq!(
+			OnDemandAssigner::get_free_entries().len(),
+			OnDemandAssigner::get_queue_status().size() as usize
+		);
 
-		// Second pop should be None.
-		OnDemandAssigner::report_processed(para_id, core_index);
-		assert_eq!(OnDemandAssigner::pop_assignment_for_core(core_index), None);
+		core_indices.iter().for_each(|core_index| {
+			OnDemandAssigner::pop_assignment_for_core(*core_index);
+		});
+
+		// There should be 2 orders in the scheduler's claimqueue,
+		// 2 in assorted AffinityMaps and 2 in free.
+		// ParaId 111
+		assert_eq!(OnDemandAssigner::get_affinity_entries(core_indices[0]).len(), 1);
+		// ParaId 222
+		assert_eq!(OnDemandAssigner::get_affinity_entries(core_indices[1]).len(), 1);
+		// Free entries are from ParaId 333
+		assert_eq!(OnDemandAssigner::get_free_entries().len(), 2);
+		// For a total size of 4.
+		assert_eq!(OnDemandAssigner::get_queue_status().size(), 4)
 	});
 }
diff --git a/polkadot/runtime/parachains/src/configuration.rs b/polkadot/runtime/parachains/src/configuration.rs
index 364a15215d38c5eae477b3a73987a43d0216278b..b7635dcd7b227993c38475a6ca13fd9a7fe53cb5 100644
--- a/polkadot/runtime/parachains/src/configuration.rs
+++ b/polkadot/runtime/parachains/src/configuration.rs
@@ -29,6 +29,7 @@ use primitives::{
 	vstaging::{ApprovalVotingParams, NodeFeatures},
 	AsyncBackingParams, Balance, ExecutorParamError, ExecutorParams, SessionIndex,
 	LEGACY_MIN_BACKING_VOTES, MAX_CODE_SIZE, MAX_HEAD_DATA_SIZE, MAX_POV_SIZE,
+	ON_DEMAND_MAX_QUEUE_MAX_SIZE,
 };
 use sp_runtime::{traits::Zero, Perbill};
 use sp_std::prelude::*;
@@ -312,6 +313,8 @@ pub enum InconsistentError<BlockNumber> {
 	InconsistentExecutorParams { inner: ExecutorParamError },
 	/// TTL should be bigger than lookahead
 	LookaheadExceedsTTL,
+	/// Passed in queue size for on-demand was too large.
+	OnDemandQueueSizeTooLarge,
 }
 
 impl<BlockNumber> HostConfiguration<BlockNumber>
@@ -405,6 +408,10 @@ where
 			return Err(LookaheadExceedsTTL)
 		}
 
+		if self.scheduler_params.on_demand_queue_max_size > ON_DEMAND_MAX_QUEUE_MAX_SIZE {
+			return Err(OnDemandQueueSizeTooLarge)
+		}
+
 		Ok(())
 	}
 
@@ -630,7 +637,7 @@ pub mod pallet {
 
 		/// Set the number of coretime execution cores.
 		///
-		/// Note that this configuration is managed by the coretime chain. Only manually change
+		/// NOTE: that this configuration is managed by the coretime chain. Only manually change
 		/// this, if you really know what you are doing!
 		#[pallet::call_index(6)]
 		#[pallet::weight((
@@ -1133,6 +1140,7 @@ pub mod pallet {
 				config.scheduler_params.on_demand_queue_max_size = new;
 			})
 		}
+
 		/// Set the on demand (parathreads) fee variability.
 		#[pallet::call_index(50)]
 		#[pallet::weight((
diff --git a/polkadot/runtime/rococo/src/lib.rs b/polkadot/runtime/rococo/src/lib.rs
index f68870c98eaf10dbcb98be0cbd82967b72f122a7..a773eeb5cbdb71b99942f063d3d8d8df3db5ac99 100644
--- a/polkadot/runtime/rococo/src/lib.rs
+++ b/polkadot/runtime/rococo/src/lib.rs
@@ -1659,6 +1659,7 @@ pub mod migrations {
 		// This needs to come after the `parachains_configuration` above as we are reading the configuration.
 		coretime::migration::MigrateToCoretime<Runtime, crate::xcm_config::XcmRouter, GetLegacyLeaseImpl>,
 		parachains_configuration::migration::v12::MigrateToV12<Runtime>,
+		parachains_assigner_on_demand::migration::MigrateV0ToV1<Runtime>,
 
 		// permanent
 		pallet_xcm::migration::MigrateToLatestXcmVersion<Runtime>,
diff --git a/polkadot/runtime/rococo/src/weights/runtime_parachains_assigner_on_demand.rs b/polkadot/runtime/rococo/src/weights/runtime_parachains_assigner_on_demand.rs
index ac0f05301b486dbdbb8c0ca004e195ab47171ff3..dba9e7904c79065c33c9fffc9851171c72c2e66a 100644
--- a/polkadot/runtime/rococo/src/weights/runtime_parachains_assigner_on_demand.rs
+++ b/polkadot/runtime/rococo/src/weights/runtime_parachains_assigner_on_demand.rs
@@ -16,10 +16,10 @@
 
 //! Autogenerated weights for `runtime_parachains::assigner_on_demand`
 //!
-//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 4.0.0-dev
-//! DATE: 2023-08-11, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]`
+//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 32.0.0
+//! DATE: 2024-03-18, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]`
 //! WORST CASE MAP SIZE: `1000000`
-//! HOSTNAME: `runner-fljshgub-project-163-concurrent-0`, CPU: `Intel(R) Xeon(R) CPU @ 2.60GHz`
+//! HOSTNAME: `runner-h2rr8wx7-project-674-concurrent-0`, CPU: `Intel(R) Xeon(R) CPU @ 2.60GHz`
 //! WASM-EXECUTION: `Compiled`, CHAIN: `Some("rococo-dev")`, DB CACHE: 1024
 
 // Executed Command:
@@ -31,11 +31,11 @@
 // --extrinsic=*
 // --wasm-execution=compiled
 // --heap-pages=4096
-// --json-file=/builds/parity/mirrors/polkadot/.git/.artifacts/bench.json
+// --json-file=/builds/parity/mirrors/polkadot-sdk/.git/.artifacts/bench.json
 // --pallet=runtime_parachains::assigner_on_demand
 // --chain=rococo-dev
-// --header=./file_header.txt
-// --output=./runtime/rococo/src/weights/
+// --header=./polkadot/file_header.txt
+// --output=./polkadot/runtime/rococo/src/weights/
 
 #![cfg_attr(rustfmt, rustfmt_skip)]
 #![allow(unused_parens)]
@@ -48,44 +48,44 @@ use core::marker::PhantomData;
 /// Weight functions for `runtime_parachains::assigner_on_demand`.
 pub struct WeightInfo<T>(PhantomData<T>);
 impl<T: frame_system::Config> runtime_parachains::assigner_on_demand::WeightInfo for WeightInfo<T> {
-	/// Storage: `OnDemandAssignmentProvider::SpotTraffic` (r:1 w:0)
-	/// Proof: `OnDemandAssignmentProvider::SpotTraffic` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
-	/// Storage: `Paras::ParaLifecycles` (r:1 w:0)
-	/// Proof: `Paras::ParaLifecycles` (`max_values`: None, `max_size`: None, mode: `Measured`)
-	/// Storage: `OnDemandAssignmentProvider::OnDemandQueue` (r:1 w:1)
-	/// Proof: `OnDemandAssignmentProvider::OnDemandQueue` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
+	/// Storage: `OnDemandAssignmentProvider::QueueStatus` (r:1 w:1)
+	/// Proof: `OnDemandAssignmentProvider::QueueStatus` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
+	/// Storage: `OnDemandAssignmentProvider::ParaIdAffinity` (r:1 w:0)
+	/// Proof: `OnDemandAssignmentProvider::ParaIdAffinity` (`max_values`: None, `max_size`: None, mode: `Measured`)
+	/// Storage: `OnDemandAssignmentProvider::FreeEntries` (r:1 w:1)
+	/// Proof: `OnDemandAssignmentProvider::FreeEntries` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
 	/// The range of component `s` is `[1, 9999]`.
 	fn place_order_keep_alive(s: u32, ) -> Weight {
 		// Proof Size summary in bytes:
-		//  Measured:  `297 + s * (4 ±0)`
-		//  Estimated: `3762 + s * (4 ±0)`
-		// Minimum execution time: 33_522_000 picoseconds.
-		Weight::from_parts(35_436_835, 0)
-			.saturating_add(Weight::from_parts(0, 3762))
-			// Standard Error: 129
-			.saturating_add(Weight::from_parts(14_041, 0).saturating_mul(s.into()))
+		//  Measured:  `218 + s * (8 ±0)`
+		//  Estimated: `3681 + s * (8 ±0)`
+		// Minimum execution time: 21_053_000 picoseconds.
+		Weight::from_parts(17_291_897, 0)
+			.saturating_add(Weight::from_parts(0, 3681))
+			// Standard Error: 104
+			.saturating_add(Weight::from_parts(18_779, 0).saturating_mul(s.into()))
 			.saturating_add(T::DbWeight::get().reads(3))
-			.saturating_add(T::DbWeight::get().writes(1))
-			.saturating_add(Weight::from_parts(0, 4).saturating_mul(s.into()))
+			.saturating_add(T::DbWeight::get().writes(2))
+			.saturating_add(Weight::from_parts(0, 8).saturating_mul(s.into()))
 	}
-	/// Storage: `OnDemandAssignmentProvider::SpotTraffic` (r:1 w:0)
-	/// Proof: `OnDemandAssignmentProvider::SpotTraffic` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
-	/// Storage: `Paras::ParaLifecycles` (r:1 w:0)
-	/// Proof: `Paras::ParaLifecycles` (`max_values`: None, `max_size`: None, mode: `Measured`)
-	/// Storage: `OnDemandAssignmentProvider::OnDemandQueue` (r:1 w:1)
-	/// Proof: `OnDemandAssignmentProvider::OnDemandQueue` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
+	/// Storage: `OnDemandAssignmentProvider::QueueStatus` (r:1 w:1)
+	/// Proof: `OnDemandAssignmentProvider::QueueStatus` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
+	/// Storage: `OnDemandAssignmentProvider::ParaIdAffinity` (r:1 w:0)
+	/// Proof: `OnDemandAssignmentProvider::ParaIdAffinity` (`max_values`: None, `max_size`: None, mode: `Measured`)
+	/// Storage: `OnDemandAssignmentProvider::FreeEntries` (r:1 w:1)
+	/// Proof: `OnDemandAssignmentProvider::FreeEntries` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
 	/// The range of component `s` is `[1, 9999]`.
 	fn place_order_allow_death(s: u32, ) -> Weight {
 		// Proof Size summary in bytes:
-		//  Measured:  `297 + s * (4 ±0)`
-		//  Estimated: `3762 + s * (4 ±0)`
-		// Minimum execution time: 33_488_000 picoseconds.
-		Weight::from_parts(34_848_934, 0)
-			.saturating_add(Weight::from_parts(0, 3762))
-			// Standard Error: 143
-			.saturating_add(Weight::from_parts(14_215, 0).saturating_mul(s.into()))
+		//  Measured:  `218 + s * (8 ±0)`
+		//  Estimated: `3681 + s * (8 ±0)`
+		// Minimum execution time: 20_843_000 picoseconds.
+		Weight::from_parts(16_881_986, 0)
+			.saturating_add(Weight::from_parts(0, 3681))
+			// Standard Error: 104
+			.saturating_add(Weight::from_parts(18_788, 0).saturating_mul(s.into()))
 			.saturating_add(T::DbWeight::get().reads(3))
-			.saturating_add(T::DbWeight::get().writes(1))
-			.saturating_add(Weight::from_parts(0, 4).saturating_mul(s.into()))
+			.saturating_add(T::DbWeight::get().writes(2))
+			.saturating_add(Weight::from_parts(0, 8).saturating_mul(s.into()))
 	}
 }
diff --git a/polkadot/runtime/westend/src/weights/runtime_parachains_assigner_on_demand.rs b/polkadot/runtime/westend/src/weights/runtime_parachains_assigner_on_demand.rs
index ac0f05301b486dbdbb8c0ca004e195ab47171ff3..acd1834f79ed8c35ed83bd56d75c0907b02f7f5d 100644
--- a/polkadot/runtime/westend/src/weights/runtime_parachains_assigner_on_demand.rs
+++ b/polkadot/runtime/westend/src/weights/runtime_parachains_assigner_on_demand.rs
@@ -16,11 +16,11 @@
 
 //! Autogenerated weights for `runtime_parachains::assigner_on_demand`
 //!
-//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 4.0.0-dev
-//! DATE: 2023-08-11, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]`
+//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 32.0.0
+//! DATE: 2024-03-18, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]`
 //! WORST CASE MAP SIZE: `1000000`
-//! HOSTNAME: `runner-fljshgub-project-163-concurrent-0`, CPU: `Intel(R) Xeon(R) CPU @ 2.60GHz`
-//! WASM-EXECUTION: `Compiled`, CHAIN: `Some("rococo-dev")`, DB CACHE: 1024
+//! HOSTNAME: `runner-h2rr8wx7-project-674-concurrent-0`, CPU: `Intel(R) Xeon(R) CPU @ 2.60GHz`
+//! WASM-EXECUTION: `Compiled`, CHAIN: `Some("westend-dev")`, DB CACHE: 1024
 
 // Executed Command:
 // target/production/polkadot
@@ -31,11 +31,11 @@
 // --extrinsic=*
 // --wasm-execution=compiled
 // --heap-pages=4096
-// --json-file=/builds/parity/mirrors/polkadot/.git/.artifacts/bench.json
+// --json-file=/builds/parity/mirrors/polkadot-sdk/.git/.artifacts/bench.json
 // --pallet=runtime_parachains::assigner_on_demand
-// --chain=rococo-dev
-// --header=./file_header.txt
-// --output=./runtime/rococo/src/weights/
+// --chain=westend-dev
+// --header=./polkadot/file_header.txt
+// --output=./polkadot/runtime/westend/src/weights/
 
 #![cfg_attr(rustfmt, rustfmt_skip)]
 #![allow(unused_parens)]
@@ -48,44 +48,44 @@ use core::marker::PhantomData;
 /// Weight functions for `runtime_parachains::assigner_on_demand`.
 pub struct WeightInfo<T>(PhantomData<T>);
 impl<T: frame_system::Config> runtime_parachains::assigner_on_demand::WeightInfo for WeightInfo<T> {
-	/// Storage: `OnDemandAssignmentProvider::SpotTraffic` (r:1 w:0)
-	/// Proof: `OnDemandAssignmentProvider::SpotTraffic` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
-	/// Storage: `Paras::ParaLifecycles` (r:1 w:0)
-	/// Proof: `Paras::ParaLifecycles` (`max_values`: None, `max_size`: None, mode: `Measured`)
-	/// Storage: `OnDemandAssignmentProvider::OnDemandQueue` (r:1 w:1)
-	/// Proof: `OnDemandAssignmentProvider::OnDemandQueue` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
+	/// Storage: `OnDemandAssignmentProvider::QueueStatus` (r:1 w:1)
+	/// Proof: `OnDemandAssignmentProvider::QueueStatus` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
+	/// Storage: `OnDemandAssignmentProvider::ParaIdAffinity` (r:1 w:0)
+	/// Proof: `OnDemandAssignmentProvider::ParaIdAffinity` (`max_values`: None, `max_size`: None, mode: `Measured`)
+	/// Storage: `OnDemandAssignmentProvider::FreeEntries` (r:1 w:1)
+	/// Proof: `OnDemandAssignmentProvider::FreeEntries` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
 	/// The range of component `s` is `[1, 9999]`.
 	fn place_order_keep_alive(s: u32, ) -> Weight {
 		// Proof Size summary in bytes:
-		//  Measured:  `297 + s * (4 ±0)`
-		//  Estimated: `3762 + s * (4 ±0)`
-		// Minimum execution time: 33_522_000 picoseconds.
-		Weight::from_parts(35_436_835, 0)
-			.saturating_add(Weight::from_parts(0, 3762))
-			// Standard Error: 129
-			.saturating_add(Weight::from_parts(14_041, 0).saturating_mul(s.into()))
+		//  Measured:  `218 + s * (8 ±0)`
+		//  Estimated: `3681 + s * (8 ±0)`
+		// Minimum execution time: 21_396_000 picoseconds.
+		Weight::from_parts(20_585_695, 0)
+			.saturating_add(Weight::from_parts(0, 3681))
+			// Standard Error: 127
+			.saturating_add(Weight::from_parts(20_951, 0).saturating_mul(s.into()))
 			.saturating_add(T::DbWeight::get().reads(3))
-			.saturating_add(T::DbWeight::get().writes(1))
-			.saturating_add(Weight::from_parts(0, 4).saturating_mul(s.into()))
+			.saturating_add(T::DbWeight::get().writes(2))
+			.saturating_add(Weight::from_parts(0, 8).saturating_mul(s.into()))
 	}
-	/// Storage: `OnDemandAssignmentProvider::SpotTraffic` (r:1 w:0)
-	/// Proof: `OnDemandAssignmentProvider::SpotTraffic` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
-	/// Storage: `Paras::ParaLifecycles` (r:1 w:0)
-	/// Proof: `Paras::ParaLifecycles` (`max_values`: None, `max_size`: None, mode: `Measured`)
-	/// Storage: `OnDemandAssignmentProvider::OnDemandQueue` (r:1 w:1)
-	/// Proof: `OnDemandAssignmentProvider::OnDemandQueue` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
+	/// Storage: `OnDemandAssignmentProvider::QueueStatus` (r:1 w:1)
+	/// Proof: `OnDemandAssignmentProvider::QueueStatus` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
+	/// Storage: `OnDemandAssignmentProvider::ParaIdAffinity` (r:1 w:0)
+	/// Proof: `OnDemandAssignmentProvider::ParaIdAffinity` (`max_values`: None, `max_size`: None, mode: `Measured`)
+	/// Storage: `OnDemandAssignmentProvider::FreeEntries` (r:1 w:1)
+	/// Proof: `OnDemandAssignmentProvider::FreeEntries` (`max_values`: Some(1), `max_size`: None, mode: `Measured`)
 	/// The range of component `s` is `[1, 9999]`.
 	fn place_order_allow_death(s: u32, ) -> Weight {
 		// Proof Size summary in bytes:
-		//  Measured:  `297 + s * (4 ±0)`
-		//  Estimated: `3762 + s * (4 ±0)`
-		// Minimum execution time: 33_488_000 picoseconds.
-		Weight::from_parts(34_848_934, 0)
-			.saturating_add(Weight::from_parts(0, 3762))
-			// Standard Error: 143
-			.saturating_add(Weight::from_parts(14_215, 0).saturating_mul(s.into()))
+		//  Measured:  `218 + s * (8 ±0)`
+		//  Estimated: `3681 + s * (8 ±0)`
+		// Minimum execution time: 21_412_000 picoseconds.
+		Weight::from_parts(19_731_554, 0)
+			.saturating_add(Weight::from_parts(0, 3681))
+			// Standard Error: 128
+			.saturating_add(Weight::from_parts(21_055, 0).saturating_mul(s.into()))
 			.saturating_add(T::DbWeight::get().reads(3))
-			.saturating_add(T::DbWeight::get().writes(1))
-			.saturating_add(Weight::from_parts(0, 4).saturating_mul(s.into()))
+			.saturating_add(T::DbWeight::get().writes(2))
+			.saturating_add(Weight::from_parts(0, 8).saturating_mul(s.into()))
 	}
 }
diff --git a/prdoc/pr_3190.prdoc b/prdoc/pr_3190.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..2f7a89a0b1aba611b10625f0d7833b3f3dfd4e6e
--- /dev/null
+++ b/prdoc/pr_3190.prdoc
@@ -0,0 +1,17 @@
+# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
+# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
+
+title: Fix algorithmic complexity of the on-demand scheduler.
+
+doc:
+  - audience: Runtime Dev
+    description: |
+      Improves on demand performance by a significant factor. Previously, having many on-demand cores
+      would cause really poor blocktimes due to the fact that for each core the full order queue was
+      processed. This allows for increasing the max size of the on-demand queue if needed.
+
+      At the same time, the spot price for on-demand is now checked prior to every order, ensuring
+      that economic backpressure will be applied.
+
+crates:
+  - name: polkadot-runtime-parachains