From 2700dbf2dda8b7f593447c939e1a26dacdb8ce45 Mon Sep 17 00:00:00 2001
From: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
Date: Thu, 31 Oct 2024 18:22:12 +0200
Subject: [PATCH] `candidate-validation`: RFC103 implementation (#5847)

Part of https://github.com/paritytech/polkadot-sdk/issues/5047
On top of https://github.com/paritytech/polkadot-sdk/pull/5679

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
Co-authored-by: GitHub Action <action@github.com>
---
 Cargo.lock                                    |   1 +
 .../node/core/candidate-validation/Cargo.toml |   2 +
 .../node/core/candidate-validation/src/lib.rs | 166 +++++--
 .../core/candidate-validation/src/tests.rs    | 407 +++++++++++++++++-
 polkadot/node/primitives/src/lib.rs           |   4 +
 polkadot/primitives/src/vstaging/mod.rs       |  12 +
 prdoc/pr_5847.prdoc                           |  19 +
 7 files changed, 571 insertions(+), 40 deletions(-)
 create mode 100644 prdoc/pr_5847.prdoc

diff --git a/Cargo.lock b/Cargo.lock
index a6c2bc1fd31..1f171ad756c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -14485,6 +14485,7 @@ dependencies = [
  "polkadot-parachain-primitives",
  "polkadot-primitives",
  "polkadot-primitives-test-helpers",
+ "rstest",
  "sp-application-crypto 30.0.0",
  "sp-core 28.0.0",
  "sp-keyring",
diff --git a/polkadot/node/core/candidate-validation/Cargo.toml b/polkadot/node/core/candidate-validation/Cargo.toml
index fcacc38cae6..87855dbce41 100644
--- a/polkadot/node/core/candidate-validation/Cargo.toml
+++ b/polkadot/node/core/candidate-validation/Cargo.toml
@@ -38,3 +38,5 @@ polkadot-node-subsystem-test-helpers = { workspace = true }
 sp-maybe-compressed-blob = { workspace = true, default-features = true }
 sp-core = { workspace = true, default-features = true }
 polkadot-primitives-test-helpers = { workspace = true }
+rstest = { workspace = true }
+polkadot-primitives = { workspace = true, features = ["test"] }
diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs
index a48669c2482..1e732e2f1f0 100644
--- a/polkadot/node/core/candidate-validation/src/lib.rs
+++ b/polkadot/node/core/candidate-validation/src/lib.rs
@@ -37,7 +37,7 @@ use polkadot_node_subsystem::{
 	overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
 	SubsystemSender,
 };
-use polkadot_node_subsystem_util as util;
+use polkadot_node_subsystem_util::{self as util, runtime::ClaimQueueSnapshot};
 use polkadot_overseer::ActiveLeavesUpdate;
 use polkadot_parachain_primitives::primitives::ValidationResult as WasmValidationResult;
 use polkadot_primitives::{
@@ -46,8 +46,9 @@ use polkadot_primitives::{
 		DEFAULT_LENIENT_PREPARATION_TIMEOUT, DEFAULT_PRECHECK_PREPARATION_TIMEOUT,
 	},
 	vstaging::{
-		CandidateDescriptorV2 as CandidateDescriptor, CandidateEvent,
+		transpose_claim_queue, CandidateDescriptorV2 as CandidateDescriptor, CandidateEvent,
 		CandidateReceiptV2 as CandidateReceipt,
+		CommittedCandidateReceiptV2 as CommittedCandidateReceipt,
 	},
 	AuthorityDiscoveryId, CandidateCommitments, ExecutorParams, Hash, PersistedValidationData,
 	PvfExecKind as RuntimePvfExecKind, PvfPrepKind, SessionIndex, ValidationCode,
@@ -148,6 +149,25 @@ impl<Context> CandidateValidationSubsystem {
 	}
 }
 
+// Returns the claim queue at relay parent and logs a warning if it is not available.
+async fn claim_queue<Sender>(relay_parent: Hash, sender: &mut Sender) -> Option<ClaimQueueSnapshot>
+where
+	Sender: SubsystemSender<RuntimeApiMessage>,
+{
+	match util::runtime::fetch_claim_queue(sender, relay_parent).await {
+		Ok(maybe_cq) => maybe_cq,
+		Err(err) => {
+			gum::warn!(
+				target: LOG_TARGET,
+				?relay_parent,
+				?err,
+				"Claim queue not available"
+			);
+			None
+		},
+	}
+}
+
 fn handle_validation_message<S>(
 	mut sender: S,
 	validation_host: ValidationHost,
@@ -167,24 +187,40 @@ where
 			exec_kind,
 			response_sender,
 			..
-		} => async move {
-			let _timer = metrics.time_validate_from_exhaustive();
-			let res = validate_candidate_exhaustive(
-				validation_host,
-				validation_data,
-				validation_code,
-				candidate_receipt,
-				pov,
-				executor_params,
-				exec_kind,
-				&metrics,
-			)
-			.await;
+		} =>
+			async move {
+				let _timer = metrics.time_validate_from_exhaustive();
+				let relay_parent = candidate_receipt.descriptor.relay_parent();
+
+				let maybe_claim_queue = claim_queue(relay_parent, &mut sender).await;
+
+				let maybe_expected_session_index =
+					match util::request_session_index_for_child(relay_parent, &mut sender)
+						.await
+						.await
+					{
+						Ok(Ok(expected_session_index)) => Some(expected_session_index),
+						_ => None,
+					};
+
+				let res = validate_candidate_exhaustive(
+					maybe_expected_session_index,
+					validation_host,
+					validation_data,
+					validation_code,
+					candidate_receipt,
+					pov,
+					executor_params,
+					exec_kind,
+					&metrics,
+					maybe_claim_queue,
+				)
+				.await;
 
-			metrics.on_validation_event(&res);
-			let _ = response_sender.send(res);
-		}
-		.boxed(),
+				metrics.on_validation_event(&res);
+				let _ = response_sender.send(res);
+			}
+			.boxed(),
 		CandidateValidationMessage::PreCheck {
 			relay_parent,
 			validation_code_hash,
@@ -637,6 +673,7 @@ where
 }
 
 async fn validate_candidate_exhaustive(
+	maybe_expected_session_index: Option<SessionIndex>,
 	mut validation_backend: impl ValidationBackend + Send,
 	persisted_validation_data: PersistedValidationData,
 	validation_code: ValidationCode,
@@ -645,11 +682,13 @@ async fn validate_candidate_exhaustive(
 	executor_params: ExecutorParams,
 	exec_kind: PvfExecKind,
 	metrics: &Metrics,
+	maybe_claim_queue: Option<ClaimQueueSnapshot>,
 ) -> Result<ValidationResult, ValidationFailed> {
 	let _timer = metrics.time_validate_candidate_exhaustive();
-
 	let validation_code_hash = validation_code.hash();
+	let relay_parent = candidate_receipt.descriptor.relay_parent();
 	let para_id = candidate_receipt.descriptor.para_id();
+
 	gum::debug!(
 		target: LOG_TARGET,
 		?validation_code_hash,
@@ -657,6 +696,27 @@ async fn validate_candidate_exhaustive(
 		"About to validate a candidate.",
 	);
 
+	// We only check the session index for backing.
+	match (exec_kind, candidate_receipt.descriptor.session_index()) {
+		(PvfExecKind::Backing | PvfExecKind::BackingSystemParas, Some(session_index)) => {
+			let Some(expected_session_index) = maybe_expected_session_index else {
+				let error = "cannot fetch session index from the runtime";
+				gum::warn!(
+					target: LOG_TARGET,
+					?relay_parent,
+					error,
+				);
+
+				return Err(ValidationFailed(error.into()))
+			};
+
+			if session_index != expected_session_index {
+				return Ok(ValidationResult::Invalid(InvalidCandidate::InvalidSessionIndex))
+			}
+		},
+		(_, _) => {},
+	};
+
 	if let Err(e) = perform_basic_checks(
 		&candidate_receipt.descriptor,
 		persisted_validation_data.max_pov_size,
@@ -754,15 +814,21 @@ async fn validate_candidate_exhaustive(
 				gum::info!(target: LOG_TARGET, ?para_id, "Invalid candidate (para_head)");
 				Ok(ValidationResult::Invalid(InvalidCandidate::ParaHeadHashMismatch))
 			} else {
-				let outputs = CandidateCommitments {
-					head_data: res.head_data,
-					upward_messages: res.upward_messages,
-					horizontal_messages: res.horizontal_messages,
-					new_validation_code: res.new_validation_code,
-					processed_downward_messages: res.processed_downward_messages,
-					hrmp_watermark: res.hrmp_watermark,
+				let committed_candidate_receipt = CommittedCandidateReceipt {
+					descriptor: candidate_receipt.descriptor.clone(),
+					commitments: CandidateCommitments {
+						head_data: res.head_data,
+						upward_messages: res.upward_messages,
+						horizontal_messages: res.horizontal_messages,
+						new_validation_code: res.new_validation_code,
+						processed_downward_messages: res.processed_downward_messages,
+						hrmp_watermark: res.hrmp_watermark,
+					},
 				};
-				if candidate_receipt.commitments_hash != outputs.hash() {
+
+				if candidate_receipt.commitments_hash !=
+					committed_candidate_receipt.commitments.hash()
+				{
 					gum::info!(
 						target: LOG_TARGET,
 						?para_id,
@@ -773,7 +839,48 @@ async fn validate_candidate_exhaustive(
 					// invalid.
 					Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))
 				} else {
-					Ok(ValidationResult::Valid(outputs, (*persisted_validation_data).clone()))
+					let core_index = candidate_receipt.descriptor.core_index();
+
+					match (core_index, exec_kind) {
+						// Core selectors are optional for V2 descriptors, but we still check the
+						// descriptor core index.
+						(
+							Some(_core_index),
+							PvfExecKind::Backing | PvfExecKind::BackingSystemParas,
+						) => {
+							let Some(claim_queue) = maybe_claim_queue else {
+								let error = "cannot fetch the claim queue from the runtime";
+								gum::warn!(
+									target: LOG_TARGET,
+									?relay_parent,
+									error
+								);
+
+								return Err(ValidationFailed(error.into()))
+							};
+
+							if let Err(err) = committed_candidate_receipt
+								.check_core_index(&transpose_claim_queue(claim_queue.0))
+							{
+								gum::warn!(
+									target: LOG_TARGET,
+									?err,
+									candidate_hash = ?candidate_receipt.hash(),
+									"Candidate core index is invalid",
+								);
+								return Ok(ValidationResult::Invalid(
+									InvalidCandidate::InvalidCoreIndex,
+								))
+							}
+						},
+						// No checks for approvals and disputes
+						(_, _) => {},
+					}
+
+					Ok(ValidationResult::Valid(
+						committed_candidate_receipt.commitments,
+						(*persisted_validation_data).clone(),
+					))
 				}
 			},
 	}
@@ -1003,6 +1110,7 @@ fn perform_basic_checks(
 		return Err(InvalidCandidate::CodeHashMismatch)
 	}
 
+	// No-op for `v2` receipts.
 	if let Err(()) = candidate.check_collator_signature() {
 		return Err(InvalidCandidate::BadSignature)
 	}
diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs
index 997a347631a..391247858ed 100644
--- a/polkadot/node/core/candidate-validation/src/tests.rs
+++ b/polkadot/node/core/candidate-validation/src/tests.rs
@@ -14,7 +14,10 @@
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::{
+	collections::BTreeMap,
+	sync::atomic::{AtomicUsize, Ordering},
+};
 
 use super::*;
 use crate::PvfExecKind;
@@ -26,12 +29,18 @@ use polkadot_node_subsystem::messages::AllMessages;
 use polkadot_node_subsystem_util::reexports::SubsystemContext;
 use polkadot_overseer::ActivatedLeaf;
 use polkadot_primitives::{
-	vstaging::CandidateDescriptorV2, CandidateDescriptor, CoreIndex, GroupIndex, HeadData,
-	Id as ParaId, OccupiedCoreAssumption, SessionInfo, UpwardMessage, ValidatorId,
+	vstaging::{
+		CandidateDescriptorV2, ClaimQueueOffset, CoreSelector, MutateDescriptorV2, UMPSignal,
+		UMP_SEPARATOR,
+	},
+	CandidateDescriptor, CoreIndex, GroupIndex, HeadData, Id as ParaId, OccupiedCoreAssumption,
+	SessionInfo, UpwardMessage, ValidatorId,
 };
 use polkadot_primitives_test_helpers::{
 	dummy_collator, dummy_collator_signature, dummy_hash, make_valid_candidate_descriptor,
+	make_valid_candidate_descriptor_v2,
 };
+use rstest::rstest;
 use sp_core::{sr25519::Public, testing::TaskExecutor};
 use sp_keyring::Sr25519Keyring;
 use sp_keystore::{testing::MemoryKeystore, Keystore};
@@ -467,25 +476,24 @@ impl ValidationBackend for MockValidateCandidateBackend {
 }
 
 #[test]
-fn candidate_validation_ok_is_ok() {
+fn session_index_checked_only_in_backing() {
 	let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };
 
 	let pov = PoV { block_data: BlockData(vec![1; 32]) };
 	let head_data = HeadData(vec![1, 1, 1]);
 	let validation_code = ValidationCode(vec![2; 16]);
 
-	let descriptor = make_valid_candidate_descriptor(
+	let descriptor = make_valid_candidate_descriptor_v2(
 		ParaId::from(1_u32),
 		dummy_hash(),
-		validation_data.hash(),
+		CoreIndex(0),
+		100,
+		dummy_hash(),
 		pov.hash(),
 		validation_code.hash(),
 		head_data.hash(),
 		dummy_hash(),
-		Sr25519Keyring::Alice,
-	)
-	.into();
-
+	);
 	let check = perform_basic_checks(
 		&descriptor,
 		validation_data.max_pov_size,
@@ -514,15 +522,59 @@ fn candidate_validation_ok_is_ok() {
 
 	let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() };
 
+	// The session index is invalid
+	let v = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
+		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result.clone())),
+		validation_data.clone(),
+		validation_code.clone(),
+		candidate_receipt.clone(),
+		Arc::new(pov.clone()),
+		ExecutorParams::default(),
+		PvfExecKind::Backing,
+		&Default::default(),
+		Default::default(),
+	))
+	.unwrap();
+
+	assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::InvalidSessionIndex));
+
+	// Approval doesn't fail since the check is ommited.
 	let v = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
+		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result.clone())),
+		validation_data.clone(),
+		validation_code.clone(),
+		candidate_receipt.clone(),
+		Arc::new(pov.clone()),
+		ExecutorParams::default(),
+		PvfExecKind::Approval,
+		&Default::default(),
+		Default::default(),
+	))
+	.unwrap();
+
+	assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => {
+		assert_eq!(outputs.head_data, HeadData(vec![1, 1, 1]));
+		assert_eq!(outputs.upward_messages, Vec::<UpwardMessage>::new());
+		assert_eq!(outputs.horizontal_messages, Vec::new());
+		assert_eq!(outputs.new_validation_code, Some(vec![2, 2, 2].into()));
+		assert_eq!(outputs.hrmp_watermark, 0);
+		assert_eq!(used_validation_data, validation_data);
+	});
+
+	// Approval doesn't fail since the check is ommited.
+	let v = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
 		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)),
 		validation_data.clone(),
 		validation_code,
 		candidate_receipt,
 		Arc::new(pov),
 		ExecutorParams::default(),
