From 68e05636877448d1d9d4944706af63a2f7677a46 Mon Sep 17 00:00:00 2001
From: Alin Dima <alin@parity.io>
Date: Mon, 4 Nov 2024 09:39:13 +0200
Subject: [PATCH] collation-generation: use v2 receipts (#5908)

Part of https://github.com/paritytech/polkadot-sdk/issues/5047

Plus some cleanups

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
Co-authored-by: Andrei Sandu <andrei-mihail@parity.io>
Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
Co-authored-by: GitHub Action <action@github.com>
---
 Cargo.lock                                    |    2 +
 polkadot/node/collation-generation/Cargo.toml |    1 +
 .../node/collation-generation/src/error.rs    |    9 +-
 polkadot/node/collation-generation/src/lib.rs |  576 ++++-----
 .../node/collation-generation/src/metrics.rs  |   68 +-
 .../node/collation-generation/src/tests.rs    | 1078 ++++-------------
 polkadot/primitives/Cargo.toml                |    2 +
 polkadot/primitives/src/vstaging/mod.rs       |   13 +
 prdoc/pr_5908.prdoc                           |   14 +
 9 files changed, 529 insertions(+), 1234 deletions(-)
 create mode 100644 prdoc/pr_5908.prdoc

diff --git a/Cargo.lock b/Cargo.lock
index 1f171ad756c..520b088f913 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -14299,6 +14299,7 @@ dependencies = [
  "polkadot-primitives",
  "polkadot-primitives-test-helpers",
  "rstest",
+ "schnellru",
  "sp-core 28.0.0",
  "sp-keyring",
  "sp-maybe-compressed-blob 11.0.0",
@@ -15139,6 +15140,7 @@ dependencies = [
  "sp-runtime 31.0.1",
  "sp-staking",
  "sp-std 14.0.0",
+ "thiserror",
 ]
 
 [[package]]
diff --git a/polkadot/node/collation-generation/Cargo.toml b/polkadot/node/collation-generation/Cargo.toml
index 855b6b0e86e..777458673f5 100644
--- a/polkadot/node/collation-generation/Cargo.toml
+++ b/polkadot/node/collation-generation/Cargo.toml
@@ -21,6 +21,7 @@ sp-core = { workspace = true, default-features = true }
 sp-maybe-compressed-blob = { workspace = true, default-features = true }
 thiserror = { workspace = true }
 codec = { features = ["bit-vec", "derive"], workspace = true }
+schnellru = { workspace = true }
 
 [dev-dependencies]
 polkadot-node-subsystem-test-helpers = { workspace = true }
diff --git a/polkadot/node/collation-generation/src/error.rs b/polkadot/node/collation-generation/src/error.rs
index f04e3c4f20b..68902f58579 100644
--- a/polkadot/node/collation-generation/src/error.rs
+++ b/polkadot/node/collation-generation/src/error.rs
@@ -14,6 +14,7 @@
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
+use polkadot_primitives::vstaging::CandidateReceiptError;
 use thiserror::Error;
 
 #[derive(Debug, Error)]
@@ -30,8 +31,12 @@ pub enum Error {
 	UtilRuntime(#[from] polkadot_node_subsystem_util::runtime::Error),
 	#[error(transparent)]
 	Erasure(#[from] polkadot_erasure_coding::Error),
-	#[error("Parachain backing state not available in runtime.")]
-	MissingParaBackingState,
+	#[error("Collation submitted before initialization")]
+	SubmittedBeforeInit,
+	#[error("V2 core index check failed: {0}")]
+	CandidateReceiptCheck(CandidateReceiptError),
+	#[error("PoV size {0} exceeded maximum size of {1}")]
+	POVSizeExceeded(usize, usize),
 }
 
 pub type Result<T> = std::result::Result<T, Error>;
diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs
index f04f69cbd38..9e975acf10b 100644
--- a/polkadot/node/collation-generation/src/lib.rs
+++ b/polkadot/node/collation-generation/src/lib.rs
@@ -32,27 +32,34 @@
 #![deny(missing_docs)]
 
 use codec::Encode;
-use futures::{channel::oneshot, future::FutureExt, join, select};
+use error::{Error, Result};
+use futures::{channel::oneshot, future::FutureExt, select};
 use polkadot_node_primitives::{
 	AvailableData, Collation, CollationGenerationConfig, CollationSecondedSignal, PoV,
 	SubmitCollationParams,
 };
 use polkadot_node_subsystem::{
-	messages::{CollationGenerationMessage, CollatorProtocolMessage},
-	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
-	SubsystemContext, SubsystemError, SubsystemResult,
+	messages::{CollationGenerationMessage, CollatorProtocolMessage, RuntimeApiMessage},
+	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
+	SubsystemContext, SubsystemError, SubsystemResult, SubsystemSender,
 };
 use polkadot_node_subsystem_util::{
-	request_async_backing_params, request_availability_cores, request_para_backing_state,
-	request_persisted_validation_data, request_validation_code, request_validation_code_hash,
-	request_validators, runtime::fetch_claim_queue,
+	request_claim_queue, request_persisted_validation_data, request_session_index_for_child,
+	request_validation_code_hash, request_validators,
+	runtime::{request_node_features, ClaimQueueSnapshot},
 };
 use polkadot_primitives::{
 	collator_signature_payload,
-	vstaging::{CandidateReceiptV2 as CandidateReceipt, CoreState},
+	node_features::FeatureIndex,
+	vstaging::{
+		transpose_claim_queue, CandidateDescriptorV2, CandidateReceiptV2 as CandidateReceipt,
+		CommittedCandidateReceiptV2, TransposedClaimQueue,
+	},
 	CandidateCommitments, CandidateDescriptor, CollatorPair, CoreIndex, Hash, Id as ParaId,
-	OccupiedCoreAssumption, PersistedValidationData, ScheduledCore, ValidationCodeHash,
+	NodeFeatures, OccupiedCoreAssumption, PersistedValidationData, SessionIndex,
+	ValidationCodeHash,
 };
+use schnellru::{ByLength, LruMap};
 use sp_core::crypto::Pair;
 use std::sync::Arc;
 
@@ -69,6 +76,7 @@ const LOG_TARGET: &'static str = "parachain::collation-generation";
 /// Collation Generation Subsystem
 pub struct CollationGenerationSubsystem {
 	config: Option<Arc<CollationGenerationConfig>>,
+	session_info_cache: SessionInfoCache,
 	metrics: Metrics,
 }
 
@@ -76,7 +84,7 @@ pub struct CollationGenerationSubsystem {
 impl CollationGenerationSubsystem {
 	/// Create a new instance of the `CollationGenerationSubsystem`.
 	pub fn new(metrics: Metrics) -> Self {
-		Self { config: None, metrics }
+		Self { config: None, metrics, session_info_cache: SessionInfoCache::new() }
 	}
 
 	/// Run this subsystem
@@ -117,19 +125,8 @@ impl CollationGenerationSubsystem {
 				activated,
 				..
 			}))) => {
-				// follow the procedure from the guide
-				if let Some(config) = &self.config {
-					let metrics = self.metrics.clone();
-					if let Err(err) = handle_new_activations(
-						config.clone(),
-						activated.into_iter().map(|v| v.hash),
-						ctx,
-						metrics,
-					)
-					.await
-					{
-						gum::warn!(target: LOG_TARGET, err = ?err, "failed to handle new activations");
-					}
+				if let Err(err) = self.handle_new_activation(activated.map(|v| v.hash), ctx).await {
+					gum::warn!(target: LOG_TARGET, err = ?err, "failed to handle new activation");
 				}
 
 				false
@@ -154,14 +151,8 @@ impl CollationGenerationSubsystem {
 			Ok(FromOrchestra::Communication {
 				msg: CollationGenerationMessage::SubmitCollation(params),
 			}) => {
-				if let Some(config) = &self.config {
-					if let Err(err) =
-						handle_submit_collation(params, config, ctx, &self.metrics).await
-					{
-						gum::error!(target: LOG_TARGET, ?err, "Failed to submit collation");
-					}
-				} else {
-					gum::error!(target: LOG_TARGET, "Collation submitted before initialization");
+				if let Err(err) = self.handle_submit_collation(params, ctx).await {
+					gum::error!(target: LOG_TARGET, ?err, "Failed to submit collation");
 				}
 
 				false
@@ -178,175 +169,132 @@ impl CollationGenerationSubsystem {
 			},
 		}
 	}
-}
-
-#[overseer::subsystem(CollationGeneration, error=SubsystemError, prefix=self::overseer)]
-impl<Context> CollationGenerationSubsystem {
-	fn start(self, ctx: Context) -> SpawnedSubsystem {
-		let future = async move {
-			self.run(ctx).await;
-			Ok(())
-		}
-		.boxed();
-
-		SpawnedSubsystem { name: "collation-generation-subsystem", future }
-	}
-}
 
-#[overseer::contextbounds(CollationGeneration, prefix = self::overseer)]
-async fn handle_new_activations<Context>(
-	config: Arc<CollationGenerationConfig>,
-	activated: impl IntoIterator<Item = Hash>,
-	ctx: &mut Context,
-	metrics: Metrics,
-) -> crate::error::Result<()> {
-	// follow the procedure from the guide:
-	// https://paritytech.github.io/polkadot-sdk/book/node/collators/collation-generation.html
-
-	// If there is no collation function provided, bail out early.
-	// Important: Lookahead collator and slot based collator do not use `CollatorFn`.
-	if config.collator.is_none() {
-		return Ok(())
-	}
-
-	let para_id = config.para_id;
-
-	let _overall_timer = metrics.time_new_activations();
-
-	for relay_parent in activated {
-		let _relay_parent_timer = metrics.time_new_activations_relay_parent();
-
-		let (availability_cores, validators, async_backing_params) = join!(
-			request_availability_cores(relay_parent, ctx.sender()).await,
-			request_validators(relay_parent, ctx.sender()).await,
-			request_async_backing_params(relay_parent, ctx.sender()).await,
-		);
-
-		let availability_cores = availability_cores??;
-		let async_backing_params = async_backing_params?.ok();
-		let n_validators = validators??.len();
-		let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent)
-			.await
-			.map_err(crate::error::Error::UtilRuntime)?;
-
-		// The loop bellow will fill in cores that the para is allowed to build on.
-		let mut cores_to_build_on = Vec::new();
-
-		// This assumption refers to all cores of the parachain, taking elastic scaling
-		// into account.
-		let mut para_assumption = None;
-		for (core_idx, core) in availability_cores.into_iter().enumerate() {
-			// This nested assumption refers only to the core being iterated.
-			let (core_assumption, scheduled_core) = match core {
-				CoreState::Scheduled(scheduled_core) =>
-					(OccupiedCoreAssumption::Free, scheduled_core),
-				CoreState::Occupied(occupied_core) => match async_backing_params {
-					Some(params) if params.max_candidate_depth >= 1 => {
-						// maximum candidate depth when building on top of a block
-						// pending availability is necessarily 1 - the depth of the
-						// pending block is 0 so the child has depth 1.
-
-						// Use claim queue if available, or fallback to `next_up_on_available`
-						let res = match maybe_claim_queue {
-							Some(ref claim_queue) => {
-								// read what's in the claim queue for this core at depth 0.
-								claim_queue
-									.get_claim_for(CoreIndex(core_idx as u32), 0)
-									.map(|para_id| ScheduledCore { para_id, collator: None })
-							},
-							None => {
-								// Runtime doesn't support claim queue runtime api. Fallback to
-								// `next_up_on_available`
-								occupied_core.next_up_on_available
-							},
-						};
+	async fn handle_submit_collation<Context>(
+		&mut self,
+		params: SubmitCollationParams,
+		ctx: &mut Context,
+	) -> Result<()> {
+		let Some(config) = &self.config else {
+			return Err(Error::SubmittedBeforeInit);
+		};
+		let _timer = self.metrics.time_submit_collation();
 
-						match res {
-							Some(res) => (OccupiedCoreAssumption::Included, res),
-							None => continue,
-						}
-					},
-					_ => {
-						gum::trace!(
-							target: LOG_TARGET,
-							core_idx = %core_idx,
-							relay_parent = ?relay_parent,
-							"core is occupied. Keep going.",
-						);
-						continue
-					},
-				},
-				CoreState::Free => {
-					gum::trace!(
-						target: LOG_TARGET,
-						core_idx = %core_idx,
-						"core is not assigned to any para. Keep going.",
-					);
-					continue
-				},
-			};
+		let SubmitCollationParams {
+			relay_parent,
+			collation,
+			parent_head,
+			validation_code_hash,
+			result_sender,
+			core_index,
+		} = params;
 
-			if scheduled_core.para_id != config.para_id {
-				gum::trace!(
+		let mut validation_data = match request_persisted_validation_data(
+			relay_parent,
+			config.para_id,
+			OccupiedCoreAssumption::TimedOut,
+			ctx.sender(),
+		)
+		.await
+		.await??
+		{
+			Some(v) => v,
+			None => {
+				gum::debug!(
 					target: LOG_TARGET,
-					core_idx = %core_idx,
 					relay_parent = ?relay_parent,
 					our_para = %config.para_id,
-					their_para = %scheduled_core.para_id,
-					"core is not assigned to our para. Keep going.",
+					"No validation data for para - does it exist at this relay-parent?",
 				);
-			} else {
-				// This does not work for elastic scaling, but it should be enough for single
-				// core parachains. If async backing runtime is available we later override
-				// the assumption based on the `para_backing_state` API response.
-				para_assumption = Some(core_assumption);
-				// Accumulate cores for building collation(s) outside the loop.
-				cores_to_build_on.push(CoreIndex(core_idx as u32));
-			}
-		}
+				return Ok(())
+			},
+		};
 
-		// Skip to next relay parent if there is no core assigned to us.
-		if cores_to_build_on.is_empty() {
-			continue
+		// We need to swap the parent-head data, but all other fields here will be correct.
+		validation_data.parent_head = parent_head;
+
+		let claim_queue = request_claim_queue(relay_parent, ctx.sender()).await.await??;
+
+		let session_index =
+			request_session_index_for_child(relay_parent, ctx.sender()).await.await??;
+
+		let session_info =
+			self.session_info_cache.get(relay_parent, session_index, ctx.sender()).await?;
+		let collation = PreparedCollation {
+			collation,
+			relay_parent,
+			para_id: config.para_id,
+			validation_data,
+			validation_code_hash,
+			n_validators: session_info.n_validators,
+			core_index,
+			session_index,
+		};
+
+		construct_and_distribute_receipt(
+			collation,
+			config.key.clone(),
+			ctx.sender(),
+			result_sender,
+			&mut self.metrics,
+			session_info.v2_receipts,
+			&transpose_claim_queue(claim_queue),
+		)
+		.await?;
+
+		Ok(())
+	}
+
+	async fn handle_new_activation<Context>(
+		&mut self,
+		maybe_activated: Option<Hash>,
+		ctx: &mut Context,
+	) -> Result<()> {
+		let Some(config) = &self.config else {
+			return Ok(());
+		};
+
+		let Some(relay_parent) = maybe_activated else { return Ok(()) };
+
+		// If there is no collation function provided, bail out early.
+		// Important: Lookahead collator and slot based collator do not use `CollatorFn`.
+		if config.collator.is_none() {
+			return Ok(())
 		}
 
-		// If at least one core is assigned to us, `para_assumption` is `Some`.
-		let Some(mut para_assumption) = para_assumption else { continue };
-
-		// If it is none it means that neither async backing or elastic scaling (which
-		// depends on it) are supported. We'll use the `para_assumption` we got from
-		// iterating cores.
-		if async_backing_params.is_some() {
-			// We are being very optimistic here, but one of the cores could pend availability some
-			// more block, ore even time out.
-			// For timeout assumption the collator can't really know because it doesn't receive
-			// bitfield gossip.
-			let para_backing_state =
-				request_para_backing_state(relay_parent, config.para_id, ctx.sender())
-					.await
-					.await??
-					.ok_or(crate::error::Error::MissingParaBackingState)?;
-
-			// Override the assumption about the para's assigned cores.
-			para_assumption = if para_backing_state.pending_availability.is_empty() {
-				OccupiedCoreAssumption::Free
-			} else {
-				OccupiedCoreAssumption::Included
-			}
+		let para_id = config.para_id;
+
+		let _timer = self.metrics.time_new_activation();
+
+		let session_index =
+			request_session_index_for_child(relay_parent, ctx.sender()).await.await??;
+
+		let session_info =
+			self.session_info_cache.get(relay_parent, session_index, ctx.sender()).await?;
+		let n_validators = session_info.n_validators;
+
+		let claim_queue =
+			ClaimQueueSnapshot::from(request_claim_queue(relay_parent, ctx.sender()).await.await??);
+
+		let cores_to_build_on = claim_queue
+			.iter_claims_at_depth(0)
+			.filter_map(|(core_idx, para_id)| (para_id == config.para_id).then_some(core_idx))
+			.collect::<Vec<_>>();
+
+		// Nothing to do if no core assigned to us.
+		if cores_to_build_on.is_empty() {
+			return Ok(())
 		}
 
-		gum::debug!(
-			target: LOG_TARGET,
-			relay_parent = ?relay_parent,
-			our_para = %para_id,
-			?para_assumption,
-			"Occupied core(s) assumption",
-		);
+		// We are being very optimistic here, but one of the cores could be pending availability
+		// for some more blocks, or even time out. We assume all cores are being freed.
 
 		let mut validation_data = match request_persisted_validation_data(
 			relay_parent,
 			para_id,
-			para_assumption,
+			// Just use included assumption always. If there are no pending candidates it's a
+			// no-op.
+			OccupiedCoreAssumption::Included,
 			ctx.sender(),
 		)
 		.await
@@ -360,17 +308,20 @@ async fn handle_new_activations<Context>(
 					our_para = %para_id,
 					"validation data is not available",
 				);
-				continue
+				return Ok(())
 			},
 		};
 
-		let validation_code_hash = match obtain_validation_code_hash_with_assumption(
+		let validation_code_hash = match request_validation_code_hash(
 			relay_parent,
 			para_id,
-			para_assumption,
+			// Just use included assumption always. If there are no pending candidates it's a
+			// no-op.
+			OccupiedCoreAssumption::Included,
 			ctx.sender(),
 		)
-		.await?
+		.await
+		.await??
 		{
 			Some(v) => v,
 			None => {
@@ -380,17 +331,19 @@ async fn handle_new_activations<Context>(
 					our_para = %para_id,
 					"validation code hash is not found.",
 				);
-				continue
+				return Ok(())
 			},
 		};
 
 		let task_config = config.clone();
-		let metrics = metrics.clone();
+		let metrics = self.metrics.clone();
 		let mut task_sender = ctx.sender().clone();
 
 		ctx.spawn(
 			"chained-collation-builder",
 			Box::pin(async move {
+				let transposed_claim_queue = transpose_claim_queue(claim_queue.0);
+
 				for core_index in cores_to_build_on {
 					let collator_fn = match task_config.collator.as_ref() {
 						Some(x) => x,
@@ -411,7 +364,7 @@ async fn handle_new_activations<Context>(
 						};
 
 					let parent_head = collation.head_data.clone();
-					construct_and_distribute_receipt(
+					if let Err(err) = construct_and_distribute_receipt(
 						PreparedCollation {
 							collation,
 							para_id,
@@ -420,13 +373,24 @@ async fn handle_new_activations<Context>(
 							validation_code_hash,
 							n_validators,
 							core_index,
+							session_index,
 						},
 						task_config.key.clone(),
 						&mut task_sender,
 						result_sender,
 						&metrics,
+						session_info.v2_receipts,
+						&transposed_claim_queue,
 					)
-					.await;
+					.await
+					{
+						gum::error!(
+							target: LOG_TARGET,
+							"Failed to construct and distribute collation: {}",
+							err
+						);
+						return
+					}
 
 					// Chain the collations. All else stays the same as we build the chained
 					// collation on same relay parent.
@@ -434,76 +398,64 @@ async fn handle_new_activations<Context>(
 				}
 			}),
 		)?;
-	}
 
-	Ok(())
+		Ok(())
+	}
 }
 
-#[overseer::contextbounds(CollationGeneration, prefix = self::overseer)]
-async fn handle_submit_collation<Context>(
-	params: SubmitCollationParams,
-	config: &CollationGenerationConfig,
-	ctx: &mut Context,
-	metrics: &Metrics,
-) -> crate::error::Result<()> {
-	let _timer = metrics.time_submit_collation();
+#[overseer::subsystem(CollationGeneration, error=SubsystemError, prefix=self::overseer)]
+impl<Context> CollationGenerationSubsystem {
+	fn start(self, ctx: Context) -> SpawnedSubsystem {
+		let future = async move {
+			self.run(ctx).await;
+			Ok(())
+		}
+		.boxed();
 
-	let SubmitCollationParams {
-		relay_parent,
-		collation,
-		parent_head,
-		validation_code_hash,
-		result_sender,
-		core_index,
-	} = params;
+		SpawnedSubsystem { name: "collation-generation-subsystem", future }
+	}
+}
 
-	let validators = request_validators(relay_parent, ctx.sender()).await.await??;
-	let n_validators = validators.len();
+#[derive(Clone)]
+struct PerSessionInfo {
+	v2_receipts: bool,
+	n_validators: usize,
+}
 
-	// We need to swap the parent-head data, but all other fields here will be correct.
-	let mut validation_data = match request_persisted_validation_data(
-		relay_parent,
-		config.para_id,
-		OccupiedCoreAssumption::TimedOut,
-		ctx.sender(),
-	)
-	.await
-	.await??
-	{
-		Some(v) => v,
-		None => {
-			gum::debug!(
-				target: LOG_TARGET,
-				relay_parent = ?relay_parent,
-				our_para = %config.para_id,
-				"No validation data for para - does it exist at this relay-parent?",
-			);
-			return Ok(())
-		},
-	};
+struct SessionInfoCache(LruMap<SessionIndex, PerSessionInfo>);
 
-	validation_data.parent_head = parent_head;
+impl SessionInfoCache {
+	fn new() -> Self {
+		Self(LruMap::new(ByLength::new(2)))
+	}
 
-	let collation = PreparedCollation {
-		collation,
-		relay_parent,
-		para_id: config.para_id,
-		validation_data,
-		validation_code_hash,
-		n_validators,
-		core_index,
-	};
+	async fn get<Sender: SubsystemSender<RuntimeApiMessage>>(
+		&mut self,
+		relay_parent: Hash,
+		session_index: SessionIndex,
+		sender: &mut Sender,
+	) -> Result<PerSessionInfo> {
+		if let Some(info) = self.0.get(&session_index) {
+			return Ok(info.clone())
+		}
 
-	construct_and_distribute_receipt(
-		collation,
-		config.key.clone(),
-		ctx.sender(),
-		result_sender,
-		metrics,
-	)
-	.await;
+		let n_validators =
+			request_validators(relay_parent, &mut sender.clone()).await.await??.len();
 
-	Ok(())
+		let node_features = request_node_features(relay_parent, session_index, sender)
+			.await?
+			.unwrap_or(NodeFeatures::EMPTY);
+
+		let info = PerSessionInfo {
+			v2_receipts: node_features
+				.get(FeatureIndex::CandidateReceiptV2 as usize)
+				.map(|b| *b)
+				.unwrap_or(false),
+			n_validators,
+		};
+		self.0.insert(session_index, info);
+		Ok(self.0.get(&session_index).expect("Just inserted").clone())
+	}
 }
 
 struct PreparedCollation {
@@ -514,6 +466,7 @@ struct PreparedCollation {
 	validation_code_hash: ValidationCodeHash,
 	n_validators: usize,
 	core_index: CoreIndex,
+	session_index: SessionIndex,
 }
 
 /// Takes a prepared collation, along with its context, and produces a candidate receipt
@@ -524,7 +477,9 @@ async fn construct_and_distribute_receipt(
 	sender: &mut impl overseer::CollationGenerationSenderTrait,
 	result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
 	metrics: &Metrics,
-) {
+	v2_receipts: bool,
+	transposed_claim_queue: &TransposedClaimQueue,
+) -> Result<()> {
 	let PreparedCollation {
 		collation,
 		para_id,
@@ -533,6 +488,7 @@ async fn construct_and_distribute_receipt(
 		validation_code_hash,
 		n_validators,
 		core_index,
+		session_index,
 	} = collation;
 
 	let persisted_validation_data_hash = validation_data.hash();
@@ -550,15 +506,7 @@ async fn construct_and_distribute_receipt(
 		// As such, honest collators never produce an uncompressed PoV which starts with
 		// a compression magic number, which would lead validators to reject the collation.
 		if encoded_size > validation_data.max_pov_size as usize {
-			gum::debug!(
-				target: LOG_TARGET,
-				para_id = %para_id,
-				size = encoded_size,
-				max_size = validation_data.max_pov_size,
-				"PoV exceeded maximum size"
-			);
-
-			return
+			return Err(Error::POVSizeExceeded(encoded_size, validation_data.max_pov_size as usize))
 		}
 
 		pov
@@ -574,18 +522,7 @@ async fn construct_and_distribute_receipt(
 		&validation_code_hash,
 	);
 
-	let erasure_root = match erasure_root(n_validators, validation_data, pov.clone()) {
-		Ok(erasure_root) => erasure_root,
-		Err(err) => {
-			gum::error!(
-				target: LOG_TARGET,
-				para_id = %para_id,
-				err = ?err,
-				"failed to calculate erasure root",
-			);
-			return
-		},
-	};
+	let erasure_root = erasure_root(n_validators, validation_data, pov.clone())?;
 
 	let commitments = CandidateCommitments {
 		upward_messages: collation.upward_messages,
@@ -596,35 +533,67 @@ async fn construct_and_distribute_receipt(
 		hrmp_watermark: collation.hrmp_watermark,
 	};
 
-	let ccr = CandidateReceipt {
-		commitments_hash: commitments.hash(),
-		descriptor: CandidateDescriptor {
-			signature: key.sign(&signature_payload),
-			para_id,
-			relay_parent,
-			collator: key.public(),
-			persisted_validation_data_hash,
-			pov_hash,
-			erasure_root,
-			para_head: commitments.head_data.hash(),
-			validation_code_hash,
+	let receipt = if v2_receipts {
+		let ccr = CommittedCandidateReceiptV2 {
+			descriptor: CandidateDescriptorV2::new(
+				para_id,
+				relay_parent,
+				core_index,
+				session_index,
+				persisted_validation_data_hash,
+				pov_hash,
+				erasure_root,
+				commitments.head_data.hash(),
+				validation_code_hash,
+			),
+			commitments,
+		};
+
+		ccr.check_core_index(&transposed_claim_queue)
+			.map_err(Error::CandidateReceiptCheck)?;
+
+		ccr.to_plain()
+	} else {
+		if commitments.selected_core().is_some() {
+			gum::warn!(
+				target: LOG_TARGET,
+				?pov_hash,
+				?relay_parent,
+				para_id = %para_id,
+				"Candidate commitments contain UMP signal without v2 receipts being enabled.",
+			);
+		}
+		CandidateReceipt {
+			commitments_hash: commitments.hash(),
+			descriptor: CandidateDescriptor {
+				signature: key.sign(&signature_payload),
+				para_id,
+				relay_parent,
+				collator: key.public(),
+				persisted_validation_data_hash,
+				pov_hash,
+				erasure_root,
+				para_head: commitments.head_data.hash(),
+				validation_code_hash,
+			}
+			.into(),
 		}
-		.into(),
 	};
 
 	gum::debug!(
 		target: LOG_TARGET,
-		candidate_hash = ?ccr.hash(),
+		candidate_hash = ?receipt.hash(),
 		?pov_hash,
 		?relay_parent,
 		para_id = %para_id,
+		?core_index,
 		"candidate is generated",
 	);
 	metrics.on_collation_generated();
 
 	sender
 		.send_message(CollatorProtocolMessage::DistributeCollation {
-			candidate_receipt: ccr,
+			candidate_receipt: receipt,
 			parent_head_data_hash,
 			pov,
 			parent_head_data,
@@ -632,40 +601,15 @@ async fn construct_and_distribute_receipt(
 			core_index,
 		})
 		.await;
-}
 
-async fn obtain_validation_code_hash_with_assumption(
-	relay_parent: Hash,
-	para_id: ParaId,
-	assumption: OccupiedCoreAssumption,
-	sender: &mut impl overseer::CollationGenerationSenderTrait,
-) -> crate::error::Result<Option<ValidationCodeHash>> {
-	match request_validation_code_hash(relay_parent, para_id, assumption, sender)
-		.await
-		.await?
-	{
-		Ok(Some(v)) => Ok(Some(v)),
-		Ok(None) => Ok(None),
-		Err(RuntimeApiError::NotSupported { .. }) => {
-			match request_validation_code(relay_parent, para_id, assumption, sender).await.await? {
-				Ok(Some(v)) => Ok(Some(v.hash())),
-				Ok(None) => Ok(None),
-				Err(e) => {
-					// We assume that the `validation_code` API is always available, so any error
-					// is unexpected.
-					Err(e.into())
-				},
-			}
-		},
-		Err(e @ RuntimeApiError::Execution { .. }) => Err(e.into()),
-	}
+	Ok(())
 }
 
 fn erasure_root(
 	n_validators: usize,
 	persisted_validation: PersistedValidationData,
 	pov: PoV,
-) -> crate::error::Result<Hash> {
+) -> Result<Hash> {
 	let available_data =
 		AvailableData { validation_data: persisted_validation, pov: Arc::new(pov) };
 
diff --git a/polkadot/node/collation-generation/src/metrics.rs b/polkadot/node/collation-generation/src/metrics.rs
index c7690ec82c4..80566dcd6fa 100644
--- a/polkadot/node/collation-generation/src/metrics.rs
+++ b/polkadot/node/collation-generation/src/metrics.rs
@@ -19,9 +19,7 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus};
 #[derive(Clone)]
 pub(crate) struct MetricsInner {
 	pub(crate) collations_generated_total: prometheus::Counter<prometheus::U64>,
-	pub(crate) new_activations_overall: prometheus::Histogram,
-	pub(crate) new_activations_per_relay_parent: prometheus::Histogram,
-	pub(crate) new_activations_per_availability_core: prometheus::Histogram,
+	pub(crate) new_activation: prometheus::Histogram,
 	pub(crate) submit_collation: prometheus::Histogram,
 }
 
@@ -37,26 +35,8 @@ impl Metrics {
 	}
 
 	/// Provide a timer for new activations which updates on drop.
-	pub fn time_new_activations(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
-		self.0.as_ref().map(|metrics| metrics.new_activations_overall.start_timer())
-	}
-
-	/// Provide a timer per relay parents which updates on drop.
-	pub fn time_new_activations_relay_parent(
-		&self,
-	) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
-		self.0
-			.as_ref()
-			.map(|metrics| metrics.new_activations_per_relay_parent.start_timer())
-	}
-
-	/// Provide a timer per availability core which updates on drop.
-	pub fn time_new_activations_availability_core(
-		&self,
-	) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
-		self.0
-			.as_ref()
-			.map(|metrics| metrics.new_activations_per_availability_core.start_timer())
+	pub fn time_new_activation(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
+		self.0.as_ref().map(|metrics| metrics.new_activation.start_timer())
 	}
 
 	/// Provide a timer for submitting a collation which updates on drop.
@@ -71,44 +51,22 @@ impl metrics::Metrics for Metrics {
 			collations_generated_total: prometheus::register(
 				prometheus::Counter::new(
 					"polkadot_parachain_collations_generated_total",
-					"Number of collations generated."
-				)?,
-				registry,
-			)?,
-			new_activations_overall: prometheus::register(
-				prometheus::Histogram::with_opts(
-					prometheus::HistogramOpts::new(
-						"polkadot_parachain_collation_generation_new_activations",
-						"Time spent within fn handle_new_activations",
-					)
-				)?,
-				registry,
-			)?,
-			new_activations_per_relay_parent: prometheus::register(
-				prometheus::Histogram::with_opts(
-					prometheus::HistogramOpts::new(
-						"polkadot_parachain_collation_generation_per_relay_parent",
-						"Time spent handling a particular relay parent within fn handle_new_activations"
-					)
+					"Number of collations generated.",
 				)?,
 				registry,
 			)?,
-			new_activations_per_availability_core: prometheus::register(
-				prometheus::Histogram::with_opts(
-					prometheus::HistogramOpts::new(
-						"polkadot_parachain_collation_generation_per_availability_core",
-						"Time spent handling a particular availability core for a relay parent in fn handle_new_activations",
-					)
-				)?,
+			new_activation: prometheus::register(
+				prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
+					"polkadot_parachain_collation_generation_new_activations",
+					"Time spent within fn handle_new_activation",
+				))?,
 				registry,
 			)?,
 			submit_collation: prometheus::register(
-				prometheus::Histogram::with_opts(
-					prometheus::HistogramOpts::new(
-						"polkadot_parachain_collation_generation_submit_collation",
-						"Time spent preparing and submitting a collation to the network protocol",
-					)
-				)?,
+				prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
+					"polkadot_parachain_collation_generation_submit_collation",
+					"Time spent preparing and submitting a collation to the network protocol",
+				))?,
 				registry,
 			)?,
 		};
diff --git a/polkadot/node/collation-generation/src/tests.rs b/polkadot/node/collation-generation/src/tests.rs
index 78b35fde0ea..f81c14cdf8f 100644
--- a/polkadot/node/collation-generation/src/tests.rs
+++ b/polkadot/node/collation-generation/src/tests.rs
@@ -17,26 +17,20 @@
 use super::*;
 use assert_matches::assert_matches;
 use futures::{
-	lock::Mutex,
 	task::{Context as FuturesContext, Poll},
-	Future,
+	Future, StreamExt,
 };
 use polkadot_node_primitives::{BlockData, Collation, CollationResult, MaybeCompressedPoV, PoV};
 use polkadot_node_subsystem::{
-	errors::RuntimeApiError,
 	messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest},
 	ActivatedLeaf,
 };
-use polkadot_node_subsystem_test_helpers::{subsystem_test_harness, TestSubsystemContextHandle};
+use polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle;
 use polkadot_node_subsystem_util::TimeoutExt;
 use polkadot_primitives::{
-	vstaging::async_backing::{BackingState, CandidatePendingAvailability},
-	AsyncBackingParams, BlockNumber, CollatorPair, HeadData, PersistedValidationData,
-	ScheduledCore, ValidationCode,
-};
-use polkadot_primitives_test_helpers::{
-	dummy_candidate_descriptor_v2, dummy_hash, dummy_head_data, dummy_validator, make_candidate,
+	node_features, vstaging::CandidateDescriptorVersion, CollatorPair, PersistedValidationData,
 };
+use polkadot_primitives_test_helpers::dummy_head_data;
 use rstest::rstest;
 use sp_keyring::sr25519::Keyring as Sr25519Keyring;
 use std::{
@@ -63,7 +57,7 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(test: impl FnOnce(VirtualOv
 		async move {
 			let mut virtual_overseer = test_fut.await;
 			// Ensure we have handled all responses.
-			if let Ok(Some(msg)) = virtual_overseer.rx.try_next() {
+			if let Some(msg) = virtual_overseer.rx.next().timeout(TIMEOUT).await {
 				panic!("Did not handle all responses: {:?}", msg);
 			}
 			// Conclude.
@@ -85,20 +79,6 @@ fn test_collation() -> Collation {
 	}
 }
 
-fn test_collation_compressed() -> Collation {
-	let mut collation = test_collation();
-	let compressed = collation.proof_of_validity.clone().into_compressed();
-	collation.proof_of_validity = MaybeCompressedPoV::Compressed(compressed);
-	collation
-}
-
-fn test_validation_data() -> PersistedValidationData {
-	let mut persisted_validation_data = PersistedValidationData::default();
-	persisted_validation_data.max_pov_size = 1024;
-	persisted_validation_data
-}
-
-// Box<dyn Future<Output = Collation> + Unpin + Send
 struct TestCollator;
 
 impl Future for TestCollator {
@@ -137,531 +117,11 @@ fn test_config_no_collator<Id: Into<ParaId>>(para_id: Id) -> CollationGeneration
 	}
 }
 
-fn scheduled_core_for<Id: Into<ParaId>>(para_id: Id) -> ScheduledCore {
-	ScheduledCore { para_id: para_id.into(), collator: None }
-}
-
-fn dummy_candidate_pending_availability(
-	para_id: ParaId,
-	candidate_relay_parent: Hash,
-	relay_parent_number: BlockNumber,
-) -> CandidatePendingAvailability {
-	let (candidate, _pvd) = make_candidate(
-		candidate_relay_parent,
-		relay_parent_number,
-		para_id,
-		dummy_head_data(),
-		HeadData(vec![1]),
-		ValidationCode(vec![1, 2, 3]).hash(),
-	);
-	let candidate_hash = candidate.hash();
-
-	CandidatePendingAvailability {
-		candidate_hash,
-		descriptor: candidate.descriptor,
-		commitments: candidate.commitments,
-		relay_parent_number,
-		max_pov_size: 5 * 1024 * 1024,
-	}
-}
-
-fn dummy_backing_state(pending_availability: Vec<CandidatePendingAvailability>) -> BackingState {
-	let constraints = helpers::dummy_constraints(
-		0,
-		vec![0],
-		dummy_head_data(),
-		ValidationCodeHash::from(Hash::repeat_byte(42)),
-	);
-
-	BackingState { constraints, pending_availability }
-}
-
-#[rstest]
-#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)]
-#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)]
-fn requests_availability_per_relay_parent(#[case] runtime_version: u32) {
-	let activated_hashes: Vec<Hash> =
-		vec![[1; 32].into(), [4; 32].into(), [9; 32].into(), [16; 32].into()];
-
-	let requested_availability_cores = Arc::new(Mutex::new(Vec::new()));
-
-	let overseer_requested_availability_cores = requested_availability_cores.clone();
-	let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
-		loop {
-			match handle.try_recv().await {
-				None => break,
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx)))) => {
-					overseer_requested_availability_cores.lock().await.push(hash);
-					tx.send(Ok(vec![])).unwrap();
-				}
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(_hash, RuntimeApiRequest::Validators(tx)))) => {
-					tx.send(Ok(vec![dummy_validator(); 3])).unwrap();
-				}
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::AsyncBackingParams(
-						tx,
-					),
-				))) => {
-					tx.send(Err(RuntimeApiError::NotSupported { runtime_api_name: "doesnt_matter" })).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::Version(tx),
-				))) => {
-					tx.send(Ok(runtime_version)).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::ClaimQueue(tx),
-				))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
-					tx.send(Ok(BTreeMap::new())).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::ParaBackingState(_para_id, tx),
-				))) => {
-					tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap();
-				},
-				Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg),
-			}
-		}
-	};
-
-	let subsystem_activated_hashes = activated_hashes.clone();
-	subsystem_test_harness(overseer, |mut ctx| async move {
-		handle_new_activations(
-			Arc::new(test_config(123u32)),
-			subsystem_activated_hashes,
-			&mut ctx,
-			Metrics(None),
-		)
-		.await
-		.unwrap();
-	});
-
-	let mut requested_availability_cores = Arc::try_unwrap(requested_availability_cores)
-		.expect("overseer should have shut down by now")
-		.into_inner();
-	requested_availability_cores.sort();
-
-	assert_eq!(requested_availability_cores, activated_hashes);
-}
-
-#[rstest]
-#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)]
-#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)]
-fn requests_validation_data_for_scheduled_matches(#[case] runtime_version: u32) {
-	let activated_hashes: Vec<Hash> = vec![
-		Hash::repeat_byte(1),
-		Hash::repeat_byte(4),
-		Hash::repeat_byte(9),
-		Hash::repeat_byte(16),
-	];
-
-	let requested_validation_data = Arc::new(Mutex::new(Vec::new()));
-
-	let overseer_requested_validation_data = requested_validation_data.clone();
-	let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
-		loop {
-			match handle.try_recv().await {
-				None => break,
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					hash,
-					RuntimeApiRequest::AvailabilityCores(tx),
-				))) => {
-					tx.send(Ok(vec![
-						CoreState::Free,
-						// this is weird, see explanation below
-						CoreState::Scheduled(scheduled_core_for(
-							(hash.as_fixed_bytes()[0] * 4) as u32,
-						)),
-						CoreState::Scheduled(scheduled_core_for(
-							(hash.as_fixed_bytes()[0] * 5) as u32,
-						)),
-					]))
-					.unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					hash,
-					RuntimeApiRequest::PersistedValidationData(
-						_para_id,
-						_occupied_core_assumption,
-						tx,
-					),
-				))) => {
-					overseer_requested_validation_data.lock().await.push(hash);
-					tx.send(Ok(None)).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::Validators(tx),
-				))) => {
-					tx.send(Ok(vec![dummy_validator(); 3])).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::AsyncBackingParams(tx),
-				))) => {
-					tx.send(Err(RuntimeApiError::NotSupported {
-						runtime_api_name: "doesnt_matter",
-					}))
-					.unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::Version(tx),
-				))) => {
-					tx.send(Ok(runtime_version)).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::ClaimQueue(tx),
-				))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
-					tx.send(Ok(BTreeMap::new())).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::ParaBackingState(_para_id, tx),
-				))) => {
-					tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap();
-				},
-				Some(msg) => {
-					panic!("didn't expect any other overseer requests; got {:?}", msg)
-				},
-			}
-		}
-	};
-
-	subsystem_test_harness(overseer, |mut ctx| async move {
-		handle_new_activations(
-			Arc::new(test_config(16)),
-			activated_hashes,
-			&mut ctx,
-			Metrics(None),
-		)
-		.await
-		.unwrap();
-	});
-
-	let requested_validation_data = Arc::try_unwrap(requested_validation_data)
-		.expect("overseer should have shut down by now")
-		.into_inner();
-
-	// the only activated hash should be from the 4 hash:
-	// each activated hash generates two scheduled cores: one with its value * 4, one with its value
-	// * 5 given that the test configuration has a `para_id` of 16, there's only one way to get that
-	// value: with the 4 hash.
-	assert_eq!(requested_validation_data, vec![[4; 32].into()]);
-}
-
-#[rstest]
-#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)]
-#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)]
-fn sends_distribute_collation_message(#[case] runtime_version: u32) {
-	let activated_hashes: Vec<Hash> = vec![
-		Hash::repeat_byte(1),
-		Hash::repeat_byte(4),
-		Hash::repeat_byte(9),
-		Hash::repeat_byte(16),
-	];
-
-	// empty vec doesn't allocate on the heap, so it's ok we throw it away
-	let to_collator_protocol = Arc::new(Mutex::new(Vec::new()));
-	let inner_to_collator_protocol = to_collator_protocol.clone();
-
-	let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
-		loop {
-			match handle.try_recv().await {
-				None => break,
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					hash,
-					RuntimeApiRequest::AvailabilityCores(tx),
-				))) => {
-					tx.send(Ok(vec![
-						CoreState::Free,
-						// this is weird, see explanation below
-						CoreState::Scheduled(scheduled_core_for(
-							(hash.as_fixed_bytes()[0] * 4) as u32,
-						)),
-						CoreState::Scheduled(scheduled_core_for(
-							(hash.as_fixed_bytes()[0] * 5) as u32,
-						)),
-					]))
-					.unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::PersistedValidationData(
-						_para_id,
-						_occupied_core_assumption,
-						tx,
-					),
-				))) => {
-					tx.send(Ok(Some(test_validation_data()))).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::Validators(tx),
-				))) => {
-					tx.send(Ok(vec![dummy_validator(); 3])).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::ValidationCodeHash(
-						_para_id,
-						OccupiedCoreAssumption::Free,
-						tx,
-					),
-				))) => {
-					tx.send(Ok(Some(ValidationCode(vec![1, 2, 3]).hash()))).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::AsyncBackingParams(tx),
-				))) => {
-					tx.send(Err(RuntimeApiError::NotSupported {
-						runtime_api_name: "doesnt_matter",
-					}))
-					.unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::Version(tx),
-				))) => {
-					tx.send(Ok(runtime_version)).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::ClaimQueue(tx),
-				))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
-					tx.send(Ok(BTreeMap::new())).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::ParaBackingState(_para_id, tx),
-				))) => {
-					tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap();
-				},
-				Some(msg @ AllMessages::CollatorProtocol(_)) => {
-					inner_to_collator_protocol.lock().await.push(msg);
-				},
-				Some(msg) => {
-					panic!("didn't expect any other overseer requests; got {:?}", msg)
-				},
-			}
-		}
-	};
-
-	let config = Arc::new(test_config(16));
-	let subsystem_config = config.clone();
-
-	subsystem_test_harness(overseer, |mut ctx| async move {
-		handle_new_activations(subsystem_config, activated_hashes, &mut ctx, Metrics(None))
-			.await
-			.unwrap();
-	});
-
-	let mut to_collator_protocol = Arc::try_unwrap(to_collator_protocol)
-		.expect("subsystem should have shut down by now")
-		.into_inner();
-
-	// we expect a single message to be sent, containing a candidate receipt.
-	// we don't care too much about the `commitments_hash` right now, but let's ensure that we've
-	// calculated the correct descriptor
-	let expect_pov_hash = test_collation_compressed().proof_of_validity.into_compressed().hash();
-	let expect_validation_data_hash = test_validation_data().hash();
-	let expect_relay_parent = Hash::repeat_byte(4);
-	let expect_validation_code_hash = ValidationCode(vec![1, 2, 3]).hash();
-	let expect_payload = collator_signature_payload(
-		&expect_relay_parent,
-		&config.para_id,
-		&expect_validation_data_hash,
-		&expect_pov_hash,
-		&expect_validation_code_hash,
-	);
-	let expect_descriptor = CandidateDescriptor {
-		signature: config.key.sign(&expect_payload),
-		para_id: config.para_id,
-		relay_parent: expect_relay_parent,
-		collator: config.key.public(),
-		persisted_validation_data_hash: expect_validation_data_hash,
-		pov_hash: expect_pov_hash,
-		erasure_root: dummy_hash(), // this isn't something we're checking right now
-		para_head: test_collation().head_data.hash(),
-		validation_code_hash: expect_validation_code_hash,
-	};
-
-	assert_eq!(to_collator_protocol.len(), 1);
-	match AllMessages::from(to_collator_protocol.pop().unwrap()) {
-		AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation {
-			candidate_receipt,
-			..
-		}) => {
-			let CandidateReceipt { descriptor, .. } = candidate_receipt;
-			// signature generation is non-deterministic, so we can't just assert that the
-			// expected descriptor is correct. What we can do is validate that the produced
-			// descriptor has a valid signature, then just copy in the generated signature
-			// and check the rest of the fields for equality.
-			assert!(CollatorPair::verify(
-				&descriptor.signature().unwrap(),
-				&collator_signature_payload(
-					&descriptor.relay_parent(),
-					&descriptor.para_id(),
-					&descriptor.persisted_validation_data_hash(),
-					&descriptor.pov_hash(),
-					&descriptor.validation_code_hash(),
-				)
-				.as_ref(),
-				&descriptor.collator().unwrap(),
-			));
-			let expect_descriptor = {
-				let mut expect_descriptor = expect_descriptor;
-				expect_descriptor.signature = descriptor.signature().clone().unwrap();
-				expect_descriptor.erasure_root = descriptor.erasure_root();
-				expect_descriptor.into()
-			};
-			assert_eq!(descriptor, expect_descriptor);
-		},
-		_ => panic!("received wrong message type"),
-	}
-}
-
-#[rstest]
-#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)]
-#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)]
-fn fallback_when_no_validation_code_hash_api(#[case] runtime_version: u32) {
-	// This is a variant of the above test, but with the validation code hash API disabled.
-
-	let activated_hashes: Vec<Hash> = vec![
-		Hash::repeat_byte(1),
-		Hash::repeat_byte(4),
-		Hash::repeat_byte(9),
-		Hash::repeat_byte(16),
-	];
-
-	// empty vec doesn't allocate on the heap, so it's ok we throw it away
-	let to_collator_protocol = Arc::new(Mutex::new(Vec::new()));
-	let inner_to_collator_protocol = to_collator_protocol.clone();
-
-	let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
-		loop {
-			match handle.try_recv().await {
-				None => break,
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					hash,
-					RuntimeApiRequest::AvailabilityCores(tx),
-				))) => {
-					tx.send(Ok(vec![
-						CoreState::Free,
-						CoreState::Scheduled(scheduled_core_for(
-							(hash.as_fixed_bytes()[0] * 4) as u32,
-						)),
-						CoreState::Scheduled(scheduled_core_for(
-							(hash.as_fixed_bytes()[0] * 5) as u32,
-						)),
-					]))
-					.unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::PersistedValidationData(
-						_para_id,
-						_occupied_core_assumption,
-						tx,
-					),
-				))) => {
-					tx.send(Ok(Some(test_validation_data()))).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::Validators(tx),
-				))) => {
-					tx.send(Ok(vec![dummy_validator(); 3])).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::ValidationCodeHash(
-						_para_id,
-						OccupiedCoreAssumption::Free,
-						tx,
-					),
-				))) => {
-					tx.send(Err(RuntimeApiError::NotSupported {
-						runtime_api_name: "validation_code_hash",
-					}))
-					.unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::ValidationCode(_para_id, OccupiedCoreAssumption::Free, tx),
-				))) => {
-					tx.send(Ok(Some(ValidationCode(vec![1, 2, 3])))).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::AsyncBackingParams(tx),
-				))) => {
-					tx.send(Err(RuntimeApiError::NotSupported {
-						runtime_api_name: "doesnt_matter",
-					}))
-					.unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::Version(tx),
-				))) => {
-					tx.send(Ok(runtime_version)).unwrap();
-				},
-				Some(msg @ AllMessages::CollatorProtocol(_)) => {
-					inner_to_collator_protocol.lock().await.push(msg);
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::ClaimQueue(tx),
-				))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
-					tx.send(Ok(Default::default())).unwrap();
-				},
-				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					_hash,
-					RuntimeApiRequest::ParaBackingState(_para_id, tx),
-				))) => {
-					tx.send(Ok(Some(dummy_backing_state(vec![])))).unwrap();
-				},
-				Some(msg) => {
-					panic!("didn't expect any other overseer requests; got {:?}", msg)
-				},
-			}
-		}
-	};
-
-	let config = Arc::new(test_config(16u32));
-	let subsystem_config = config.clone();
-
-	// empty vec doesn't allocate on the heap, so it's ok we throw it away
-	subsystem_test_harness(overseer, |mut ctx| async move {
-		handle_new_activations(subsystem_config, activated_hashes, &mut ctx, Metrics(None))
-			.await
-			.unwrap();
-	});
-
-	let to_collator_protocol = Arc::try_unwrap(to_collator_protocol)
-		.expect("subsystem should have shut down by now")
-		.into_inner();
-
-	let expect_validation_code_hash = ValidationCode(vec![1, 2, 3]).hash();
-
-	assert_eq!(to_collator_protocol.len(), 1);
-	match &to_collator_protocol[0] {
-		AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation {
-			candidate_receipt,
-			..
-		}) => {
-			let CandidateReceipt { descriptor, .. } = candidate_receipt;
-			assert_eq!(expect_validation_code_hash, descriptor.validation_code_hash());
-		},
-		_ => panic!("received wrong message type"),
-	}
+fn node_features_with_v2_enabled() -> NodeFeatures {
+	let mut node_features = NodeFeatures::new();
+	node_features.resize(node_features::FeatureIndex::CandidateReceiptV2 as usize + 1, false);
+	node_features.set(node_features::FeatureIndex::CandidateReceiptV2 as u8 as usize, true);
+	node_features
 }
 
 #[test]
