From 2431001ec012334226bc181d69a754c49f168328 Mon Sep 17 00:00:00 2001
From: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
Date: Fri, 23 Feb 2024 23:35:48 +0700
Subject: [PATCH] Runtime: allow backing multiple candidates of same parachain
 on different cores  (#3231)

Fixes https://github.com/paritytech/polkadot-sdk/issues/3144

Builds on top of https://github.com/paritytech/polkadot-sdk/pull/3229

### Summary
Some preparations for Runtime to support elastic scaling, guarded by
config node features bit `FeatureIndex::ElasticScalingMVP`. This PR
introduces a per-candidate `CoreIndex` but does it in a hacky way to
avoid changing `CandidateCommitments`, `CandidateReceipts` primitives
and networking protocols.

#### Including `CoreIndex` in `BackedCandidate`
If the `ElasticScalingMVP` feature bit is enabled then
`BackedCandidate::validator_indices` is extended by 8 bits.
The value stored in these bits represents the assumed core index for the
candidate.

It is temporary solution which works by creating a mapping from
`BackedCandidate` to `CoreIndex` by assuming the `CoreIndex` can be
discovered by checking in which validator group the validator that
signed the statement is.

TODO:
- [x] fix tests
- [x] add new tests
- [x] Bump runtime API for Kusama, so we have that node features thing!
-> https://github.com/polkadot-fellows/runtimes/pull/194

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
Signed-off-by: alindima <alin@parity.io>
Co-authored-by: alindima <alin@parity.io>
---
 .gitlab/pipeline/zombienet/polkadot.yml       |   8 +
 Cargo.lock                                    |  37 ++
 polkadot/node/core/backing/Cargo.toml         |   1 +
 polkadot/node/core/backing/src/lib.rs         |  19 +-
 polkadot/node/core/backing/src/tests/mod.rs   |  70 +-
 polkadot/node/core/provisioner/src/lib.rs     |   2 +-
 polkadot/node/core/provisioner/src/tests.rs   |  70 +-
 polkadot/primitives/src/v6/mod.rs             | 208 +++++-
 polkadot/runtime/parachains/Cargo.toml        |   1 +
 polkadot/runtime/parachains/src/builder.rs    |   7 +-
 .../runtime/parachains/src/inclusion/mod.rs   |  91 +--
 .../runtime/parachains/src/inclusion/tests.rs | 423 +++++++++---
 .../src/paras_inherent/benchmarking.rs        |   4 +-
 .../parachains/src/paras_inherent/mod.rs      | 288 ++++++---
 .../parachains/src/paras_inherent/tests.rs    | 604 ++++++++++++++++--
 .../parachains/src/paras_inherent/weights.rs  |   4 +-
 .../functional/0012-elastic-scaling-mvp.toml  |  38 ++
 .../functional/0012-elastic-scaling-mvp.zndsl |  28 +
 .../functional/0012-enable-node-feature.js    |  37 ++
 .../functional/0012-register-para.js          |  37 ++
 prdoc/pr_3231.prdoc                           |  11 +
 21 files changed, 1640 insertions(+), 348 deletions(-)
 create mode 100644 polkadot/zombienet_tests/functional/0012-elastic-scaling-mvp.toml
 create mode 100644 polkadot/zombienet_tests/functional/0012-elastic-scaling-mvp.zndsl
 create mode 100644 polkadot/zombienet_tests/functional/0012-enable-node-feature.js
 create mode 100644 polkadot/zombienet_tests/functional/0012-register-para.js
 create mode 100644 prdoc/pr_3231.prdoc

diff --git a/.gitlab/pipeline/zombienet/polkadot.yml b/.gitlab/pipeline/zombienet/polkadot.yml
index 54eb6db48ca..97572f029d0 100644
--- a/.gitlab/pipeline/zombienet/polkadot.yml
+++ b/.gitlab/pipeline/zombienet/polkadot.yml
@@ -158,6 +158,14 @@ zombienet-polkadot-functional-0011-async-backing-6-seconds-rate:
       --local-dir="${LOCAL_DIR}/functional"
       --test="0011-async-backing-6-seconds-rate.zndsl"
 
+zombienet-polkadot-functional-0012-elastic-scaling-mvp:
+  extends:
+    - .zombienet-polkadot-common
+  script:
+    - /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
+      --local-dir="${LOCAL_DIR}/functional"
+      --test="0012-elastic-scaling-mvp.zndsl"
+
 zombienet-polkadot-smoke-0001-parachains-smoke-test:
   extends:
     - .zombienet-polkadot-common
diff --git a/Cargo.lock b/Cargo.lock
index fbe2729acba..a23be53b34f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -12512,6 +12512,7 @@ dependencies = [
  "polkadot-primitives",
  "polkadot-primitives-test-helpers",
  "polkadot-statement-table",
+ "rstest",
  "sc-keystore",
  "schnellru",
  "sp-application-crypto",
@@ -13350,6 +13351,7 @@ dependencies = [
  "polkadot-runtime-metrics",
  "rand",
  "rand_chacha 0.3.1",
+ "rstest",
  "rustc-hex",
  "sc-keystore",
  "scale-info",
@@ -14790,6 +14792,12 @@ version = "0.8.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
 
+[[package]]
+name = "relative-path"
+version = "1.9.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e898588f33fdd5b9420719948f9f2a32c922a246964576f71ba7f24f80610fbc"
+
 [[package]]
 name = "remote-ext-tests-bags-list"
 version = "1.0.0"
@@ -15172,6 +15180,35 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "rstest"
+version = "0.18.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "97eeab2f3c0a199bc4be135c36c924b6590b88c377d416494288c14f2db30199"
+dependencies = [
+ "futures",
+ "futures-timer",
+ "rstest_macros",
+ "rustc_version 0.4.0",
+]
+
+[[package]]
+name = "rstest_macros"
+version = "0.18.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d428f8247852f894ee1be110b375111b586d4fa431f6c46e64ba5a0dcccbe605"
+dependencies = [
+ "cfg-if",
+ "glob",
+ "proc-macro2",
+ "quote",
+ "regex",
+ "relative-path",
+ "rustc_version 0.4.0",
+ "syn 2.0.50",
+ "unicode-ident",
+]
+
 [[package]]
 name = "rtnetlink"
 version = "0.10.1"
diff --git a/polkadot/node/core/backing/Cargo.toml b/polkadot/node/core/backing/Cargo.toml
index f71b8df80dd..d0c1f9aa483 100644
--- a/polkadot/node/core/backing/Cargo.toml
+++ b/polkadot/node/core/backing/Cargo.toml
@@ -32,5 +32,6 @@ sc-keystore = { path = "../../../../substrate/client/keystore" }
 sp-tracing = { path = "../../../../substrate/primitives/tracing" }
 futures = { version = "0.3.21", features = ["thread-pool"] }
 assert_matches = "1.4.0"
+rstest = "0.18.2"
 polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
 test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../../primitives/test-helpers" }
diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs
index 26f20a6d48e..69bf2e956a0 100644
--- a/polkadot/node/core/backing/src/lib.rs
+++ b/polkadot/node/core/backing/src/lib.rs
@@ -70,7 +70,7 @@ use std::{
 	sync::Arc,
 };
 
-use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec};
+use bitvec::vec::BitVec;
 use futures::{
 	channel::{mpsc, oneshot},
 	future::BoxFuture,
@@ -494,20 +494,15 @@ fn table_attested_to_backed(
 	}
 	vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
 
-	if inject_core_index {
-		let core_index_to_inject: BitVec<u8, BitOrderLsb0> =
-			BitVec::from_vec(vec![core_index.0 as u8]);
-		validator_indices.extend(core_index_to_inject);
-	}
-
-	Some(BackedCandidate {
+	Some(BackedCandidate::new(
 		candidate,
-		validity_votes: vote_positions
+		vote_positions
 			.into_iter()
 			.map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
 			.collect(),
 		validator_indices,
-	})
+		inject_core_index.then_some(core_index),
+	))
 }
 
 async fn store_available_data(
@@ -1775,7 +1770,7 @@ async fn post_import_statement_actions<Context>(
 				&rp_state.table_context,
 				rp_state.inject_core_index,
 			) {
-				let para_id = backed.candidate.descriptor.para_id;
+				let para_id = backed.candidate().descriptor.para_id;
 				gum::debug!(
 					target: LOG_TARGET,
 					candidate_hash = ?candidate_hash,
@@ -1796,7 +1791,7 @@ async fn post_import_statement_actions<Context>(
 					// notify collator protocol.
 					ctx.send_message(CollatorProtocolMessage::Backed {
 						para_id,
-						para_head: backed.candidate.descriptor.para_head,
+						para_head: backed.candidate().descriptor.para_head,
 					})
 					.await;
 					// Notify statement distribution of backed candidate.
diff --git a/polkadot/node/core/backing/src/tests/mod.rs b/polkadot/node/core/backing/src/tests/mod.rs
index 7223f1e1dfb..e3cc5727435 100644
--- a/polkadot/node/core/backing/src/tests/mod.rs
+++ b/polkadot/node/core/backing/src/tests/mod.rs
@@ -33,9 +33,10 @@ use polkadot_node_subsystem::{
 };
 use polkadot_node_subsystem_test_helpers as test_helpers;
 use polkadot_primitives::{
-	CandidateDescriptor, GroupRotationInfo, HeadData, PersistedValidationData, PvfExecKind,
-	ScheduledCore, SessionIndex, LEGACY_MIN_BACKING_VOTES,
+	vstaging::node_features, CandidateDescriptor, GroupRotationInfo, HeadData,
+	PersistedValidationData, PvfExecKind, ScheduledCore, SessionIndex, LEGACY_MIN_BACKING_VOTES,
 };
+use rstest::rstest;
 use sp_application_crypto::AppCrypto;
 use sp_keyring::Sr25519Keyring;
 use sp_keystore::Keystore;
@@ -79,6 +80,7 @@ pub(crate) struct TestState {
 	relay_parent: Hash,
 	minimum_backing_votes: u32,
 	disabled_validators: Vec<ValidatorIndex>,
+	node_features: NodeFeatures,
 }
 
 impl TestState {
@@ -157,6 +159,7 @@ impl Default for TestState {
 			relay_parent,
 			minimum_backing_votes: LEGACY_MIN_BACKING_VOTES,
 			disabled_validators: Vec::new(),
+			node_features: Default::default(),
 		}
 	}
 }
@@ -298,7 +301,7 @@ async fn test_startup(virtual_overseer: &mut VirtualOverseer, test_state: &TestS
 		AllMessages::RuntimeApi(
 			RuntimeApiMessage::Request(_parent, RuntimeApiRequest::NodeFeatures(_session_index, tx))
 		) => {
-			tx.send(Ok(Default::default())).unwrap();
+			tx.send(Ok(test_state.node_features.clone())).unwrap();
 		}
 	);
 
@@ -494,9 +497,20 @@ fn backing_second_works() {
 }
 
 // Test that the candidate reaches quorum successfully.
-#[test]
-fn backing_works() {
-	let test_state = TestState::default();
+#[rstest]
+#[case(true)]
+#[case(false)]
+fn backing_works(#[case] elastic_scaling_mvp: bool) {
+	let mut test_state = TestState::default();
+	if elastic_scaling_mvp {
+		test_state
+			.node_features
+			.resize((node_features::FeatureIndex::ElasticScalingMVP as u8 + 1) as usize, false);
+		test_state
+			.node_features
+			.set(node_features::FeatureIndex::ElasticScalingMVP as u8 as usize, true);
+	}
+
 	test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
 		test_startup(&mut virtual_overseer, &test_state).await;
 
@@ -647,6 +661,31 @@ fn backing_works() {
 
 		virtual_overseer.send(FromOrchestra::Communication { msg: statement }).await;
 
+		let (tx, rx) = oneshot::channel();
+		let msg = CandidateBackingMessage::GetBackedCandidates(
+			vec![(candidate_a_hash, test_state.relay_parent)],
+			tx,
+		);
+
+		virtual_overseer.send(FromOrchestra::Communication { msg }).await;
+
+		let candidates = rx.await.unwrap();
+		assert_eq!(1, candidates.len());
+		assert_eq!(candidates[0].validity_votes().len(), 3);
+
+		let (validator_indices, maybe_core_index) =
+			candidates[0].validator_indices_and_core_index(elastic_scaling_mvp);
+		if elastic_scaling_mvp {
+			assert_eq!(maybe_core_index.unwrap(), CoreIndex(0));
+		} else {
+			assert!(maybe_core_index.is_none());
+		}
+
+		assert_eq!(
+			validator_indices,
+			bitvec::bitvec![u8, bitvec::order::Lsb0; 1, 1, 0, 1].as_bitslice()
+		);
+
 		virtual_overseer
 			.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
 				ActiveLeavesUpdate::stop_work(test_state.relay_parent),
@@ -919,20 +958,20 @@ fn backing_works_while_validation_ongoing() {
 
 		let candidates = rx.await.unwrap();
 		assert_eq!(1, candidates.len());
-		assert_eq!(candidates[0].validity_votes.len(), 3);
+		assert_eq!(candidates[0].validity_votes().len(), 3);
 
 		assert!(candidates[0]
-			.validity_votes
+			.validity_votes()
 			.contains(&ValidityAttestation::Implicit(signed_a.signature().clone())));
 		assert!(candidates[0]
-			.validity_votes
+			.validity_votes()
 			.contains(&ValidityAttestation::Explicit(signed_b.signature().clone())));
 		assert!(candidates[0]
-			.validity_votes
+			.validity_votes()
 			.contains(&ValidityAttestation::Explicit(signed_c.signature().clone())));
 		assert_eq!(
-			candidates[0].validator_indices,
-			bitvec::bitvec![u8, bitvec::order::Lsb0; 1, 0, 1, 1],
+			candidates[0].validator_indices_and_core_index(false),
+			(bitvec::bitvec![u8, bitvec::order::Lsb0; 1, 0, 1, 1].as_bitslice(), None)
 		);
 
 		virtual_overseer
@@ -1604,8 +1643,11 @@ fn candidate_backing_reorders_votes() {
 	let expected_attestations =
 		vec![fake_attestation(1).into(), fake_attestation(3).into(), fake_attestation(5).into()];
 
-	assert_eq!(backed.validator_indices, expected_bitvec);
-	assert_eq!(backed.validity_votes, expected_attestations);
+	assert_eq!(
+		backed.validator_indices_and_core_index(false),
+		(expected_bitvec.as_bitslice(), None)
+	);
+	assert_eq!(backed.validity_votes(), expected_attestations);
 }
 
 // Test whether we retry on failed PoV fetching.
diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs
index d98f6ebfe42..a29cf72afb1 100644
--- a/polkadot/node/core/provisioner/src/lib.rs
+++ b/polkadot/node/core/provisioner/src/lib.rs
@@ -766,7 +766,7 @@ async fn select_candidates(
 	// keep only one candidate with validation code.
 	let mut with_validation_code = false;
 	candidates.retain(|c| {
-		if c.candidate.commitments.new_validation_code.is_some() {
+		if c.candidate().commitments.new_validation_code.is_some() {
 			if with_validation_code {
 				return false
 			}
diff --git a/polkadot/node/core/provisioner/src/tests.rs b/polkadot/node/core/provisioner/src/tests.rs
index b26df8ddb91..87c0e7a65d3 100644
--- a/polkadot/node/core/provisioner/src/tests.rs
+++ b/polkadot/node/core/provisioner/src/tests.rs
@@ -460,13 +460,16 @@ mod select_candidates {
 
 		let expected_backed = expected_candidates
 			.iter()
-			.map(|c| BackedCandidate {
-				candidate: CommittedCandidateReceipt {
-					descriptor: c.descriptor.clone(),
-					commitments: Default::default(),
-				},
-				validity_votes: Vec::new(),
-				validator_indices: default_bitvec(MOCK_GROUP_SIZE),
+			.map(|c| {
+				BackedCandidate::new(
+					CommittedCandidateReceipt {
+						descriptor: c.descriptor().clone(),
+						commitments: Default::default(),
+					},
+					Vec::new(),
+					default_bitvec(MOCK_GROUP_SIZE),
+					None,
+				)
 			})
 			.collect();
 
@@ -486,7 +489,7 @@ mod select_candidates {
 
 				result.into_iter().for_each(|c| {
 					assert!(
-						expected_candidates.iter().any(|c2| c.candidate.corresponds_to(c2)),
+						expected_candidates.iter().any(|c2| c.candidate().corresponds_to(c2)),
 						"Failed to find candidate: {:?}",
 						c,
 					)
@@ -532,10 +535,13 @@ mod select_candidates {
 		// Build possible outputs from select_candidates
 		let backed_candidates: Vec<_> = committed_receipts
 			.iter()
-			.map(|committed_receipt| BackedCandidate {
-				candidate: committed_receipt.clone(),
-				validity_votes: Vec::new(),
-				validator_indices: default_bitvec(MOCK_GROUP_SIZE),
+			.map(|committed_receipt| {
+				BackedCandidate::new(
+					committed_receipt.clone(),
+					Vec::new(),
+					default_bitvec(MOCK_GROUP_SIZE),
+					None,
+				)
 			})
 			.collect();
 
@@ -566,7 +572,7 @@ mod select_candidates {
 
 				result.into_iter().for_each(|c| {
 					assert!(
-						expected_backed_filtered.iter().any(|c2| c.candidate.corresponds_to(c2)),
+						expected_backed_filtered.iter().any(|c2| c.candidate().corresponds_to(c2)),
 						"Failed to find candidate: {:?}",
 						c,
 					)
@@ -605,13 +611,16 @@ mod select_candidates {
 
 		let expected_backed = expected_candidates
 			.iter()
-			.map(|c| BackedCandidate {
-				candidate: CommittedCandidateReceipt {
-					descriptor: c.descriptor.clone(),
-					commitments: Default::default(),
-				},
-				validity_votes: Vec::new(),
-				validator_indices: default_bitvec(MOCK_GROUP_SIZE),
+			.map(|c| {
+				BackedCandidate::new(
+					CommittedCandidateReceipt {
+						descriptor: c.descriptor.clone(),
+						commitments: Default::default(),
+					},
+					Vec::new(),
+					default_bitvec(MOCK_GROUP_SIZE),
+					None,
+				)
 			})
 			.collect();
 
@@ -631,7 +640,7 @@ mod select_candidates {
 
 				result.into_iter().for_each(|c| {
 					assert!(
-						expected_candidates.iter().any(|c2| c.candidate.corresponds_to(c2)),
+						expected_candidates.iter().any(|c2| c.candidate().corresponds_to(c2)),
 						"Failed to find candidate: {:?}",
 						c,
 					)
@@ -671,13 +680,16 @@ mod select_candidates {
 
 		let expected_backed = expected_candidates
 			.iter()
-			.map(|c| BackedCandidate {
-				candidate: CommittedCandidateReceipt {
-					descriptor: c.descriptor.clone(),
-					commitments: Default::default(),
-				},
-				validity_votes: Vec::new(),
-				validator_indices: default_bitvec(MOCK_GROUP_SIZE),
+			.map(|c| {
+				BackedCandidate::new(
+					CommittedCandidateReceipt {
+						descriptor: c.descriptor().clone(),
+						commitments: Default::default(),
+					},
+					Vec::new(),
+					default_bitvec(MOCK_GROUP_SIZE),
+					None,
+				)
 			})
 			.collect();
 
@@ -697,7 +709,7 @@ mod select_candidates {
 
 				result.into_iter().for_each(|c| {
 					assert!(
-						expected_candidates.iter().any(|c2| c.candidate.corresponds_to(c2)),
+						expected_candidates.iter().any(|c2| c.candidate().corresponds_to(c2)),
 						"Failed to find candidate: {:?}",
 						c,
 					)
diff --git a/polkadot/primitives/src/v6/mod.rs b/polkadot/primitives/src/v6/mod.rs
index 538eb385584..89431f7801f 100644
--- a/polkadot/primitives/src/v6/mod.rs
+++ b/polkadot/primitives/src/v6/mod.rs
@@ -16,7 +16,7 @@
 
 //! `V6` Primitives.
 
-use bitvec::vec::BitVec;
+use bitvec::{field::BitField, slice::BitSlice, vec::BitVec};
 use parity_scale_codec::{Decode, Encode};
 use scale_info::TypeInfo;
 use sp_std::{
@@ -707,19 +707,50 @@ pub type UncheckedSignedAvailabilityBitfields = Vec<UncheckedSignedAvailabilityB
 #[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug, TypeInfo)]
 pub struct BackedCandidate<H = Hash> {
 	/// The candidate referred to.
-	pub candidate: CommittedCandidateReceipt<H>,
+	candidate: CommittedCandidateReceipt<H>,
 	/// The validity votes themselves, expressed as signatures.
-	pub validity_votes: Vec<ValidityAttestation>,
-	/// The indices of the validators within the group, expressed as a bitfield.
-	pub validator_indices: BitVec<u8, bitvec::order::Lsb0>,
+	validity_votes: Vec<ValidityAttestation>,
+	/// The indices of the validators within the group, expressed as a bitfield. May be extended
+	/// beyond the backing group size to contain the assigned core index, if ElasticScalingMVP is
+	/// enabled.
+	validator_indices: BitVec<u8, bitvec::order::Lsb0>,
 }
 
 impl<H> BackedCandidate<H> {
-	/// Get a reference to the descriptor of the para.
+	/// Constructor
+	pub fn new(
+		candidate: CommittedCandidateReceipt<H>,
+		validity_votes: Vec<ValidityAttestation>,
+		validator_indices: BitVec<u8, bitvec::order::Lsb0>,
+		core_index: Option<CoreIndex>,
+	) -> Self {
+		let mut instance = Self { candidate, validity_votes, validator_indices };
+		if let Some(core_index) = core_index {
+			instance.inject_core_index(core_index);
+		}
+		instance
+	}
+
+	/// Get a reference to the descriptor of the candidate.
 	pub fn descriptor(&self) -> &CandidateDescriptor<H> {
 		&self.candidate.descriptor
 	}
 
+	/// Get a reference to the committed candidate receipt of the candidate.
+	pub fn candidate(&self) -> &CommittedCandidateReceipt<H> {
+		&self.candidate
+	}
+
+	/// Get a reference to the validity votes of the candidate.
+	pub fn validity_votes(&self) -> &[ValidityAttestation] {
+		&self.validity_votes
+	}
+
+	/// Get a mutable reference to validity votes of the para.
+	pub fn validity_votes_mut(&mut self) -> &mut Vec<ValidityAttestation> {
+		&mut self.validity_votes
+	}
+
 	/// Compute this candidate's hash.
 	pub fn hash(&self) -> CandidateHash
 	where
@@ -735,6 +766,48 @@ impl<H> BackedCandidate<H> {
 	{
 		self.candidate.to_plain()
 	}
+
+	/// Get a copy of the validator indices and the assumed core index, if any.
+	pub fn validator_indices_and_core_index(
+		&self,
+		core_index_enabled: bool,
+	) -> (&BitSlice<u8, bitvec::order::Lsb0>, Option<CoreIndex>) {
+		// This flag tells us if the block producers must enable Elastic Scaling MVP hack.
+		// It extends `BackedCandidate::validity_indices` to store a 8 bit core index.
+		if core_index_enabled {
+			let core_idx_offset = self.validator_indices.len().saturating_sub(8);
+			if core_idx_offset > 0 {
+				let (validator_indices_slice, core_idx_slice) =
+					self.validator_indices.split_at(core_idx_offset);
+				return (
+					validator_indices_slice,
+					Some(CoreIndex(core_idx_slice.load::<u8>() as u32)),
+				);
+			}
+		}
+
+		(&self.validator_indices, None)
+	}
+
+	/// Inject a core index in the validator_indices bitvec.
+	fn inject_core_index(&mut self, core_index: CoreIndex) {
+		let core_index_to_inject: BitVec<u8, bitvec::order::Lsb0> =
+			BitVec::from_vec(vec![core_index.0 as u8]);
+		self.validator_indices.extend(core_index_to_inject);
+	}
+
+	/// Update the validator indices and core index in the candidate.
+	pub fn set_validator_indices_and_core_index(
+		&mut self,
+		new_indices: BitVec<u8, bitvec::order::Lsb0>,
+		maybe_core_index: Option<CoreIndex>,
+	) {
+		self.validator_indices = new_indices;
+
+		if let Some(core_index) = maybe_core_index {
+			self.inject_core_index(core_index);
+		}
+	}
 }
 
 /// Verify the backing of the given candidate.
@@ -748,44 +821,42 @@ impl<H> BackedCandidate<H> {
 /// Returns either an error, indicating that one of the signatures was invalid or that the index
 /// was out-of-bounds, or the number of signatures checked.
 pub fn check_candidate_backing<H: AsRef<[u8]> + Clone + Encode + core::fmt::Debug>(
-	backed: &BackedCandidate<H>,
+	candidate_hash: CandidateHash,
+	validity_votes: &[ValidityAttestation],
+	validator_indices: &BitSlice<u8, bitvec::order::Lsb0>,
 	signing_context: &SigningContext<H>,
 	group_len: usize,
 	validator_lookup: impl Fn(usize) -> Option<ValidatorId>,
 ) -> Result<usize, ()> {
-	if backed.validator_indices.len() != group_len {
+	if validator_indices.len() != group_len {
 		log::debug!(
 			target: LOG_TARGET,
 			"Check candidate backing: indices mismatch: group_len = {} , indices_len = {}",
 			group_len,
-			backed.validator_indices.len(),
+			validator_indices.len(),
 		);
 		return Err(())
 	}
 
-	if backed.validity_votes.len() > group_len {
+	if validity_votes.len() > group_len {
 		log::debug!(
 			target: LOG_TARGET,
 			"Check candidate backing: Too many votes, expected: {}, found: {}",
 			group_len,
-			backed.validity_votes.len(),
+			validity_votes.len(),
 		);
 		return Err(())
 	}
 
-	// this is known, even in runtime, to be blake2-256.
-	let hash = backed.candidate.hash();
-
 	let mut signed = 0;
-	for ((val_in_group_idx, _), attestation) in backed
-		.validator_indices
+	for ((val_in_group_idx, _), attestation) in validator_indices
 		.iter()
 		.enumerate()
 		.filter(|(_, signed)| **signed)
-		.zip(backed.validity_votes.iter())
+		.zip(validity_votes.iter())
 	{
 		let validator_id = validator_lookup(val_in_group_idx).ok_or(())?;
-		let payload = attestation.signed_payload(hash, signing_context);
+		let payload = attestation.signed_payload(candidate_hash, signing_context);
 		let sig = attestation.signature();
 
 		if sig.verify(&payload[..], &validator_id) {
@@ -801,11 +872,11 @@ pub fn check_candidate_backing<H: AsRef<[u8]> + Clone + Encode + core::fmt::Debu
 		}
 	}
 
-	if signed != backed.validity_votes.len() {
+	if signed != validity_votes.len() {
 		log::error!(
 			target: LOG_TARGET,
 			"Check candidate backing: Too many signatures, expected = {}, found = {}",
-			backed.validity_votes.len() ,
+			validity_votes.len(),
 			signed,
 		);
 		return Err(())
@@ -1884,6 +1955,34 @@ pub enum PvfExecKind {
 #[cfg(test)]
 mod tests {
 	use super::*;
+	use bitvec::bitvec;
+	use primitives::sr25519;
+
+	pub fn dummy_committed_candidate_receipt() -> CommittedCandidateReceipt {
+		let zeros = Hash::zero();
+
+		CommittedCandidateReceipt {
+			descriptor: CandidateDescriptor {
+				para_id: 0.into(),
+				relay_parent: zeros,
+				collator: CollatorId::from(sr25519::Public::from_raw([0; 32])),
+				persisted_validation_data_hash: zeros,
+				pov_hash: zeros,
+				erasure_root: zeros,
+				signature: CollatorSignature::from(sr25519::Signature([0u8; 64])),
+				para_head: zeros,
+				validation_code_hash: ValidationCode(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]).hash(),
+			},
+			commitments: CandidateCommitments {
+				head_data: HeadData(vec![]),
+				upward_messages: vec![].try_into().expect("empty vec fits within bounds"),
+				new_validation_code: None,
+				horizontal_messages: vec![].try_into().expect("empty vec fits within bounds"),
+				processed_downward_messages: 0,
+				hrmp_watermark: 0_u32,
+			},
+		}
+	}
 
 	#[test]
 	fn group_rotation_info_calculations() {
@@ -1958,4 +2057,73 @@ mod tests {
 
 		assert!(zero_b.leading_zeros() >= zero_u.leading_zeros());
 	}
+
+	#[test]
+	fn test_backed_candidate_injected_core_index() {
+		let initial_validator_indices = bitvec![u8, bitvec::order::Lsb0; 0, 1, 0, 1];
+		let mut candidate = BackedCandidate::new(
+			dummy_committed_candidate_receipt(),
+			vec![],
+			initial_validator_indices.clone(),
+			None,
+		);
+
+		// No core index supplied, ElasticScalingMVP is off.
+		let (validator_indices, core_index) = candidate.validator_indices_and_core_index(false);
+		assert_eq!(validator_indices, initial_validator_indices.as_bitslice());
+		assert!(core_index.is_none());
+
+		// No core index supplied, ElasticScalingMVP is on. Still, decoding will be ok if backing
+		// group size is <= 8, to give a chance to parachains that don't have multiple cores
+		// assigned.
+		let (validator_indices, core_index) = candidate.validator_indices_and_core_index(true);
+		assert_eq!(validator_indices, initial_validator_indices.as_bitslice());
+		assert!(core_index.is_none());
+
+		let encoded_validator_indices = candidate.validator_indices.clone();
+		candidate.set_validator_indices_and_core_index(validator_indices.into(), core_index);
+		assert_eq!(candidate.validator_indices, encoded_validator_indices);
+
+		// No core index supplied, ElasticScalingMVP is on. Decoding is corrupted if backing group
+		// size larger than 8.
+		let candidate = BackedCandidate::new(
+			dummy_committed_candidate_receipt(),
+			vec![],
+			bitvec![u8, bitvec::order::Lsb0; 0, 1, 0, 1, 0, 1, 0, 1, 0],
+			None,
+		);
+		let (validator_indices, core_index) = candidate.validator_indices_and_core_index(true);
+		assert_eq!(validator_indices, bitvec![u8, bitvec::order::Lsb0; 0].as_bitslice());
+		assert!(core_index.is_some());
+
+		// Core index supplied, ElasticScalingMVP is off. Core index will be treated as normal
+		// validator indices. Runtime will check against this.
+		let candidate = BackedCandidate::new(
+			dummy_committed_candidate_receipt(),
+			vec![],
+			bitvec![u8, bitvec::order::Lsb0; 0, 1, 0, 1],
+			Some(CoreIndex(10)),
+		);
+		let (validator_indices, core_index) = candidate.validator_indices_and_core_index(false);
+		assert_eq!(
+			validator_indices,
+			bitvec![u8, bitvec::order::Lsb0; 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0]
+		);
+		assert!(core_index.is_none());
+
+		// Core index supplied, ElasticScalingMVP is on.
+		let mut candidate = BackedCandidate::new(
+			dummy_committed_candidate_receipt(),
+			vec![],
+			bitvec![u8, bitvec::order::Lsb0; 0, 1, 0, 1],
+			Some(CoreIndex(10)),
+		);
+		let (validator_indices, core_index) = candidate.validator_indices_and_core_index(true);
+		assert_eq!(validator_indices, bitvec![u8, bitvec::order::Lsb0; 0, 1, 0, 1]);
+		assert_eq!(core_index, Some(CoreIndex(10)));
+
+		let encoded_validator_indices = candidate.validator_indices.clone();
+		candidate.set_validator_indices_and_core_index(validator_indices.into(), core_index);
+		assert_eq!(candidate.validator_indices, encoded_validator_indices);
+	}
 }
diff --git a/polkadot/runtime/parachains/Cargo.toml b/polkadot/runtime/parachains/Cargo.toml
index 311a62b6c91..61040145476 100644
--- a/polkadot/runtime/parachains/Cargo.toml
+++ b/polkadot/runtime/parachains/Cargo.toml
@@ -69,6 +69,7 @@ sp-tracing = { path = "../../../substrate/primitives/tracing" }
 sp-crypto-hashing = { path = "../../../substrate/primitives/crypto/hashing" }
 thousands = "0.2.0"
 assert_matches = "1"
+rstest = "0.18.2"
 serde_json = { workspace = true, default-features = true }
 
 [features]
diff --git a/polkadot/runtime/parachains/src/builder.rs b/polkadot/runtime/parachains/src/builder.rs
index 016b3fca589..500bc70cfa7 100644
--- a/polkadot/runtime/parachains/src/builder.rs
+++ b/polkadot/runtime/parachains/src/builder.rs
@@ -587,11 +587,12 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
 					})
 					.collect();
 
-				BackedCandidate::<T::Hash> {
+				BackedCandidate::<T::Hash>::new(
 					candidate,
 					validity_votes,
-					validator_indices: bitvec::bitvec![u8, bitvec::order::Lsb0; 1; group_validators.len()],
-				}
+					bitvec::bitvec![u8, bitvec::order::Lsb0; 1; group_validators.len()],
+					None,
+				)
 			})
 			.collect()
 	}
diff --git a/polkadot/runtime/parachains/src/inclusion/mod.rs b/polkadot/runtime/parachains/src/inclusion/mod.rs
index 90af9cde00a..16e2e93b561 100644
--- a/polkadot/runtime/parachains/src/inclusion/mod.rs
+++ b/polkadot/runtime/parachains/src/inclusion/mod.rs
@@ -47,10 +47,7 @@ use scale_info::TypeInfo;
 use sp_runtime::{traits::One, DispatchError, SaturatedConversion, Saturating};
 #[cfg(feature = "std")]
 use sp_std::fmt;
-use sp_std::{
-	collections::{btree_map::BTreeMap, btree_set::BTreeSet},
-	prelude::*,
-};
+use sp_std::{collections::btree_set::BTreeSet, prelude::*};
 
 pub use pallet::*;
 
@@ -601,18 +598,16 @@ impl<T: Config> Pallet<T> {
 	/// scheduled cores. If these conditions are not met, the execution of the function fails.
 	pub(crate) fn process_candidates<GV>(
 		allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
-		candidates: Vec<BackedCandidate<T::Hash>>,
-		scheduled: &BTreeMap<ParaId, CoreIndex>,
+		candidates: Vec<(BackedCandidate<T::Hash>, CoreIndex)>,
 		group_validators: GV,
+		core_index_enabled: bool,
 	) -> Result<ProcessedCandidates<T::Hash>, DispatchError>
 	where
 		GV: Fn(GroupIndex) -> Option<Vec<ValidatorIndex>>,
 	{
 		let now = <frame_system::Pallet<T>>::block_number();
 
-		ensure!(candidates.len() <= scheduled.len(), Error::<T>::UnscheduledCandidate);
-
-		if scheduled.is_empty() {
+		if candidates.is_empty() {
 			return Ok(ProcessedCandidates::default())
 		}
 
@@ -648,7 +643,7 @@ impl<T: Config> Pallet<T> {
 			//
 			// In the meantime, we do certain sanity checks on the candidates and on the scheduled
 			// list.
-			for (candidate_idx, backed_candidate) in candidates.iter().enumerate() {
+			for (candidate_idx, (backed_candidate, core_index)) in candidates.iter().enumerate() {
 				let relay_parent_hash = backed_candidate.descriptor().relay_parent;
 				let para_id = backed_candidate.descriptor().para_id;
 
@@ -663,7 +658,7 @@ impl<T: Config> Pallet<T> {
 				let relay_parent_number = match check_ctx.verify_backed_candidate(
 					&allowed_relay_parents,
 					candidate_idx,
-					backed_candidate,
+					backed_candidate.candidate(),
 				)? {
 					Err(FailedToCreatePVD) => {
 						log::debug!(
@@ -679,11 +674,22 @@ impl<T: Config> Pallet<T> {
 					Ok(rpn) => rpn,
 				};
 
-				let para_id = backed_candidate.descriptor().para_id;
+				let (validator_indices, _) =
+					backed_candidate.validator_indices_and_core_index(core_index_enabled);
+
+				log::debug!(
+					target: LOG_TARGET,
+					"Candidate {:?} on {:?},
+					core_index_enabled = {}",
+					backed_candidate.hash(),
+					core_index,
+					core_index_enabled
+				);
+
+				check_assignment_in_order(core_index)?;
+
 				let mut backers = bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()];
 
-				let core_idx = *scheduled.get(&para_id).ok_or(Error::<T>::UnscheduledCandidate)?;
-				check_assignment_in_order(core_idx)?;
 				ensure!(
 					<PendingAvailability<T>>::get(&para_id).is_none() &&
 						<PendingAvailabilityCommitments<T>>::get(&para_id).is_none(),
@@ -694,7 +700,7 @@ impl<T: Config> Pallet<T> {
 				// assigned to core at block `N + 1`. Thus, `relay_parent_number + 1`
 				// will always land in the current session.
 				let group_idx = <scheduler::Pallet<T>>::group_assigned_to_core(
-					core_idx,
+					*core_index,
 					relay_parent_number + One::one(),
 				)
 				.ok_or_else(|| {
@@ -711,7 +717,9 @@ impl<T: Config> Pallet<T> {
 				// check the signatures in the backing and that it is a majority.
 				{
 					let maybe_amount_validated = primitives::check_candidate_backing(
-						&backed_candidate,
+						backed_candidate.candidate().hash(),
+						backed_candidate.validity_votes(),
+						validator_indices,
 						&signing_context,
 						group_vals.len(),
 						|intra_group_vi| {
@@ -738,16 +746,15 @@ impl<T: Config> Pallet<T> {
 
 					let mut backer_idx_and_attestation =
 						Vec::<(ValidatorIndex, ValidityAttestation)>::with_capacity(
-							backed_candidate.validator_indices.count_ones(),
+							validator_indices.count_ones(),
 						);
 					let candidate_receipt = backed_candidate.receipt();
 
-					for ((bit_idx, _), attestation) in backed_candidate
-						.validator_indices
+					for ((bit_idx, _), attestation) in validator_indices
 						.iter()
 						.enumerate()
 						.filter(|(_, signed)| **signed)
-						.zip(backed_candidate.validity_votes.iter().cloned())
+						.zip(backed_candidate.validity_votes().iter().cloned())
 					{
 						let val_idx =
 							group_vals.get(bit_idx).expect("this query succeeded above; qed");
@@ -760,7 +767,7 @@ impl<T: Config> Pallet<T> {
 				}
 
 				core_indices_and_backers.push((
-					(core_idx, para_id),
+					(*core_index, para_id),
 					backers,
 					group_idx,
 					relay_parent_number,
@@ -772,7 +779,7 @@ impl<T: Config> Pallet<T> {
 
 		// one more sweep for actually writing to storage.
 		let core_indices = core_indices_and_backers.iter().map(|(c, ..)| *c).collect();
-		for (candidate, (core, backers, group, relay_parent_number)) in
+		for ((candidate, _), (core, backers, group, relay_parent_number)) in
 			candidates.into_iter().zip(core_indices_and_backers)
 		{
 			let para_id = candidate.descriptor().para_id;
@@ -782,16 +789,18 @@ impl<T: Config> Pallet<T> {
 				bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()];
 
 			Self::deposit_event(Event::<T>::CandidateBacked(
-				candidate.candidate.to_plain(),
-				candidate.candidate.commitments.head_data.clone(),
+				candidate.candidate().to_plain(),
+				candidate.candidate().commitments.head_data.clone(),
 				core.0,
 				group,
 			));
 
-			let candidate_hash = candidate.candidate.hash();
+			let candidate_hash = candidate.candidate().hash();
 
-			let (descriptor, commitments) =
-				(candidate.candidate.descriptor, candidate.candidate.commitments);
+			let (descriptor, commitments) = (
+				candidate.candidate().descriptor.clone(),
+				candidate.candidate().commitments.clone(),
+			);
 
 			<PendingAvailability<T>>::insert(
 				&para_id,
@@ -1195,10 +1204,10 @@ impl<T: Config> CandidateCheckContext<T> {
 		&self,
 		allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
 		candidate_idx: usize,
-		backed_candidate: &BackedCandidate<<T as frame_system::Config>::Hash>,
+		backed_candidate_receipt: &CommittedCandidateReceipt<<T as frame_system::Config>::Hash>,
 	) -> Result<Result<BlockNumberFor<T>, FailedToCreatePVD>, Error<T>> {
-		let para_id = backed_candidate.descriptor().para_id;
-		let relay_parent = backed_candidate.descriptor().relay_parent;
+		let para_id = backed_candidate_receipt.descriptor().para_id;
+		let relay_parent = backed_candidate_receipt.descriptor().relay_parent;
 
 		// Check that the relay-parent is one of the allowed relay-parents.
 		let (relay_parent_storage_root, relay_parent_number) = {
@@ -1223,13 +1232,13 @@ impl<T: Config> CandidateCheckContext<T> {
 			let expected = persisted_validation_data.hash();
 
 			ensure!(
-				expected == backed_candidate.descriptor().persisted_validation_data_hash,
+				expected == backed_candidate_receipt.descriptor().persisted_validation_data_hash,
 				Error::<T>::ValidationDataHashMismatch,
 			);
 		}
 
 		ensure!(
-			backed_candidate.descriptor().check_collator_signature().is_ok(),
+			backed_candidate_receipt.descriptor().check_collator_signature().is_ok(),
 			Error::<T>::NotCollatorSigned,
 		);
 
@@ -1237,25 +1246,25 @@ impl<T: Config> CandidateCheckContext<T> {
 			// A candidate for a parachain without current validation code is not scheduled.
 			.ok_or_else(|| Error::<T>::UnscheduledCandidate)?;
 		ensure!(
-			backed_candidate.descriptor().validation_code_hash == validation_code_hash,
+			backed_candidate_receipt.descriptor().validation_code_hash == validation_code_hash,
 			Error::<T>::InvalidValidationCodeHash,
 		);
 
 		ensure!(
-			backed_candidate.descriptor().para_head ==
-				backed_candidate.candidate.commitments.head_data.hash(),
+			backed_candidate_receipt.descriptor().para_head ==
+				backed_candidate_receipt.commitments.head_data.hash(),
 			Error::<T>::ParaHeadMismatch,
 		);
 
 		if let Err(err) = self.check_validation_outputs(
 			para_id,
 			relay_parent_number,
-			&backed_candidate.candidate.commitments.head_data,
-			&backed_candidate.candidate.commitments.new_validation_code,
-			backed_candidate.candidate.commitments.processed_downward_messages,
-			&backed_candidate.candidate.commitments.upward_messages,
-			BlockNumberFor::<T>::from(backed_candidate.candidate.commitments.hrmp_watermark),
-			&backed_candidate.candidate.commitments.horizontal_messages,
+			&backed_candidate_receipt.commitments.head_data,
+			&backed_candidate_receipt.commitments.new_validation_code,
+			backed_candidate_receipt.commitments.processed_downward_messages,
+			&backed_candidate_receipt.commitments.upward_messages,
+			BlockNumberFor::<T>::from(backed_candidate_receipt.commitments.hrmp_watermark),
+			&backed_candidate_receipt.commitments.horizontal_messages,
 		) {
 			log::debug!(
 				target: LOG_TARGET,
diff --git a/polkadot/runtime/parachains/src/inclusion/tests.rs b/polkadot/runtime/parachains/src/inclusion/tests.rs
index 557b7b71a9e..d2b5a67c3e4 100644
--- a/polkadot/runtime/parachains/src/inclusion/tests.rs
+++ b/polkadot/runtime/parachains/src/inclusion/tests.rs
@@ -120,6 +120,7 @@ pub(crate) fn back_candidate(
 	keystore: &KeystorePtr,
 	signing_context: &SigningContext,
 	kind: BackingKind,
+	core_index: Option<CoreIndex>,
 ) -> BackedCandidate {
 	let mut validator_indices = bitvec::bitvec![u8, BitOrderLsb0; 0; group.len()];
 	let threshold = effective_minimum_backing_votes(
@@ -155,15 +156,20 @@ pub(crate) fn back_candidate(
 		validity_votes.push(ValidityAttestation::Explicit(signature).into());
 	}
 
-	let backed = BackedCandidate { candidate, validity_votes, validator_indices };
+	let backed =
+		BackedCandidate::new(candidate, validity_votes, validator_indices.clone(), core_index);
 
-	let successfully_backed =
-		primitives::check_candidate_backing(&backed, signing_context, group.len(), |i| {
-			Some(validators[group[i].0 as usize].public().into())
-		})
-		.ok()
-		.unwrap_or(0) >=
-			threshold;
+	let successfully_backed = primitives::check_candidate_backing(
+		backed.candidate().hash(),
+		backed.validity_votes(),
+		validator_indices.as_bitslice(),
+		signing_context,
+		group.len(),
+		|i| Some(validators[group[i].0 as usize].public().into()),
+	)
+	.ok()
+	.unwrap_or(0) >=
+		threshold;
 
 	match kind {
 		BackingKind::Unanimous | BackingKind::Threshold => assert!(successfully_backed),
@@ -919,38 +925,16 @@ fn candidate_checks() {
 		let thread_a_assignment = (thread_a, CoreIndex::from(2));
 		let allowed_relay_parents = default_allowed_relay_parent_tracker();
 
-		// unscheduled candidate.
-		{
-			let mut candidate = TestCandidateBuilder {
-				para_id: chain_a,
-				relay_parent: System::parent_hash(),
-				pov_hash: Hash::repeat_byte(1),
-				persisted_validation_data_hash: make_vdata_hash(chain_a).unwrap(),
-				hrmp_watermark: RELAY_PARENT_NUM,
-				..Default::default()
-			}
-			.build();
-			collator_sign_candidate(Sr25519Keyring::One, &mut candidate);
-
-			let backed = back_candidate(
-				candidate,
-				&validators,
-				group_validators(GroupIndex::from(0)).unwrap().as_ref(),
-				&keystore,
-				&signing_context,
-				BackingKind::Threshold,
-			);
-
-			assert_noop!(
-				ParaInclusion::process_candidates(
-					&allowed_relay_parents,
-					vec![backed],
-					&[chain_b_assignment].into_iter().collect(),
-					&group_validators,
-				),
-				Error::<Test>::UnscheduledCandidate
-			);
-		}
+		// no candidates.
+		assert_eq!(
+			ParaInclusion::process_candidates(
+				&allowed_relay_parents,
+				vec![],
+				&group_validators,
+				false
+			),
+			Ok(ProcessedCandidates::default())
+		);
 
 		// candidates out of order.
 		{
@@ -984,6 +968,7 @@ fn candidate_checks() {
 				&keystore,
 				&signing_context,
 				BackingKind::Threshold,
+				None,
 			);
 
 			let backed_b = back_candidate(
@@ -993,15 +978,16 @@ fn candidate_checks() {
 				&keystore,
 				&signing_context,
 				BackingKind::Threshold,
+				None,
 			);
 
 			// out-of-order manifests as unscheduled.
 			assert_noop!(
 				ParaInclusion::process_candidates(
 					&allowed_relay_parents,
-					vec![backed_b, backed_a],
-					&[chain_a_assignment, chain_b_assignment].into_iter().collect(),
+					vec![(backed_b, chain_b_assignment.1), (backed_a, chain_a_assignment.1)],
 					&group_validators,
+					false
 				),
 				Error::<Test>::ScheduledOutOfOrder
 			);
@@ -1027,14 +1013,15 @@ fn candidate_checks() {
 				&keystore,
 				&signing_context,
 				BackingKind::Lacking,
+				None,
 			);
 
 			assert_noop!(
 				ParaInclusion::process_candidates(
 					&allowed_relay_parents,
-					vec![backed],
-					&[chain_a_assignment].into_iter().collect(),
+					vec![(backed, chain_a_assignment.1)],
 					&group_validators,
+					false
 				),
 				Error::<Test>::InsufficientBacking
 			);
@@ -1075,6 +1062,7 @@ fn candidate_checks() {
 				&keystore,
 				&signing_context,
 				BackingKind::Threshold,
+				None,
 			);
 
 			let backed_b = back_candidate(
@@ -1084,14 +1072,15 @@ fn candidate_checks() {
 				&keystore,
 				&signing_context,
 				BackingKind::Threshold,
+				None,
 			);
 
 			assert_noop!(
 				ParaInclusion::process_candidates(
 					&allowed_relay_parents,
-					vec![backed_b, backed_a],
-					&[chain_a_assignment, chain_b_assignment].into_iter().collect(),
+					vec![(backed_b, chain_b_assignment.1), (backed_a, chain_a_assignment.1)],
 					&group_validators,
+					false
 				),
 				Error::<Test>::DisallowedRelayParent
 			);
@@ -1122,14 +1111,15 @@ fn candidate_checks() {
 				&keystore,
 				&signing_context,
 				BackingKind::Threshold,
+				None,
 			);
 
 			assert_noop!(
 				ParaInclusion::process_candidates(
 					&allowed_relay_parents,
-					vec![backed],
-					&[thread_a_assignment].into_iter().collect(),
+					vec![(backed, thread_a_assignment.1)],
 					&group_validators,
+					false
 				),
 				Error::<Test>::NotCollatorSigned
 			);
@@ -1156,6 +1146,7 @@ fn candidate_checks() {
 				&keystore,
 				&signing_context,
 				BackingKind::Threshold,
+				None,
 			);
 
 			let candidate = TestCandidateBuilder::default().build();
@@ -1177,9 +1168,9 @@ fn candidate_checks() {
 			assert_noop!(
 				ParaInclusion::process_candidates(
 					&allowed_relay_parents,
-					vec![backed],
-					&[chain_a_assignment].into_iter().collect(),
+					vec![(backed, chain_a_assignment.1)],
 					&group_validators,
+					false
 				),
 				Error::<Test>::CandidateScheduledBeforeParaFree
 			);
@@ -1212,14 +1203,15 @@ fn candidate_checks() {
 				&keystore,
 				&signing_context,
 				BackingKind::Threshold,
+				None,
 			);
 
 			assert_noop!(
 				ParaInclusion::process_candidates(
 					&allowed_relay_parents,
-					vec![backed],
-					&[chain_a_assignment].into_iter().collect(),
+					vec![(backed, chain_a_assignment.1)],
 					&group_validators,
+					false
 				),
 				Error::<Test>::CandidateScheduledBeforeParaFree
 			);
@@ -1249,6 +1241,7 @@ fn candidate_checks() {
 				&keystore,
 				&signing_context,
 				BackingKind::Threshold,
+				None,
 			);
 
 			{
@@ -1267,9 +1260,9 @@ fn candidate_checks() {
 			assert_noop!(
 				ParaInclusion::process_candidates(
 					&allowed_relay_parents,
-					vec![backed],
-					&[chain_a_assignment].into_iter().collect(),
+					vec![(backed, chain_a_assignment.1)],
 					&group_validators,
+					false
 				),
 				Error::<Test>::PrematureCodeUpgrade
 			);
@@ -1296,14 +1289,15 @@ fn candidate_checks() {
 				&keystore,
 				&signing_context,
 				BackingKind::Threshold,
+				None,
 			);
 
 			assert_eq!(
 				ParaInclusion::process_candidates(
 					&allowed_relay_parents,
-					vec![backed],
-					&[chain_a_assignment].into_iter().collect(),
+					vec![(backed, chain_a_assignment.1)],
 					&group_validators,
+					false
 				),
 				Err(Error::<Test>::ValidationDataHashMismatch.into()),
 			);
@@ -1331,14 +1325,15 @@ fn candidate_checks() {
 				&keystore,
 				&signing_context,
 				BackingKind::Threshold,
+				None,
 			);
 
 			assert_noop!(
 				ParaInclusion::process_candidates(
 					&allowed_relay_parents,
-					vec![backed],
-					&[chain_a_assignment].into_iter().collect(),
+					vec![(backed, chain_a_assignment.1)],
 					&group_validators,
+					false
 				),
 				Error::<Test>::InvalidValidationCodeHash
 			);
@@ -1366,14 +1361,15 @@ fn candidate_checks() {
 				&keystore,
 				&signing_context,
 				BackingKind::Threshold,
+				None,
 			);
 
 			assert_noop!(
 				ParaInclusion::process_candidates(
 					&allowed_relay_parents,
-					vec![backed],
-					&[chain_a_assignment].into_iter().collect(),
+					vec![(backed, chain_a_assignment.1)],
 					&group_validators,
+					false
 				),
 				Error::<Test>::ParaHeadMismatch
 			);
@@ -1486,6 +1482,7 @@ fn backing_works() {
 			&keystore,
 			&signing_context,
 			BackingKind::Threshold,
+			None,
 		);
 
 		let backed_b = back_candidate(
@@ -1495,6 +1492,7 @@ fn backing_works() {
 			&keystore,
 			&signing_context,
 			BackingKind::Threshold,
+			None,
 		);
 
 		let backed_c = back_candidate(
@@ -1504,15 +1502,20 @@ fn backing_works() {
 			&keystore,
 			&signing_context,
 			BackingKind::Threshold,
+			None,
 		);
 
-		let backed_candidates = vec![backed_a.clone(), backed_b.clone(), backed_c];
+		let backed_candidates = vec![
+			(backed_a.clone(), chain_a_assignment.1),
+			(backed_b.clone(), chain_b_assignment.1),
+			(backed_c, thread_a_assignment.1),
+		];
 		let get_backing_group_idx = {
 			// the order defines the group implicitly for this test case
 			let backed_candidates_with_groups = backed_candidates
 				.iter()
 				.enumerate()
-				.map(|(idx, backed_candidate)| (backed_candidate.hash(), GroupIndex(idx as _)))
+				.map(|(idx, (backed_candidate, _))| (backed_candidate.hash(), GroupIndex(idx as _)))
 				.collect::<Vec<_>>();
 
 			move |candidate_hash_x: CandidateHash| -> Option<GroupIndex> {
@@ -1532,10 +1535,8 @@ fn backing_works() {
 		} = ParaInclusion::process_candidates(
 			&allowed_relay_parents,
 			backed_candidates.clone(),
-			&[chain_a_assignment, chain_b_assignment, thread_a_assignment]
-				.into_iter()
-				.collect(),
 			&group_validators,
+			false,
 		)
 		.expect("candidates scheduled, in order, and backed");
 
@@ -1554,22 +1555,22 @@ fn backing_works() {
 				CandidateHash,
 				(CandidateReceipt, Vec<(ValidatorIndex, ValidityAttestation)>),
 			>::new();
-			backed_candidates.into_iter().for_each(|backed_candidate| {
+			backed_candidates.into_iter().for_each(|(backed_candidate, _)| {
 				let candidate_receipt_with_backers = intermediate
 					.entry(backed_candidate.hash())
 					.or_insert_with(|| (backed_candidate.receipt(), Vec::new()));
-
-				assert_eq!(
-					backed_candidate.validity_votes.len(),
-					backed_candidate.validator_indices.count_ones()
-				);
+				let (validator_indices, None) =
+					backed_candidate.validator_indices_and_core_index(false)
+				else {
+					panic!("Expected no injected core index")
+				};
+				assert_eq!(backed_candidate.validity_votes().len(), validator_indices.count_ones());
 				candidate_receipt_with_backers.1.extend(
-					backed_candidate
-						.validator_indices
+					validator_indices
 						.iter()
 						.enumerate()
 						.filter(|(_, signed)| **signed)
-						.zip(backed_candidate.validity_votes.iter().cloned())
+						.zip(backed_candidate.validity_votes().iter().cloned())
 						.filter_map(|((validator_index_within_group, _), attestation)| {
 							let grp_idx = get_backing_group_idx(backed_candidate.hash()).unwrap();
 							group_validators(grp_idx).map(|validator_indices| {
@@ -1666,6 +1667,257 @@ fn backing_works() {
 	});
 }
 
+#[test]
+fn backing_works_with_elastic_scaling_mvp() {
+	let chain_a = ParaId::from(1_u32);
+	let chain_b = ParaId::from(2_u32);
+	let thread_a = ParaId::from(3_u32);
+
+	// The block number of the relay-parent for testing.
+	const RELAY_PARENT_NUM: BlockNumber = 4;
+
+	let paras = vec![
+		(chain_a, ParaKind::Parachain),
+		(chain_b, ParaKind::Parachain),
+		(thread_a, ParaKind::Parathread),
+	];
+	let validators = vec![
+		Sr25519Keyring::Alice,
+		Sr25519Keyring::Bob,
+		Sr25519Keyring::Charlie,
+		Sr25519Keyring::Dave,
+		Sr25519Keyring::Ferdie,
+	];
+	let keystore: KeystorePtr = Arc::new(LocalKeystore::in_memory());
+	for validator in validators.iter() {
+		Keystore::sr25519_generate_new(
+			&*keystore,
+			PARACHAIN_KEY_TYPE_ID,
+			Some(&validator.to_seed()),
+		)
+		.unwrap();
+	}
+	let validator_public = validator_pubkeys(&validators);
+
+	new_test_ext(genesis_config(paras)).execute_with(|| {
+		shared::Pallet::<Test>::set_active_validators_ascending(validator_public.clone());
+		shared::Pallet::<Test>::set_session_index(5);
+
+		run_to_block(5, |_| None);
+
+		let signing_context =
+			SigningContext { parent_hash: System::parent_hash(), session_index: 5 };
+
+		let group_validators = |group_index: GroupIndex| {
+			match group_index {
+				group_index if group_index == GroupIndex::from(0) => Some(vec![0, 1]),
+				group_index if group_index == GroupIndex::from(1) => Some(vec![2, 3]),
+				group_index if group_index == GroupIndex::from(2) => Some(vec![4]),
+				_ => panic!("Group index out of bounds for 2 parachains and 1 parathread core"),
+			}
+			.map(|vs| vs.into_iter().map(ValidatorIndex).collect::<Vec<_>>())
+		};
+
+		// When processing candidates, we compute the group index from scheduler.
+		let validator_groups = vec![
+			vec![ValidatorIndex(0), ValidatorIndex(1)],
+			vec![ValidatorIndex(2), ValidatorIndex(3)],
+			vec![ValidatorIndex(4)],
+		];
+		Scheduler::set_validator_groups(validator_groups);
+
+		let allowed_relay_parents = default_allowed_relay_parent_tracker();
+
+		let mut candidate_a = TestCandidateBuilder {
+			para_id: chain_a,
+			relay_parent: System::parent_hash(),
+			pov_hash: Hash::repeat_byte(1),
+			persisted_validation_data_hash: make_vdata_hash(chain_a).unwrap(),
+			hrmp_watermark: RELAY_PARENT_NUM,
+			..Default::default()
+		}
+		.build();
+		collator_sign_candidate(Sr25519Keyring::One, &mut candidate_a);
+
+		let mut candidate_b_1 = TestCandidateBuilder {
+			para_id: chain_b,
+			relay_parent: System::parent_hash(),
+			pov_hash: Hash::repeat_byte(2),
+			persisted_validation_data_hash: make_vdata_hash(chain_b).unwrap(),
+			hrmp_watermark: RELAY_PARENT_NUM,
+			..Default::default()
+		}
+		.build();
+		collator_sign_candidate(Sr25519Keyring::One, &mut candidate_b_1);
+
+		let mut candidate_b_2 = TestCandidateBuilder {
+			para_id: chain_b,
+			relay_parent: System::parent_hash(),
+			pov_hash: Hash::repeat_byte(3),
+			persisted_validation_data_hash: make_vdata_hash(chain_b).unwrap(),
+			hrmp_watermark: RELAY_PARENT_NUM,
+			..Default::default()
+		}
+		.build();
+		collator_sign_candidate(Sr25519Keyring::One, &mut candidate_b_2);
+
+		let backed_a = back_candidate(
+			candidate_a.clone(),
+			&validators,
+			group_validators(GroupIndex::from(0)).unwrap().as_ref(),
+			&keystore,
+			&signing_context,
+			BackingKind::Threshold,
+			None,
+		);
+
+		let backed_b_1 = back_candidate(
+			candidate_b_1.clone(),
+			&validators,
+			group_validators(GroupIndex::from(1)).unwrap().as_ref(),
+			&keystore,
+			&signing_context,
+			BackingKind::Threshold,
+			Some(CoreIndex(1)),
+		);
+
+		let backed_b_2 = back_candidate(
+			candidate_b_2.clone(),
+			&validators,
+			group_validators(GroupIndex::from(2)).unwrap().as_ref(),
+			&keystore,
+			&signing_context,
+			BackingKind::Threshold,
+			Some(CoreIndex(2)),
+		);
+
+		let backed_candidates = vec![
+			(backed_a.clone(), CoreIndex(0)),
+			(backed_b_1.clone(), CoreIndex(1)),
+			(backed_b_2.clone(), CoreIndex(2)),
+		];
+		let get_backing_group_idx = {
+			// the order defines the group implicitly for this test case
+			let backed_candidates_with_groups = backed_candidates
+				.iter()
+				.enumerate()
+				.map(|(idx, (backed_candidate, _))| (backed_candidate.hash(), GroupIndex(idx as _)))
+				.collect::<Vec<_>>();
+
+			move |candidate_hash_x: CandidateHash| -> Option<GroupIndex> {
+				backed_candidates_with_groups.iter().find_map(|(candidate_hash, grp)| {
+					if *candidate_hash == candidate_hash_x {
+						Some(*grp)
+					} else {
+						None
+					}
+				})
+			}
+		};
+
+		let ProcessedCandidates {
+			core_indices: occupied_cores,
+			candidate_receipt_with_backing_validator_indices,
+		} = ParaInclusion::process_candidates(
+			&allowed_relay_parents,
+			backed_candidates.clone(),
+			&group_validators,
+			true,
+		)
+		.expect("candidates scheduled, in order, and backed");
+
+		// Both b candidates will be backed. However, only one will be recorded on-chain and proceed
+		// with being made available.
+		assert_eq!(
+			occupied_cores,
+			vec![
+				(CoreIndex::from(0), chain_a),
+				(CoreIndex::from(1), chain_b),
+				(CoreIndex::from(2), chain_b),
+			]
+		);
+
+		// Transform the votes into the setup we expect
+		let mut expected = std::collections::HashMap::<
+			CandidateHash,
+			(CandidateReceipt, Vec<(ValidatorIndex, ValidityAttestation)>),
+		>::new();
+		backed_candidates.into_iter().for_each(|(backed_candidate, _)| {
+			let candidate_receipt_with_backers = expected
+				.entry(backed_candidate.hash())
+				.or_insert_with(|| (backed_candidate.receipt(), Vec::new()));
+			let (validator_indices, _maybe_core_index) =
+				backed_candidate.validator_indices_and_core_index(true);
+			assert_eq!(backed_candidate.validity_votes().len(), validator_indices.count_ones());
+			candidate_receipt_with_backers.1.extend(
+				validator_indices
+					.iter()
+					.enumerate()
+					.filter(|(_, signed)| **signed)
+					.zip(backed_candidate.validity_votes().iter().cloned())
+					.filter_map(|((validator_index_within_group, _), attestation)| {
+						let grp_idx = get_backing_group_idx(backed_candidate.hash()).unwrap();
+						group_validators(grp_idx).map(|validator_indices| {
+							(validator_indices[validator_index_within_group], attestation)
+						})
+					}),
+			);
+		});
+
+		assert_eq!(
+			expected,
+			candidate_receipt_with_backing_validator_indices
+				.into_iter()
+				.map(|c| (c.0.hash(), c))
+				.collect()
+		);
+
+		let backers = {
+			let num_backers = effective_minimum_backing_votes(
+				group_validators(GroupIndex(0)).unwrap().len(),
+				configuration::Pallet::<Test>::config().minimum_backing_votes,
+			);
+			backing_bitfield(&(0..num_backers).collect::<Vec<_>>())
+		};
+		assert_eq!(
+			<PendingAvailability<Test>>::get(&chain_a),
+			Some(CandidatePendingAvailability {
+				core: CoreIndex::from(0),
+				hash: candidate_a.hash(),
+				descriptor: candidate_a.descriptor,
+				availability_votes: default_availability_votes(),
+				relay_parent_number: System::block_number() - 1,
+				backed_in_number: System::block_number(),
+				backers,
+				backing_group: GroupIndex::from(0),
+			})
+		);
+		assert_eq!(
+			<PendingAvailabilityCommitments<Test>>::get(&chain_a),
+			Some(candidate_a.commitments),
+		);
+
+		// Only one candidate for b will be recorded on chain.
+		assert_eq!(
+			<PendingAvailability<Test>>::get(&chain_b),
+			Some(CandidatePendingAvailability {
+				core: CoreIndex::from(2),
+				hash: candidate_b_2.hash(),
+				descriptor: candidate_b_2.descriptor,
+				availability_votes: default_availability_votes(),
+				relay_parent_number: System::block_number() - 1,
+				backed_in_number: System::block_number(),
+				backers: backing_bitfield(&[4]),
+				backing_group: GroupIndex::from(2),
+			})
+		);
+		assert_eq!(
+			<PendingAvailabilityCommitments<Test>>::get(&chain_b),
+			Some(candidate_b_2.commitments),
+		);
+	});
+}
+
 #[test]
 fn can_include_candidate_with_ok_code_upgrade() {
 	let chain_a = ParaId::from(1_u32);
@@ -1740,14 +1992,15 @@ fn can_include_candidate_with_ok_code_upgrade() {
 			&keystore,
 			&signing_context,
 			BackingKind::Threshold,
+			None,
 		);
 
 		let ProcessedCandidates { core_indices: occupied_cores, .. } =
 			ParaInclusion::process_candidates(
 				&allowed_relay_parents,
-				vec![backed_a],
-				&[chain_a_assignment].into_iter().collect(),
+				vec![(backed_a, chain_a_assignment.1)],
 				&group_validators,
+				false,
 			)
 			.expect("candidates scheduled, in order, and backed");
 
@@ -1932,6 +2185,7 @@ fn check_allowed_relay_parents() {
 			&keystore,
 			&signing_context_a,
 			BackingKind::Threshold,
+			None,
 		);
 
 		let backed_b = back_candidate(
@@ -1941,6 +2195,7 @@ fn check_allowed_relay_parents() {
 			&keystore,
 			&signing_context_b,
 			BackingKind::Threshold,
+			None,
 		);
 
 		let backed_c = back_candidate(
@@ -1950,17 +2205,20 @@ fn check_allowed_relay_parents() {
 			&keystore,
 			&signing_context_c,
 			BackingKind::Threshold,
+			None,
 		);
 
-		let backed_candidates = vec![backed_a, backed_b, backed_c];
+		let backed_candidates = vec![
+			(backed_a, chain_a_assignment.1),
+			(backed_b, chain_b_assignment.1),
+			(backed_c, thread_a_assignment.1),
+		];
 
 		ParaInclusion::process_candidates(
 			&allowed_relay_parents,
 			backed_candidates.clone(),
-			&[chain_a_assignment, chain_b_assignment, thread_a_assignment]
-				.into_iter()
-				.collect(),
 			&group_validators,
+			false,
 		)
 		.expect("candidates scheduled, in order, and backed");
 	});
@@ -2189,14 +2447,15 @@ fn para_upgrade_delay_scheduled_from_inclusion() {
 			&keystore,
 			&signing_context,
 			BackingKind::Threshold,
+			None,
 		);
 
 		let ProcessedCandidates { core_indices: occupied_cores, .. } =
 			ParaInclusion::process_candidates(
 				&allowed_relay_parents,
-				vec![backed_a],
-				&[chain_a_assignment].into_iter().collect(),
+				vec![(backed_a, chain_a_assignment.1)],
 				&group_validators,
+				false,
 			)
 			.expect("candidates scheduled, in order, and backed");
 
diff --git a/polkadot/runtime/parachains/src/paras_inherent/benchmarking.rs b/polkadot/runtime/parachains/src/paras_inherent/benchmarking.rs
index 0f6b23ae1b3..ad3fa8e0dc7 100644
--- a/polkadot/runtime/parachains/src/paras_inherent/benchmarking.rs
+++ b/polkadot/runtime/parachains/src/paras_inherent/benchmarking.rs
@@ -120,7 +120,7 @@ benchmarks! {
 		// with `v` validity votes.
 		// let votes = v as usize;
 		let votes = min(scheduler::Pallet::<T>::group_validators(GroupIndex::from(0)).unwrap().len(), v as usize);
-		assert_eq!(benchmark.backed_candidates.get(0).unwrap().validity_votes.len(), votes);
+		assert_eq!(benchmark.backed_candidates.get(0).unwrap().validity_votes().len(), votes);
 
 		benchmark.bitfields.clear();
 		benchmark.disputes.clear();
@@ -177,7 +177,7 @@ benchmarks! {
 		// There is 1 backed
 		assert_eq!(benchmark.backed_candidates.len(), 1);
 		assert_eq!(
-			benchmark.backed_candidates.get(0).unwrap().validity_votes.len(),
+			benchmark.backed_candidates.get(0).unwrap().validity_votes().len(),
 			votes,
 		);
 
diff --git a/polkadot/runtime/parachains/src/paras_inherent/mod.rs b/polkadot/runtime/parachains/src/paras_inherent/mod.rs
index 81e092f0a99..cebf858c24a 100644
--- a/polkadot/runtime/parachains/src/paras_inherent/mod.rs
+++ b/polkadot/runtime/parachains/src/paras_inherent/mod.rs
@@ -43,15 +43,14 @@ use frame_support::{
 use frame_system::pallet_prelude::*;
 use pallet_babe::{self, ParentBlockRandomness};
 use primitives::{
-	effective_minimum_backing_votes, BackedCandidate, CandidateHash, CandidateReceipt,
-	CheckedDisputeStatementSet, CheckedMultiDisputeStatementSet, CoreIndex, DisputeStatementSet,
-	InherentData as ParachainsInherentData, MultiDisputeStatementSet, ScrapedOnChainVotes,
-	SessionIndex, SignedAvailabilityBitfields, SigningContext, UncheckedSignedAvailabilityBitfield,
-	UncheckedSignedAvailabilityBitfields, ValidatorId, ValidatorIndex, ValidityAttestation,
-	PARACHAINS_INHERENT_IDENTIFIER,
+	effective_minimum_backing_votes, vstaging::node_features::FeatureIndex, BackedCandidate,
+	CandidateHash, CandidateReceipt, CheckedDisputeStatementSet, CheckedMultiDisputeStatementSet,
+	CoreIndex, DisputeStatementSet, InherentData as ParachainsInherentData,
+	MultiDisputeStatementSet, ScrapedOnChainVotes, SessionIndex, SignedAvailabilityBitfields,
+	SigningContext, UncheckedSignedAvailabilityBitfield, UncheckedSignedAvailabilityBitfields,
+	ValidatorId, ValidatorIndex, ValidityAttestation, PARACHAINS_INHERENT_IDENTIFIER,
 };
 use rand::{seq::SliceRandom, SeedableRng};
-
 use scale_info::TypeInfo;
 use sp_runtime::traits::{Header as HeaderT, One};
 use sp_std::{
@@ -145,6 +144,10 @@ pub mod pallet {
 		DisputeInvalid,
 		/// A candidate was backed by a disabled validator
 		BackedByDisabled,
+		/// A candidate was backed even though the paraid was not scheduled.
+		BackedOnUnscheduledCore,
+		/// Too many candidates supplied.
+		UnscheduledCandidate,
 	}
 
 	/// Whether the paras inherent was included within this block.
@@ -585,25 +588,39 @@ impl<T: Config> Pallet<T> {
 		let freed = collect_all_freed_cores::<T, _>(freed_concluded.iter().cloned());
 
 		<scheduler::Pallet<T>>::free_cores_and_fill_claimqueue(freed, now);
-		let scheduled = <scheduler::Pallet<T>>::scheduled_paras()
-			.map(|(core_idx, para_id)| (para_id, core_idx))
-			.collect();
 
 		METRICS.on_candidates_processed_total(backed_candidates.len() as u64);
 
-		let SanitizedBackedCandidates { backed_candidates, votes_from_disabled_were_dropped } =
-			sanitize_backed_candidates::<T, _>(
-				backed_candidates,
-				&allowed_relay_parents,
-				|candidate_idx: usize,
-				 backed_candidate: &BackedCandidate<<T as frame_system::Config>::Hash>|
-				 -> bool {
-					let para_id = backed_candidate.descriptor().para_id;
-					let prev_context = <paras::Pallet<T>>::para_most_recent_context(para_id);
-					let check_ctx = CandidateCheckContext::<T>::new(prev_context);
-
-					// never include a concluded-invalid candidate
-					current_concluded_invalid_disputes.contains(&backed_candidate.hash()) ||
+		let core_index_enabled = configuration::Pallet::<T>::config()
+			.node_features
+			.get(FeatureIndex::ElasticScalingMVP as usize)
+			.map(|b| *b)
+			.unwrap_or(false);
+
+		let mut scheduled: BTreeMap<ParaId, BTreeSet<CoreIndex>> = BTreeMap::new();
+		let mut total_scheduled_cores = 0;
+
+		for (core_idx, para_id) in <scheduler::Pallet<T>>::scheduled_paras() {
+			total_scheduled_cores += 1;
+			scheduled.entry(para_id).or_default().insert(core_idx);
+		}
+
+		let SanitizedBackedCandidates {
+			backed_candidates_with_core,
+			votes_from_disabled_were_dropped,
+			dropped_unscheduled_candidates,
+		} = sanitize_backed_candidates::<T, _>(
+			backed_candidates,
+			&allowed_relay_parents,
+			|candidate_idx: usize,
+			 backed_candidate: &BackedCandidate<<T as frame_system::Config>::Hash>|
+			 -> bool {
+				let para_id = backed_candidate.descriptor().para_id;
+				let prev_context = <paras::Pallet<T>>::para_most_recent_context(para_id);
+				let check_ctx = CandidateCheckContext::<T>::new(prev_context);
+
+				// never include a concluded-invalid candidate
+				current_concluded_invalid_disputes.contains(&backed_candidate.hash()) ||
 					// Instead of checking the candidates with code upgrades twice
 					// move the checking up here and skip it in the training wheels fallback.
 					// That way we avoid possible duplicate checks while assuring all
@@ -611,13 +628,19 @@ impl<T: Config> Pallet<T> {
 					//
 					// NOTE: this is the only place where we check the relay-parent.
 					check_ctx
-						.verify_backed_candidate(&allowed_relay_parents, candidate_idx, backed_candidate)
+						.verify_backed_candidate(&allowed_relay_parents, candidate_idx, backed_candidate.candidate())
 						.is_err()
-				},
-				&scheduled,
-			);
+			},
+			scheduled,
+			core_index_enabled,
+		);
+
+		ensure!(
+			backed_candidates_with_core.len() <= total_scheduled_cores,
+			Error::<T>::UnscheduledCandidate
+		);
 
-		METRICS.on_candidates_sanitized(backed_candidates.len() as u64);
+		METRICS.on_candidates_sanitized(backed_candidates_with_core.len() as u64);
 
 		// In `Enter` context (invoked during execution) there should be no backing votes from
 		// disabled validators because they should have been filtered out during inherent data
@@ -626,15 +649,22 @@ impl<T: Config> Pallet<T> {
 			ensure!(!votes_from_disabled_were_dropped, Error::<T>::BackedByDisabled);
 		}
 
+		// In `Enter` context (invoked during execution) we shouldn't have filtered any candidates
+		// due to a para not being scheduled. They have been filtered during inherent data
+		// preparation (`ProvideInherent` context). Abort in such cases.
+		if context == ProcessInherentDataContext::Enter {
+			ensure!(!dropped_unscheduled_candidates, Error::<T>::BackedOnUnscheduledCore);
+		}
+
 		// Process backed candidates according to scheduled cores.
 		let inclusion::ProcessedCandidates::<<HeaderFor<T> as HeaderT>::Hash> {
 			core_indices: occupied,
 			candidate_receipt_with_backing_validator_indices,
 		} = <inclusion::Pallet<T>>::process_candidates(
 			&allowed_relay_parents,
-			backed_candidates.clone(),
-			&scheduled,
+			backed_candidates_with_core.clone(),
 			<scheduler::Pallet<T>>::group_validators,
+			core_index_enabled,
 		)?;
 		// Note which of the scheduled cores were actually occupied by a backed candidate.
 		<scheduler::Pallet<T>>::occupied(occupied.into_iter().map(|e| (e.0, e.1)).collect());
@@ -651,8 +681,15 @@ impl<T: Config> Pallet<T> {
 
 		let bitfields = bitfields.into_iter().map(|v| v.into_unchecked()).collect();
 
-		let processed =
-			ParachainsInherentData { bitfields, backed_candidates, disputes, parent_header };
+		let processed = ParachainsInherentData {
+			bitfields,
+			backed_candidates: backed_candidates_with_core
+				.into_iter()
+				.map(|(candidate, _)| candidate)
+				.collect(),
+			disputes,
+			parent_header,
+		};
 		Ok((processed, Some(all_weight_after).into()))
 	}
 }
@@ -774,7 +811,7 @@ fn apply_weight_limit<T: Config + inclusion::Config>(
 		.iter()
 		.enumerate()
 		.filter_map(|(idx, candidate)| {
-			candidate.candidate.commitments.new_validation_code.as_ref().map(|_code| idx)
+			candidate.candidate().commitments.new_validation_code.as_ref().map(|_code| idx)
 		})
 		.collect::<Vec<usize>>();
 
@@ -916,16 +953,22 @@ pub(crate) fn sanitize_bitfields<T: crate::inclusion::Config>(
 // Result from `sanitize_backed_candidates`
 #[derive(Debug, PartialEq)]
 struct SanitizedBackedCandidates<Hash> {
-	// Sanitized backed candidates. The `Vec` is sorted according to the occupied core index.
-	backed_candidates: Vec<BackedCandidate<Hash>>,
+	// Sanitized backed candidates along with the assigned core. The `Vec` is sorted according to
+	// the occupied core index.
+	backed_candidates_with_core: Vec<(BackedCandidate<Hash>, CoreIndex)>,
 	// Set to true if any votes from disabled validators were dropped from the input.
 	votes_from_disabled_were_dropped: bool,
+	// Set to true if any candidates were dropped due to filtering done in
+	// `map_candidates_to_cores`
+	dropped_unscheduled_candidates: bool,
 }
 
 /// Filter out:
 /// 1. any candidates that have a concluded invalid dispute
-/// 2. all backing votes from disabled validators
-/// 3. any candidates that end up with less than `effective_minimum_backing_votes` backing votes
+/// 2. any unscheduled candidates, as well as candidates whose paraid has multiple cores assigned
+///    but have no injected core index.
+/// 3. all backing votes from disabled validators
+/// 4. any candidates that end up with less than `effective_minimum_backing_votes` backing votes
 ///
 /// `scheduled` follows the same naming scheme as provided in the
 /// guide: Currently `free` but might become `occupied`.
@@ -944,7 +987,8 @@ fn sanitize_backed_candidates<
 	mut backed_candidates: Vec<BackedCandidate<T::Hash>>,
 	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
 	mut candidate_has_concluded_invalid_dispute_or_is_invalid: F,
-	scheduled: &BTreeMap<ParaId, CoreIndex>,
+	scheduled: BTreeMap<ParaId, BTreeSet<CoreIndex>>,
+	core_index_enabled: bool,
 ) -> SanitizedBackedCandidates<T::Hash> {
 	// Remove any candidates that were concluded invalid.
 	// This does not assume sorting.
@@ -952,22 +996,23 @@ fn sanitize_backed_candidates<
 		!candidate_has_concluded_invalid_dispute_or_is_invalid(candidate_idx, backed_candidate)
 	});
 
-	// Assure the backed candidate's `ParaId`'s core is free.
-	// This holds under the assumption that `Scheduler::schedule` is called _before_.
-	// We don't check the relay-parent because this is done in the closure when
-	// constructing the inherent and during actual processing otherwise.
-
-	backed_candidates.retain(|backed_candidate| {
-		let desc = backed_candidate.descriptor();
+	let initial_candidate_count = backed_candidates.len();
+	// Map candidates to scheduled cores. Filter out any unscheduled candidates.
+	let mut backed_candidates_with_core = map_candidates_to_cores::<T>(
+		&allowed_relay_parents,
+		scheduled,
+		core_index_enabled,
+		backed_candidates,
+	);
 
-		scheduled.get(&desc.para_id).is_some()
-	});
+	let dropped_unscheduled_candidates =
+		initial_candidate_count != backed_candidates_with_core.len();
 
 	// Filter out backing statements from disabled validators
-	let dropped_disabled = filter_backed_statements_from_disabled_validators::<T>(
-		&mut backed_candidates,
+	let votes_from_disabled_were_dropped = filter_backed_statements_from_disabled_validators::<T>(
+		&mut backed_candidates_with_core,
 		&allowed_relay_parents,
-		scheduled,
+		core_index_enabled,
 	);
 
 	// Sort the `Vec` last, once there is a guarantee that these
@@ -975,14 +1020,12 @@ fn sanitize_backed_candidates<
 	// but more importantly are scheduled for a free core.
 	// This both avoids extra work for obviously invalid candidates,
 	// but also allows this to be done in place.
-	backed_candidates.sort_by(|x, y| {
-		// Never panics, since we filtered all panic arguments out in the previous `fn retain`.
-		scheduled[&x.descriptor().para_id].cmp(&scheduled[&y.descriptor().para_id])
-	});
+	backed_candidates_with_core.sort_by(|(_x, core_x), (_y, core_y)| core_x.cmp(&core_y));
 
 	SanitizedBackedCandidates {
-		backed_candidates,
-		votes_from_disabled_were_dropped: dropped_disabled,
+		dropped_unscheduled_candidates,
+		votes_from_disabled_were_dropped,
+		backed_candidates_with_core,
 	}
 }
 
@@ -1071,9 +1114,12 @@ fn limit_and_sanitize_disputes<
 // few more sanity checks. Returns `true` if at least one statement is removed and `false`
 // otherwise.
 fn filter_backed_statements_from_disabled_validators<T: shared::Config + scheduler::Config>(
-	backed_candidates: &mut Vec<BackedCandidate<<T as frame_system::Config>::Hash>>,
+	backed_candidates_with_core: &mut Vec<(
+		BackedCandidate<<T as frame_system::Config>::Hash>,
+		CoreIndex,
+	)>,
 	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
-	scheduled: &BTreeMap<ParaId, CoreIndex>,
+	core_index_enabled: bool,
 ) -> bool {
 	let disabled_validators =
 		BTreeSet::<_>::from_iter(shared::Pallet::<T>::disabled_validators().into_iter());
@@ -1083,7 +1129,7 @@ fn filter_backed_statements_from_disabled_validators<T: shared::Config + schedul
 		return false
 	}
 
-	let backed_len_before = backed_candidates.len();
+	let backed_len_before = backed_candidates_with_core.len();
 
 	// Flag which will be returned. Set to `true` if at least one vote is filtered.
 	let mut filtered = false;
@@ -1094,15 +1140,9 @@ fn filter_backed_statements_from_disabled_validators<T: shared::Config + schedul
 	// the validator group assigned to the parachain. To obtain this group we need:
 	// 1. Core index assigned to the parachain which has produced the candidate
 	// 2. The relay chain block number of the candidate
-	backed_candidates.retain_mut(|bc| {
-		// Get `core_idx` assigned to the `para_id` of the candidate
-		let core_idx = match scheduled.get(&bc.descriptor().para_id) {
-			Some(core_idx) => *core_idx,
-			None => {
-				log::debug!(target: LOG_TARGET, "Can't get core idx of a backed candidate for para id {:?}. Dropping the candidate.", bc.descriptor().para_id);
-				return false
-			}
-		};
+	backed_candidates_with_core.retain_mut(|(bc, core_idx)| {
+		let (validator_indices, maybe_core_index) = bc.validator_indices_and_core_index(core_index_enabled);
+		let mut validator_indices = BitVec::<_>::from(validator_indices);
 
 		// Get relay parent block number of the candidate. We need this to get the group index assigned to this core at this block number
 		let relay_parent_block_number = match allowed_relay_parents
@@ -1116,7 +1156,7 @@ fn filter_backed_statements_from_disabled_validators<T: shared::Config + schedul
 
 		// Get the group index for the core
 		let group_idx = match <scheduler::Pallet<T>>::group_assigned_to_core(
-			core_idx,
+			*core_idx,
 			relay_parent_block_number + One::one(),
 		) {
 			Some(group_idx) => group_idx,
@@ -1138,12 +1178,15 @@ fn filter_backed_statements_from_disabled_validators<T: shared::Config + schedul
 		// Bitmask with the disabled indices within the validator group
 		let disabled_indices = BitVec::<u8, bitvec::order::Lsb0>::from_iter(validator_group.iter().map(|idx| disabled_validators.contains(idx)));
 		// The indices of statements from disabled validators in `BackedCandidate`. We have to drop these.
-		let indices_to_drop = disabled_indices.clone() & &bc.validator_indices;
+		let indices_to_drop = disabled_indices.clone() & &validator_indices;
 		// Apply the bitmask to drop the disabled validator from `validator_indices`
-		bc.validator_indices &= !disabled_indices;
+		validator_indices &= !disabled_indices;
+		// Update the backed candidate
+		bc.set_validator_indices_and_core_index(validator_indices, maybe_core_index);
+
 		// Remove the corresponding votes from `validity_votes`
 		for idx in indices_to_drop.iter_ones().rev() {
-			bc.validity_votes.remove(idx);
+			bc.validity_votes_mut().remove(idx);
 		}
 
 		// If at least one statement was dropped we need to return `true`
@@ -1154,10 +1197,9 @@ fn filter_backed_statements_from_disabled_validators<T: shared::Config + schedul
 		// By filtering votes we might render the candidate invalid and cause a failure in
 		// [`process_candidates`]. To avoid this we have to perform a sanity check here. If there
 		// are not enough backing votes after filtering we will remove the whole candidate.
-		if bc.validity_votes.len() < effective_minimum_backing_votes(
+		if bc.validity_votes().len() < effective_minimum_backing_votes(
 			validator_group.len(),
 			minimum_backing_votes
-
 		) {
 			return false
 		}
@@ -1166,5 +1208,101 @@ fn filter_backed_statements_from_disabled_validators<T: shared::Config + schedul
 	});
 
 	// Also return `true` if a whole candidate was dropped from the set
-	filtered || backed_len_before != backed_candidates.len()
+	filtered || backed_len_before != backed_candidates_with_core.len()
+}
+
+/// Map candidates to scheduled cores.
+/// If the para only has one scheduled core and no `CoreIndex` is injected, map the candidate to the
+/// single core. If the para has multiple cores scheduled, only map the candidates which have a
+/// proper core injected. Filter out the rest.
+/// Also returns whether or not we dropped any candidates.
+fn map_candidates_to_cores<T: configuration::Config + scheduler::Config + inclusion::Config>(
+	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
+	mut scheduled: BTreeMap<ParaId, BTreeSet<CoreIndex>>,
+	core_index_enabled: bool,
+	candidates: Vec<BackedCandidate<T::Hash>>,
+) -> Vec<(BackedCandidate<T::Hash>, CoreIndex)> {
+	let mut backed_candidates_with_core = Vec::with_capacity(candidates.len());
+
+	// We keep a candidate if the parachain has only one core assigned or if
+	// a core index is provided by block author and it's indeed scheduled.
+	for backed_candidate in candidates {
+		let maybe_injected_core_index = get_injected_core_index::<T>(
+			allowed_relay_parents,
+			&backed_candidate,
+			core_index_enabled,
+		);
+
+		let scheduled_cores = scheduled.get_mut(&backed_candidate.descriptor().para_id);
+		// Candidates without scheduled cores are silently filtered out.
+		if let Some(scheduled_cores) = scheduled_cores {
+			if let Some(core_idx) = maybe_injected_core_index {
+				if scheduled_cores.contains(&core_idx) {
+					scheduled_cores.remove(&core_idx);
+					backed_candidates_with_core.push((backed_candidate, core_idx));
+				}
+			} else if scheduled_cores.len() == 1 {
+				backed_candidates_with_core
+					.push((backed_candidate, scheduled_cores.pop_first().expect("Length is 1")));
+			}
+		}
+	}
+
+	backed_candidates_with_core
+}
+
+fn get_injected_core_index<T: configuration::Config + scheduler::Config + inclusion::Config>(
+	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
+	candidate: &BackedCandidate<T::Hash>,
+	core_index_enabled: bool,
+) -> Option<CoreIndex> {
+	// After stripping the 8 bit extensions, the `validator_indices` field length is expected
+	// to be equal to backing group size. If these don't match, the `CoreIndex` is badly encoded,
+	// or not supported.
+	let (validator_indices, maybe_core_idx) =
+		candidate.validator_indices_and_core_index(core_index_enabled);
+
+	let Some(core_idx) = maybe_core_idx else { return None };
+
+	let relay_parent_block_number =
+		match allowed_relay_parents.acquire_info(candidate.descriptor().relay_parent, None) {
+			Some((_, block_num)) => block_num,
+			None => {
+				log::debug!(
+					target: LOG_TARGET,
+					"Relay parent {:?} for candidate {:?} is not in the allowed relay parents. Dropping the candidate.",
+					candidate.descriptor().relay_parent,
+					candidate.candidate().hash(),
+				);
+				return None
+			},
+		};
+
+	// Get the backing group of the candidate backed at `core_idx`.
+	let group_idx = match <scheduler::Pallet<T>>::group_assigned_to_core(
+		core_idx,
+		relay_parent_block_number + One::one(),
+	) {
+		Some(group_idx) => group_idx,
+		None => {
+			log::debug!(
+				target: LOG_TARGET,
+				"Can't get the group index for core idx {:?}. Dropping the candidate {:?}.",
+				core_idx,
+				candidate.candidate().hash(),
+			);
+			return None
+		},
+	};
+
+	let group_validators = match <scheduler::Pallet<T>>::group_validators(group_idx) {
+		Some(validators) => validators,
+		None => return None,
+	};
+
+	if group_validators.len() == validator_indices.len() {
+		Some(core_idx)
+	} else {
+		None
+	}
 }
diff --git a/polkadot/runtime/parachains/src/paras_inherent/tests.rs b/polkadot/runtime/parachains/src/paras_inherent/tests.rs
index 6f3eac35685..defb2f4404f 100644
--- a/polkadot/runtime/parachains/src/paras_inherent/tests.rs
+++ b/polkadot/runtime/parachains/src/paras_inherent/tests.rs
@@ -26,7 +26,10 @@ mod enter {
 	use crate::{
 		builder::{Bench, BenchBuilder},
 		mock::{mock_assigner, new_test_ext, BlockLength, BlockWeights, MockGenesisConfig, Test},
-		scheduler::common::Assignment,
+		scheduler::{
+			common::{Assignment, AssignmentProvider, AssignmentProviderConfig},
+			ParasEntry,
+		},
 	};
 	use assert_matches::assert_matches;
 	use frame_support::assert_ok;
@@ -697,6 +700,25 @@ mod enter {
 				2
 			);
 
+			// One core was scheduled. We should put the assignment back, before calling enter().
+			let now = <frame_system::Pallet<Test>>::block_number() + 1;
+			let used_cores = 5;
+			let cores = (0..used_cores)
+				.into_iter()
+				.map(|i| {
+					let AssignmentProviderConfig { ttl, .. } =
+						scheduler::Pallet::<Test>::assignment_provider_config(CoreIndex(i));
+					// Load an assignment into provider so that one is present to pop
+					let assignment =
+						<Test as scheduler::Config>::AssignmentProvider::get_mock_assignment(
+							CoreIndex(i),
+							ParaId::from(i),
+						);
+					(CoreIndex(i), [ParasEntry::new(assignment, now + ttl)].into())
+				})
+				.collect();
+			scheduler::ClaimQueue::<Test>::set(cores);
+
 			assert_ok!(Pallet::<Test>::enter(
 				frame_system::RawOrigin::None.into(),
 				limit_inherent_data,
@@ -980,6 +1002,7 @@ mod sanitizers {
 		AvailabilityBitfield, GroupIndex, Hash, Id as ParaId, SignedAvailabilityBitfield,
 		ValidatorIndex,
 	};
+	use rstest::rstest;
 	use sp_core::crypto::UncheckedFrom;
 
 	use crate::mock::Test;
@@ -1238,12 +1261,13 @@ mod sanitizers {
 		// Backed candidates and scheduled parachains used for `sanitize_backed_candidates` testing
 		struct TestData {
 			backed_candidates: Vec<BackedCandidate>,
-			scheduled_paras: BTreeMap<primitives::Id, CoreIndex>,
+			all_backed_candidates_with_core: Vec<(BackedCandidate, CoreIndex)>,
+			scheduled_paras: BTreeMap<primitives::Id, BTreeSet<CoreIndex>>,
 		}
 
 		// Generate test data for the candidates and assert that the evnironment is set as expected
 		// (check the comments for details)
-		fn get_test_data() -> TestData {
+		fn get_test_data(core_index_enabled: bool) -> TestData {
 			const RELAY_PARENT_NUM: u32 = 3;
 
 			// Add the relay parent to `shared` pallet. Otherwise some code (e.g. filtering backing
@@ -1285,9 +1309,14 @@ mod sanitizers {
 			shared::Pallet::<Test>::set_active_validators_ascending(validator_ids);
 
 			// Two scheduled parachains - ParaId(1) on CoreIndex(0) and ParaId(2) on CoreIndex(1)
-			let scheduled = (0_usize..2)
+			let scheduled: BTreeMap<ParaId, BTreeSet<CoreIndex>> = (0_usize..2)
 				.into_iter()
-				.map(|idx| (ParaId::from(1_u32 + idx as u32), CoreIndex::from(idx as u32)))
+				.map(|idx| {
+					(
+						ParaId::from(1_u32 + idx as u32),
+						[CoreIndex::from(idx as u32)].into_iter().collect(),
+					)
+				})
 				.collect::<BTreeMap<_, _>>();
 
 			// Set the validator groups in `scheduler`
@@ -1301,7 +1330,7 @@ mod sanitizers {
 				(
 					CoreIndex::from(0),
 					VecDeque::from([ParasEntry::new(
-						Assignment::Pool { para_id: 1.into(), core_index: CoreIndex(1) },
+						Assignment::Pool { para_id: 1.into(), core_index: CoreIndex(0) },
 						RELAY_PARENT_NUM,
 					)]),
 				),
@@ -1319,12 +1348,12 @@ mod sanitizers {
 				match group_index {
 					group_index if group_index == GroupIndex::from(0) => Some(vec![0, 1]),
 					group_index if group_index == GroupIndex::from(1) => Some(vec![2, 3]),
-					_ => panic!("Group index out of bounds for 2 parachains and 1 parathread core"),
+					_ => panic!("Group index out of bounds"),
 				}
 				.map(|m| m.into_iter().map(ValidatorIndex).collect::<Vec<_>>())
 			};
 
-			// Two backed candidates from each parachain
+			// One backed candidate from each parachain
 			let backed_candidates = (0_usize..2)
 				.into_iter()
 				.map(|idx0| {
@@ -1348,6 +1377,7 @@ mod sanitizers {
 						&keystore,
 						&signing_context,
 						BackingKind::Threshold,
+						core_index_enabled.then_some(CoreIndex(idx0 as u32)),
 					);
 					backed
 				})
@@ -1369,13 +1399,373 @@ mod sanitizers {
 				]
 			);
 
-			TestData { backed_candidates, scheduled_paras: scheduled }
+			let all_backed_candidates_with_core = backed_candidates
+				.iter()
+				.map(|candidate| {
+					// Only one entry for this test data.
+					(
+						candidate.clone(),
+						scheduled
+							.get(&candidate.descriptor().para_id)
+							.unwrap()
+							.first()
+							.copied()
+							.unwrap(),
+					)
+				})
+				.collect();
+
+			TestData {
+				backed_candidates,
+				scheduled_paras: scheduled,
+				all_backed_candidates_with_core,
+			}
 		}
 
-		#[test]
-		fn happy_path() {
+		// Generate test data for the candidates and assert that the evnironment is set as expected
+		// (check the comments for details)
+		// Para 1 scheduled on core 0 and core 1. Two candidates are supplied.
+		// Para 2 scheduled on cores 2 and 3. One candidate supplied.
+		// Para 3 scheduled on core 4. One candidate supplied.
+		// Para 4 scheduled on core 5. Two candidates supplied.
+		// Para 5 scheduled on core 6. No candidates supplied.
+		fn get_test_data_multiple_cores_per_para(core_index_enabled: bool) -> TestData {
+			const RELAY_PARENT_NUM: u32 = 3;
+
+			// Add the relay parent to `shared` pallet. Otherwise some code (e.g. filtering backing
+			// votes) won't behave correctly
+			shared::Pallet::<Test>::add_allowed_relay_parent(
+				default_header().hash(),
+				Default::default(),
+				RELAY_PARENT_NUM,
+				1,
+			);
+
+			let header = default_header();
+			let relay_parent = header.hash();
+			let session_index = SessionIndex::from(0_u32);
+
+			let keystore = LocalKeystore::in_memory();
+			let keystore = Arc::new(keystore) as KeystorePtr;
+			let signing_context = SigningContext { parent_hash: relay_parent, session_index };
+
+			let validators = vec![
+				keyring::Sr25519Keyring::Alice,
+				keyring::Sr25519Keyring::Bob,
+				keyring::Sr25519Keyring::Charlie,
+				keyring::Sr25519Keyring::Dave,
+				keyring::Sr25519Keyring::Eve,
+				keyring::Sr25519Keyring::Ferdie,
+				keyring::Sr25519Keyring::One,
+			];
+			for validator in validators.iter() {
+				Keystore::sr25519_generate_new(
+					&*keystore,
+					PARACHAIN_KEY_TYPE_ID,
+					Some(&validator.to_seed()),
+				)
+				.unwrap();
+			}
+
+			// Set active validators in `shared` pallet
+			let validator_ids =
+				validators.iter().map(|v| v.public().into()).collect::<Vec<ValidatorId>>();
+			shared::Pallet::<Test>::set_active_validators_ascending(validator_ids);
+
+			// Set the validator groups in `scheduler`
+			scheduler::Pallet::<Test>::set_validator_groups(vec![
+				vec![ValidatorIndex(0)],
+				vec![ValidatorIndex(1)],
+				vec![ValidatorIndex(2)],
+				vec![ValidatorIndex(3)],
+				vec![ValidatorIndex(4)],
+				vec![ValidatorIndex(5)],
+				vec![ValidatorIndex(6)],
+			]);
+
+			// Update scheduler's claimqueue with the parachains
+			scheduler::Pallet::<Test>::set_claimqueue(BTreeMap::from([
+				(
+					CoreIndex::from(0),
+					VecDeque::from([ParasEntry::new(
+						Assignment::Pool { para_id: 1.into(), core_index: CoreIndex(0) },
+						RELAY_PARENT_NUM,
+					)]),
+				),
+				(
+					CoreIndex::from(1),
+					VecDeque::from([ParasEntry::new(
+						Assignment::Pool { para_id: 1.into(), core_index: CoreIndex(1) },
+						RELAY_PARENT_NUM,
+					)]),
+				),
+				(
+					CoreIndex::from(2),
+					VecDeque::from([ParasEntry::new(
+						Assignment::Pool { para_id: 2.into(), core_index: CoreIndex(2) },
+						RELAY_PARENT_NUM,
+					)]),
+				),
+				(
+					CoreIndex::from(3),
+					VecDeque::from([ParasEntry::new(
+						Assignment::Pool { para_id: 2.into(), core_index: CoreIndex(3) },
+						RELAY_PARENT_NUM,
+					)]),
+				),
+				(
+					CoreIndex::from(4),
+					VecDeque::from([ParasEntry::new(
+						Assignment::Pool { para_id: 3.into(), core_index: CoreIndex(4) },
+						RELAY_PARENT_NUM,
+					)]),
+				),
+				(
+					CoreIndex::from(5),
+					VecDeque::from([ParasEntry::new(
+						Assignment::Pool { para_id: 4.into(), core_index: CoreIndex(5) },
+						RELAY_PARENT_NUM,
+					)]),
+				),
+				(
+					CoreIndex::from(6),
+					VecDeque::from([ParasEntry::new(
+						Assignment::Pool { para_id: 5.into(), core_index: CoreIndex(6) },
+						RELAY_PARENT_NUM,
+					)]),
+				),
+			]));
+
+			// Callback used for backing candidates
+			let group_validators = |group_index: GroupIndex| {
+				match group_index {
+					group_index if group_index == GroupIndex::from(0) => Some(vec![0]),
+					group_index if group_index == GroupIndex::from(1) => Some(vec![1]),
+					group_index if group_index == GroupIndex::from(2) => Some(vec![2]),
+					group_index if group_index == GroupIndex::from(3) => Some(vec![3]),
+					group_index if group_index == GroupIndex::from(4) => Some(vec![4]),
+					group_index if group_index == GroupIndex::from(5) => Some(vec![5]),
+					group_index if group_index == GroupIndex::from(6) => Some(vec![6]),
+
+					_ => panic!("Group index out of bounds"),
+				}
+				.map(|m| m.into_iter().map(ValidatorIndex).collect::<Vec<_>>())
+			};
+
+			let mut backed_candidates = vec![];
+			let mut all_backed_candidates_with_core = vec![];
+
+			// Para 1
+			{
+				let mut candidate = TestCandidateBuilder {
+					para_id: ParaId::from(1),
+					relay_parent,
+					pov_hash: Hash::repeat_byte(1 as u8),
+					persisted_validation_data_hash: [42u8; 32].into(),
+					hrmp_watermark: RELAY_PARENT_NUM,
+					..Default::default()
+				}
+				.build();
+
+				collator_sign_candidate(Sr25519Keyring::One, &mut candidate);
+
+				let backed: BackedCandidate = back_candidate(
+					candidate,
+					&validators,
+					group_validators(GroupIndex::from(0 as u32)).unwrap().as_ref(),
+					&keystore,
+					&signing_context,
+					BackingKind::Threshold,
+					core_index_enabled.then_some(CoreIndex(0 as u32)),
+				);
+				backed_candidates.push(backed.clone());
+				if core_index_enabled {
+					all_backed_candidates_with_core.push((backed, CoreIndex(0)));
+				}
+
+				let mut candidate = TestCandidateBuilder {
+					para_id: ParaId::from(1),
+					relay_parent,
+					pov_hash: Hash::repeat_byte(2 as u8),
+					persisted_validation_data_hash: [42u8; 32].into(),
+					hrmp_watermark: RELAY_PARENT_NUM,
+					..Default::default()
+				}
+				.build();
+
+				collator_sign_candidate(Sr25519Keyring::One, &mut candidate);
+
+				let backed = back_candidate(
+					candidate,
+					&validators,
+					group_validators(GroupIndex::from(1 as u32)).unwrap().as_ref(),
+					&keystore,
+					&signing_context,
+					BackingKind::Threshold,
+					core_index_enabled.then_some(CoreIndex(1 as u32)),
+				);
+				backed_candidates.push(backed.clone());
+				if core_index_enabled {
+					all_backed_candidates_with_core.push((backed, CoreIndex(1)));
+				}
+			}
+
+			// Para 2
+			{
+				let mut candidate = TestCandidateBuilder {
+					para_id: ParaId::from(2),
+					relay_parent,
+					pov_hash: Hash::repeat_byte(3 as u8),
+					persisted_validation_data_hash: [42u8; 32].into(),
+					hrmp_watermark: RELAY_PARENT_NUM,
+					..Default::default()
+				}
+				.build();
+
+				collator_sign_candidate(Sr25519Keyring::One, &mut candidate);
+
+				let backed = back_candidate(
+					candidate,
+					&validators,
+					group_validators(GroupIndex::from(2 as u32)).unwrap().as_ref(),
+					&keystore,
+					&signing_context,
+					BackingKind::Threshold,
+					core_index_enabled.then_some(CoreIndex(2 as u32)),
+				);
+				backed_candidates.push(backed.clone());
+				if core_index_enabled {
+					all_backed_candidates_with_core.push((backed, CoreIndex(2)));
+				}
+			}
+
+			// Para 3
+			{
+				let mut candidate = TestCandidateBuilder {
+					para_id: ParaId::from(3),
+					relay_parent,
+					pov_hash: Hash::repeat_byte(4 as u8),
+					persisted_validation_data_hash: [42u8; 32].into(),
+					hrmp_watermark: RELAY_PARENT_NUM,
+					..Default::default()
+				}
+				.build();
+
+				collator_sign_candidate(Sr25519Keyring::One, &mut candidate);
+
+				let backed = back_candidate(
+					candidate,
+					&validators,
+					group_validators(GroupIndex::from(4 as u32)).unwrap().as_ref(),
+					&keystore,
+					&signing_context,
+					BackingKind::Threshold,
+					core_index_enabled.then_some(CoreIndex(4 as u32)),
+				);
+				backed_candidates.push(backed.clone());
+				all_backed_candidates_with_core.push((backed, CoreIndex(4)));
+			}
+
+			// Para 4
+			{
+				let mut candidate = TestCandidateBuilder {
+					para_id: ParaId::from(4),
+					relay_parent,
+					pov_hash: Hash::repeat_byte(5 as u8),
+					persisted_validation_data_hash: [42u8; 32].into(),
+					hrmp_watermark: RELAY_PARENT_NUM,
+					..Default::default()
+				}
+				.build();
+
+				collator_sign_candidate(Sr25519Keyring::One, &mut candidate);
+
+				let backed = back_candidate(
+					candidate,
+					&validators,
+					group_validators(GroupIndex::from(5 as u32)).unwrap().as_ref(),
+					&keystore,
+					&signing_context,
+					BackingKind::Threshold,
+					None,
+				);
+				backed_candidates.push(backed.clone());
+				all_backed_candidates_with_core.push((backed, CoreIndex(5)));
+
+				let mut candidate = TestCandidateBuilder {
+					para_id: ParaId::from(4),
+					relay_parent,
+					pov_hash: Hash::repeat_byte(6 as u8),
+					persisted_validation_data_hash: [42u8; 32].into(),
+					hrmp_watermark: RELAY_PARENT_NUM,
+					..Default::default()
+				}
+				.build();
+
+				collator_sign_candidate(Sr25519Keyring::One, &mut candidate);
+
+				let backed = back_candidate(
+					candidate,
+					&validators,
+					group_validators(GroupIndex::from(5 as u32)).unwrap().as_ref(),
+					&keystore,
+					&signing_context,
+					BackingKind::Threshold,
+					core_index_enabled.then_some(CoreIndex(5 as u32)),
+				);
+				backed_candidates.push(backed.clone());
+			}
+
+			// No candidate for para 5.
+
+			// State sanity checks
+			assert_eq!(
+				<scheduler::Pallet<Test>>::scheduled_paras().collect::<Vec<_>>(),
+				vec![
+					(CoreIndex(0), ParaId::from(1)),
+					(CoreIndex(1), ParaId::from(1)),
+					(CoreIndex(2), ParaId::from(2)),
+					(CoreIndex(3), ParaId::from(2)),
+					(CoreIndex(4), ParaId::from(3)),
+					(CoreIndex(5), ParaId::from(4)),
+					(CoreIndex(6), ParaId::from(5)),
+				]
+			);
+			let mut scheduled: BTreeMap<ParaId, BTreeSet<CoreIndex>> = BTreeMap::new();
+			for (core_idx, para_id) in <scheduler::Pallet<Test>>::scheduled_paras() {
+				scheduled.entry(para_id).or_default().insert(core_idx);
+			}
+
+			assert_eq!(
+				shared::Pallet::<Test>::active_validator_indices(),
+				vec![
+					ValidatorIndex(0),
+					ValidatorIndex(1),
+					ValidatorIndex(2),
+					ValidatorIndex(3),
+					ValidatorIndex(4),
+					ValidatorIndex(5),
+					ValidatorIndex(6),
+				]
+			);
+
+			TestData {
+				backed_candidates,
+				scheduled_paras: scheduled,
+				all_backed_candidates_with_core,
+			}
+		}
+
+		#[rstest]
+		#[case(false)]
+		#[case(true)]
+		fn happy_path(#[case] core_index_enabled: bool) {
 			new_test_ext(MockGenesisConfig::default()).execute_with(|| {
-				let TestData { backed_candidates, scheduled_paras: scheduled } = get_test_data();
+				let TestData {
+					backed_candidates,
+					all_backed_candidates_with_core,
+					scheduled_paras: scheduled,
+				} = get_test_data(core_index_enabled);
 
 				let has_concluded_invalid =
 					|_idx: usize, _backed_candidate: &BackedCandidate| -> bool { false };
@@ -1385,47 +1775,95 @@ mod sanitizers {
 						backed_candidates.clone(),
 						&<shared::Pallet<Test>>::allowed_relay_parents(),
 						has_concluded_invalid,
-						&scheduled
+						scheduled,
+						core_index_enabled
 					),
 					SanitizedBackedCandidates {
-						backed_candidates,
-						votes_from_disabled_were_dropped: false
+						backed_candidates_with_core: all_backed_candidates_with_core,
+						votes_from_disabled_were_dropped: false,
+						dropped_unscheduled_candidates: false
 					}
 				);
+			});
+		}
+
+		#[rstest]
+		#[case(false)]
+		#[case(true)]
+		fn test_with_multiple_cores_per_para(#[case] core_index_enabled: bool) {
+			new_test_ext(MockGenesisConfig::default()).execute_with(|| {
+				let TestData {
+					backed_candidates,
+					all_backed_candidates_with_core: expected_all_backed_candidates_with_core,
+					scheduled_paras: scheduled,
+				} = get_test_data_multiple_cores_per_para(core_index_enabled);
+
+				let has_concluded_invalid =
+					|_idx: usize, _backed_candidate: &BackedCandidate| -> bool { false };
 
-				{}
+				assert_eq!(
+					sanitize_backed_candidates::<Test, _>(
+						backed_candidates.clone(),
+						&<shared::Pallet<Test>>::allowed_relay_parents(),
+						has_concluded_invalid,
+						scheduled,
+						core_index_enabled
+					),
+					SanitizedBackedCandidates {
+						backed_candidates_with_core: expected_all_backed_candidates_with_core,
+						votes_from_disabled_were_dropped: false,
+						dropped_unscheduled_candidates: true
+					}
+				);
 			});
 		}
 
 		// nothing is scheduled, so no paraids match, thus all backed candidates are skipped
-		#[test]
-		fn nothing_scheduled() {
+		#[rstest]
+		#[case(false, false)]
+		#[case(true, true)]
+		#[case(false, true)]
+		#[case(true, false)]
+		fn nothing_scheduled(
+			#[case] core_index_enabled: bool,
+			#[case] multiple_cores_per_para: bool,
+		) {
 			new_test_ext(MockGenesisConfig::default()).execute_with(|| {
-				let TestData { backed_candidates, scheduled_paras: _ } = get_test_data();
-				let scheduled = &BTreeMap::new();
+				let TestData { backed_candidates, .. } = if multiple_cores_per_para {
+					get_test_data_multiple_cores_per_para(core_index_enabled)
+				} else {
+					get_test_data(core_index_enabled)
+				};
+				let scheduled = BTreeMap::new();
 				let has_concluded_invalid =
 					|_idx: usize, _backed_candidate: &BackedCandidate| -> bool { false };
 
 				let SanitizedBackedCandidates {
-					backed_candidates: sanitized_backed_candidates,
+					backed_candidates_with_core: sanitized_backed_candidates,
 					votes_from_disabled_were_dropped,
+					dropped_unscheduled_candidates,
 				} = sanitize_backed_candidates::<Test, _>(
 					backed_candidates.clone(),
 					&<shared::Pallet<Test>>::allowed_relay_parents(),
 					has_concluded_invalid,
-					&scheduled,
+					scheduled,
+					core_index_enabled,
 				);
 
 				assert!(sanitized_backed_candidates.is_empty());
 				assert!(!votes_from_disabled_were_dropped);
+				assert!(dropped_unscheduled_candidates);
 			});
 		}
 
 		// candidates that have concluded as invalid are filtered out
-		#[test]
-		fn invalid_are_filtered_out() {
+		#[rstest]
+		#[case(false)]
+		#[case(true)]
+		fn invalid_are_filtered_out(#[case] core_index_enabled: bool) {
 			new_test_ext(MockGenesisConfig::default()).execute_with(|| {
-				let TestData { backed_candidates, scheduled_paras: scheduled } = get_test_data();
+				let TestData { backed_candidates, scheduled_paras: scheduled, .. } =
+					get_test_data(core_index_enabled);
 
 				// mark every second one as concluded invalid
 				let set = {
@@ -1440,45 +1878,55 @@ mod sanitizers {
 				let has_concluded_invalid =
 					|_idx: usize, candidate: &BackedCandidate| set.contains(&candidate.hash());
 				let SanitizedBackedCandidates {
-					backed_candidates: sanitized_backed_candidates,
+					backed_candidates_with_core: sanitized_backed_candidates,
 					votes_from_disabled_were_dropped,
+					dropped_unscheduled_candidates,
 				} = sanitize_backed_candidates::<Test, _>(
 					backed_candidates.clone(),
 					&<shared::Pallet<Test>>::allowed_relay_parents(),
 					has_concluded_invalid,
-					&scheduled,
+					scheduled,
+					core_index_enabled,
 				);
 
 				assert_eq!(sanitized_backed_candidates.len(), backed_candidates.len() / 2);
 				assert!(!votes_from_disabled_were_dropped);
+				assert!(!dropped_unscheduled_candidates);
 			});
 		}
 
-		#[test]
-		fn disabled_non_signing_validator_doesnt_get_filtered() {
+		#[rstest]
+		#[case(false)]
+		#[case(true)]
+		fn disabled_non_signing_validator_doesnt_get_filtered(#[case] core_index_enabled: bool) {
 			new_test_ext(MockGenesisConfig::default()).execute_with(|| {
-				let TestData { mut backed_candidates, scheduled_paras } = get_test_data();
+				let TestData { mut all_backed_candidates_with_core, .. } =
+					get_test_data(core_index_enabled);
 
 				// Disable Eve
 				set_disabled_validators(vec![4]);
 
-				let before = backed_candidates.clone();
+				let before = all_backed_candidates_with_core.clone();
 
 				// Eve is disabled but no backing statement is signed by it so nothing should be
 				// filtered
 				assert!(!filter_backed_statements_from_disabled_validators::<Test>(
-					&mut backed_candidates,
+					&mut all_backed_candidates_with_core,
 					&<shared::Pallet<Test>>::allowed_relay_parents(),
-					&scheduled_paras
+					core_index_enabled
 				));
-				assert_eq!(backed_candidates, before);
+				assert_eq!(all_backed_candidates_with_core, before);
 			});
 		}
-
-		#[test]
-		fn drop_statements_from_disabled_without_dropping_candidate() {
+		#[rstest]
+		#[case(false)]
+		#[case(true)]
+		fn drop_statements_from_disabled_without_dropping_candidate(
+			#[case] core_index_enabled: bool,
+		) {
 			new_test_ext(MockGenesisConfig::default()).execute_with(|| {
-				let TestData { mut backed_candidates, scheduled_paras } = get_test_data();
+				let TestData { mut all_backed_candidates_with_core, .. } =
+					get_test_data(core_index_enabled);
 
 				// Disable Alice
 				set_disabled_validators(vec![0]);
@@ -1491,61 +1939,83 @@ mod sanitizers {
 				configuration::Pallet::<Test>::force_set_active_config(hc);
 
 				// Verify the initial state is as expected
-				assert_eq!(backed_candidates.get(0).unwrap().validity_votes.len(), 2);
 				assert_eq!(
-					backed_candidates.get(0).unwrap().validator_indices.get(0).unwrap(),
-					true
+					all_backed_candidates_with_core.get(0).unwrap().0.validity_votes().len(),
+					2
 				);
-				assert_eq!(
-					backed_candidates.get(0).unwrap().validator_indices.get(1).unwrap(),
-					true
-				);
-				let untouched = backed_candidates.get(1).unwrap().clone();
+				let (validator_indices, maybe_core_index) = all_backed_candidates_with_core
+					.get(0)
+					.unwrap()
+					.0
+					.validator_indices_and_core_index(core_index_enabled);
+				if core_index_enabled {
+					assert!(maybe_core_index.is_some());
+				} else {
+					assert!(maybe_core_index.is_none());
+				}
+
+				assert_eq!(validator_indices.get(0).unwrap(), true);
+				assert_eq!(validator_indices.get(1).unwrap(), true);
+				let untouched = all_backed_candidates_with_core.get(1).unwrap().0.clone();
 
 				assert!(filter_backed_statements_from_disabled_validators::<Test>(
-					&mut backed_candidates,
+					&mut all_backed_candidates_with_core,
 					&<shared::Pallet<Test>>::allowed_relay_parents(),
-					&scheduled_paras
+					core_index_enabled
 				));
 
+				let (validator_indices, maybe_core_index) = all_backed_candidates_with_core
+					.get(0)
+					.unwrap()
+					.0
+					.validator_indices_and_core_index(core_index_enabled);
+				if core_index_enabled {
+					assert!(maybe_core_index.is_some());
+				} else {
+					assert!(maybe_core_index.is_none());
+				}
+
 				// there should still be two backed candidates
-				assert_eq!(backed_candidates.len(), 2);
+				assert_eq!(all_backed_candidates_with_core.len(), 2);
 				// but the first one should have only one validity vote
-				assert_eq!(backed_candidates.get(0).unwrap().validity_votes.len(), 1);
-				// Validator 0 vote should be dropped, validator 1 - retained
 				assert_eq!(
-					backed_candidates.get(0).unwrap().validator_indices.get(0).unwrap(),
-					false
-				);
-				assert_eq!(
-					backed_candidates.get(0).unwrap().validator_indices.get(1).unwrap(),
-					true
+					all_backed_candidates_with_core.get(0).unwrap().0.validity_votes().len(),
+					1
 				);
+				// Validator 0 vote should be dropped, validator 1 - retained
+				assert_eq!(validator_indices.get(0).unwrap(), false);
+				assert_eq!(validator_indices.get(1).unwrap(), true);
 				// the second candidate shouldn't be modified
-				assert_eq!(*backed_candidates.get(1).unwrap(), untouched);
+				assert_eq!(all_backed_candidates_with_core.get(1).unwrap().0, untouched);
 			});
 		}
 
-		#[test]
-		fn drop_candidate_if_all_statements_are_from_disabled() {
+		#[rstest]
+		#[case(false)]
+		#[case(true)]
+		fn drop_candidate_if_all_statements_are_from_disabled(#[case] core_index_enabled: bool) {
 			new_test_ext(MockGenesisConfig::default()).execute_with(|| {
-				let TestData { mut backed_candidates, scheduled_paras } = get_test_data();
+				let TestData { mut all_backed_candidates_with_core, .. } =
+					get_test_data(core_index_enabled);
 
 				// Disable Alice and Bob
 				set_disabled_validators(vec![0, 1]);
 
 				// Verify the initial state is as expected
-				assert_eq!(backed_candidates.get(0).unwrap().validity_votes.len(), 2);
-				let untouched = backed_candidates.get(1).unwrap().clone();
+				assert_eq!(
+					all_backed_candidates_with_core.get(0).unwrap().0.validity_votes().len(),
+					2
+				);
+				let untouched = all_backed_candidates_with_core.get(1).unwrap().0.clone();
 
 				assert!(filter_backed_statements_from_disabled_validators::<Test>(
-					&mut backed_candidates,
+					&mut all_backed_candidates_with_core,
 					&<shared::Pallet<Test>>::allowed_relay_parents(),
-					&scheduled_paras
+					core_index_enabled
 				));
 
-				assert_eq!(backed_candidates.len(), 1);
-				assert_eq!(*backed_candidates.get(0).unwrap(), untouched);
+				assert_eq!(all_backed_candidates_with_core.len(), 1);
+				assert_eq!(all_backed_candidates_with_core.get(0).unwrap().0, untouched);
 			});
 		}
 	}
diff --git a/polkadot/runtime/parachains/src/paras_inherent/weights.rs b/polkadot/runtime/parachains/src/paras_inherent/weights.rs
index 05cc53fae04..0f4e5be572a 100644
--- a/polkadot/runtime/parachains/src/paras_inherent/weights.rs
+++ b/polkadot/runtime/parachains/src/paras_inherent/weights.rs
@@ -149,11 +149,11 @@ pub fn backed_candidate_weight<T: frame_system::Config + Config>(
 	candidate: &BackedCandidate<T::Hash>,
 ) -> Weight {
 	set_proof_size_to_tx_size(
-		if candidate.candidate.commitments.new_validation_code.is_some() {
+		if candidate.candidate().commitments.new_validation_code.is_some() {
 			<<T as Config>::WeightInfo as WeightInfo>::enter_backed_candidate_code_upgrade()
 		} else {
 			<<T as Config>::WeightInfo as WeightInfo>::enter_backed_candidates_variable(
-				candidate.validity_votes.len() as u32,
+				candidate.validity_votes().len() as u32,
 			)
 		},
 		candidate,
diff --git a/polkadot/zombienet_tests/functional/0012-elastic-scaling-mvp.toml b/polkadot/zombienet_tests/functional/0012-elastic-scaling-mvp.toml
new file mode 100644
index 00000000000..0dfd814e10a
--- /dev/null
+++ b/polkadot/zombienet_tests/functional/0012-elastic-scaling-mvp.toml
@@ -0,0 +1,38 @@
+[settings]
+timeout = 1000
+bootnode = true
+
+[relaychain.genesis.runtimeGenesis.patch.configuration.config]
+  max_validators_per_core = 2
+  needed_approvals = 4
+  coretime_cores = 2
+
+[relaychain]
+default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}"
+chain = "rococo-local"
+default_command = "polkadot"
+
+[relaychain.default_resources]
+limits = { memory = "4G", cpu = "2" }
+requests = { memory = "2G", cpu = "1" }
+
+  [[relaychain.nodes]]
+  name = "alice"
+  validator = "true"
+
+  [[relaychain.node_groups]]
+  name = "validator"
+  count = 3
+  args = [ "-lparachain=debug,runtime=debug"]
+
+[[parachains]]
+id = 2000
+default_command = "polkadot-parachain"
+add_to_genesis = false
+register_para = true
+onboard_as_parachain = false
+
+  [parachains.collator]
+  name = "collator2000"
+  command = "polkadot-parachain"
+  args = [ "-lparachain=debug" ]
diff --git a/polkadot/zombienet_tests/functional/0012-elastic-scaling-mvp.zndsl b/polkadot/zombienet_tests/functional/0012-elastic-scaling-mvp.zndsl
new file mode 100644
index 00000000000..a7193c9282b
--- /dev/null
+++ b/polkadot/zombienet_tests/functional/0012-elastic-scaling-mvp.zndsl
@@ -0,0 +1,28 @@
+Description: Test that a paraid acquiring multiple cores does not brick itself if ElasticScalingMVP feature is enabled
+Network: ./0012-elastic-scaling-mvp.toml
+Creds: config
+
+# Check authority status.
+validator: reports node_roles is 4
+
+validator: reports substrate_block_height{status="finalized"} is at least 10 within 100 seconds
+
+# Ensure parachain was able to make progress.
+validator: parachain 2000 block height is at least 10 within 200 seconds
+
+# Register the second core assigned to this parachain.
+alice: js-script ./0012-register-para.js return is 0 within 600 seconds
+
+validator: reports substrate_block_height{status="finalized"} is at least 35 within 100 seconds
+
+# Parachain will now be stalled
+validator: parachain 2000 block height is lower than 20 within 300 seconds
+
+# Enable the ElasticScalingMVP node feature.
+alice: js-script ./0012-enable-node-feature.js with "1" return is 0 within 600 seconds
+
+# Wait two sessions for the config to be updated.
+sleep 120 seconds
+
+# Ensure parachain is now making progress.
+validator: parachain 2000 block height is at least 30 within 200 seconds
diff --git a/polkadot/zombienet_tests/functional/0012-enable-node-feature.js b/polkadot/zombienet_tests/functional/0012-enable-node-feature.js
new file mode 100644
index 00000000000..4822e1f6644
--- /dev/null
+++ b/polkadot/zombienet_tests/functional/0012-enable-node-feature.js
@@ -0,0 +1,37 @@
+async function run(nodeName, networkInfo, index) {
+  const { wsUri, userDefinedTypes } = networkInfo.nodesByName[nodeName];
+  const api = await zombie.connect(wsUri, userDefinedTypes);
+
+  await zombie.util.cryptoWaitReady();
+
+  // account to submit tx
+  const keyring = new zombie.Keyring({ type: "sr25519" });
+  const alice = keyring.addFromUri("//Alice");
+
+  await new Promise(async (resolve, reject) => {
+    const unsub = await api.tx.sudo
+      .sudo(api.tx.configuration.setNodeFeature(Number(index), true))
+      .signAndSend(alice, ({ status, isError }) => {
+        if (status.isInBlock) {
+          console.log(
+            `Transaction included at blockhash ${status.asInBlock}`,
+          );
+        } else if (status.isFinalized) {
+          console.log(
+            `Transaction finalized at blockHash ${status.asFinalized}`,
+          );
+          unsub();
+          return resolve();
+        } else if (isError) {
+          console.log(`Transaction error`);
+          reject(`Transaction error`);
+        }
+      });
+  });
+
+
+
+  return 0;
+}
+
+module.exports = { run };
diff --git a/polkadot/zombienet_tests/functional/0012-register-para.js b/polkadot/zombienet_tests/functional/0012-register-para.js
new file mode 100644
index 00000000000..25c7e4f5ffd
--- /dev/null
+++ b/polkadot/zombienet_tests/functional/0012-register-para.js
@@ -0,0 +1,37 @@
+async function run(nodeName, networkInfo, _jsArgs) {
+  const { wsUri, userDefinedTypes } = networkInfo.nodesByName[nodeName];
+  const api = await zombie.connect(wsUri, userDefinedTypes);
+
+  await zombie.util.cryptoWaitReady();
+
+  // account to submit tx
+  const keyring = new zombie.Keyring({ type: "sr25519" });
+  const alice = keyring.addFromUri("//Alice");
+
+  await new Promise(async (resolve, reject) => {
+    const unsub = await api.tx.sudo
+      .sudo(api.tx.coretime.assignCore(0, 35, [[{ task: 2000 }, 57600]], null))
+      .signAndSend(alice, ({ status, isError }) => {
+        if (status.isInBlock) {
+          console.log(
+            `Transaction included at blockhash ${status.asInBlock}`,
+          );
+        } else if (status.isFinalized) {
+          console.log(
+            `Transaction finalized at blockHash ${status.asFinalized}`,
+          );
+          unsub();
+          return resolve();
+        } else if (isError) {
+          console.log(`Transaction error`);
+          reject(`Transaction error`);
+        }
+      });
+  });
+
+
+
+  return 0;
+}
+
+module.exports = { run };
diff --git a/prdoc/pr_3231.prdoc b/prdoc/pr_3231.prdoc
new file mode 100644
index 00000000000..26e96d3635b
--- /dev/null
+++ b/prdoc/pr_3231.prdoc
@@ -0,0 +1,11 @@
+title: Allow parachain which acquires multiple coretime cores to make progress
+
+doc:
+  - audience: Node Operator
+    description: |
+      Adds the needed changes so that parachains which acquire multiple coretime cores can still make progress.
+      Only one of the cores will be able to be occupied at a time.
+      Only works if the ElasticScalingMVP node feature is enabled in the runtime and the block author validator is
+      updated to include this change.
+
+crates: [ ]
-- 
GitLab