From 417c54c61c3fc77bd0402eec2c1ccfdb4ad38f8e Mon Sep 17 00:00:00 2001
From: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
Date: Wed, 27 Mar 2024 16:44:10 +0200
Subject: [PATCH] collation-generation + collator-protocol: collate on multiple
 assigned cores (#3795)

This works only for collators that implement the `collator_fn` allowing
`collation-generation` subsystem to pull collations triggered on new
heads.

Also enables
`request_v2::CollationFetchingResponse::CollationWithParentHeadData` for
test adder/undying collators.

TODO:
- [x] fix tests
- [x] new tests
- [x] PR doc

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
---
 .../consensus/aura/src/collators/lookahead.rs |  39 ++-
 .../node/collation-generation/src/error.rs    |   2 +
 polkadot/node/collation-generation/src/lib.rs | 183 +++++++-----
 .../node/collation-generation/src/tests.rs    | 261 +++++++++++++++++-
 .../src/collator_side/mod.rs                  | 129 ++++++---
 .../src/collator_side/tests/mod.rs            |   1 +
 .../tests/prospective_parachains.rs           |   2 +
 .../src/collator_side/validators_buffer.rs    |  18 +-
 polkadot/node/primitives/src/lib.rs           |   6 +-
 polkadot/node/subsystem-types/src/messages.rs |   2 +
 polkadot/node/subsystem-util/src/lib.rs       |   3 +-
 .../test-parachains/adder/collator/Cargo.toml |   2 +-
 .../undying/collator/Cargo.toml               |   2 +-
 polkadot/runtime/test-runtime/src/lib.rs      |  34 ++-
 prdoc/pr_3795.prdoc                           |  14 +
 15 files changed, 557 insertions(+), 141 deletions(-)
 create mode 100644 prdoc/pr_3795.prdoc

diff --git a/cumulus/client/consensus/aura/src/collators/lookahead.rs b/cumulus/client/consensus/aura/src/collators/lookahead.rs
index 161f10d55a1..58005833617 100644
--- a/cumulus/client/consensus/aura/src/collators/lookahead.rs
+++ b/cumulus/client/consensus/aura/src/collators/lookahead.rs
@@ -49,7 +49,7 @@ use polkadot_node_subsystem::messages::{
 	CollationGenerationMessage, RuntimeApiMessage, RuntimeApiRequest,
 };
 use polkadot_overseer::Handle as OverseerHandle;
-use polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption};
+use polkadot_primitives::{CollatorPair, CoreIndex, Id as ParaId, OccupiedCoreAssumption};
 
 use futures::{channel::oneshot, prelude::*};
 use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
@@ -184,7 +184,15 @@ where
 		while let Some(relay_parent_header) = import_notifications.next().await {
 			let relay_parent = relay_parent_header.hash();
 
-			if !is_para_scheduled(relay_parent, params.para_id, &mut params.overseer_handle).await {
+			// TODO: Currently we use just the first core here, but for elastic scaling
+			// we iterate and build on all of the cores returned.
+			let core_index = if let Some(core_index) =
+				cores_scheduled_for_para(relay_parent, params.para_id, &mut params.overseer_handle)
+					.await
+					.get(0)
+			{
+				*core_index
+			} else {
 				tracing::trace!(
 					target: crate::LOG_TARGET,
 					?relay_parent,
@@ -193,7 +201,7 @@ where
 				);
 
 				continue
-			}
+			};
 
 			let max_pov_size = match params
 				.relay_client
@@ -396,6 +404,7 @@ where
 										parent_head: parent_header.encode().into(),
 										validation_code_hash,
 										result_sender: None,
+										core_index,
 									},
 								),
 								"SubmitCollation",
@@ -480,14 +489,12 @@ async fn max_ancestry_lookback(
 	}
 }
 
-// Checks if there exists a scheduled core for the para at the provided relay parent.
-//
-// Falls back to `false` in case of an error.
-async fn is_para_scheduled(
+// Return all the cores assigned to the para at the provided relay parent.
+async fn cores_scheduled_for_para(
 	relay_parent: PHash,
 	para_id: ParaId,
 	overseer_handle: &mut OverseerHandle,
-) -> bool {
+) -> Vec<CoreIndex> {
 	let (tx, rx) = oneshot::channel();
 	let request = RuntimeApiRequest::AvailabilityCores(tx);
 	overseer_handle
@@ -503,7 +510,7 @@ async fn is_para_scheduled(
 				?relay_parent,
 				"Failed to query availability cores runtime API",
 			);
-			return false
+			return Vec::new()
 		},
 		Err(oneshot::Canceled) => {
 			tracing::error!(
@@ -511,9 +518,19 @@ async fn is_para_scheduled(
 				?relay_parent,
 				"Sender for availability cores runtime request dropped",
 			);
-			return false
+			return Vec::new()
 		},
 	};
 
-	cores.iter().any(|core| core.para_id() == Some(para_id))
+	cores
+		.iter()
+		.enumerate()
+		.filter_map(|(index, core)| {
+			if core.para_id() == Some(para_id) {
+				Some(CoreIndex(index as u32))
+			} else {
+				None
+			}
+		})
+		.collect()
 }
