From 59f868d1e9502cb4e434127cac6e439d01d7dd2b Mon Sep 17 00:00:00 2001
From: Tsvetomir Dimitrov <tsvetomir@parity.io>
Date: Mon, 8 Apr 2024 08:58:12 +0300
Subject: [PATCH] Deprecate `para_id()` from `CoreState` in polkadot primitives
 (#3979)

With Coretime enabled we can no longer assume there is a static 1:1
mapping between core index and para id. This mapping should be obtained
from the scheduler/claimqueue on block by block basis.

This PR modifies `para_id()` (from `CoreState`) to return the scheduled
`ParaId` for occupied cores and removes its usages in the code.

Closes https://github.com/paritytech/polkadot-sdk/issues/3948

---------

Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
---
 .../consensus/aura/src/collators/lookahead.rs | 53 +++++++++++++------
 .../core/prospective-parachains/src/tests.rs  | 10 +++-
 polkadot/node/core/provisioner/src/tests.rs   |  6 ++-
 .../statement-distribution/src/v2/mod.rs      | 53 +++++++++++--------
 polkadot/primitives/src/v7/mod.rs             | 10 +++-
 prdoc/pr_3979.prdoc                           | 19 +++++++
 6 files changed, 108 insertions(+), 43 deletions(-)
 create mode 100644 prdoc/pr_3979.prdoc

diff --git a/cumulus/client/consensus/aura/src/collators/lookahead.rs b/cumulus/client/consensus/aura/src/collators/lookahead.rs
index 58005833617..2b774128c1f 100644
--- a/cumulus/client/consensus/aura/src/collators/lookahead.rs
+++ b/cumulus/client/consensus/aura/src/collators/lookahead.rs
@@ -49,7 +49,9 @@ use polkadot_node_subsystem::messages::{
 	CollationGenerationMessage, RuntimeApiMessage, RuntimeApiRequest,
 };
 use polkadot_overseer::Handle as OverseerHandle;
-use polkadot_primitives::{CollatorPair, CoreIndex, Id as ParaId, OccupiedCoreAssumption};
+use polkadot_primitives::{
+	AsyncBackingParams, CollatorPair, CoreIndex, CoreState, Id as ParaId, OccupiedCoreAssumption,
+};
 
 use futures::{channel::oneshot, prelude::*};
 use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
@@ -186,10 +188,14 @@ where
 
 			// TODO: Currently we use just the first core here, but for elastic scaling
 			// we iterate and build on all of the cores returned.
-			let core_index = if let Some(core_index) =
-				cores_scheduled_for_para(relay_parent, params.para_id, &mut params.overseer_handle)
-					.await
-					.get(0)
+			let core_index = if let Some(core_index) = cores_scheduled_for_para(
+				relay_parent,
+				params.para_id,
+				&mut params.overseer_handle,
+				&mut params.relay_client,
+			)
+			.await
+			.get(0)
 			{
 				*core_index
 			} else {
@@ -223,7 +229,10 @@ where
 			let parent_search_params = ParentSearchParams {
 				relay_parent,
 				para_id: params.para_id,
-				ancestry_lookback: max_ancestry_lookback(relay_parent, &params.relay_client).await,
+				ancestry_lookback: async_backing_params(relay_parent, &params.relay_client)
+					.await
+					.map(|c| c.allowed_ancestry_len as usize)
+					.unwrap_or(0),
 				max_depth: PARENT_SEARCH_DEPTH,
 				ignore_alternative_branches: true,
 			};
@@ -461,21 +470,19 @@ where
 	Some(SlotClaim::unchecked::<P>(author_pub, slot, timestamp))
 }
 
-/// Reads allowed ancestry length parameter from the relay chain storage at the given relay parent.
-///
-/// Falls back to 0 in case of an error.
-async fn max_ancestry_lookback(
+/// Reads async backing parameters from the relay chain storage at the given relay parent.
+async fn async_backing_params(
 	relay_parent: PHash,
 	relay_client: &impl RelayChainInterface,
-) -> usize {
+) -> Option<AsyncBackingParams> {
 	match load_abridged_host_configuration(relay_parent, relay_client).await {
-		Ok(Some(config)) => config.async_backing_params.allowed_ancestry_len as usize,
+		Ok(Some(config)) => Some(config.async_backing_params),
 		Ok(None) => {
 			tracing::error!(
 				target: crate::LOG_TARGET,
 				"Active config is missing in relay chain storage",
 			);
-			0
+			None
 		},
 		Err(err) => {
 			tracing::error!(
@@ -484,7 +491,7 @@ async fn max_ancestry_lookback(
 				?relay_parent,
 				"Failed to read active config from relay chain client",
 			);
-			0
+			None
 		},
 	}
 }
@@ -494,7 +501,9 @@ async fn cores_scheduled_for_para(
 	relay_parent: PHash,
 	para_id: ParaId,
 	overseer_handle: &mut OverseerHandle,
+	relay_client: &impl RelayChainInterface,
 ) -> Vec<CoreIndex> {
+	// Get `AvailabilityCores` from runtime
 	let (tx, rx) = oneshot::channel();
 	let request = RuntimeApiRequest::AvailabilityCores(tx);
 	overseer_handle
@@ -522,11 +531,25 @@ async fn cores_scheduled_for_para(
 		},
 	};
 
+	let max_candidate_depth = async_backing_params(relay_parent, relay_client)
+		.await
+		.map(|c| c.max_candidate_depth)
+		.unwrap_or(0);
+
 	cores
 		.iter()
 		.enumerate()
 		.filter_map(|(index, core)| {
-			if core.para_id() == Some(para_id) {
+			let core_para_id = match core {
+				CoreState::Scheduled(scheduled_core) => Some(scheduled_core.para_id),
+				CoreState::Occupied(occupied_core) if max_candidate_depth >= 1 => occupied_core
+					.next_up_on_available
+					.as_ref()
+					.map(|scheduled_core| scheduled_core.para_id),
+				CoreState::Free | CoreState::Occupied(_) => None,
+			};
+
+			if core_para_id == Some(para_id) {
 				Some(CoreIndex(index as u32))
 			} else {
 				None
diff --git a/polkadot/node/core/prospective-parachains/src/tests.rs b/polkadot/node/core/prospective-parachains/src/tests.rs
index 0e0079c02bb..8989911a332 100644
--- a/polkadot/node/core/prospective-parachains/src/tests.rs
+++ b/polkadot/node/core/prospective-parachains/src/tests.rs
@@ -1797,7 +1797,10 @@ fn persists_pending_availability_candidate() {
 	test_state.availability_cores = test_state
 		.availability_cores
 		.into_iter()
-		.filter(|core| core.para_id().map_or(false, |id| id == para_id))
+		.filter(|core| match core {
+			CoreState::Scheduled(scheduled_core) => scheduled_core.para_id == para_id,
+			_ => false,
+		})
 		.collect();
 	assert_eq!(test_state.availability_cores.len(), 1);
 
@@ -1896,7 +1899,10 @@ fn backwards_compatible() {
 	test_state.availability_cores = test_state
 		.availability_cores
 		.into_iter()
-		.filter(|core| core.para_id().map_or(false, |id| id == para_id))
+		.filter(|core| match core {
+			CoreState::Scheduled(scheduled_core) => scheduled_core.para_id == para_id,
+			_ => false,
+		})
 		.collect();
 	assert_eq!(test_state.availability_cores.len(), 1);
 
diff --git a/polkadot/node/core/provisioner/src/tests.rs b/polkadot/node/core/provisioner/src/tests.rs
index 823b1d86e46..d463b7f1663 100644
--- a/polkadot/node/core/provisioner/src/tests.rs
+++ b/polkadot/node/core/provisioner/src/tests.rs
@@ -918,7 +918,11 @@ mod select_candidates {
 		let committed_receipts: Vec<_> = (0..mock_cores.len())
 			.map(|i| {
 				let mut descriptor = dummy_candidate_descriptor(dummy_hash());
-				descriptor.para_id = mock_cores[i].para_id().unwrap();
+				descriptor.para_id = if let Scheduled(scheduled_core) = &mock_cores[i] {
+					scheduled_core.para_id
+				} else {
+					panic!("`mock_cores` is not initialized with `Scheduled`?")
+				};
 				descriptor.persisted_validation_data_hash = empty_hash;
 				descriptor.pov_hash = Hash::from_low_u64_be(i as u64);
 				CommittedCandidateReceipt {
diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs
index f5a8ec4a269..68caa5f0e70 100644
--- a/polkadot/node/network/statement-distribution/src/v2/mod.rs
+++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs
@@ -46,7 +46,7 @@ use polkadot_node_subsystem_util::{
 	backing_implicit_view::View as ImplicitView,
 	reputation::ReputationAggregator,
 	runtime::{request_min_backing_votes, ProspectiveParachainsMode},
-	vstaging::fetch_claim_queue,
+	vstaging::{fetch_claim_queue, ClaimQueueSnapshot},
 };
 use polkadot_primitives::{
 	AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, CoreState, GroupIndex,
@@ -681,6 +681,13 @@ pub(crate) async fn handle_active_leaves_update<Context>(
 				.map_err(JfyiError::FetchValidatorGroups)?
 				.1;
 
+		let maybe_claim_queue = fetch_claim_queue(ctx.sender(), new_relay_parent)
+		.await
+		.unwrap_or_else(|err| {
+				gum::debug!(target: LOG_TARGET, ?new_relay_parent, ?err, "handle_active_leaves_update: `claim_queue` API not available");
+				None
+			});
+
 		let local_validator = per_session.local_validator.and_then(|v| {
 			if let LocalValidatorIndex::Active(idx) = v {
 				find_active_validator_state(
@@ -688,7 +695,9 @@ pub(crate) async fn handle_active_leaves_update<Context>(
 					&per_session.groups,
 					&availability_cores,
 					&group_rotation_info,
+					&maybe_claim_queue,
 					seconding_limit,
+					max_candidate_depth,
 				)
 			} else {
 				Some(LocalValidatorState { grid_tracker: GridTracker::default(), active: None })
@@ -696,10 +705,9 @@ pub(crate) async fn handle_active_leaves_update<Context>(
 		});
 
 		let groups_per_para = determine_groups_per_para(
-			ctx.sender(),
-			new_relay_parent,
 			availability_cores,
 			group_rotation_info,
+			&maybe_claim_queue,
 			max_candidate_depth,
 		)
 		.await;
@@ -752,7 +760,9 @@ fn find_active_validator_state(
 	groups: &Groups,
 	availability_cores: &[CoreState],
 	group_rotation_info: &GroupRotationInfo,
+	maybe_claim_queue: &Option<ClaimQueueSnapshot>,
 	seconding_limit: usize,
+	max_candidate_depth: usize,
 ) -> Option<LocalValidatorState> {
 	if groups.all().is_empty() {
 		return None
@@ -760,18 +770,28 @@ fn find_active_validator_state(
 
 	let our_group = groups.by_validator_index(validator_index)?;
 
-	// note: this won't work well for on-demand parachains because it only works
-	// when core assignments to paras are static throughout the session.
-
-	let core = group_rotation_info.core_for_group(our_group, availability_cores.len());
-	let para = availability_cores.get(core.0 as usize).and_then(|c| c.para_id());
+	let core_index = group_rotation_info.core_for_group(our_group, availability_cores.len());
+	let para_assigned_to_core = if let Some(claim_queue) = maybe_claim_queue {
+		claim_queue.get_claim_for(core_index, 0)
+	} else {
+		availability_cores
+			.get(core_index.0 as usize)
+			.and_then(|core_state| match core_state {
+				CoreState::Scheduled(scheduled_core) => Some(scheduled_core.para_id),
+				CoreState::Occupied(occupied_core) if max_candidate_depth >= 1 => occupied_core
+					.next_up_on_available
+					.as_ref()
+					.map(|scheduled_core| scheduled_core.para_id),
+				CoreState::Free | CoreState::Occupied(_) => None,
+			})
+	};
 	let group_validators = groups.get(our_group)?.to_owned();
 
 	Some(LocalValidatorState {
 		active: Some(ActiveValidatorState {
 			index: validator_index,
 			group: our_group,
-			assignment: para,
+			assignment: para_assigned_to_core,
 			cluster_tracker: ClusterTracker::new(group_validators, seconding_limit)
 				.expect("group is non-empty because we are in it; qed"),
 		}),
@@ -2138,24 +2158,11 @@ async fn provide_candidate_to_grid<Context>(
 
 // Utility function to populate per relay parent `ParaId` to `GroupIndex` mappings.
 async fn determine_groups_per_para(
-	sender: &mut impl overseer::StatementDistributionSenderTrait,
-	relay_parent: Hash,
 	availability_cores: Vec<CoreState>,
 	group_rotation_info: GroupRotationInfo,
+	maybe_claim_queue: &Option<ClaimQueueSnapshot>,
 	max_candidate_depth: usize,
 ) -> HashMap<ParaId, Vec<GroupIndex>> {
-	let maybe_claim_queue = fetch_claim_queue(sender, relay_parent)
-			.await
-			.unwrap_or_else(|err| {
-				gum::debug!(
-					target: LOG_TARGET,
-					?relay_parent,
-					?err,
-					"determine_groups_per_para: `claim_queue` API not available, falling back to iterating availability cores"
-				);
-				None
-			});
-
 	let n_cores = availability_cores.len();
 
 	// Determine the core indices occupied by each para at the current relay parent. To support
diff --git a/polkadot/primitives/src/v7/mod.rs b/polkadot/primitives/src/v7/mod.rs
index d4f4a633577..5647bfe68d5 100644
--- a/polkadot/primitives/src/v7/mod.rs
+++ b/polkadot/primitives/src/v7/mod.rs
@@ -1086,10 +1086,16 @@ pub enum CoreState<H = Hash, N = BlockNumber> {
 }
 
 impl<N> CoreState<N> {
-	/// If this core state has a `para_id`, return it.
+	/// Returns the scheduled `ParaId` for the core or `None` if nothing is scheduled.
+	///
+	/// This function is deprecated. `ClaimQueue` should be used to obtain the scheduled `ParaId`s
+	/// for each core.
+	#[deprecated(
+		note = "`para_id` will be removed. Use `ClaimQueue` to query the scheduled `para_id` instead."
+	)]
 	pub fn para_id(&self) -> Option<Id> {
 		match self {
-			Self::Occupied(ref core) => Some(core.para_id()),
+			Self::Occupied(ref core) => core.next_up_on_available.as_ref().map(|n| n.para_id),
 			Self::Scheduled(core) => Some(core.para_id),
 			Self::Free => None,
 		}
diff --git a/prdoc/pr_3979.prdoc b/prdoc/pr_3979.prdoc
new file mode 100644
index 00000000000..b092ae697ba
--- /dev/null
+++ b/prdoc/pr_3979.prdoc
@@ -0,0 +1,19 @@
+# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
+# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
+
+title: Deprecate `para_id()` from `CoreState` in polkadot primitives
+
+doc:
+  - audience: "Node Dev"
+    description: |
+     `CoreState`'s `para_id()` function is getting deprecated in favour of direct usage of the
+     `ClaimQueue`. This is the preferred approach because it provides a better view on what is
+     scheduled on each core.
+
+crates:
+  - name: polkadot-primitives
+    bump: minor
+  - name: polkadot-statement-distribution
+    bump: minor
+  - name: cumulus-client-consensus-aura
+    bump: minor
-- 
GitLab