@@ -717,31 +177,15 @@ fn submit_collation_leads_to_distribution() {
 			})
 			.await;
 
-		assert_matches!(
-			overseer_recv(&mut virtual_overseer).await,
-			AllMessages::RuntimeApi(RuntimeApiMessage::Request(rp, RuntimeApiRequest::Validators(tx))) => {
-				assert_eq!(rp, relay_parent);
-				let _ = tx.send(Ok(vec![
-					Sr25519Keyring::Alice.public().into(),
-					Sr25519Keyring::Bob.public().into(),
-					Sr25519Keyring::Charlie.public().into(),
-				]));
-			}
-		);
-
-		assert_matches!(
-			overseer_recv(&mut virtual_overseer).await,
-			AllMessages::RuntimeApi(RuntimeApiMessage::Request(rp, RuntimeApiRequest::PersistedValidationData(id, a, tx))) => {
-				assert_eq!(rp, relay_parent);
-				assert_eq!(id, para_id);
-				assert_eq!(a, OccupiedCoreAssumption::TimedOut);
-
-				// Candidate receipt should be constructed with the real parent head.
-				let mut pvd = expected_pvd.clone();
-				pvd.parent_head = vec![4, 5, 6].into();
-				let _ = tx.send(Ok(Some(pvd)));
-			}
-		);
+		helpers::handle_runtime_calls_on_submit_collation(
+			&mut virtual_overseer,
+			relay_parent,
+			para_id,
+			expected_pvd.clone(),
+			NodeFeatures::EMPTY,
+			Default::default(),
+		)
+		.await;
 
 		assert_matches!(
 			overseer_recv(&mut virtual_overseer).await,
@@ -762,78 +206,16 @@ fn submit_collation_leads_to_distribution() {
 	});
 }
 