diff --git a/polkadot/node/collation-generation/src/error.rs b/polkadot/node/collation-generation/src/error.rs
index ac5db6cd7f2..852c50f3068 100644
--- a/polkadot/node/collation-generation/src/error.rs
+++ b/polkadot/node/collation-generation/src/error.rs
@@ -28,6 +28,8 @@ pub enum Error {
 	Util(#[from] polkadot_node_subsystem_util::Error),
 	#[error(transparent)]
 	Erasure(#[from] polkadot_erasure_coding::Error),
+	#[error("Parachain backing state not available in runtime.")]
+	MissingParaBackingState,
 }
 
 pub type Result<T> = std::result::Result<T, Error>;
diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs
index 3b1a8f5ff23..3164f6078bc 100644
--- a/polkadot/node/collation-generation/src/lib.rs
+++ b/polkadot/node/collation-generation/src/lib.rs
@@ -44,8 +44,8 @@ use polkadot_node_subsystem::{
 };
 use polkadot_node_subsystem_util::{
 	has_required_runtime, request_async_backing_params, request_availability_cores,
-	request_claim_queue, request_persisted_validation_data, request_validation_code,
-	request_validation_code_hash, request_validators,
+	request_claim_queue, request_para_backing_state, request_persisted_validation_data,
+	request_validation_code, request_validation_code_hash, request_validators,
 };
 use polkadot_primitives::{
 	collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt,
@@ -212,6 +212,7 @@ async fn handle_new_activations<Context>(
 	if config.collator.is_none() {
 		return Ok(())
 	}
+	let para_id = config.para_id;
 
 	let _overall_timer = metrics.time_new_activations();
 
@@ -225,25 +226,23 @@ async fn handle_new_activations<Context>(
 		);
 
 		let availability_cores = availability_cores??;
-		let n_validators = validators??.len();
 		let async_backing_params = async_backing_params?.ok();
+		let n_validators = validators??.len();
 		let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent).await?;
 
-		for (core_idx, core) in availability_cores.into_iter().enumerate() {
-			let _availability_core_timer = metrics.time_new_activations_availability_core();
+		// The loop bellow will fill in cores that the para is allowed to build on.
+		let mut cores_to_build_on = Vec::new();
 
-			let (scheduled_core, assumption) = match core {
-				CoreState::Scheduled(scheduled_core) =>
-					(scheduled_core, OccupiedCoreAssumption::Free),
+		for (core_idx, core) in availability_cores.into_iter().enumerate() {
+			let scheduled_core = match core {
+				CoreState::Scheduled(scheduled_core) => scheduled_core,
 				CoreState::Occupied(occupied_core) => match async_backing_params {
 					Some(params) if params.max_candidate_depth >= 1 => {
 						// maximum candidate depth when building on top of a block
 						// pending availability is necessarily 1 - the depth of the
 						// pending block is 0 so the child has depth 1.
 
-						// TODO [now]: this assumes that next up == current.
-						// in practice we should only set `OccupiedCoreAssumption::Included`
-						// when the candidate occupying the core is also of the same para.
+						// Use claim queue if available, or fallback to `next_up_on_available`
 						let res = match maybe_claim_queue {
 							Some(ref claim_queue) => {
 								// read what's in the claim queue for this core
@@ -257,8 +256,7 @@ async fn handle_new_activations<Context>(
 								// `next_up_on_available`
 								occupied_core.next_up_on_available
 							},
-						}
-						.map(|scheduled| (scheduled, OccupiedCoreAssumption::Included));
+						};
 
 						match res {
 							Some(res) => res,
@@ -279,7 +277,7 @@ async fn handle_new_activations<Context>(
 					gum::trace!(
 						target: LOG_TARGET,
 						core_idx = %core_idx,
-						"core is free. Keep going.",
+						"core is not assigned to any para. Keep going.",
 					);
 					continue
 				},
@@ -294,64 +292,90 @@ async fn handle_new_activations<Context>(
 					their_para = %scheduled_core.para_id,
 					"core is not assigned to our para. Keep going.",
 				);
-				continue
+			} else {
+				// Accumulate cores for building collation(s) outside the loop.
+				cores_to_build_on.push(CoreIndex(core_idx as u32));
 			}
+		}
 
-			// we get validation data and validation code synchronously for each core instead of
-			// within the subtask loop, because we have only a single mutable handle to the
-			// context, so the work can't really be distributed
-
-			let validation_data = match request_persisted_validation_data(
-				relay_parent,
-				scheduled_core.para_id,
-				assumption,
-				ctx.sender(),
-			)
-			.await
-			.await??
-			{
-				Some(v) => v,
-				None => {
-					gum::trace!(
-						target: LOG_TARGET,
-						core_idx = %core_idx,
-						relay_parent = ?relay_parent,
-						our_para = %config.para_id,
-						their_para = %scheduled_core.para_id,
-						"validation data is not available",
-					);
-					continue
-				},
-			};
+		// Skip to next relay parent if there is no core assigned to us.
+		if cores_to_build_on.is_empty() {
+			continue
+		}
 
-			let validation_code_hash = match obtain_validation_code_hash_with_assumption(
-				relay_parent,
-				scheduled_core.para_id,
-				assumption,
-				ctx.sender(),
-			)
-			.await?
-			{
-				Some(v) => v,
-				None => {
-					gum::trace!(
-						target: LOG_TARGET,
-						core_idx = %core_idx,
-						relay_parent = ?relay_parent,
-						our_para = %config.para_id,
-						their_para = %scheduled_core.para_id,
-						"validation code hash is not found.",
-					);
-					continue
-				},
-			};
+		let para_backing_state =
+			request_para_backing_state(relay_parent, config.para_id, ctx.sender())
+				.await
+				.await??
+				.ok_or(crate::error::Error::MissingParaBackingState)?;
+
+		// We are being very optimistic here, but one of the cores could pend availability some more
+		// block, ore even time out.
+		// For timeout assumption the collator can't really know because it doesn't receive bitfield
+		// gossip.
+		let assumption = if para_backing_state.pending_availability.is_empty() {
+			OccupiedCoreAssumption::Free
+		} else {
+			OccupiedCoreAssumption::Included
+		};
+
+		gum::debug!(
+			target: LOG_TARGET,
+			relay_parent = ?relay_parent,
+			our_para = %config.para_id,
+			?assumption,
+			"Occupied core(s) assumption",
+		);
+
+		let mut validation_data = match request_persisted_validation_data(
+			relay_parent,
+			config.para_id,
+			assumption,
+			ctx.sender(),
+		)
+		.await
+		.await??
+		{
+			Some(v) => v,
+			None => {
+				gum::debug!(
+					target: LOG_TARGET,
+					relay_parent = ?relay_parent,
+					our_para = %config.para_id,
+					"validation data is not available",
+				);
+				continue
+			},
+		};
 
-			let task_config = config.clone();
-			let metrics = metrics.clone();
-			let mut task_sender = ctx.sender().clone();
-			ctx.spawn(
-				"collation-builder",
-				Box::pin(async move {
+		let validation_code_hash = match obtain_validation_code_hash_with_assumption(
+			relay_parent,
+			config.para_id,
+			assumption,
+			ctx.sender(),
+		)
+		.await?
+		{
+			Some(v) => v,
+			None => {
+				gum::debug!(
+					target: LOG_TARGET,
+					relay_parent = ?relay_parent,
+					our_para = %config.para_id,
+					"validation code hash is not found.",
+				);
+				continue
+			},
+		};
+
+		let task_config = config.clone();
+		let metrics = metrics.clone();
+		let mut task_sender = ctx.sender().clone();
+
+		ctx.spawn(
+			"chained-collation-builder",
+			Box::pin(async move {
+				for core_index in cores_to_build_on {
 					let collator_fn = match task_config.collator.as_ref() {
 						Some(x) => x,
 						None => return,
@@ -363,21 +387,23 @@ async fn handle_new_activations<Context>(
 							None => {
 								gum::debug!(
 									target: LOG_TARGET,
-									para_id = %scheduled_core.para_id,
+									?para_id,
 									"collator returned no collation on collate",
 								);
 								return
 							},
 						};
 
+					let parent_head = collation.head_data.clone();
 					construct_and_distribute_receipt(
 						PreparedCollation {
 							collation,
-							para_id: scheduled_core.para_id,
+							para_id,
 							relay_parent,
-							validation_data,
+							validation_data: validation_data.clone(),
 							validation_code_hash,
 							n_validators,
+							core_index,
 						},
 						task_config.key.clone(),
 						&mut task_sender,
@@ -385,9 +411,13 @@ async fn handle_new_activations<Context>(
 						&metrics,
 					)
 					.await;
-				}),
-			)?;
-		}
+
+					// Chain the collations. All else stays the same as we build the chained
+					// collation on same relay parent.
+					validation_data.parent_head = parent_head;
+				}
+			}),
+		)?;
 	}
 
 	Ok(())
@@ -408,6 +438,7 @@ async fn handle_submit_collation<Context>(
 		parent_head,
 		validation_code_hash,
 		result_sender,
+		core_index,
 	} = params;
 
 	let validators = request_validators(relay_parent, ctx.sender()).await.await??;
@@ -444,6 +475,7 @@ async fn handle_submit_collation<Context>(
 		validation_data,
 		validation_code_hash,
 		n_validators,
+		core_index,
 	};
 
 	construct_and_distribute_receipt(
@@ -465,6 +497,7 @@ struct PreparedCollation {
 	validation_data: PersistedValidationData,
 	validation_code_hash: ValidationCodeHash,
 	n_validators: usize,
+	core_index: CoreIndex,
 }
 
 /// Takes a prepared collation, along with its context, and produces a candidate receipt
@@ -483,6 +516,7 @@ async fn construct_and_distribute_receipt(
 		validation_data,
 		validation_code_hash,
 		n_validators,
+		core_index,
 	} = collation;
 
 	let persisted_validation_data_hash = validation_data.hash();
@@ -578,6 +612,7 @@ async fn construct_and_distribute_receipt(
 			pov,
 			parent_head_data,
 			result_sender,
+			core_index,
 		})
 		.await;
 }
diff --git a/polkadot/node/collation-generation/src/tests.rs b/polkadot/node/collation-generation/src/tests.rs
index 9b16980e6af..3cb3e61a35a 100644
--- a/polkadot/node/collation-generation/src/tests.rs
+++ b/polkadot/node/collation-generation/src/tests.rs
@@ -30,13 +30,16 @@ use polkadot_node_subsystem::{
 use polkadot_node_subsystem_test_helpers::{subsystem_test_harness, TestSubsystemContextHandle};
 use polkadot_node_subsystem_util::TimeoutExt;
 use polkadot_primitives::{
-	AsyncBackingParams, CollatorPair, HeadData, Id as ParaId, Id, PersistedValidationData,
+	async_backing::{BackingState, CandidatePendingAvailability},
+	AsyncBackingParams, BlockNumber, CollatorPair, HeadData, PersistedValidationData,
 	ScheduledCore, ValidationCode,
 };
 use rstest::rstest;
 use sp_keyring::sr25519::Keyring as Sr25519Keyring;
 use std::pin::Pin;
-use test_helpers::{dummy_candidate_descriptor, dummy_hash, dummy_head_data, dummy_validator};
+use test_helpers::{
+	dummy_candidate_descriptor, dummy_hash, dummy_head_data, dummy_validator, make_candidate,
+};
 
 type VirtualOverseer = TestSubsystemContextHandle<CollationGenerationMessage>;
 
@@ -105,9 +108,9 @@ impl Future for TestCollator {
 
 impl Unpin for TestCollator {}
 
-async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages {
-	const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);
+const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);
 
+async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages {
 	overseer
 		.recv()
 		.timeout(TIMEOUT)
@@ -135,6 +138,41 @@ fn scheduled_core_for<Id: Into<ParaId>>(para_id: Id) -> ScheduledCore {
 	ScheduledCore { para_id: para_id.into(), collator: None }
 }
 
+fn dummy_candidate_pending_availability(
+	para_id: ParaId,
+	candidate_relay_parent: Hash,
+	relay_parent_number: BlockNumber,
+) -> CandidatePendingAvailability {
+	let (candidate, _pvd) = make_candidate(
+		candidate_relay_parent,
+		relay_parent_number,
+		para_id,
+		dummy_head_data(),
+		HeadData(vec![1]),
+		ValidationCode(vec![1, 2, 3]).hash(),
+	);
+	let candidate_hash = candidate.hash();
+
+	CandidatePendingAvailability {
+		candidate_hash,
+		descriptor: candidate.descriptor,
+		commitments: candidate.commitments,
+		relay_parent_number,
+		max_pov_size: 5 * 1024 * 1024,
+	}
+}
+
+fn dummy_backing_state(pending_availability: Vec<CandidatePendingAvailability>) -> BackingState {
+	let constraints = helpers::dummy_constraints(
+		0,
+		vec![0],
+		dummy_head_data(),
+		ValidationCodeHash::from(Hash::repeat_byte(42)),
+	);
+
+	BackingState { constraints, pending_availability }
+}
+
 #[rstest]
 #[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)]
 #[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)]
@@ -176,6 +214,12 @@ fn requests_availability_per_relay_parent(#[case] runtime_version: u32) {
 				))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
 					tx.send(Ok(BTreeMap::new())).unwrap();
 				},
+				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					_hash,
+					RuntimeApiRequest::ParaBackingState(_para_id, tx),
+				))) => {
+					tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap();
+				},
 				Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg),
 			}
 		}
@@ -273,6 +317,12 @@ fn requests_validation_data_for_scheduled_matches(#[case] runtime_version: u32)
 				))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
 					tx.send(Ok(BTreeMap::new())).unwrap();
 				},
