From e8e201f0ffc657d227ab869a778e37a876af2666 Mon Sep 17 00:00:00 2001
From: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
Date: Wed, 3 Apr 2024 11:34:50 +0300
Subject: [PATCH] statement-distribution: fix filtering of statements for
 elastic parachains (#3879)

fixes https://github.com/paritytech/polkadot-sdk/issues/3775

Additionally moves the claim queue fetch utilities into
`subsystem-util`.

TODO:
- [x] fix tests
- [x] add elastic scaling tests

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
---
 .../node/collation-generation/src/error.rs    |   2 +
 polkadot/node/collation-generation/src/lib.rs |  54 ++------
 .../node/collation-generation/src/tests.rs    |  15 +-
 .../statement-distribution/src/error.rs       |   3 +
 .../statement-distribution/src/v2/mod.rs      | 131 +++++++++++++-----
 .../src/v2/tests/cluster.rs                   |  60 ++++++++
 .../src/v2/tests/grid.rs                      |  23 ++-
 .../src/v2/tests/mod.rs                       |  29 +++-
 polkadot/node/subsystem-util/src/vstaging.rs  |  46 +++++-
 9 files changed, 265 insertions(+), 98 deletions(-)

diff --git a/polkadot/node/collation-generation/src/error.rs b/polkadot/node/collation-generation/src/error.rs
index 852c50f3068..f04e3c4f20b 100644
--- a/polkadot/node/collation-generation/src/error.rs
+++ b/polkadot/node/collation-generation/src/error.rs
@@ -27,6 +27,8 @@ pub enum Error {
 	#[error(transparent)]
 	Util(#[from] polkadot_node_subsystem_util::Error),
 	#[error(transparent)]
+	UtilRuntime(#[from] polkadot_node_subsystem_util::runtime::Error),
+	#[error(transparent)]
 	Erasure(#[from] polkadot_erasure_coding::Error),
 	#[error("Parachain backing state not available in runtime.")]
 	MissingParaBackingState,
diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs
index 3164f6078bc..fb82871bb15 100644
--- a/polkadot/node/collation-generation/src/lib.rs
+++ b/polkadot/node/collation-generation/src/lib.rs
@@ -38,25 +38,23 @@ use polkadot_node_primitives::{
 	SubmitCollationParams,
 };
 use polkadot_node_subsystem::{
-	messages::{CollationGenerationMessage, CollatorProtocolMessage, RuntimeApiRequest},
+	messages::{CollationGenerationMessage, CollatorProtocolMessage},
 	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
 	SubsystemContext, SubsystemError, SubsystemResult,
 };
 use polkadot_node_subsystem_util::{
-	has_required_runtime, request_async_backing_params, request_availability_cores,
-	request_claim_queue, request_para_backing_state, request_persisted_validation_data,
-	request_validation_code, request_validation_code_hash, request_validators,
+	request_async_backing_params, request_availability_cores, request_para_backing_state,
+	request_persisted_validation_data, request_validation_code, request_validation_code_hash,
+	request_validators,
+	vstaging::{fetch_claim_queue, fetch_next_scheduled_on_core},
 };
 use polkadot_primitives::{
 	collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt,
 	CollatorPair, CoreIndex, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption,
-	PersistedValidationData, ScheduledCore, ValidationCodeHash,
+	PersistedValidationData, ValidationCodeHash,
 };
 use sp_core::crypto::Pair;
-use std::{
-	collections::{BTreeMap, VecDeque},
-	sync::Arc,
-};
+use std::sync::Arc;
 
 mod error;
 
@@ -228,7 +226,9 @@ async fn handle_new_activations<Context>(
 		let availability_cores = availability_cores??;
 		let async_backing_params = async_backing_params?.ok();
 		let n_validators = validators??.len();
-		let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent).await?;
+		let maybe_claim_queue = fetch_claim_queue(ctx.sender(), relay_parent)
+			.await
+			.map_err(crate::error::Error::UtilRuntime)?;
 
 		// The loop bellow will fill in cores that the para is allowed to build on.
 		let mut cores_to_build_on = Vec::new();
@@ -655,37 +655,3 @@ fn erasure_root(
 	let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
 	Ok(polkadot_erasure_coding::branches(&chunks).root())
 }
-
-// Checks if the runtime supports `request_claim_queue` and executes it. Returns `Ok(None)`
-// otherwise. Any [`RuntimeApiError`]s are bubbled up to the caller.
-async fn fetch_claim_queue(
-	sender: &mut impl overseer::CollationGenerationSenderTrait,
-	relay_parent: Hash,
-) -> crate::error::Result<Option<BTreeMap<CoreIndex, VecDeque<ParaId>>>> {
-	if has_required_runtime(
-		sender,
-		relay_parent,
-		RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
-	)
-	.await
-	{
-		let res = request_claim_queue(relay_parent, sender).await.await??;
-		Ok(Some(res))
-	} else {
-		gum::trace!(target: LOG_TARGET, "Runtime doesn't support `request_claim_queue`");
-		Ok(None)
-	}
-}
-
-// Returns the next scheduled `ParaId` for a core in the claim queue, wrapped in `ScheduledCore`.
-// This function is supposed to be used in `handle_new_activations` hence the return type.
-fn fetch_next_scheduled_on_core(
-	claim_queue: &BTreeMap<CoreIndex, VecDeque<ParaId>>,
-	core_idx: CoreIndex,
-) -> Option<ScheduledCore> {
-	claim_queue
-		.get(&core_idx)?
-		.front()
-		.cloned()
-		.map(|para_id| ScheduledCore { para_id, collator: None })
-}
diff --git a/polkadot/node/collation-generation/src/tests.rs b/polkadot/node/collation-generation/src/tests.rs
index 923a21e86fb..781d27188df 100644
--- a/polkadot/node/collation-generation/src/tests.rs
+++ b/polkadot/node/collation-generation/src/tests.rs
@@ -28,7 +28,7 @@ use polkadot_node_subsystem::{
 	ActivatedLeaf,
 };
 use polkadot_node_subsystem_test_helpers::{subsystem_test_harness, TestSubsystemContextHandle};
-use polkadot_node_subsystem_util::TimeoutExt;
+use polkadot_node_subsystem_util::{vstaging::ClaimQueueSnapshot, TimeoutExt};
 use polkadot_primitives::{
 	async_backing::{BackingState, CandidatePendingAvailability},
 	AsyncBackingParams, BlockNumber, CollatorPair, HeadData, PersistedValidationData,
@@ -36,7 +36,10 @@ use polkadot_primitives::{
 };
 use rstest::rstest;
 use sp_keyring::sr25519::Keyring as Sr25519Keyring;
-use std::pin::Pin;
+use std::{
+	collections::{BTreeMap, VecDeque},
+	pin::Pin,
+};
 use test_helpers::{
 	dummy_candidate_descriptor, dummy_hash, dummy_head_data, dummy_validator, make_candidate,
 };
@@ -617,7 +620,7 @@ fn fallback_when_no_validation_code_hash_api(#[case] runtime_version: u32) {
 					_hash,
 					RuntimeApiRequest::ClaimQueue(tx),
 				))) if runtime_version >= RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT => {
-					let res = BTreeMap::<CoreIndex, VecDeque<ParaId>>::new();
+					let res = ClaimQueueSnapshot::new();
 					tx.send(Ok(res)).unwrap();
 				},
 				Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
@@ -780,7 +783,7 @@ fn distribute_collation_for_occupied_core_with_async_backing_enabled(#[case] run
 		candidate_hash: Default::default(),
 		candidate_descriptor: dummy_candidate_descriptor(dummy_hash()),
 	})];
-	let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);
+	let claim_queue = ClaimQueueSnapshot::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);
 
 	test_harness(|mut virtual_overseer| async move {
 		helpers::initialize_collator(&mut virtual_overseer, para_id).await;
@@ -962,7 +965,7 @@ fn no_collation_is_distributed_for_occupied_core_with_async_backing_disabled(
 		candidate_hash: Default::default(),
 		candidate_descriptor: dummy_candidate_descriptor(dummy_hash()),
 	})];