-// There is one core in `Occupied` state and async backing is enabled. On new head activation
-// `CollationGeneration` should produce and distribute a new collation.
-#[rstest]
-#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)]
-#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)]
-fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] runtime_version: u32) {
-	let activated_hash: Hash = [1; 32].into();
-	let para_id = ParaId::from(5);
-
-	// One core, in occupied state. The data in `CoreState` and `ClaimQueue` should match.
-	let cores: Vec<CoreState> =
-		vec![CoreState::Occupied(polkadot_primitives::vstaging::OccupiedCore {
-			next_up_on_available: Some(ScheduledCore { para_id, collator: None }),
-			occupied_since: 1,
-			time_out_at: 10,
-			next_up_on_time_out: Some(ScheduledCore { para_id, collator: None }),
-			availability: Default::default(), // doesn't matter
-			group_responsible: polkadot_primitives::GroupIndex(0),
-			candidate_hash: Default::default(),
-			candidate_descriptor: dummy_candidate_descriptor_v2(dummy_hash()),
-		})];
-	let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]).into();
-
-	test_harness(|mut virtual_overseer| async move {
-		helpers::initialize_collator(&mut virtual_overseer, para_id).await;
-		helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
-
-		let pending_availability =
-			vec![dummy_candidate_pending_availability(para_id, activated_hash, 1)];
-		helpers::handle_runtime_calls_on_new_head_activation(
-			&mut virtual_overseer,
-			activated_hash,
-			AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 },
-			cores,
-			runtime_version,
-			claim_queue,
-		)
-		.await;
-		helpers::handle_cores_processing_for_a_leaf(
-			&mut virtual_overseer,
-			activated_hash,
-			para_id,
-			// `CoreState` is `Occupied` => `OccupiedCoreAssumption` is `Included`
-			OccupiedCoreAssumption::Included,
-			1,
-			pending_availability,
-			runtime_version,
-		)
-		.await;
-
-		virtual_overseer
-	});
-}
-
 #[test]