+				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					_hash,
+					RuntimeApiRequest::ParaBackingState(_para_id, tx),
+				))) => {
+					tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap();
+				},
 				Some(msg) => {
 					panic!("didn't expect any other overseer requests; got {:?}", msg)
 				},
@@ -384,6 +434,12 @@ fn sends_distribute_collation_message(#[case] runtime_version: u32) {
 				))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
 					tx.send(Ok(BTreeMap::new())).unwrap();
 				},
+				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					_hash,
+					RuntimeApiRequest::ParaBackingState(_para_id, tx),
+				))) => {
+					tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap();
+				},
 				Some(msg @ AllMessages::CollatorProtocol(_)) => {
 					inner_to_collator_protocol.lock().await.push(msg);
 				},
@@ -564,6 +620,12 @@ fn fallback_when_no_validation_code_hash_api(#[case] runtime_version: u32) {
 					let res = BTreeMap::<CoreIndex, VecDeque<ParaId>>::new();
 					tx.send(Ok(res)).unwrap();
 				},
+				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					_hash,
+					RuntimeApiRequest::ParaBackingState(_para_id, tx),
+				))) => {
+					tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap();
+				},
 				Some(msg) => {
 					panic!("didn't expect any other overseer requests; got {:?}", msg)
 				},