-		PvfExecKind::Backing,
+		PvfExecKind::Dispute,
 		&Default::default(),
+		Default::default(),
 	))
 	.unwrap();
 
@@ -536,6 +588,323 @@ fn candidate_validation_ok_is_ok() {
 	});
 }
 
+#[rstest]
+#[case(true)]
+#[case(false)]
+fn candidate_validation_ok_is_ok(#[case] v2_descriptor: bool) {
+	let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };
+
+	let pov = PoV { block_data: BlockData(vec![1; 32]) };
+	let head_data = HeadData(vec![1, 1, 1]);
+	let validation_code = ValidationCode(vec![2; 16]);
+
+	let descriptor = if v2_descriptor {
+		make_valid_candidate_descriptor_v2(
+			ParaId::from(1_u32),
+			dummy_hash(),
+			CoreIndex(1),
+			1,
+			dummy_hash(),
+			pov.hash(),
+			validation_code.hash(),
+			head_data.hash(),
+			dummy_hash(),
+		)
+	} else {
+		make_valid_candidate_descriptor(
+			ParaId::from(1_u32),
+			dummy_hash(),
+			validation_data.hash(),
+			pov.hash(),
+			validation_code.hash(),
+			head_data.hash(),
+			dummy_hash(),
+			Sr25519Keyring::Alice,
+		)
+		.into()
+	};
+
+	let check = perform_basic_checks(
+		&descriptor,
+		validation_data.max_pov_size,
+		&pov,
+		&validation_code.hash(),
+	);
+	assert!(check.is_ok());
+
+	let mut validation_result = WasmValidationResult {
+		head_data,
+		new_validation_code: Some(vec![2, 2, 2].into()),
+		upward_messages: Default::default(),
+		horizontal_messages: Default::default(),
+		processed_downward_messages: 0,
+		hrmp_watermark: 0,
+	};
+
+	if v2_descriptor {
+		validation_result.upward_messages.force_push(UMP_SEPARATOR);
+		validation_result
+			.upward_messages
+			.force_push(UMPSignal::SelectCore(CoreSelector(0), ClaimQueueOffset(1)).encode());
+	}
+
+	let commitments = CandidateCommitments {
+		head_data: validation_result.head_data.clone(),
+		upward_messages: validation_result.upward_messages.clone(),
+		horizontal_messages: validation_result.horizontal_messages.clone(),
+		new_validation_code: validation_result.new_validation_code.clone(),
+		processed_downward_messages: validation_result.processed_downward_messages,
+		hrmp_watermark: validation_result.hrmp_watermark,
+	};
+
+	let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() };
+	let mut cq = BTreeMap::new();
+	let _ = cq.insert(CoreIndex(0), vec![1.into(), 2.into()].into());
+	let _ = cq.insert(CoreIndex(1), vec![1.into(), 1.into()].into());
+
+	let v = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
+		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)),
+		validation_data.clone(),
+		validation_code,
+		candidate_receipt,
+		Arc::new(pov),
+		ExecutorParams::default(),
+		PvfExecKind::Backing,
+		&Default::default(),
+		Some(ClaimQueueSnapshot(cq)),
+	))
+	.unwrap();
+
+	assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => {
+		assert_eq!(outputs.head_data, HeadData(vec![1, 1, 1]));
+		assert_eq!(outputs.upward_messages, commitments.upward_messages);
+		assert_eq!(outputs.horizontal_messages, Vec::new());
+		assert_eq!(outputs.new_validation_code, Some(vec![2, 2, 2].into()));
+		assert_eq!(outputs.hrmp_watermark, 0);
+		assert_eq!(used_validation_data, validation_data);
+	});
+}
+
+#[test]
+fn invalid_session_or_core_index() {
+	let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };
+
+	let pov = PoV { block_data: BlockData(vec![1; 32]) };
+	let head_data = HeadData(vec![1, 1, 1]);
+	let validation_code = ValidationCode(vec![2; 16]);
+
+	let descriptor = make_valid_candidate_descriptor_v2(
+		ParaId::from(1_u32),
+		dummy_hash(),
+		CoreIndex(1),
+		100,
+		dummy_hash(),
+		pov.hash(),
+		validation_code.hash(),
+		head_data.hash(),
+		dummy_hash(),
+	);
+
+	let check = perform_basic_checks(
+		&descriptor,
+		validation_data.max_pov_size,
+		&pov,
+		&validation_code.hash(),
+	);
+	assert!(check.is_ok());
+
+	let mut validation_result = WasmValidationResult {
+		head_data,
+		new_validation_code: Some(vec![2, 2, 2].into()),
+		upward_messages: Default::default(),
+		horizontal_messages: Default::default(),
+		processed_downward_messages: 0,
+		hrmp_watermark: 0,
+	};
+
+	validation_result.upward_messages.force_push(UMP_SEPARATOR);
+	validation_result
+		.upward_messages
+		.force_push(UMPSignal::SelectCore(CoreSelector(1), ClaimQueueOffset(0)).encode());
+
+	let commitments = CandidateCommitments {
+		head_data: validation_result.head_data.clone(),
+		upward_messages: validation_result.upward_messages.clone(),
+		horizontal_messages: validation_result.horizontal_messages.clone(),
+		new_validation_code: validation_result.new_validation_code.clone(),
+		processed_downward_messages: validation_result.processed_downward_messages,
+		hrmp_watermark: validation_result.hrmp_watermark,
+	};
+
+	let mut candidate_receipt =
+		CandidateReceipt { descriptor, commitments_hash: commitments.hash() };
+
+	let err = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
+		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result.clone())),
+		validation_data.clone(),
+		validation_code.clone(),
+		candidate_receipt.clone(),
+		Arc::new(pov.clone()),
+		ExecutorParams::default(),
+		PvfExecKind::Backing,
+		&Default::default(),
+		Default::default(),
+	))
+	.unwrap();
+
+	assert_matches!(err, ValidationResult::Invalid(InvalidCandidate::InvalidSessionIndex));
+
+	let err = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
+		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result.clone())),
+		validation_data.clone(),
+		validation_code.clone(),
+		candidate_receipt.clone(),
+		Arc::new(pov.clone()),
+		ExecutorParams::default(),
+		PvfExecKind::BackingSystemParas,
+		&Default::default(),
+		Default::default(),
+	))
+	.unwrap();
+
+	assert_matches!(err, ValidationResult::Invalid(InvalidCandidate::InvalidSessionIndex));
+
+	candidate_receipt.descriptor.set_session_index(1);
+
+	let result = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
+		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result.clone())),
+		validation_data.clone(),
+		validation_code.clone(),
+		candidate_receipt.clone(),
+		Arc::new(pov.clone()),
+		ExecutorParams::default(),
+		PvfExecKind::Backing,
+		&Default::default(),
+		Some(Default::default()),
+	))
+	.unwrap();
+	assert_matches!(result, ValidationResult::Invalid(InvalidCandidate::InvalidCoreIndex));
+
+	let result = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
+		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result.clone())),
+		validation_data.clone(),
+		validation_code.clone(),
+		candidate_receipt.clone(),
+		Arc::new(pov.clone()),
+		ExecutorParams::default(),
+		PvfExecKind::BackingSystemParas,
+		&Default::default(),
+		Some(Default::default()),
+	))
+	.unwrap();
+	assert_matches!(result, ValidationResult::Invalid(InvalidCandidate::InvalidCoreIndex));
+
+	let v = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
+		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result.clone())),
+		validation_data.clone(),
+		validation_code.clone(),
+		candidate_receipt.clone(),
+		Arc::new(pov.clone()),
+		ExecutorParams::default(),
+		PvfExecKind::Approval,
+		&Default::default(),
+		Default::default(),
+	))
+	.unwrap();
+
+	// Validation doesn't fail for approvals, core/session index is not checked.
+	assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => {
+		assert_eq!(outputs.head_data, HeadData(vec![1, 1, 1]));
+		assert_eq!(outputs.upward_messages, commitments.upward_messages);
+		assert_eq!(outputs.horizontal_messages, Vec::new());
+		assert_eq!(outputs.new_validation_code, Some(vec![2, 2, 2].into()));
+		assert_eq!(outputs.hrmp_watermark, 0);
+		assert_eq!(used_validation_data, validation_data);
+	});
+
+	// Dispute check passes because we don't check core or session index
+	let v = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
+		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result.clone())),
+		validation_data.clone(),
+		validation_code.clone(),
+		candidate_receipt.clone(),
+		Arc::new(pov.clone()),
+		ExecutorParams::default(),
+		PvfExecKind::Dispute,
+		&Default::default(),
+		Default::default(),
+	))
+	.unwrap();
+
+	// Validation doesn't fail for approvals, core/session index is not checked.
+	assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => {
+		assert_eq!(outputs.head_data, HeadData(vec![1, 1, 1]));
+		assert_eq!(outputs.upward_messages, commitments.upward_messages);
+		assert_eq!(outputs.horizontal_messages, Vec::new());
+		assert_eq!(outputs.new_validation_code, Some(vec![2, 2, 2].into()));
+		assert_eq!(outputs.hrmp_watermark, 0);
+		assert_eq!(used_validation_data, validation_data);
+	});
+
+	// Populate claim queue.
+	let mut cq = BTreeMap::new();
+	let _ = cq.insert(CoreIndex(0), vec![1.into(), 2.into()].into());
+	let _ = cq.insert(CoreIndex(1), vec![1.into(), 2.into()].into());
+
+	let v = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
+		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result.clone())),
+		validation_data.clone(),
+		validation_code.clone(),
+		candidate_receipt.clone(),
+		Arc::new(pov.clone()),
+		ExecutorParams::default(),
+		PvfExecKind::Backing,
+		&Default::default(),
+		Some(ClaimQueueSnapshot(cq.clone())),
+	))
+	.unwrap();
+
+	assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => {
+		assert_eq!(outputs.head_data, HeadData(vec![1, 1, 1]));
+		assert_eq!(outputs.upward_messages, commitments.upward_messages);
+		assert_eq!(outputs.horizontal_messages, Vec::new());
+		assert_eq!(outputs.new_validation_code, Some(vec![2, 2, 2].into()));
+		assert_eq!(outputs.hrmp_watermark, 0);
+		assert_eq!(used_validation_data, validation_data);
+	});
+
+	let v = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
+		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result.clone())),
+		validation_data.clone(),
+		validation_code.clone(),
+		candidate_receipt.clone(),
+		Arc::new(pov.clone()),
+		ExecutorParams::default(),
+		PvfExecKind::BackingSystemParas,
+		&Default::default(),
+		Some(ClaimQueueSnapshot(cq)),
+	))
+	.unwrap();
+
+	assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => {
+		assert_eq!(outputs.head_data, HeadData(vec![1, 1, 1]));
+		assert_eq!(outputs.upward_messages, commitments.upward_messages);
+		assert_eq!(outputs.horizontal_messages, Vec::new());
+		assert_eq!(outputs.new_validation_code, Some(vec![2, 2, 2].into()));
+		assert_eq!(outputs.hrmp_watermark, 0);
+		assert_eq!(used_validation_data, validation_data);
+	});
+}
+
 #[test]
 fn candidate_validation_bad_return_is_invalid() {
 	let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };
@@ -566,6 +935,7 @@ fn candidate_validation_bad_return_is_invalid() {
 	let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };
 
 	let v = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
 		MockValidateCandidateBackend::with_hardcoded_result(Err(ValidationError::Invalid(
 			WasmInvalidCandidate::HardTimeout,
 		))),
@@ -576,6 +946,7 @@ fn candidate_validation_bad_return_is_invalid() {
 		ExecutorParams::default(),
 		PvfExecKind::Backing,
 		&Default::default(),
+		Default::default(),
 	))
 	.unwrap();
 
@@ -647,6 +1018,7 @@ fn candidate_validation_one_ambiguous_error_is_valid() {
 	let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() };
 
 	let v = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
 		MockValidateCandidateBackend::with_hardcoded_result_list(vec![
 			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)),
 			Ok(validation_result),
@@ -658,6 +1030,7 @@ fn candidate_validation_one_ambiguous_error_is_valid() {
 		ExecutorParams::default(),
 		PvfExecKind::Approval,
 		&Default::default(),
+		Default::default(),
 	))
 	.unwrap();
 
@@ -688,6 +1061,7 @@ fn candidate_validation_multiple_ambiguous_errors_is_invalid() {
 	let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };
 
 	let v = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
 		MockValidateCandidateBackend::with_hardcoded_result_list(vec![
 			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)),
 			Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)),