-fn distribute_collation_for_occupied_core_pre_async_backing() {
+fn distribute_collation_only_for_assigned_para_id_at_offset_0() {
 	let activated_hash: Hash = [1; 32].into();
 	let para_id = ParaId::from(5);
-	let total_cores = 3;
-
-	// Use runtime version before async backing
-	let runtime_version = RuntimeApiRequest::ASYNC_BACKING_STATE_RUNTIME_REQUIREMENT - 1;
 
-	let cores = (0..total_cores)
+	let claim_queue = (0..=5)
 		.into_iter()
-		.map(|_idx| CoreState::Scheduled(ScheduledCore { para_id, collator: None }))
-		.collect::<Vec<_>>();
-
-	let claim_queue = cores
-		.iter()
-		.enumerate()
-		.map(|(idx, _core)| (CoreIndex::from(idx as u32), VecDeque::from([para_id])))
+		// Set all cores assigned to para_id 5 at the second and third depths. This shouldn't
+		// matter.
+		.map(|idx| (CoreIndex(idx), VecDeque::from([ParaId::from(idx), para_id, para_id])))
 		.collect::<BTreeMap<_, _>>();
 
 	test_harness(|mut virtual_overseer| async move {
@@ -842,10 +224,8 @@ fn distribute_collation_for_occupied_core_pre_async_backing() {
 		helpers::handle_runtime_calls_on_new_head_activation(
 			&mut virtual_overseer,
 			activated_hash,
-			AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 },
-			cores,
-			runtime_version,
 			claim_queue,
+			NodeFeatures::EMPTY,
 		)
 		.await;
 
@@ -853,11 +233,7 @@ fn distribute_collation_for_occupied_core_pre_async_backing() {
 			&mut virtual_overseer,
 			activated_hash,
 			para_id,
-			// `CoreState` is `Free` => `OccupiedCoreAssumption` is `Free`
-			OccupiedCoreAssumption::Free,
-			total_cores,
-			vec![],
-			runtime_version,
+			vec![5], // Only core 5 is assigned to paraid 5.
 		)
 		.await;
 
@@ -865,48 +241,22 @@ fn distribute_collation_for_occupied_core_pre_async_backing() {
 	});
 }
 
-// There are variable number of cores of cores in `Occupied` state and async backing is enabled.
-// On new head activation `CollationGeneration` should produce and distribute a new collation
-// with proper assumption about the para candidate chain availability at next block.
+// There are variable number of cores assigned to the paraid.
+// On new head activation `CollationGeneration` should produce and distribute the right number of
+// new collations with proper assumption about the para candidate chain availability at next block.
 #[rstest]
 #[case(0)]
 #[case(1)]
 #[case(2)]
-fn distribute_collation_for_occupied_cores_with_async_backing_enabled_and_elastic_scaling(
-	#[case] candidates_pending_avail: u32,
-) {
+#[case(3)]
+fn distribute_collation_with_elastic_scaling(#[case] total_cores: u32) {
 	let activated_hash: Hash = [1; 32].into();
 	let para_id = ParaId::from(5);
-	// Using latest runtime with the fancy claim queue exposed.
-	let runtime_version = RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT;
 
-	let cores = (0..3)
+	let claim_queue = (0..total_cores)
 		.into_iter()
-		.map(|idx| {
-			CoreState::Occupied(polkadot_primitives::vstaging::OccupiedCore {
-				next_up_on_available: Some(ScheduledCore { para_id, collator: None }),
-				occupied_since: 0,
-				time_out_at: 10,
-				next_up_on_time_out: Some(ScheduledCore { para_id, collator: None }),
-				availability: Default::default(), // doesn't matter
-				group_responsible: polkadot_primitives::GroupIndex(idx as u32),
-				candidate_hash: Default::default(),
-				candidate_descriptor: dummy_candidate_descriptor_v2(dummy_hash()),
-			})
-		})
-		.collect::<Vec<_>>();
-
-	let pending_availability = (0..candidates_pending_avail)
-		.into_iter()
-		.map(|_idx| dummy_candidate_pending_availability(para_id, activated_hash, 0))
-		.collect::<Vec<_>>();
-
-	let claim_queue = cores
-		.iter()
-		.enumerate()
-		.map(|(idx, _core)| (CoreIndex::from(idx as u32), VecDeque::from([para_id])))
+		.map(|idx| (CoreIndex(idx), VecDeque::from([para_id])))
 		.collect::<BTreeMap<_, _>>();
-	let total_cores = cores.len();
 
 	test_harness(|mut virtual_overseer| async move {
 		helpers::initialize_collator(&mut virtual_overseer, para_id).await;
@@ -914,10 +264,8 @@ fn distribute_collation_for_occupied_cores_with_async_backing_enabled_and_elasti
 		helpers::handle_runtime_calls_on_new_head_activation(
 			&mut virtual_overseer,
 			activated_hash,
-			AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 },
-			cores,
-			runtime_version,
 			claim_queue,
+			NodeFeatures::EMPTY,
 		)
 		.await;
 
@@ -925,16 +273,7 @@ fn distribute_collation_for_occupied_cores_with_async_backing_enabled_and_elasti
 			&mut virtual_overseer,
 			activated_hash,
 			para_id,
-			// if at least 1 cores is occupied => `OccupiedCoreAssumption` is `Included`
-			// else assumption is `Free`.
-			if candidates_pending_avail > 0 {
-				OccupiedCoreAssumption::Included
-			} else {
-				OccupiedCoreAssumption::Free
-			},
-			total_cores,
-			pending_availability,
-			runtime_version,
+			(0..total_cores).collect(),
 		)
 		.await;
 
@@ -942,136 +281,128 @@ fn distribute_collation_for_occupied_cores_with_async_backing_enabled_and_elasti
 	});
 }
 
-// There are variable number of cores of cores in `Free` state and async backing is enabled.
-// On new head activation `CollationGeneration` should produce and distribute a new collation
-// with proper assumption about the para candidate chain availability at next block.
 #[rstest]
-#[case(0)]
-#[case(1)]
-#[case(2)]
-fn distribute_collation_for_free_cores_with_async_backing_enabled_and_elastic_scaling(
-	#[case] total_cores: usize,
-) {
-	let activated_hash: Hash = [1; 32].into();
+#[case(true)]
+#[case(false)]
+fn test_candidate_receipt_versioning(#[case] v2_receipts: bool) {
+	let relay_parent = Hash::repeat_byte(0);
+	let validation_code_hash = ValidationCodeHash::from(Hash::repeat_byte(42));
+	let parent_head = dummy_head_data();
 	let para_id = ParaId::from(5);
-	// Using latest runtime with the fancy claim queue exposed.
-	let runtime_version = RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT;
-
-	let cores = (0..total_cores)
-		.into_iter()
-		.map(|_idx| CoreState::Scheduled(ScheduledCore { para_id, collator: None }))
-		.collect::<Vec<_>>();
-
-	let claim_queue = cores
-		.iter()
-		.enumerate()
-		.map(|(idx, _core)| (CoreIndex::from(idx as u32), VecDeque::from([para_id])))
-		.collect::<BTreeMap<_, _>>();
+	let expected_pvd = PersistedValidationData {
+		parent_head: parent_head.clone(),
+		relay_parent_number: 10,
+		relay_parent_storage_root: Hash::repeat_byte(1),
+		max_pov_size: 1024,
+	};
+	let node_features =
+		if v2_receipts { node_features_with_v2_enabled() } else { NodeFeatures::EMPTY };
+	let expected_descriptor_version =
+		if v2_receipts { CandidateDescriptorVersion::V2 } else { CandidateDescriptorVersion::V1 };
 
 	test_harness(|mut virtual_overseer| async move {
-		helpers::initialize_collator(&mut virtual_overseer, para_id).await;
-		helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
-		helpers::handle_runtime_calls_on_new_head_activation(
-			&mut virtual_overseer,
-			activated_hash,
-			AsyncBackingParams { max_candidate_depth: 1, allowed_ancestry_len: 1 },
-			cores,
-			runtime_version,
-			claim_queue,
-		)
-		.await;
+		virtual_overseer
+			.send(FromOrchestra::Communication {
+				msg: CollationGenerationMessage::Initialize(test_config_no_collator(para_id)),
+			})
+			.await;
 
-		helpers::handle_cores_processing_for_a_leaf(
+		virtual_overseer
+			.send(FromOrchestra::Communication {
+				msg: CollationGenerationMessage::SubmitCollation(SubmitCollationParams {
+					relay_parent,
+					collation: test_collation(),
+					parent_head: dummy_head_data(),
+					validation_code_hash,
+					result_sender: None,
+					core_index: CoreIndex(0),
+				}),
+			})
+			.await;
+
+		helpers::handle_runtime_calls_on_submit_collation(
 			&mut virtual_overseer,
-			activated_hash,
+			relay_parent,
 			para_id,
-			// `CoreState` is `Free` => `OccupiedCoreAssumption` is `Free`
-			OccupiedCoreAssumption::Free,
-			total_cores,
-			vec![],
-			runtime_version,
+			expected_pvd.clone(),
+			node_features,
+			[(CoreIndex(0), [para_id].into_iter().collect())].into_iter().collect(),
 		)
 		.await;
 
+		assert_matches!(
+			overseer_recv(&mut virtual_overseer).await,
+			AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation {
+				candidate_receipt,
+				parent_head_data_hash,
+				..
+			}) => {
+				let CandidateReceipt { descriptor, .. } = candidate_receipt;
+				assert_eq!(parent_head_data_hash, parent_head.hash());
+				assert_eq!(descriptor.persisted_validation_data_hash(), expected_pvd.hash());
+				assert_eq!(descriptor.para_head(), dummy_head_data().hash());
+				assert_eq!(descriptor.validation_code_hash(), validation_code_hash);
+				// Check that the right version was indeed used.
+				assert_eq!(descriptor.version(), expected_descriptor_version);
+			}
+		);
+
 		virtual_overseer
 	});
 }
 
-// There is one core in `Occupied` state and async backing is disabled. On new head activation
-// no new collation should be generated.
-#[rstest]
-#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT - 1)]
-#[case(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)]
-fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled(
-	#[case] runtime_version: u32,
-) {
-	let activated_hash: Hash = [1; 32].into();
+#[test]
+fn v2_receipts_failed_core_index_check() {
+	let relay_parent = Hash::repeat_byte(0);
+	let validation_code_hash = ValidationCodeHash::from(Hash::repeat_byte(42));
+	let parent_head = dummy_head_data();
 	let para_id = ParaId::from(5);
-
-	// One core, in occupied state. The data in `CoreState` and `ClaimQueue` should match.
-	let cores: Vec<CoreState> =
-		vec![CoreState::Occupied(polkadot_primitives::vstaging::OccupiedCore {
-			next_up_on_available: Some(ScheduledCore { para_id, collator: None }),
-			occupied_since: 1,
-			time_out_at: 10,
-			next_up_on_time_out: Some(ScheduledCore { para_id, collator: None }),
-			availability: Default::default(), // doesn't matter
-			group_responsible: polkadot_primitives::GroupIndex(0),
-			candidate_hash: Default::default(),
-			candidate_descriptor: dummy_candidate_descriptor_v2(dummy_hash()),
-		})];
-	let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]).into();
+	let expected_pvd = PersistedValidationData {
+		parent_head: parent_head.clone(),
+		relay_parent_number: 10,
+		relay_parent_storage_root: Hash::repeat_byte(1),
+		max_pov_size: 1024,
+	};
 
 	test_harness(|mut virtual_overseer| async move {
-		helpers::initialize_collator(&mut virtual_overseer, para_id).await;
-		helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
+		virtual_overseer
+			.send(FromOrchestra::Communication {
+				msg: CollationGenerationMessage::Initialize(test_config_no_collator(para_id)),
+			})
+			.await;
 
-		helpers::handle_runtime_calls_on_new_head_activation(
+		virtual_overseer
+			.send(FromOrchestra::Communication {
+				msg: CollationGenerationMessage::SubmitCollation(SubmitCollationParams {
+					relay_parent,
+					collation: test_collation(),
+					parent_head: dummy_head_data(),
+					validation_code_hash,
+					result_sender: None,
+					core_index: CoreIndex(0),
+				}),
+			})
+			.await;
+
+		helpers::handle_runtime_calls_on_submit_collation(
 			&mut virtual_overseer,
-			activated_hash,
-			AsyncBackingParams { max_candidate_depth: 0, allowed_ancestry_len: 0 },
-			cores,
-			runtime_version,
-			claim_queue,
+			relay_parent,
+			para_id,
+			expected_pvd.clone(),
+			node_features_with_v2_enabled(),
+			// Core index commitment is on core 0 but don't add any assignment for core 0.
+			[(CoreIndex(1), [para_id].into_iter().collect())].into_iter().collect(),
 		)
 		.await;
 
+		// No collation is distributed.
+
 		virtual_overseer
 	});
 }
-
 mod helpers {
-	use polkadot_primitives::{
-		async_backing::{Constraints, InboundHrmpLimitations},
-		BlockNumber,
-	};
-
 	use super::*;
-
-	// A set for dummy constraints for `ParaBackingState``
-	pub(crate) fn dummy_constraints(
-		min_relay_parent_number: BlockNumber,
-		valid_watermarks: Vec<BlockNumber>,
-		required_parent: HeadData,
-		validation_code_hash: ValidationCodeHash,
-	) -> Constraints {
-		Constraints {
-			min_relay_parent_number,
-			max_pov_size: 5 * 1024 * 1024,
-			max_code_size: 1_000_000,
-			ump_remaining: 10,
-			ump_remaining_bytes: 1_000,
-			max_ump_num_per_candidate: 10,
-			dmp_remaining_messages: vec![],
-			hrmp_inbound: InboundHrmpLimitations { valid_watermarks },
-			hrmp_channels_out: vec![],
-			max_hrmp_num_per_candidate: 0,
-			required_parent,
-			validation_code_hash,
-			upgrade_restriction: None,
-			future_validation_code: None,
-		}
-	}
+	use std::collections::{BTreeMap, VecDeque};
 
 	// Sends `Initialize` with a collator config
 	pub async fn initialize_collator(virtual_overseer: &mut VirtualOverseer, para_id: ParaId) {
@@ -1098,22 +429,18 @@ mod helpers {
 			.await;
 	}
 
-	// Handle all runtime calls performed in `handle_new_activations`. Conditionally expects a
-	// `CLAIM_QUEUE_RUNTIME_REQUIREMENT` call if the passed `runtime_version` is greater or equal to
-	// `CLAIM_QUEUE_RUNTIME_REQUIREMENT`
+	// Handle all runtime calls performed in `handle_new_activation`.
 	pub async fn handle_runtime_calls_on_new_head_activation(
 		virtual_overseer: &mut VirtualOverseer,
 		activated_hash: Hash,
-		async_backing_params: AsyncBackingParams,
-		cores: Vec<CoreState>,
-		runtime_version: u32,
 		claim_queue: BTreeMap<CoreIndex, VecDeque<ParaId>>,
+		node_features: NodeFeatures,
 	) {
 		assert_matches!(
 			overseer_recv(virtual_overseer).await,
-			AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx))) => {
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::SessionIndexForChild(tx))) => {
 				assert_eq!(hash, activated_hash);
-				let _ = tx.send(Ok(cores));
+				tx.send(Ok(1)).unwrap();
 			}
 		);
 
@@ -1121,73 +448,46 @@ mod helpers {
 			overseer_recv(virtual_overseer).await,
 			AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::Validators(tx))) => {
 				assert_eq!(hash, activated_hash);
-				let _ = tx.send(Ok(vec![
+				tx.send(Ok(vec![
 					Sr25519Keyring::Alice.public().into(),
 					Sr25519Keyring::Bob.public().into(),
 					Sr25519Keyring::Charlie.public().into(),
-				]));
+				])).unwrap();
 			}
 		);
 