@@ -611,6 +673,7 @@ fn submit_collation_is_no_op_before_initialization() {
 					parent_head: vec![1, 2, 3].into(),
 					validation_code_hash: Hash::repeat_byte(1).into(),
 					result_sender: None,
+					core_index: CoreIndex(0),
 				}),
 			})
 			.await;
@@ -647,6 +710,7 @@ fn submit_collation_leads_to_distribution() {
 					parent_head: vec![1, 2, 3].into(),
 					validation_code_hash,
 					result_sender: None,
+					core_index: CoreIndex(0),
 				}),
 			})
 			.await;
@@ -721,6 +785,9 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run
 	test_harness(|mut virtual_overseer| async move {
 		helpers::initialize_collator(&mut virtual_overseer, para_id).await;
 		helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
+
+		let pending_availability =
+			vec![dummy_candidate_pending_availability(para_id, activated_hash, 1)];
 		helpers::handle_runtime_calls_on_new_head_activation(
 			&mut virtual_overseer,
 			activated_hash,
@@ -728,14 +795,140 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run
 			cores,
 			runtime_version,
 			claim_queue,
+			pending_availability,
+		)
+		.await;
+		helpers::handle_core_processing_for_a_leaf(
+			&mut virtual_overseer,
+			activated_hash,
+			para_id,
+			// `CoreState` is `Occupied` => `OccupiedCoreAssumption` is `Included`
+			OccupiedCoreAssumption::Included,
+			1,
+		)
+		.await;
+
+		virtual_overseer
+	});
+}
+
+// There are variable number of cores of cores in `Occupied` state and async backing is enabled.
+// On new head activation `CollationGeneration` should produce and distribute a new collation
+// with proper assumption about the para candidate chain availability at next block.
+#[rstest]
+#[case(0)]
+#[case(1)]
+#[case(2)]
+fn distribute_collation_for_occupied_cores_with_async_backing_enabled_and_elastic_scaling(
+	#[case] candidates_pending_avail: u32,
+) {
+	let activated_hash: Hash = [1; 32].into();
+	let para_id = ParaId::from(5);
+
+	let cores = (0..candidates_pending_avail)
+		.into_iter()
+		.map(|idx| {
+			CoreState::Occupied(polkadot_primitives::OccupiedCore {
+				next_up_on_available: Some(ScheduledCore { para_id, collator: None }),
+				occupied_since: 0,
+				time_out_at: 10,
+				next_up_on_time_out: Some(ScheduledCore { para_id, collator: None }),
+				availability: Default::default(), // doesn't matter
+				group_responsible: polkadot_primitives::GroupIndex(idx as u32),
+				candidate_hash: Default::default(),
+				candidate_descriptor: dummy_candidate_descriptor(dummy_hash()),
+			})
+		})
+		.collect::<Vec<_>>();
+
+	let pending_availability = (0..candidates_pending_avail)
+		.into_iter()
+		.map(|_idx| dummy_candidate_pending_availability(para_id, activated_hash, 0))
+		.collect::<Vec<_>>();
+
+	let claim_queue = cores
+		.iter()
+		.enumerate()
+		.map(|(idx, _core)| (CoreIndex::from(idx as u32), VecDeque::from([para_id])))
+		.collect::<BTreeMap<_, _>>();
+	let total_cores = cores.len();
+
+	test_harness(|mut virtual_overseer| async move {
+		helpers::initialize_collator(&mut virtual_overseer, para_id).await;
+		helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
+		helpers::handle_runtime_calls_on_new_head_activation(
+			&mut virtual_overseer,
+			activated_hash,
+			AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 },
+			cores,
+			// Using latest runtime with the fancy claim queue exposed.
+			RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
+			claim_queue,
+			pending_availability,
 		)
 		.await;
+
 		helpers::handle_core_processing_for_a_leaf(
 			&mut virtual_overseer,
 			activated_hash,
 			para_id,
 			// `CoreState` is `Occupied` => `OccupiedCoreAssumption` is `Included`
 			OccupiedCoreAssumption::Included,
+			total_cores,
+		)
+		.await;
+
+		virtual_overseer
+	});
+}
+
+// There are variable number of cores of cores in `Free` state and async backing is enabled.
+// On new head activation `CollationGeneration` should produce and distribute a new collation
+// with proper assumption about the para candidate chain availability at next block.
+#[rstest]
+#[case(0)]
+#[case(1)]
+#[case(2)]
+fn distribute_collation_for_free_cores_with_async_backing_enabled_and_elastic_scaling(
+	#[case] candidates_pending_avail: u32,
+) {
+	let activated_hash: Hash = [1; 32].into();
+	let para_id = ParaId::from(5);
+
+	let cores = (0..candidates_pending_avail)
+		.into_iter()
+		.map(|_idx| CoreState::Scheduled(ScheduledCore { para_id, collator: None }))
+		.collect::<Vec<_>>();
+
+	let claim_queue = cores
+		.iter()
+		.enumerate()
+		.map(|(idx, _core)| (CoreIndex::from(idx as u32), VecDeque::from([para_id])))
+		.collect::<BTreeMap<_, _>>();
+	let total_cores = cores.len();
+
+	test_harness(|mut virtual_overseer| async move {
+		helpers::initialize_collator(&mut virtual_overseer, para_id).await;
+		helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
+		helpers::handle_runtime_calls_on_new_head_activation(
+			&mut virtual_overseer,
+			activated_hash,
+			AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 },
+			cores,
+			// Using latest runtime with the fancy claim queue exposed.
+			RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
+			claim_queue,
+			vec![],
+		)
+		.await;
+
+		helpers::handle_core_processing_for_a_leaf(
+			&mut virtual_overseer,
+			activated_hash,
+			para_id,
+			// `CoreState` is `Free` => `OccupiedCoreAssumption` is `Free`
+			OccupiedCoreAssumption::Free,
+			total_cores,
 		)
 		.await;
 
@@ -777,6 +970,7 @@ fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled(
 			cores,
 			runtime_version,
 			claim_queue,
+			vec![],
 		)
 		.await;
 
@@ -785,8 +979,38 @@ fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled(
 }
 
 mod helpers {
+	use polkadot_primitives::{
+		async_backing::{Constraints, InboundHrmpLimitations},
+		BlockNumber,
+	};
+
 	use super::*;
 
+	// A set for dummy constraints for `ParaBackingState``
+	pub(crate) fn dummy_constraints(
+		min_relay_parent_number: BlockNumber,
+		valid_watermarks: Vec<BlockNumber>,
+		required_parent: HeadData,
+		validation_code_hash: ValidationCodeHash,
+	) -> Constraints {
+		Constraints {
+			min_relay_parent_number,
+			max_pov_size: 5 * 1024 * 1024,
+			max_code_size: 1_000_000,
+			ump_remaining: 10,
+			ump_remaining_bytes: 1_000,
+			max_ump_num_per_candidate: 10,
+			dmp_remaining_messages: vec![],
+			hrmp_inbound: InboundHrmpLimitations { valid_watermarks },
+			hrmp_channels_out: vec![],
+			max_hrmp_num_per_candidate: 0,
+			required_parent,
+			validation_code_hash,
+			upgrade_restriction: None,
+			future_validation_code: None,
+		}
+	}
+
 	// Sends `Initialize` with a collator config
 	pub async fn initialize_collator(virtual_overseer: &mut VirtualOverseer, para_id: ParaId) {
 		virtual_overseer
@@ -822,7 +1046,8 @@ mod helpers {
 		async_backing_params: AsyncBackingParams,
 		cores: Vec<CoreState>,
 		runtime_version: u32,
-		claim_queue: BTreeMap<CoreIndex, VecDeque<Id>>,
+		claim_queue: BTreeMap<CoreIndex, VecDeque<ParaId>>,
+		pending_availability: Vec<CandidatePendingAvailability>,
 	) {
 		assert_matches!(
 			overseer_recv(virtual_overseer).await,
@@ -857,6 +1082,25 @@ mod helpers {
 			}
 		);
 
+		// Process the `ParaBackingState` message, and return some dummy state.
+		let message = overseer_recv(virtual_overseer).await;
+		let para_id = match message {
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+				_,
+				RuntimeApiRequest::ParaBackingState(p_id, _),
+			)) => p_id,
+			_ => panic!("received unexpected message {:?}", message),
+		};
+
+		assert_matches!(
+			message,
+			AllMessages::RuntimeApi(
+				RuntimeApiMessage::Request(parent, RuntimeApiRequest::ParaBackingState(p_id, tx))
+			) if parent == activated_hash && p_id == para_id => {
+				tx.send(Ok(Some(dummy_backing_state(pending_availability)))).unwrap();
+			}
+		);
+
 		assert_matches!(
 			overseer_recv(virtual_overseer).await,
 			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
@@ -889,7 +1133,14 @@ mod helpers {
 		activated_hash: Hash,
 		para_id: ParaId,
 		expected_occupied_core_assumption: OccupiedCoreAssumption,
+		cores_assigned: usize,
 	) {
+		// Expect no messages if no cores is assigned to the para
+		if cores_assigned == 0 {
+			assert!(overseer_recv(virtual_overseer).timeout(TIMEOUT / 2).await.is_none());
+			return
+		}
+
 		// Some hardcoded data - if needed, extract to parameters
 		let validation_code_hash = ValidationCodeHash::from(Hash::repeat_byte(42));
 		let parent_head = HeadData::from(vec![1, 2, 3]);
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
index 9f306f288a1..e6aa55235b7 100644
--- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
+++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
@@ -203,20 +203,40 @@ struct PeerData {
 	version: CollationVersion,
 }
 
+/// A type wrapping a collation and it's designated core index.
+struct CollationWithCoreIndex(Collation, CoreIndex);
+
+impl CollationWithCoreIndex {
+	/// Returns inner collation ref.
+	pub fn collation(&self) -> &Collation {
+		&self.0
+	}
+
+	/// Returns inner collation mut ref.
+	pub fn collation_mut(&mut self) -> &mut Collation {
+		&mut self.0
+	}
+
+	/// Returns inner core index.
+	pub fn core_index(&self) -> &CoreIndex {
+		&self.1
+	}
+}
+
 struct PerRelayParent {
 	prospective_parachains_mode: ProspectiveParachainsMode,
-	/// Validators group responsible for backing candidates built
+	/// Per core index validators group responsible for backing candidates built
 	/// on top of this relay parent.
-	validator_group: ValidatorGroup,
+	validator_group: HashMap<CoreIndex, ValidatorGroup>,
 	/// Distributed collations.
-	collations: HashMap<CandidateHash, Collation>,
+	collations: HashMap<CandidateHash, CollationWithCoreIndex>,
 }
 
 impl PerRelayParent {
 	fn new(mode: ProspectiveParachainsMode) -> Self {
 		Self {
 			prospective_parachains_mode: mode,
-			validator_group: ValidatorGroup::default(),
+			validator_group: HashMap::default(),
 			collations: HashMap::new(),
 		}
 	}
@@ -350,6 +370,7 @@ async fn distribute_collation<Context>(
 	pov: PoV,
 	parent_head_data: HeadData,
 	result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
+	core_index: CoreIndex,
 ) -> Result<()> {
 	let candidate_relay_parent = receipt.descriptor.relay_parent;
 	let candidate_hash = receipt.hash();
@@ -422,7 +443,22 @@ async fn distribute_collation<Context>(
 		);
 	}
 
-	let our_core = our_cores[0];
+	// Double check that the specified `core_index` is among the ones our para has assignments for.
+	if !our_cores.iter().any(|assigned_core| assigned_core == &core_index) {
+		gum::warn!(
+			target: LOG_TARGET,
+			para_id = %id,
+			relay_parent = ?candidate_relay_parent,
+			cores = ?our_cores,
+			?core_index,
+			"Attempting to distribute collation for a core we are not assigned to ",
+		);
+
+		return Ok(())
+	}
+
+	let our_core = core_index;
+
 	// Determine the group on that core.
 	//
 	// When prospective parachains are disabled, candidate relay parent here is
@@ -464,10 +500,12 @@ async fn distribute_collation<Context>(
 		"Accepted collation, connecting to validators."
 	);
 
-	let validators_at_relay_parent = &mut per_relay_parent.validator_group.validators;
-	if validators_at_relay_parent.is_empty() {
-		*validators_at_relay_parent = validators;
-	}
+	// Insert validator group for the `core_index` at relay parent.
+	per_relay_parent.validator_group.entry(core_index).or_insert_with(|| {
+		let mut group = ValidatorGroup::default();
+		group.validators = validators;
+		group
+	});
 
 	// Update a set of connected validators if necessary.
 	connect_to_validators(ctx, &state.validator_groups_buf).await;
@@ -484,7 +522,10 @@ async fn distribute_collation<Context>(
 
 	per_relay_parent.collations.insert(
 		candidate_hash,
-		Collation { receipt, pov, parent_head_data, status: CollationStatus::Created },
+		CollationWithCoreIndex(
+			Collation { receipt, pov, parent_head_data, status: CollationStatus::Created },
+			core_index,
+		),
 	);
 
 	// If prospective parachains are disabled, a leaf should be known to peer.
@@ -690,7 +731,10 @@ async fn advertise_collation<Context>(
 	advertisement_timeouts: &mut FuturesUnordered<ResetInterestTimeout>,
 	metrics: &Metrics,
 ) {
-	for (candidate_hash, collation) in per_relay_parent.collations.iter_mut() {
+	for (candidate_hash, collation_and_core) in per_relay_parent.collations.iter_mut() {
+		let core_index = *collation_and_core.core_index();
+		let collation = collation_and_core.collation_mut();
+
 		// Check that peer will be able to request the collation.
 		if let CollationVersion::V1 = protocol_version {
 			if per_relay_parent.prospective_parachains_mode.is_enabled() {
@@ -704,11 +748,17 @@ async fn advertise_collation<Context>(
 			}
 		}
 
-		let should_advertise =
-			per_relay_parent
-				.validator_group
-				.should_advertise_to(candidate_hash, peer_ids, &peer);
+		let Some(validator_group) = per_relay_parent.validator_group.get_mut(&core_index) else {
+			gum::debug!(
+				target: LOG_TARGET,
+				?relay_parent,
+				?core_index,
+				"Skipping advertising to validator, validator group for core not found",
+			);
+			return
+		};
 
+		let should_advertise = validator_group.should_advertise_to(candidate_hash, peer_ids, &peer);
 		match should_advertise {
 			ShouldAdvertiseTo::Yes => {},
 			ShouldAdvertiseTo::NotAuthority | ShouldAdvertiseTo::AlreadyAdvertised => {
@@ -756,9 +806,7 @@ async fn advertise_collation<Context>(
 		))
 		.await;
 
-		per_relay_parent
-			.validator_group
-			.advertised_to_peer(candidate_hash, &peer_ids, peer);
+		validator_group.advertised_to_peer(candidate_hash, &peer_ids, peer);
 
 		advertisement_timeouts.push(ResetInterestTimeout::new(
 			*candidate_hash,
@@ -790,6 +838,7 @@ async fn process_msg<Context>(
 			pov,
 			parent_head_data,
 			result_sender,
+			core_index,
 		} => {
 			let _span1 = state
 				.span_per_relay_parent
@@ -820,6 +869,7 @@ async fn process_msg<Context>(
 						pov,
 						parent_head_data,
 						result_sender,
+						core_index,
 					)
 					.await?;
 				},
@@ -1053,7 +1103,7 @@ async fn handle_incoming_request<Context>(
 			};
 			let mode = per_relay_parent.prospective_parachains_mode;
 
-			let collation = match &req {
+			let collation_with_core = match &req {
 				VersionedCollationRequest::V1(_) if !mode.is_enabled() =>
 					per_relay_parent.collations.values_mut().next(),
 				VersionedCollationRequest::V2(req) =>
@@ -1070,22 +1120,24 @@ async fn handle_incoming_request<Context>(
 					return Ok(())
 				},
 			};
-			let (receipt, pov, parent_head_data) = if let Some(collation) = collation {
-				collation.status.advance_to_requested();
-				(
-					collation.receipt.clone(),
-					collation.pov.clone(),
-					collation.parent_head_data.clone(),
-				)
-			} else {
-				gum::warn!(
-					target: LOG_TARGET,
-					relay_parent = %relay_parent,
-					"received a `RequestCollation` for a relay parent we don't have collation stored.",
-				);
+			let (receipt, pov, parent_head_data) =
+				if let Some(collation_with_core) = collation_with_core {
+					let collation = collation_with_core.collation_mut();
+					collation.status.advance_to_requested();
+					(
+						collation.receipt.clone(),
+						collation.pov.clone(),
+						collation.parent_head_data.clone(),
+					)
+				} else {
+					gum::warn!(
+						target: LOG_TARGET,
+						relay_parent = %relay_parent,
+						"received a `RequestCollation` for a relay parent we don't have collation stored.",
+					);
 
-				return Ok(())
-			};
+					return Ok(())
+				};
 
 			state.metrics.on_collation_sent_requested();
 
@@ -1340,7 +1392,9 @@ where
 				.remove(removed)
 				.map(|per_relay_parent| per_relay_parent.collations)
 				.unwrap_or_default();
-			for collation in collations.into_values() {
+			for collation_with_core in collations.into_values() {
+				let collation = collation_with_core.collation();
+
 				let candidate_hash = collation.receipt.hash();
 				state.collation_result_senders.remove(&candidate_hash);
 				state.validator_groups_buf.remove_candidate(&candidate_hash);
@@ -1477,7 +1531,7 @@ async fn run_inner<Context>(
 					continue
 				};
 
-				let next_collation = {
+				let next_collation_with_core = {
 					let per_relay_parent = match state.per_relay_parent.get(&relay_parent) {
 						Some(per_relay_parent) => per_relay_parent,
 						None => continue,
@@ -1497,7 +1551,8 @@ async fn run_inner<Context>(
 					}
 				};
 
-				if let Some(collation) = next_collation {
+				if let Some(collation_with_core) = next_collation_with_core {
+					let collation = collation_with_core.collation();
 					let receipt = collation.receipt.clone();
 					let pov = collation.pov.clone();
 					let parent_head_data = collation.parent_head_data.clone();
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs
index 38e6780eb7d..bcf0b34e631 100644
--- a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs
+++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs
@@ -377,6 +377,7 @@ async fn distribute_collation_with_receipt(
 			pov: pov.clone(),
 			parent_head_data: HeadData(vec![1, 2, 3]),
 			result_sender: None,
+			core_index: CoreIndex(0),
 		},
 	)
 	.await;
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs
index e419cd5444f..70705354563 100644
--- a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs
+++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs
@@ -277,6 +277,7 @@ fn distribute_collation_from_implicit_view() {
 					pov: pov.clone(),
 					parent_head_data: HeadData(vec![1, 2, 3]),
 					result_sender: None,
+					core_index: CoreIndex(0),
 				},
 			)
 			.await;
@@ -358,6 +359,7 @@ fn distribute_collation_up_to_limit() {
 					pov: pov.clone(),
 					parent_head_data: HeadData(vec![1, 2, 3]),
 					result_sender: None,
+					core_index: CoreIndex(0),
 				},
 			)
 			.await;
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs
index 1533f2eda5a..fbb3ff4328a 100644
--- a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs
+++ b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs
@@ -45,14 +45,22 @@ use futures::FutureExt;
 use polkadot_node_network_protocol::PeerId;
 use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, GroupIndex, SessionIndex};
 
+/// Elastic scaling: how many candidates per relay chain block the collator supports building.
+pub const MAX_CHAINED_CANDIDATES_PER_RCB: NonZeroUsize = match NonZeroUsize::new(3) {
+	Some(cap) => cap,
+	None => panic!("max candidates per rcb cannot be zero"),
+};
+
 /// The ring buffer stores at most this many unique validator groups.
 ///
 /// This value should be chosen in way that all groups assigned to our para
-/// in the view can fit into the buffer.
-pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize = match NonZeroUsize::new(3) {
-	Some(cap) => cap,
-	None => panic!("buffer capacity must be non-zero"),
-};
+/// in the view can fit into the buffer multiplied by amount of candidates we support per relay
+/// chain block in the case of elastic scaling.
+pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize =
+	match NonZeroUsize::new(3 * MAX_CHAINED_CANDIDATES_PER_RCB.get()) {
+		Some(cap) => cap,
+		None => panic!("buffer capacity must be non-zero"),
+	};
 
 /// Unique identifier of a validators group.
 #[derive(Debug)]
diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs
index b102cf06c38..b127d87d4ea 100644
--- a/polkadot/node/primitives/src/lib.rs
+++ b/polkadot/node/primitives/src/lib.rs
@@ -31,8 +31,8 @@ use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
 
 use polkadot_primitives::{
 	BlakeTwo256, BlockNumber, CandidateCommitments, CandidateHash, CollatorPair,
-	CommittedCandidateReceipt, CompactStatement, EncodeAs, Hash, HashT, HeadData, Id as ParaId,
-	PersistedValidationData, SessionIndex, Signed, UncheckedSigned, ValidationCode,
+	CommittedCandidateReceipt, CompactStatement, CoreIndex, EncodeAs, Hash, HashT, HeadData,
+	Id as ParaId, PersistedValidationData, SessionIndex, Signed, UncheckedSigned, ValidationCode,
 	ValidationCodeHash, ValidatorIndex, MAX_CODE_SIZE, MAX_POV_SIZE,
 };
 pub use sp_consensus_babe::{
@@ -524,6 +524,8 @@ pub struct SubmitCollationParams {
 	/// okay to just drop it. However, if it is called, it should be called with the signed
 	/// statement of a parachain validator seconding the collation.
 	pub result_sender: Option<futures::channel::oneshot::Sender<CollationSecondedSignal>>,
+	/// The core index on which the resulting candidate should be backed
+	pub core_index: CoreIndex,
 }
 
 /// This is the data we keep available for each candidate included in the relay chain.
diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs
index 5d05d2b56ed..d84b0b6dd14 100644
--- a/polkadot/node/subsystem-types/src/messages.rs
+++ b/polkadot/node/subsystem-types/src/messages.rs
@@ -228,6 +228,8 @@ pub enum CollatorProtocolMessage {
 		/// The result sender should be informed when at least one parachain validator seconded the
 		/// collation. It is also completely okay to just drop the sender.
 		result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
+		/// The core index where the candidate should be backed.
+		core_index: CoreIndex,
 	},
 	/// Report a collator as having provided an invalid collation. This should lead to disconnect
 	/// and blacklist of the collator.
diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs
index 6ff09ed5f22..83b046f0bf0 100644
--- a/polkadot/node/subsystem-util/src/lib.rs
+++ b/polkadot/node/subsystem-util/src/lib.rs
@@ -30,7 +30,7 @@ use polkadot_node_subsystem::{
 	messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
 	overseer, SubsystemSender,
 };
-use polkadot_primitives::{slashing, CoreIndex, ExecutorParams};
+use polkadot_primitives::{async_backing::BackingState, slashing, CoreIndex, ExecutorParams};
 
 pub use overseer::{
 	gen::{OrchestraError as OverseerError, Timeout},
@@ -308,6 +308,7 @@ specialize_requests! {
 	fn request_disabled_validators() -> Vec<ValidatorIndex>; DisabledValidators;
 	fn request_async_backing_params() -> AsyncBackingParams; AsyncBackingParams;
 	fn request_claim_queue() -> BTreeMap<CoreIndex, VecDeque<ParaId>>; ClaimQueue;
+	fn request_para_backing_state(para_id: ParaId) -> Option<BackingState>; ParaBackingState;
 }
 
 /// Requests executor parameters from the runtime effective at given relay-parent. First obtains
diff --git a/polkadot/parachain/test-parachains/adder/collator/Cargo.toml b/polkadot/parachain/test-parachains/adder/collator/Cargo.toml
index cb283c27119..30bce806f9f 100644
--- a/polkadot/parachain/test-parachains/adder/collator/Cargo.toml
+++ b/polkadot/parachain/test-parachains/adder/collator/Cargo.toml
@@ -24,7 +24,7 @@ log = { workspace = true, default-features = true }
 test-parachain-adder = { path = ".." }
 polkadot-primitives = { path = "../../../../primitives" }
 polkadot-cli = { path = "../../../../cli" }
-polkadot-service = { path = "../../../../node/service", features = ["rococo-native"] }
+polkadot-service = { path = "../../../../node/service", features = ["elastic-scaling-experimental", "rococo-native"] }
 polkadot-node-primitives = { path = "../../../../node/primitives" }
 polkadot-node-subsystem = { path = "../../../../node/subsystem" }
 
diff --git a/polkadot/parachain/test-parachains/undying/collator/Cargo.toml b/polkadot/parachain/test-parachains/undying/collator/Cargo.toml
index 238b98a6680..bede10a7673 100644
--- a/polkadot/parachain/test-parachains/undying/collator/Cargo.toml
+++ b/polkadot/parachain/test-parachains/undying/collator/Cargo.toml
@@ -24,7 +24,7 @@ log = { workspace = true, default-features = true }
 test-parachain-undying = { path = ".." }
 polkadot-primitives = { path = "../../../../primitives" }
 polkadot-cli = { path = "../../../../cli" }
-polkadot-service = { path = "../../../../node/service", features = ["rococo-native"] }
+polkadot-service = { path = "../../../../node/service", features = ["elastic-scaling-experimental", "rococo-native"] }
 polkadot-node-primitives = { path = "../../../../node/primitives" }
 polkadot-node-subsystem = { path = "../../../../node/subsystem" }
 
diff --git a/polkadot/runtime/test-runtime/src/lib.rs b/polkadot/runtime/test-runtime/src/lib.rs
index 8a3cd9309db..62c3741c56d 100644
--- a/polkadot/runtime/test-runtime/src/lib.rs
+++ b/polkadot/runtime/test-runtime/src/lib.rs
@@ -27,10 +27,11 @@ use sp_std::{collections::btree_map::BTreeMap, prelude::*};
 use polkadot_runtime_parachains::{
 	assigner_parachains as parachains_assigner_parachains,
 	configuration as parachains_configuration, disputes as parachains_disputes,
-	disputes::slashing as parachains_slashing, dmp as parachains_dmp, hrmp as parachains_hrmp,
-	inclusion as parachains_inclusion, initializer as parachains_initializer,
-	origin as parachains_origin, paras as parachains_paras,
-	paras_inherent as parachains_paras_inherent, runtime_api_impl::v7 as runtime_impl,
+	disputes::slashing as parachains_slashing,
+	dmp as parachains_dmp, hrmp as parachains_hrmp, inclusion as parachains_inclusion,
+	initializer as parachains_initializer, origin as parachains_origin, paras as parachains_paras,
+	paras_inherent as parachains_paras_inherent,
+	runtime_api_impl::{v7 as runtime_impl, vstaging as staging_runtime_impl},
 	scheduler as parachains_scheduler, session_info as parachains_session_info,
 	shared as parachains_shared,
 };
@@ -829,6 +830,7 @@ sp_api::impl_runtime_apis! {
 		}
 	}
 
+	#[api_version(10)]
 	impl primitives::runtime_api::ParachainHost<Block> for Runtime {
 		fn validators() -> Vec<ValidatorId> {
 			runtime_impl::validators::<Runtime>()
@@ -956,6 +958,30 @@ sp_api::impl_runtime_apis! {
 				key_ownership_proof,
 			)
 		}
+
+		fn minimum_backing_votes() -> u32 {
+			runtime_impl::minimum_backing_votes::<Runtime>()
+		}
+
+		fn para_backing_state(para_id: ParaId) -> Option<primitives::async_backing::BackingState> {
+			runtime_impl::backing_state::<Runtime>(para_id)
+		}
+
+		fn async_backing_params() -> primitives::AsyncBackingParams {
+			runtime_impl::async_backing_params::<Runtime>()
+		}
+
+		fn approval_voting_params() -> primitives::vstaging::ApprovalVotingParams {
+			staging_runtime_impl::approval_voting_params::<Runtime>()
+		}
+
+		fn disabled_validators() -> Vec<ValidatorIndex> {
+			staging_runtime_impl::disabled_validators::<Runtime>()
+		}
+
+		fn node_features() -> primitives::vstaging::NodeFeatures {
+			staging_runtime_impl::node_features::<Runtime>()
+		}
 	}
 
 	impl beefy_primitives::BeefyApi<Block, BeefyId> for Runtime {
diff --git a/prdoc/pr_3795.prdoc b/prdoc/pr_3795.prdoc
new file mode 100644
index 00000000000..da01fcbec82
--- /dev/null
+++ b/prdoc/pr_3795.prdoc
@@ -0,0 +1,14 @@
+title: Enable collators to build on multiple cores
+
+doc:
+  - audience: Node Dev
+    description: |
+      Introduces a `CoreIndex` parameter in `SubmitCollationParams`. This enables
+      the collators to make use of potentially multiple cores assigned at some relay
+      chain block. This extra parameter is used by the collator protocol and collation
+      generation subsystems to forward the collation to the approapriate backing group.
+
+crates:
+- name: polkadot-node-collation-generation
+- name: polkadot-collator-protocol
+  bump: minor
-- 
GitLab