@@ -699,6 +1073,7 @@ fn candidate_validation_multiple_ambiguous_errors_is_invalid() {
 		ExecutorParams::default(),
 		PvfExecKind::Approval,
 		&Default::default(),
+		Default::default(),
 	))
 	.unwrap();
 
@@ -806,6 +1181,7 @@ fn candidate_validation_retry_on_error_helper(
 	let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };
 
 	return executor::block_on(validate_candidate_exhaustive(
+		Some(1),
 		MockValidateCandidateBackend::with_hardcoded_result_list(mock_errors),
 		validation_data,
 		validation_code,
@@ -814,6 +1190,7 @@ fn candidate_validation_retry_on_error_helper(
 		ExecutorParams::default(),
 		exec_kind,
 		&Default::default(),
+		Default::default(),
 	))
 }
 
@@ -847,6 +1224,7 @@ fn candidate_validation_timeout_is_internal_error() {
 	let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };
 
 	let v = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
 		MockValidateCandidateBackend::with_hardcoded_result(Err(ValidationError::Invalid(
 			WasmInvalidCandidate::HardTimeout,
 		))),
@@ -857,6 +1235,7 @@ fn candidate_validation_timeout_is_internal_error() {
 		ExecutorParams::default(),
 		PvfExecKind::Backing,
 		&Default::default(),
+		Default::default(),
 	));
 
 	assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)));