-		let async_backing_response =
-			if runtime_version >= RuntimeApiRequest::ASYNC_BACKING_STATE_RUNTIME_REQUIREMENT {
-				Ok(async_backing_params)
-			} else {
-				Err(RuntimeApiError::NotSupported { runtime_api_name: "async_backing_params" })
-			};
-
 		assert_matches!(
 			overseer_recv(virtual_overseer).await,
 			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-								hash,
-								RuntimeApiRequest::AsyncBackingParams(
-									tx,
-								),
-							)) => {
+				hash,
+				RuntimeApiRequest::NodeFeatures(session_index, tx),
+			)) => {
+				assert_eq!(1, session_index);
 				assert_eq!(hash, activated_hash);
-				let _ = tx.send(async_backing_response);
+
+				tx.send(Ok(node_features)).unwrap();
 			}
 		);
 
 		assert_matches!(
 			overseer_recv(virtual_overseer).await,
-			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-								hash,
-								RuntimeApiRequest::Version(tx),
-							)) => {
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::ClaimQueue(tx))) => {
 				assert_eq!(hash, activated_hash);
-				let _ = tx.send(Ok(runtime_version));
+				tx.send(Ok(claim_queue)).unwrap();
 			}
 		);
-
-		if runtime_version == RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT {
-			assert_matches!(
-				overseer_recv(virtual_overseer).await,
-				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-									hash,
-									RuntimeApiRequest::ClaimQueue(tx),
-								)) => {
-					assert_eq!(hash, activated_hash);
-					let _ = tx.send(Ok(claim_queue.into()));
-				}
-			);
-		}
 	}
 