-	let claim_queue = BTreeMap::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);
+	let claim_queue = ClaimQueueSnapshot::from([(CoreIndex::from(0), VecDeque::from([para_id]))]);
 
 	test_harness(|mut virtual_overseer| async move {
 		helpers::initialize_collator(&mut virtual_overseer, para_id).await;
@@ -1050,7 +1053,7 @@ mod helpers {
 		async_backing_params: AsyncBackingParams,
 		cores: Vec<CoreState>,
 		runtime_version: u32,
-		claim_queue: BTreeMap<CoreIndex, VecDeque<ParaId>>,
+		claim_queue: ClaimQueueSnapshot,
 	) {
 		assert_matches!(
 			overseer_recv(virtual_overseer).await,
diff --git a/polkadot/node/network/statement-distribution/src/error.rs b/polkadot/node/network/statement-distribution/src/error.rs
index a712ab6da43..d7f52162fe2 100644
--- a/polkadot/node/network/statement-distribution/src/error.rs
+++ b/polkadot/node/network/statement-distribution/src/error.rs
@@ -81,6 +81,9 @@ pub enum Error {
 	#[error("Fetching validator groups failed {0:?}")]
 	FetchValidatorGroups(RuntimeApiError),
 
+	#[error("Fetching claim queue failed {0:?}")]
+	FetchClaimQueue(runtime::Error),
+
 	#[error("Attempted to share statement when not a validator or not assigned")]
 	InvalidShare,
 
diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs
index d782e37f10b..b9f6f705ed8 100644
--- a/polkadot/node/network/statement-distribution/src/v2/mod.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs
@@ -46,6 +46,7 @@ use polkadot_node_subsystem_util::{
 	backing_implicit_view::View as ImplicitView,
 	reputation::ReputationAggregator,
 	runtime::{request_min_backing_votes, ProspectiveParachainsMode},
+	vstaging::fetch_claim_queue,
 };
 use polkadot_primitives::{
 	AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, CoreState, GroupIndex,
@@ -149,10 +150,9 @@ pub(crate) const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(1);
 struct PerRelayParentState {
 	local_validator: Option<LocalValidatorState>,
 	statement_store: StatementStore,
-	availability_cores: Vec<CoreState>,
-	group_rotation_info: GroupRotationInfo,
 	seconding_limit: usize,
 	session: SessionIndex,
+	groups_per_para: HashMap<ParaId, Vec<GroupIndex>>,
 }
 
 impl PerRelayParentState {
@@ -563,11 +563,13 @@ pub(crate) async fn handle_active_leaves_update<Context>(
 	activated: &ActivatedLeaf,
 	leaf_mode: ProspectiveParachainsMode,
 ) -> JfyiErrorResult<()> {
-	let seconding_limit = match leaf_mode {
+	let max_candidate_depth = match leaf_mode {
 		ProspectiveParachainsMode::Disabled => return Ok(()),
-		ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth + 1,
+		ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } => max_candidate_depth,
 	};
 
+	let seconding_limit = max_candidate_depth + 1;
+
 	state
 		.implicit_view
 		.activate_leaf(ctx.sender(), activated.hash)
@@ -693,15 +695,23 @@ pub(crate) async fn handle_active_leaves_update<Context>(
 			}
 		});
 
+		let groups_per_para = determine_groups_per_para(
+			ctx.sender(),
+			new_relay_parent,
+			availability_cores,
+			group_rotation_info,
+			max_candidate_depth,
+		)
+		.await;
+
 		state.per_relay_parent.insert(
 			new_relay_parent,
 			PerRelayParentState {
 				local_validator,
 				statement_store: StatementStore::new(&per_session.groups),
-				availability_cores,
-				group_rotation_info,
 				seconding_limit,
 				session: session_index,
+				groups_per_para,
 			},
 		);
 	}
@@ -2126,17 +2136,64 @@ async fn provide_candidate_to_grid<Context>(
 	}
 }
 
-fn group_for_para(
-	availability_cores: &[CoreState],
-	group_rotation_info: &GroupRotationInfo,
-	para_id: ParaId,
-) -> Option<GroupIndex> {
-	// Note: this won't work well for on-demand parachains as it assumes that core assignments are
-	// fixed across blocks.
-	let core_index = availability_cores.iter().position(|c| c.para_id() == Some(para_id));
+// Utility function to populate per relay parent `ParaId` to `GroupIndex` mappings.
+async fn determine_groups_per_para(
+	sender: &mut impl overseer::StatementDistributionSenderTrait,
+	relay_parent: Hash,
+	availability_cores: Vec<CoreState>,
+	group_rotation_info: GroupRotationInfo,
+	max_candidate_depth: usize,
+) -> HashMap<ParaId, Vec<GroupIndex>> {
+	let maybe_claim_queue = fetch_claim_queue(sender, relay_parent)
+			.await
+			.unwrap_or_else(|err| {
+				gum::debug!(
+					target: LOG_TARGET,
+					?relay_parent,
+					?err,
+					"determine_groups_per_para: `claim_queue` API not available, falling back to iterating availability cores"
+				);
+				None
+			});
+
+	let n_cores = availability_cores.len();
+
+	// Determine the core indices occupied by each para at the current relay parent. To support
+	// on-demand parachains we also consider the core indices at next block if core has a candidate
+	// pending availability.
+	let para_core_indices: Vec<_> = if let Some(claim_queue) = maybe_claim_queue {
+		claim_queue
+			.into_iter()
+			.filter_map(|(core_index, paras)| Some((*paras.front()?, core_index)))
+			.collect()
+	} else {
+		availability_cores
+			.into_iter()
+			.enumerate()
+			.filter_map(|(index, core)| match core {
+				CoreState::Scheduled(scheduled_core) =>
+					Some((scheduled_core.para_id, CoreIndex(index as u32))),
+				CoreState::Occupied(occupied_core) =>
+					if max_candidate_depth >= 1 {
+						occupied_core
+							.next_up_on_available
+							.map(|scheduled_core| (scheduled_core.para_id, CoreIndex(index as u32)))
+					} else {
+						None
+					},
+				CoreState::Free => None,
+			})
+			.collect()
+	};
 
-	core_index
-		.map(|c| group_rotation_info.group_for_core(CoreIndex(c as _), availability_cores.len()))
+	let mut groups_per_para = HashMap::new();
+	// Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`.
+	for (para, core_index) in para_core_indices {
+		let group_index = group_rotation_info.group_for_core(core_index, n_cores);
+		groups_per_para.entry(para).or_insert_with(Vec::new).push(group_index)
+	}
+
+	groups_per_para
 }
 
 #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
@@ -2192,18 +2249,23 @@ async fn fragment_tree_update_inner<Context>(
 			let confirmed_candidate = state.candidates.get_confirmed(&candidate_hash);
 			let prs = state.per_relay_parent.get_mut(&receipt.descriptor().relay_parent);
 			if let (Some(confirmed), Some(prs)) = (confirmed_candidate, prs) {
-				let group_index = group_for_para(
-					&prs.availability_cores,
-					&prs.group_rotation_info,
-					receipt.descriptor().para_id,
-				);
-
 				let per_session = state.per_session.get(&prs.session);
-				if let (Some(per_session), Some(group_index)) = (per_session, group_index) {
+				let group_index = confirmed.group_index();
+
+				// Sanity check if group_index is valid for this para at relay parent.
+				let Some(expected_groups) = prs.groups_per_para.get(&receipt.descriptor().para_id)
+				else {
+					continue
+				};
+				if !expected_groups.iter().any(|g| *g == group_index) {
+					continue
+				}
+
+				if let Some(per_session) = per_session {
 					send_backing_fresh_statements(
 						ctx,
 						candidate_hash,
-						group_index,
+						confirmed.group_index(),
 						&receipt.descriptor().relay_parent,
 						prs,
 						confirmed,
@@ -2311,13 +2373,12 @@ async fn handle_incoming_manifest_common<'a, Context>(
 		Some(x) => x,
 	};
 
-	let expected_group = group_for_para(
-		&relay_parent_state.availability_cores,
-		&relay_parent_state.group_rotation_info,
-		para_id,
-	);
+	let Some(expected_groups) = relay_parent_state.groups_per_para.get(&para_id) else {
+		modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
+		return None
+	};
 
-	if expected_group != Some(manifest_summary.claimed_group_index) {
+	if !expected_groups.iter().any(|g| g == &manifest_summary.claimed_group_index) {
 		modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
 		return None
 	}
@@ -3037,13 +3098,11 @@ pub(crate) async fn handle_response<Context>(
 			relay_parent_state.session,
 			|v| per_session.session_info.validators.get(v).map(|x| x.clone()),
 			|para, g_index| {
-				let expected_group = group_for_para(
-					&relay_parent_state.availability_cores,
-					&relay_parent_state.group_rotation_info,
-					para,
-				);
+				let Some(expected_groups) = relay_parent_state.groups_per_para.get(&para) else {
+					return false
+				};
 
-				Some(g_index) == expected_group
+				expected_groups.iter().any(|g| g == &g_index)
 			},
 			disabled_mask,
 		);
diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs b/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs
index a944a9cd6d0..4fb033e08ce 100644
--- a/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/tests/cluster.rs
@@ -312,6 +312,66 @@ fn useful_cluster_statement_from_non_cluster_peer_rejected() {
 	});
 }
 
+// Both validators in the test are part of backing groups assigned to same parachain
+#[test]
+fn elastic_scaling_useful_cluster_statement_from_non_cluster_peer_rejected() {
+	let config = TestConfig {
+		validator_count: 20,
+		group_size: 3,
+		local_validator: LocalRole::Validator,
+		async_backing_params: None,
+	};
+
+	let relay_parent = Hash::repeat_byte(1);
+	let peer_a = PeerId::random();
+
+	test_harness(config, |state, mut overseer| async move {
+		let candidate_hash = CandidateHash(Hash::repeat_byte(42));
+
+		let test_leaf = state.make_dummy_leaf_with_multiple_cores_per_para(relay_parent, 3);
+
+		// Peer A is not in our group, but its group is assigned to same para as we are.
+		let not_our_group = GroupIndex(1);
+
+		let that_group_validators = state.group_validators(not_our_group, false);
+		let v_non = that_group_validators[0];
+
+		connect_peer(
+			&mut overseer,
+			peer_a.clone(),
+			Some(vec![state.discovery_id(v_non)].into_iter().collect()),
+		)
+		.await;
+
+		send_peer_view_change(&mut overseer, peer_a.clone(), view![relay_parent]).await;
+		activate_leaf(&mut overseer, &test_leaf, &state, true, vec![]).await;
+
+		let statement = state
+			.sign_statement(
+				v_non,
+				CompactStatement::Seconded(candidate_hash),
+				&SigningContext { parent_hash: relay_parent, session_index: 1 },
+			)
+			.as_unchecked()
+			.clone();
+
+		send_peer_message(
+			&mut overseer,
+			peer_a.clone(),
+			protocol_v2::StatementDistributionMessage::Statement(relay_parent, statement),
+		)
+		.await;
+
+		assert_matches!(
+			overseer.recv().await,
+			AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Single(p, r)))
+				if p == peer_a && r == COST_UNEXPECTED_STATEMENT_INVALID_SENDER.into() => { }
+		);
+
+		overseer
+	});
+}
+
 #[test]
 fn statement_from_non_cluster_originator_unexpected() {
 	let config = TestConfig {
diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs b/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs
index 38a12cf32e3..9d00a92e742 100644
--- a/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/tests/grid.rs
@@ -1829,9 +1829,7 @@ fn advertisement_not_re_sent_when_peer_re_enters_view() {
 	});
 }
 
-// Grid statements imported to backing once candidate enters hypothetical frontier.
-#[test]
-fn grid_statements_imported_to_backing() {
+fn inner_grid_statements_imported_to_backing(groups_for_first_para: usize) {
 	let validator_count = 6;
 	let group_size = 3;
 	let config = TestConfig {
@@ -1851,9 +1849,12 @@ fn grid_statements_imported_to_backing() {
 		let local_group_index = local_validator.group_index.unwrap();
 
 		let other_group = next_group_index(local_group_index, validator_count, group_size);
-		let other_para = ParaId::from(other_group.0);
 
-		let test_leaf = state.make_dummy_leaf(relay_parent);
+		// Other para is same para for elastic scaling test (groups_for_first_para > 1)
+		let other_para = ParaId::from((groups_for_first_para == 1) as u32);
+
+		let test_leaf =
+			state.make_dummy_leaf_with_multiple_cores_per_para(relay_parent, groups_for_first_para);
 
 		let (candidate, pvd) = make_candidate(
 			relay_parent,
@@ -2018,6 +2019,18 @@ fn grid_statements_imported_to_backing() {
 		overseer
 	});
 }
+// Grid statements imported to backing once candidate enters hypothetical frontier.
+#[test]
+fn grid_statements_imported_to_backing() {
+	inner_grid_statements_imported_to_backing(1);
+}
+
+// Grid statements imported to backing once candidate enters hypothetical frontier.
+// All statements are for candidates of the same parachain but from different backing groups.
+#[test]
+fn elastic_scaling_grid_statements_imported_to_backing() {
+	inner_grid_statements_imported_to_backing(2);
+}
 
 #[test]
 fn advertisements_rejected_from_incorrect_peers() {
diff --git a/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs b/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs
index 82986a0330e..e98b1107931 100644
--- a/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/tests/mod.rs
@@ -177,20 +177,39 @@ impl TestState {
 	}
 
 	fn make_dummy_leaf(&self, relay_parent: Hash) -> TestLeaf {
+		self.make_dummy_leaf_with_multiple_cores_per_para(relay_parent, 1)
+	}
+
+	fn make_dummy_leaf_with_multiple_cores_per_para(
+		&self,
+		relay_parent: Hash,
+		groups_for_first_para: usize,
+	) -> TestLeaf {
 		TestLeaf {
 			number: 1,
 			hash: relay_parent,
 			parent_hash: Hash::repeat_byte(0),
 			session: 1,
 			availability_cores: self.make_availability_cores(|i| {
-				CoreState::Scheduled(ScheduledCore {
-					para_id: ParaId::from(i as u32),
-					collator: None,
-				})
+				let para_id = if i < groups_for_first_para {
+					ParaId::from(0u32)
+				} else {
+					ParaId::from(i as u32)
+				};
+
+				CoreState::Scheduled(ScheduledCore { para_id, collator: None })
 			}),
 			disabled_validators: Default::default(),
 			para_data: (0..self.session_info.validator_groups.len())
-				.map(|i| (ParaId::from(i as u32), PerParaData::new(1, vec![1, 2, 3].into())))
+				.map(|i| {
+					let para_id = if i < groups_for_first_para {
+						ParaId::from(0u32)
+					} else {
+						ParaId::from(i as u32)
+					};
+
+					(para_id, PerParaData::new(1, vec![1, 2, 3].into()))
+				})
 				.collect(),
 			minimum_backing_votes: 2,
 		}
diff --git a/polkadot/node/subsystem-util/src/vstaging.rs b/polkadot/node/subsystem-util/src/vstaging.rs
index 3e807eff538..25ea7ce7c9b 100644
--- a/polkadot/node/subsystem-util/src/vstaging.rs
+++ b/polkadot/node/subsystem-util/src/vstaging.rs
@@ -19,14 +19,19 @@
 //! This module is intended to contain common boiler plate code handling unreleased runtime API
 //! calls.
 
+use std::collections::{BTreeMap, VecDeque};
+
 use polkadot_node_subsystem_types::messages::{RuntimeApiMessage, RuntimeApiRequest};
 use polkadot_overseer::SubsystemSender;
-use polkadot_primitives::{Hash, ValidatorIndex};
+use polkadot_primitives::{CoreIndex, Hash, Id as ParaId, ScheduledCore, ValidatorIndex};
 
-use crate::{has_required_runtime, request_disabled_validators, runtime};
+use crate::{has_required_runtime, request_claim_queue, request_disabled_validators, runtime};
 
 const LOG_TARGET: &'static str = "parachain::subsystem-util-vstaging";
 
+/// A snapshot of the runtime claim queue at an arbitrary relay chain block.
+pub type ClaimQueueSnapshot = BTreeMap<CoreIndex, VecDeque<ParaId>>;
+
 // TODO: https://github.com/paritytech/polkadot-sdk/issues/1940
 /// Returns disabled validators list if the runtime supports it. Otherwise logs a debug messages and
 /// returns an empty vec.
@@ -54,3 +59,40 @@ pub async fn get_disabled_validators_with_fallback<Sender: SubsystemSender<Runti
 
 	Ok(disabled_validators)
 }
+
+/// Checks if the runtime supports `request_claim_queue` and attempts to fetch the claim queue.
+/// Returns `ClaimQueueSnapshot` or `None` if claim queue API is not supported by runtime.
+/// Any specific `RuntimeApiError` is bubbled up to the caller.
+pub async fn fetch_claim_queue(
+	sender: &mut impl SubsystemSender<RuntimeApiMessage>,
+	relay_parent: Hash,
+) -> Result<Option<ClaimQueueSnapshot>, runtime::Error> {
+	if has_required_runtime(
+		sender,
+		relay_parent,
+		RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT,
+	)
+	.await
+	{
+		let res = request_claim_queue(relay_parent, sender)
+			.await
+			.await
+			.map_err(runtime::Error::RuntimeRequestCanceled)??;
+		Ok(Some(res))
+	} else {
+		gum::trace!(target: LOG_TARGET, "Runtime doesn't support `request_claim_queue`");
+		Ok(None)
+	}
+}
+
+/// Returns the next scheduled `ParaId` for a core in the claim queue, wrapped in `ScheduledCore`.
+pub fn fetch_next_scheduled_on_core(
+	claim_queue: &ClaimQueueSnapshot,
+	core_idx: CoreIndex,
+) -> Option<ScheduledCore> {
+	claim_queue
+		.get(&core_idx)?
+		.front()
+		.cloned()
+		.map(|para_id| ScheduledCore { para_id, collator: None })
+}
-- 
GitLab