@@ -896,6 +1275,7 @@ fn candidate_validation_commitment_hash_mismatch_is_invalid() {
 	};
 
 	let result = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
 		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)),
 		validation_data,
 		validation_code,
@@ -904,6 +1284,7 @@ fn candidate_validation_commitment_hash_mismatch_is_invalid() {
 		ExecutorParams::default(),
 		PvfExecKind::Backing,
 		&Default::default(),
+		Default::default(),
 	))
 	.unwrap();
 
@@ -947,6 +1328,7 @@ fn candidate_validation_code_mismatch_is_invalid() {
 	>(pool.clone());
 
 	let v = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
 		MockValidateCandidateBackend::with_hardcoded_result(Err(ValidationError::Invalid(
 			WasmInvalidCandidate::HardTimeout,
 		))),
@@ -957,6 +1339,7 @@ fn candidate_validation_code_mismatch_is_invalid() {
 		ExecutorParams::default(),
 		PvfExecKind::Backing,
 		&Default::default(),
+		Default::default(),
 	))
 	.unwrap();
 
@@ -1007,6 +1390,7 @@ fn compressed_code_works() {
 	let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() };
 
 	let v = executor::block_on(validate_candidate_exhaustive(
+		Some(1),
 		MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)),
 		validation_data,
 		validation_code,