-	// Handles all runtime requests performed in `handle_new_activations` for the case when a
+	// Handles all runtime requests performed in `handle_new_activation` for the case when a
 	// collation should be prepared for the new leaf
 	pub async fn handle_cores_processing_for_a_leaf(
 		virtual_overseer: &mut VirtualOverseer,
 		activated_hash: Hash,
 		para_id: ParaId,
-		expected_occupied_core_assumption: OccupiedCoreAssumption,
-		cores_assigned: usize,
-		pending_availability: Vec<CandidatePendingAvailability>,
-		runtime_version: u32,
+		cores_assigned: Vec<u32>,
 	) {
 		// Expect no messages if no cores is assigned to the para
-		if cores_assigned == 0 {
-			assert!(overseer_recv(virtual_overseer).timeout(TIMEOUT / 2).await.is_none());
+		if cores_assigned.is_empty() {
 			return
 		}
 
@@ -1201,23 +501,12 @@ mod helpers {
 			max_pov_size: 1024,
 		};
 
-		if runtime_version >= RuntimeApiRequest::ASYNC_BACKING_STATE_RUNTIME_REQUIREMENT {
-			assert_matches!(
-				overseer_recv(virtual_overseer).await,
-				AllMessages::RuntimeApi(
-					RuntimeApiMessage::Request(parent, RuntimeApiRequest::ParaBackingState(p_id, tx))
-				) if parent == activated_hash && p_id == para_id => {
-					tx.send(Ok(Some(dummy_backing_state(pending_availability)))).unwrap();
-				}
-			);
-		}
-
 		assert_matches!(
 			overseer_recv(virtual_overseer).await,
 			AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::PersistedValidationData(id, a, tx))) => {
 				assert_eq!(hash, activated_hash);
 				assert_eq!(id, para_id);
-				assert_eq!(a, expected_occupied_core_assumption);
+				assert_eq!(a, OccupiedCoreAssumption::Included);
 
 				let _ = tx.send(Ok(Some(pvd.clone())));
 			}
@@ -1235,20 +524,22 @@ mod helpers {
 			)) => {
 				assert_eq!(hash, activated_hash);
 				assert_eq!(id, para_id);
-				assert_eq!(assumption, expected_occupied_core_assumption);
+				assert_eq!(assumption, OccupiedCoreAssumption::Included);
 
 				let _ = tx.send(Ok(Some(validation_code_hash)));
 			}
 		);
 
-		for _ in 0..cores_assigned {
+		for core in cores_assigned {
 			assert_matches!(
 				overseer_recv(virtual_overseer).await,
 				AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation{
 					candidate_receipt,
 					parent_head_data_hash,
+					core_index,
 					..
 				}) => {
+					assert_eq!(CoreIndex(core), core_index);
 					assert_eq!(parent_head_data_hash, parent_head.hash());
 					assert_eq!(candidate_receipt.descriptor().persisted_validation_data_hash(), pvd.hash());
 					assert_eq!(candidate_receipt.descriptor().para_head(), dummy_head_data().hash());
@@ -1257,4 +548,69 @@ mod helpers {
 			);
 		}
 	}
+
+	// Handles all runtime requests performed in `handle_submit_collation`
+	pub async fn handle_runtime_calls_on_submit_collation(
+		virtual_overseer: &mut VirtualOverseer,
+		relay_parent: Hash,
+		para_id: ParaId,
+		expected_pvd: PersistedValidationData,
+		node_features: NodeFeatures,
+		claim_queue: BTreeMap<CoreIndex, VecDeque<ParaId>>,
+	) {
+		assert_matches!(
+			overseer_recv(virtual_overseer).await,
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(rp, RuntimeApiRequest::PersistedValidationData(id, a, tx))) => {
+				assert_eq!(rp, relay_parent);
+				assert_eq!(id, para_id);
+				assert_eq!(a, OccupiedCoreAssumption::TimedOut);
+
+				tx.send(Ok(Some(expected_pvd))).unwrap();
+			}
+		);
+
+		assert_matches!(
+			overseer_recv(virtual_overseer).await,
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+				rp,
+				RuntimeApiRequest::ClaimQueue(tx),
+			)) => {
+				assert_eq!(rp, relay_parent);
+				tx.send(Ok(claim_queue)).unwrap();
+			}
+		);
+
+		assert_matches!(
+			overseer_recv(virtual_overseer).await,
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(rp, RuntimeApiRequest::SessionIndexForChild(tx))) => {
+				assert_eq!(rp, relay_parent);
+				tx.send(Ok(1)).unwrap();
+			}
+		);
+
+		assert_matches!(
+			overseer_recv(virtual_overseer).await,
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(rp, RuntimeApiRequest::Validators(tx))) => {
+				assert_eq!(rp, relay_parent);
+				tx.send(Ok(vec![
+					Sr25519Keyring::Alice.public().into(),
+					Sr25519Keyring::Bob.public().into(),
+					Sr25519Keyring::Charlie.public().into(),
+				])).unwrap();
+			}
+		);
+
+		assert_matches!(
+			overseer_recv(virtual_overseer).await,
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+				rp,
+				RuntimeApiRequest::NodeFeatures(session_index, tx),
+			)) => {
+				assert_eq!(1, session_index);
+				assert_eq!(rp, relay_parent);
+
+				tx.send(Ok(node_features.clone())).unwrap();
+			}
+		);
+	}
 }