@@ -1015,6 +1399,7 @@ fn compressed_code_works() {
 		ExecutorParams::default(),
 		PvfExecKind::Backing,
 		&Default::default(),
+		Default::default(),
 	));
 
 	assert_matches!(v, Ok(ValidationResult::Valid(_, _)));
diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs
index e2e7aa92b11..6985e86098b 100644
--- a/polkadot/node/primitives/src/lib.rs
+++ b/polkadot/node/primitives/src/lib.rs
@@ -348,6 +348,10 @@ pub enum InvalidCandidate {
 	CodeHashMismatch,
 	/// Validation has generated different candidate commitments.
 	CommitmentsHashMismatch,
+	/// The candidate receipt contains an invalid session index.
+	InvalidSessionIndex,
+	/// The candidate receipt contains an invalid core index.
+	InvalidCoreIndex,
 }
 
 /// Result of the validation of the candidate.
diff --git a/polkadot/primitives/src/vstaging/mod.rs b/polkadot/primitives/src/vstaging/mod.rs
index 265fcd899d7..21aab41902b 100644
--- a/polkadot/primitives/src/vstaging/mod.rs
+++ b/polkadot/primitives/src/vstaging/mod.rs
@@ -208,6 +208,10 @@ pub trait MutateDescriptorV2<H> {
 	fn set_erasure_root(&mut self, erasure_root: Hash);
 	/// Set the para head of the descriptor.
 	fn set_para_head(&mut self, para_head: Hash);
+	/// Set the core index of the descriptor.
+	fn set_core_index(&mut self, core_index: CoreIndex);
+	/// Set the session index of the descriptor.
+	fn set_session_index(&mut self, session_index: SessionIndex);
 }
 
 #[cfg(feature = "test")]