diff --git a/polkadot/primitives/Cargo.toml b/polkadot/primitives/Cargo.toml
index a8cd6cb5f4e..dd269caa2d6 100644
--- a/polkadot/primitives/Cargo.toml
+++ b/polkadot/primitives/Cargo.toml
@@ -16,6 +16,7 @@ codec = { features = ["bit-vec", "derive"], workspace = true }
 scale-info = { features = ["bit-vec", "derive", "serde"], workspace = true }
 log = { workspace = true }
 serde = { features = ["alloc", "derive"], workspace = true }
+thiserror = { workspace = true, optional = true }
 
 sp-application-crypto = { features = ["serde"], workspace = true }
 sp-inherents = { workspace = true }
@@ -59,6 +60,7 @@ std = [
 	"sp-runtime/std",
 	"sp-staking/std",
 	"sp-std/std",
+	"thiserror",
 ]
 runtime-benchmarks = [
 	"polkadot-parachain-primitives/runtime-benchmarks",
diff --git a/polkadot/primitives/src/vstaging/mod.rs b/polkadot/primitives/src/vstaging/mod.rs
index 21aab41902b..94b7b200e68 100644
--- a/polkadot/primitives/src/vstaging/mod.rs
+++ b/polkadot/primitives/src/vstaging/mod.rs
@@ -465,19 +465,32 @@ impl CandidateCommitments {
 
 /// CandidateReceipt construction errors.
 #[derive(PartialEq, Eq, Clone, Encode, Decode, TypeInfo, RuntimeDebug)]
+#[cfg_attr(feature = "std", derive(thiserror::Error))]
 pub enum CandidateReceiptError {
 	/// The specified core index is invalid.
+	#[cfg_attr(feature = "std", error("The specified core index is invalid"))]
 	InvalidCoreIndex,
 	/// The core index in commitments doesn't match the one in descriptor
+	#[cfg_attr(
+		feature = "std",
+		error("The core index in commitments doesn't match the one in descriptor")
+	)]
 	CoreIndexMismatch,
 	/// The core selector or claim queue offset is invalid.
+	#[cfg_attr(feature = "std", error("The core selector or claim queue offset is invalid"))]
 	InvalidSelectedCore,
 	/// The parachain is not assigned to any core at specified claim queue offset.
+	#[cfg_attr(
+		feature = "std",
+		error("The parachain is not assigned to any core at specified claim queue offset")
+	)]
 	NoAssignment,
 	/// No core was selected. The `SelectCore` commitment is mandatory for
 	/// v2 receipts if parachains has multiple cores assigned.
+	#[cfg_attr(feature = "std", error("Core selector not present"))]
 	NoCoreSelected,
 	/// Unknown version.
+	#[cfg_attr(feature = "std", error("Unknown internal version"))]
 	UnknownVersion(InternalVersion),
 }
 
diff --git a/prdoc/pr_5908.prdoc b/prdoc/pr_5908.prdoc
new file mode 100644
index 00000000000..8f05819451a
--- /dev/null
+++ b/prdoc/pr_5908.prdoc
@@ -0,0 +1,14 @@
+title: "collation-generation: use v2 receipts"
+
+doc:
+  - audience: Node Dev
+    description: |
+      Implementation of [RFC 103](https://github.com/polkadot-fellows/RFCs/pull/103) for the collation-generation subsystem.
+      Also removes the usage of AsyncBackingParams.
+
+crates:
+  - name: polkadot-node-collation-generation
+    bump: major
+    validate: false
+  - name: polkadot-primitives
+    bump: minor
-- 
GitLab