@@ -228,6 +232,14 @@ impl<H> MutateDescriptorV2<H> for CandidateDescriptorV2<H> {
 		self.version = version;
 	}
 
+	fn set_core_index(&mut self, core_index: CoreIndex) {
+		self.core_index = core_index.0 as u16;
+	}
+
+	fn set_session_index(&mut self, session_index: SessionIndex) {
+		self.session_index = session_index;
+	}
+
 	fn set_persisted_validation_data_hash(&mut self, persisted_validation_data_hash: Hash) {
 		self.persisted_validation_data_hash = persisted_validation_data_hash;
 	}
diff --git a/prdoc/pr_5847.prdoc b/prdoc/pr_5847.prdoc
new file mode 100644
index 00000000000..fdbf6423da6
--- /dev/null
+++ b/prdoc/pr_5847.prdoc
@@ -0,0 +1,19 @@
+title: '`candidate-validation`: RFC103 implementation'
+doc:
+- audience: Node Dev
+  description: |
+    Introduces support for new v2 descriptor `core_index` and `session_index` fields.
+    The subsystem will check the values of the new fields only during backing validations.
+crates:
+- name: polkadot-node-primitives
+  bump: major
+- name: polkadot-primitives
+  bump: major
+- name: cumulus-relay-chain-inprocess-interface
+  bump: minor
+- name: cumulus-relay-chain-interface
+  bump: minor
+- name: cumulus-client-consensus-aura
+  bump: minor
+- name: polkadot-node-core-candidate-validation
+  bump: major
-- 
GitLab