From 7df1ae3b8111d534cce108b2b405b6a33fcdedc3 Mon Sep 17 00:00:00 2001
From: Alin Dima <alin@parity.io>
Date: Tue, 6 Feb 2024 10:21:28 +0200
Subject: [PATCH] prospective-parachains: respond with multiple backable
 candidates (#3160)

Fixes https://github.com/paritytech/polkadot-sdk/issues/3129
---
 .../src/fragment_tree.rs                      | 163 ++++-
 .../core/prospective-parachains/src/lib.rs    |  74 +-
 .../core/prospective-parachains/src/tests.rs  | 638 +++++++++++++++++-
 polkadot/node/core/provisioner/src/lib.rs     |   7 +-
 polkadot/node/core/provisioner/src/tests.rs   |  19 +-
 polkadot/node/subsystem-types/src/messages.rs |   8 +-
 .../node/backing/prospective-parachains.md    |   7 +-
 .../src/node/utility/provisioner.md           |   8 +-
 prdoc/pr_3160.prdoc                           |  12 +
 9 files changed, 844 insertions(+), 92 deletions(-)
 create mode 100644 prdoc/pr_3160.prdoc

diff --git a/polkadot/node/core/prospective-parachains/src/fragment_tree.rs b/polkadot/node/core/prospective-parachains/src/fragment_tree.rs
index f7b97f07827..04ee42a9de0 100644
--- a/polkadot/node/core/prospective-parachains/src/fragment_tree.rs
+++ b/polkadot/node/core/prospective-parachains/src/fragment_tree.rs
@@ -756,53 +756,127 @@ impl FragmentTree {
 		depths.iter_ones().collect()
 	}
 
-	/// Select a candidate after the given `required_path` which passes
-	/// the predicate.
+	/// Select `count` candidates after the given `required_path` which pass
+	/// the predicate and have not already been backed on chain.
 	///
-	/// If there are multiple possibilities, this will select the first one.
-	///
-	/// This returns `None` if there is no candidate meeting those criteria.
+	/// Does an exhaustive search into the tree starting after `required_path`.
+	/// If there are multiple possibilities of size `count`, this will select the first one.
+	/// If there is no chain of size `count` that matches the criteria, this will return the largest
+	/// chain it could find with the criteria.
+	/// If there are no candidates meeting those criteria, returns an empty `Vec`.
+	/// Cycles are accepted, see module docs for the `Cycles` section.
 	///
 	/// The intention of the `required_path` is to allow queries on the basis of
 	/// one or more candidates which were previously pending availability becoming
 	/// available and opening up more room on the core.
-	pub(crate) fn select_child(
+	pub(crate) fn select_children(
 		&self,
 		required_path: &[CandidateHash],
+		count: u32,
 		pred: impl Fn(&CandidateHash) -> bool,
-	) -> Option<CandidateHash> {
+	) -> Vec<CandidateHash> {
 		let base_node = {
 			// traverse the required path.
 			let mut node = NodePointer::Root;
 			for required_step in required_path {
-				node = self.node_candidate_child(node, &required_step)?;
+				if let Some(next_node) = self.node_candidate_child(node, &required_step) {
+					node = next_node;
+				} else {
+					return vec![]
+				};
 			}
 
 			node
 		};
 
-		// TODO [now]: taking the first selection might introduce bias
+		// TODO: taking the first best selection might introduce bias
 		// or become gameable.
 		//
 		// For plausibly unique parachains, this shouldn't matter much.
 		// figure out alternative selection criteria?
-		match base_node {
+		self.select_children_inner(base_node, count, count, &pred, &mut vec![])
+	}
+
+	// Try finding a candidate chain starting from `base_node` of length `expected_count`.
+	// If not possible, return the longest one we could find.
+	// Does a depth-first search, since we're optimistic that there won't be more than one such
+	// chains (parachains shouldn't usually have forks). So in the usual case, this will conclude
+	// in `O(expected_count)`.
+	// Cycles are accepted, but this doesn't allow for infinite execution time, because the maximum
+	// depth we'll reach is `expected_count`.
+	//
+	// Worst case performance is `O(num_forks ^ expected_count)`.
+	// Although an exponential function, this is actually a constant that can only be altered via
+	// sudo/governance, because:
+	// 1. `num_forks` at a given level is at most `max_candidate_depth * max_validators_per_core`
+	//    (because each validator in the assigned group can second `max_candidate_depth`
+	//    candidates). The prospective-parachains subsystem assumes that the number of para forks is
+	//    limited by collator-protocol and backing subsystems. In practice, this is a constant which
+	//    can only be altered by sudo or governance.
+	// 2. `expected_count` is equal to the number of cores a para is scheduled on (in an elastic
+	//    scaling scenario). For non-elastic-scaling, this is just 1. In practice, this should be a
+	//    small number (1-3), capped by the total number of available cores (a constant alterable
+	//    only via governance/sudo).
+	fn select_children_inner(
+		&self,
+		base_node: NodePointer,
+		expected_count: u32,
+		remaining_count: u32,
+		pred: &dyn Fn(&CandidateHash) -> bool,
+		accumulator: &mut Vec<CandidateHash>,
+	) -> Vec<CandidateHash> {
+		if remaining_count == 0 {
+			// The best option is the chain we've accumulated so far.
+			return accumulator.to_vec();
+		}
+
+		let children: Vec<_> = match base_node {
 			NodePointer::Root => self
 				.nodes
 				.iter()
-				.take_while(|n| n.parent == NodePointer::Root)
-				.filter(|n| self.scope.get_pending_availability(&n.candidate_hash).is_none())
-				.filter(|n| pred(&n.candidate_hash))
-				.map(|n| n.candidate_hash)
-				.next(),
-			NodePointer::Storage(ptr) => self.nodes[ptr]
-				.children
-				.iter()
-				.filter(|n| self.scope.get_pending_availability(&n.1).is_none())
-				.filter(|n| pred(&n.1))
-				.map(|n| n.1)
-				.next(),
+				.enumerate()
+				.take_while(|(_, n)| n.parent == NodePointer::Root)
+				.filter(|(_, n)| self.scope.get_pending_availability(&n.candidate_hash).is_none())
+				.filter(|(_, n)| pred(&n.candidate_hash))
+				.map(|(ptr, n)| (NodePointer::Storage(ptr), n.candidate_hash))
+				.collect(),
+			NodePointer::Storage(base_node_ptr) => {
+				let base_node = &self.nodes[base_node_ptr];
+
+				base_node
+					.children
+					.iter()
+					.filter(|(_, hash)| self.scope.get_pending_availability(&hash).is_none())
+					.filter(|(_, hash)| pred(&hash))
+					.map(|(ptr, hash)| (*ptr, *hash))
+					.collect()
+			},
+		};
+
+		let mut best_result = accumulator.clone();
+		for (child_ptr, child_hash) in children {
+			accumulator.push(child_hash);
+
+			let result = self.select_children_inner(
+				child_ptr,
+				expected_count,
+				remaining_count - 1,
+				&pred,
+				accumulator,
+			);
+
+			accumulator.pop();
+
+			// Short-circuit the search if we've found the right length. Otherwise, we'll
+			// search for a max.
+			if result.len() == expected_count as usize {
+				return result
+			} else if best_result.len() < result.len() {
+				best_result = result;
+			}
 		}
+
+		best_result
 	}
 
 	fn populate_from_bases(&mut self, storage: &CandidateStorage, initial_bases: Vec<NodePointer>) {
@@ -987,6 +1061,7 @@ mod tests {
 	use polkadot_node_subsystem_util::inclusion_emulator::InboundHrmpLimitations;
 	use polkadot_primitives::{BlockNumber, CandidateCommitments, CandidateDescriptor, HeadData};
 	use polkadot_primitives_test_helpers as test_helpers;
+	use std::iter;
 
 	fn make_constraints(
 		min_relay_parent_number: BlockNumber,
@@ -1524,6 +1599,21 @@ mod tests {
 		assert_eq!(tree.nodes[2].candidate_hash, candidate_a_hash);
 		assert_eq!(tree.nodes[3].candidate_hash, candidate_a_hash);
 		assert_eq!(tree.nodes[4].candidate_hash, candidate_a_hash);
+
+		for count in 1..10 {
+			assert_eq!(
+				tree.select_children(&[], count, |_| true),
+				iter::repeat(candidate_a_hash)
+					.take(std::cmp::min(count as usize, max_depth + 1))
+					.collect::<Vec<_>>()
+			);
+			assert_eq!(
+				tree.select_children(&[candidate_a_hash], count - 1, |_| true),
+				iter::repeat(candidate_a_hash)
+					.take(std::cmp::min(count as usize - 1, max_depth))
+					.collect::<Vec<_>>()
+			);
+		}
 	}
 
 	#[test]
@@ -1591,6 +1681,35 @@ mod tests {
 		assert_eq!(tree.nodes[2].candidate_hash, candidate_a_hash);
 		assert_eq!(tree.nodes[3].candidate_hash, candidate_b_hash);
 		assert_eq!(tree.nodes[4].candidate_hash, candidate_a_hash);
+
+		assert_eq!(tree.select_children(&[], 1, |_| true), vec![candidate_a_hash],);
+		assert_eq!(
+			tree.select_children(&[], 2, |_| true),
+			vec![candidate_a_hash, candidate_b_hash],
+		);
+		assert_eq!(
+			tree.select_children(&[], 3, |_| true),
+			vec![candidate_a_hash, candidate_b_hash, candidate_a_hash],
+		);
+		assert_eq!(
+			tree.select_children(&[candidate_a_hash], 2, |_| true),
+			vec![candidate_b_hash, candidate_a_hash],
+		);
+
+		assert_eq!(
+			tree.select_children(&[], 6, |_| true),
+			vec![
+				candidate_a_hash,
+				candidate_b_hash,
+				candidate_a_hash,
+				candidate_b_hash,
+				candidate_a_hash
+			],
+		);
+		assert_eq!(
+			tree.select_children(&[candidate_a_hash, candidate_b_hash], 6, |_| true),
+			vec![candidate_a_hash, candidate_b_hash, candidate_a_hash,],
+		);
 	}
 
 	#[test]
diff --git a/polkadot/node/core/prospective-parachains/src/lib.rs b/polkadot/node/core/prospective-parachains/src/lib.rs
index aba3d092a41..6e6915b9272 100644
--- a/polkadot/node/core/prospective-parachains/src/lib.rs
+++ b/polkadot/node/core/prospective-parachains/src/lib.rs
@@ -146,12 +146,20 @@ async fn run_iteration<Context>(
 					handle_candidate_seconded(view, para, candidate_hash),
 				ProspectiveParachainsMessage::CandidateBacked(para, candidate_hash) =>
 					handle_candidate_backed(&mut *ctx, view, para, candidate_hash).await?,
-				ProspectiveParachainsMessage::GetBackableCandidate(
+				ProspectiveParachainsMessage::GetBackableCandidates(
 					relay_parent,
 					para,
+					count,
 					required_path,
 					tx,
-				) => answer_get_backable_candidate(&view, relay_parent, para, required_path, tx),
+				) => answer_get_backable_candidates(
+					&view,
+					relay_parent,
+					para,
+					count,
+					required_path,
+					tx,
+				),
 				ProspectiveParachainsMessage::GetHypotheticalFrontier(request, tx) =>
 					answer_hypothetical_frontier_request(&view, request, tx),
 				ProspectiveParachainsMessage::GetTreeMembership(para, candidate, tx) =>
@@ -552,12 +560,13 @@ async fn handle_candidate_backed<Context>(
 	Ok(())
 }
 
-fn answer_get_backable_candidate(
+fn answer_get_backable_candidates(
 	view: &View,
 	relay_parent: Hash,
 	para: ParaId,
+	count: u32,
 	required_path: Vec<CandidateHash>,
-	tx: oneshot::Sender<Option<(CandidateHash, Hash)>>,
+	tx: oneshot::Sender<Vec<(CandidateHash, Hash)>>,
 ) {
 	let data = match view.active_leaves.get(&relay_parent) {
 		None => {
@@ -568,7 +577,7 @@ fn answer_get_backable_candidate(
 				"Requested backable candidate for inactive relay-parent."
 			);
 
-			let _ = tx.send(None);
+			let _ = tx.send(vec![]);
 			return
 		},
 		Some(d) => d,
@@ -583,7 +592,7 @@ fn answer_get_backable_candidate(
 				"Requested backable candidate for inactive para."
 			);
 
-			let _ = tx.send(None);
+			let _ = tx.send(vec![]);
 			return
 		},
 		Some(tree) => tree,
@@ -598,15 +607,32 @@ fn answer_get_backable_candidate(
 				"No candidate storage for active para",
 			);
 
-			let _ = tx.send(None);
+			let _ = tx.send(vec![]);
 			return
 		},
 		Some(s) => s,
 	};
 
-	let Some(child_hash) =
-		tree.select_child(&required_path, |candidate| storage.is_backed(candidate))
-	else {
+	let backable_candidates: Vec<_> = tree
+		.select_children(&required_path, count, |candidate| storage.is_backed(candidate))
+		.into_iter()
+		.filter_map(|child_hash| {
+			storage.relay_parent_by_candidate_hash(&child_hash).map_or_else(
+				|| {
+					gum::error!(
+						target: LOG_TARGET,
+						?child_hash,
+						para_id = ?para,
+						"Candidate is present in fragment tree but not in candidate's storage!",
+					);
+					None
+				},
+				|parent_hash| Some((child_hash, parent_hash)),
+			)
+		})
+		.collect();
+
+	if backable_candidates.is_empty() {
 		gum::trace!(
 			target: LOG_TARGET,
 			?required_path,
@@ -614,30 +640,16 @@ fn answer_get_backable_candidate(
 			%relay_parent,
 			"Could not find any backable candidate",
 		);
-
-		let _ = tx.send(None);
-		return
-	};
-	let Some(candidate_relay_parent) = storage.relay_parent_by_candidate_hash(&child_hash) else {
-		gum::error!(
+	} else {
+		gum::trace!(
 			target: LOG_TARGET,
-			?child_hash,
-			para_id = ?para,
-			"Candidate is present in fragment tree but not in candidate's storage!",
+			?relay_parent,
+			?backable_candidates,
+			"Found backable candidates",
 		);
-		let _ = tx.send(None);
-		return
-	};
-
-	gum::trace!(
-		target: LOG_TARGET,
-		?relay_parent,
-		candidate_hash = ?child_hash,
-		?candidate_relay_parent,
-		"Found backable candidate",
-	);
+	}
 
-	let _ = tx.send(Some((child_hash, candidate_relay_parent)));
+	let _ = tx.send(backable_candidates);
 }
 
 fn answer_hypothetical_frontier_request(
diff --git a/polkadot/node/core/prospective-parachains/src/tests.rs b/polkadot/node/core/prospective-parachains/src/tests.rs
index 7e369245c0e..732736b101d 100644
--- a/polkadot/node/core/prospective-parachains/src/tests.rs
+++ b/polkadot/node/core/prospective-parachains/src/tests.rs
@@ -403,25 +403,28 @@ async fn get_membership(
 	assert_eq!(resp, expected_membership_response);
 }
 
-async fn get_backable_candidate(
+async fn get_backable_candidates(
 	virtual_overseer: &mut VirtualOverseer,
 	leaf: &TestLeaf,
 	para_id: ParaId,
 	required_path: Vec<CandidateHash>,
-	expected_result: Option<(CandidateHash, Hash)>,
+	count: u32,
+	expected_result: Vec<(CandidateHash, Hash)>,
 ) {
 	let (tx, rx) = oneshot::channel();
 	virtual_overseer
 		.send(overseer::FromOrchestra::Communication {
-			msg: ProspectiveParachainsMessage::GetBackableCandidate(
+			msg: ProspectiveParachainsMessage::GetBackableCandidates(
 				leaf.hash,
 				para_id,
+				count,
 				required_path,
 				tx,
 			),
 		})
 		.await;
 	let resp = rx.await.unwrap();
+	assert_eq!(resp.len(), expected_result.len());
 	assert_eq!(resp, expected_result);
 }
 
@@ -849,9 +852,9 @@ fn check_candidate_on_multiple_forks() {
 	assert_eq!(view.candidate_storage.get(&2.into()).unwrap().len(), (0, 0));
 }
 
-// Backs some candidates and tests `GetBackableCandidate`.
+// Backs some candidates and tests `GetBackableCandidates` when requesting a single candidate.
 #[test]
-fn check_backable_query() {
+fn check_backable_query_single_candidate() {
 	let test_state = TestState::default();
 	let view = test_harness(|mut virtual_overseer| async move {
 		// Leaf A
@@ -896,26 +899,38 @@ fn check_backable_query() {
 		introduce_candidate(&mut virtual_overseer, candidate_b.clone(), pvd_b).await;
 
 		// Should not get any backable candidates.
-		get_backable_candidate(
+		get_backable_candidates(
 			&mut virtual_overseer,
 			&leaf_a,
 			1.into(),
 			vec![candidate_hash_a],
-			None,
+			1,
+			vec![],
 		)
 		.await;
+		get_backable_candidates(
+			&mut virtual_overseer,
+			&leaf_a,
+			1.into(),
+			vec![candidate_hash_a],
+			0,
+			vec![],
+		)
+		.await;
+		get_backable_candidates(&mut virtual_overseer, &leaf_a, 1.into(), vec![], 0, vec![]).await;
 
 		// Second candidates.
 		second_candidate(&mut virtual_overseer, candidate_a.clone()).await;
 		second_candidate(&mut virtual_overseer, candidate_b.clone()).await;
 
 		// Should not get any backable candidates.
-		get_backable_candidate(
+		get_backable_candidates(
 			&mut virtual_overseer,
 			&leaf_a,
 			1.into(),
 			vec![candidate_hash_a],
-			None,
+			1,
+			vec![],
 		)
 		.await;
 
@@ -923,31 +938,46 @@ fn check_backable_query() {
 		back_candidate(&mut virtual_overseer, &candidate_a, candidate_hash_a).await;
 		back_candidate(&mut virtual_overseer, &candidate_b, candidate_hash_b).await;
 
+		// Should not get any backable candidates for the other para.
+		get_backable_candidates(&mut virtual_overseer, &leaf_a, 2.into(), vec![], 1, vec![]).await;
+		get_backable_candidates(
+			&mut virtual_overseer,
+			&leaf_a,
+			2.into(),
+			vec![candidate_hash_a],
+			1,
+			vec![],
+		)
+		.await;
+
 		// Get backable candidate.
-		get_backable_candidate(
+		get_backable_candidates(
 			&mut virtual_overseer,
 			&leaf_a,
 			1.into(),
 			vec![],
-			Some((candidate_hash_a, leaf_a.hash)),
+			1,
+			vec![(candidate_hash_a, leaf_a.hash)],
 		)
 		.await;
-		get_backable_candidate(
+		get_backable_candidates(
 			&mut virtual_overseer,
 			&leaf_a,
 			1.into(),
 			vec![candidate_hash_a],
-			Some((candidate_hash_b, leaf_a.hash)),
+			1,
+			vec![(candidate_hash_b, leaf_a.hash)],
 		)
 		.await;
 
 		// Should not get anything at the wrong path.
-		get_backable_candidate(
+		get_backable_candidates(
 			&mut virtual_overseer,
 			&leaf_a,
 			1.into(),
 			vec![candidate_hash_b],
-			None,
+			1,
+			vec![],
 		)
 		.await;
 
@@ -961,6 +991,572 @@ fn check_backable_query() {
 	assert_eq!(view.candidate_storage.get(&2.into()).unwrap().len(), (0, 0));
 }
 
+// Backs some candidates and tests `GetBackableCandidates` when requesting a multiple candidates.
+#[test]
+fn check_backable_query_multiple_candidates() {
+	macro_rules! make_and_back_candidate {
+		($test_state:ident, $virtual_overseer:ident, $leaf:ident, $parent:expr, $index:expr) => {{
+			let (mut candidate, pvd) = make_candidate(
+				$leaf.hash,
+				$leaf.number,
+				1.into(),
+				$parent.commitments.head_data.clone(),
+				HeadData(vec![$index]),
+				$test_state.validation_code_hash,
+			);
+			// Set a field to make this candidate unique.
+			candidate.descriptor.para_head = Hash::from_low_u64_le($index);
+			let candidate_hash = candidate.hash();
+			introduce_candidate(&mut $virtual_overseer, candidate.clone(), pvd).await;
+			second_candidate(&mut $virtual_overseer, candidate.clone()).await;
+			back_candidate(&mut $virtual_overseer, &candidate, candidate_hash).await;
+
+			(candidate, candidate_hash)
+		}};
+	}
+
+	// Parachain 1 looks like this:
+	//          +---A----+
+	//          |        |
+	//     +----B---+    C
+	//     |    |   |    |
+	//     D    E   F    H
+	//              |    |
+	//              G    I
+	//                   |
+	//                   J
+	{
+		let test_state = TestState::default();
+		let view = test_harness(|mut virtual_overseer| async move {
+			// Leaf A
+			let leaf_a = TestLeaf {
+				number: 100,
+				hash: Hash::from_low_u64_be(130),
+				para_data: vec![
+					(1.into(), PerParaData::new(97, HeadData(vec![1, 2, 3]))),
+					(2.into(), PerParaData::new(100, HeadData(vec![2, 3, 4]))),
+				],
+			};
+
+			// Activate leaves.
+			activate_leaf(&mut virtual_overseer, &leaf_a, &test_state).await;
+
+			// Candidate A
+			let (candidate_a, pvd_a) = make_candidate(
+				leaf_a.hash,
+				leaf_a.number,
+				1.into(),
+				HeadData(vec![1, 2, 3]),
+				HeadData(vec![1]),
+				test_state.validation_code_hash,
+			);
+			let candidate_hash_a = candidate_a.hash();
+			introduce_candidate(&mut virtual_overseer, candidate_a.clone(), pvd_a).await;
+			second_candidate(&mut virtual_overseer, candidate_a.clone()).await;
+			back_candidate(&mut virtual_overseer, &candidate_a, candidate_hash_a).await;
+
+			let (candidate_b, candidate_hash_b) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_a, 2);
+			let (candidate_c, candidate_hash_c) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_a, 3);
+			let (_candidate_d, candidate_hash_d) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_b, 4);
+			let (_candidate_e, candidate_hash_e) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_b, 5);
+			let (candidate_f, candidate_hash_f) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_b, 6);
+			let (_candidate_g, candidate_hash_g) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_f, 7);
+			let (candidate_h, candidate_hash_h) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_c, 8);
+			let (candidate_i, candidate_hash_i) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_h, 9);
+			let (_candidate_j, candidate_hash_j) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_i, 10);
+
+			// Should not get any backable candidates for the other para.
+			get_backable_candidates(&mut virtual_overseer, &leaf_a, 2.into(), vec![], 1, vec![])
+				.await;
+			get_backable_candidates(&mut virtual_overseer, &leaf_a, 2.into(), vec![], 5, vec![])
+				.await;
+			get_backable_candidates(
+				&mut virtual_overseer,
+				&leaf_a,
+				2.into(),
+				vec![candidate_hash_a],
+				1,
+				vec![],
+			)
+			.await;
+
+			// Test various scenarios with various counts.
+
+			// empty required_path
+			{
+				get_backable_candidates(
+					&mut virtual_overseer,
+					&leaf_a,
+					1.into(),
+					vec![],
+					1,
+					vec![(candidate_hash_a, leaf_a.hash)],
+				)
+				.await;
+				get_backable_candidates(
+					&mut virtual_overseer,
+					&leaf_a,
+					1.into(),
+					vec![],
+					4,
+					vec![
+						(candidate_hash_a, leaf_a.hash),
+						(candidate_hash_b, leaf_a.hash),
+						(candidate_hash_f, leaf_a.hash),
+						(candidate_hash_g, leaf_a.hash),
+					],
+				)
+				.await;
+			}
+
+			// required path of 1
+			{
+				get_backable_candidates(
+					&mut virtual_overseer,
+					&leaf_a,
+					1.into(),
+					vec![candidate_hash_a],
+					1,
+					vec![(candidate_hash_b, leaf_a.hash)],
+				)
+				.await;
+				get_backable_candidates(
+					&mut virtual_overseer,
+					&leaf_a,
+					1.into(),
+					vec![candidate_hash_a],
+					2,
+					vec![(candidate_hash_b, leaf_a.hash), (candidate_hash_d, leaf_a.hash)],
+				)
+				.await;
+				get_backable_candidates(
+					&mut virtual_overseer,
+					&leaf_a,
+					1.into(),
+					vec![candidate_hash_a],
+					3,
+					vec![
+						(candidate_hash_b, leaf_a.hash),
+						(candidate_hash_f, leaf_a.hash),
+						(candidate_hash_g, leaf_a.hash),
+					],
+				)
+				.await;
+
+				// If the requested count exceeds the largest chain, return the longest
+				// chain we can get.
+				for count in 5..10 {
+					get_backable_candidates(
+						&mut virtual_overseer,
+						&leaf_a,
+						1.into(),
+						vec![candidate_hash_a],
+						count,
+						vec![
+							(candidate_hash_c, leaf_a.hash),
+							(candidate_hash_h, leaf_a.hash),
+							(candidate_hash_i, leaf_a.hash),
+							(candidate_hash_j, leaf_a.hash),
+						],
+					)
+					.await;
+				}
+			}
+
+			// required path of 2
+			{
+				get_backable_candidates(
+					&mut virtual_overseer,
+					&leaf_a,
+					1.into(),
+					vec![candidate_hash_a, candidate_hash_b],
+					1,
+					vec![(candidate_hash_d, leaf_a.hash)],
+				)
+				.await;
+				get_backable_candidates(
+					&mut virtual_overseer,
+					&leaf_a,
+					1.into(),
+					vec![candidate_hash_a, candidate_hash_c],
+					1,
+					vec![(candidate_hash_h, leaf_a.hash)],
+				)
+				.await;
+				// If the requested count exceeds the largest chain, return the longest
+				// chain we can get.
+				for count in 4..10 {
+					get_backable_candidates(
+						&mut virtual_overseer,
+						&leaf_a,
+						1.into(),
+						vec![candidate_hash_a, candidate_hash_c],
+						count,
+						vec![
+							(candidate_hash_h, leaf_a.hash),
+							(candidate_hash_i, leaf_a.hash),
+							(candidate_hash_j, leaf_a.hash),
+						],
+					)
+					.await;
+				}
+			}
+
+			// No more candidates in any chain.
+			{
+				let required_paths = vec![
+					vec![candidate_hash_a, candidate_hash_b, candidate_hash_e],
+					vec![
+						candidate_hash_a,
+						candidate_hash_c,
+						candidate_hash_h,
+						candidate_hash_i,
+						candidate_hash_j,
+					],
+				];
+				for path in required_paths {
+					for count in 1..4 {
+						get_backable_candidates(
+							&mut virtual_overseer,
+							&leaf_a,
+							1.into(),
+							path.clone(),
+							count,
+							vec![],
+						)
+						.await;
+					}
+				}
+			}
+
+			// Should not get anything at the wrong path.
+			get_backable_candidates(
+				&mut virtual_overseer,
+				&leaf_a,
+				1.into(),
+				vec![candidate_hash_b],
+				1,
+				vec![],
+			)
+			.await;
+			get_backable_candidates(
+				&mut virtual_overseer,
+				&leaf_a,
+				1.into(),
+				vec![candidate_hash_b, candidate_hash_a],
+				3,
+				vec![],
+			)
+			.await;
+			get_backable_candidates(
+				&mut virtual_overseer,
+				&leaf_a,
+				1.into(),
+				vec![candidate_hash_a, candidate_hash_b, candidate_hash_c],
+				3,
+				vec![],
+			)
+			.await;
+
+			virtual_overseer
+		});
+
+		assert_eq!(view.active_leaves.len(), 1);
+		assert_eq!(view.candidate_storage.len(), 2);
+		// 10 candidates and 7 parents on para 1.
+		assert_eq!(view.candidate_storage.get(&1.into()).unwrap().len(), (7, 10));
+		assert_eq!(view.candidate_storage.get(&2.into()).unwrap().len(), (0, 0));
+	}
+
+	// A tree with multiple roots.
+	// Parachain 1 looks like this:
+	//       (imaginary root)
+	//          |        |
+	//     +----B---+    A
+	//	   |    |   |    |
+	//     |    |   |    C
+	//     D    E   F    |
+	//              |    H
+	//              G    |
+	//                   I
+	//                   |
+	//                   J
+	{
+		let test_state = TestState::default();
+		let view = test_harness(|mut virtual_overseer| async move {
+			// Leaf A
+			let leaf_a = TestLeaf {
+				number: 100,
+				hash: Hash::from_low_u64_be(130),
+				para_data: vec![
+					(1.into(), PerParaData::new(97, HeadData(vec![1, 2, 3]))),
+					(2.into(), PerParaData::new(100, HeadData(vec![2, 3, 4]))),
+				],
+			};
+
+			// Activate leaves.
+			activate_leaf(&mut virtual_overseer, &leaf_a, &test_state).await;
+
+			// Candidate B
+			let (candidate_b, pvd_b) = make_candidate(
+				leaf_a.hash,
+				leaf_a.number,
+				1.into(),
+				HeadData(vec![1, 2, 3]),
+				HeadData(vec![2]),
+				test_state.validation_code_hash,
+			);
+			let candidate_hash_b = candidate_b.hash();
+			introduce_candidate(&mut virtual_overseer, candidate_b.clone(), pvd_b).await;
+			second_candidate(&mut virtual_overseer, candidate_b.clone()).await;
+			back_candidate(&mut virtual_overseer, &candidate_b, candidate_hash_b).await;
+
+			// Candidate A
+			let (candidate_a, pvd_a) = make_candidate(
+				leaf_a.hash,
+				leaf_a.number,
+				1.into(),
+				HeadData(vec![1, 2, 3]),
+				HeadData(vec![1]),
+				test_state.validation_code_hash,
+			);
+			let candidate_hash_a = candidate_a.hash();
+			introduce_candidate(&mut virtual_overseer, candidate_a.clone(), pvd_a).await;
+			second_candidate(&mut virtual_overseer, candidate_a.clone()).await;
+			back_candidate(&mut virtual_overseer, &candidate_a, candidate_hash_a).await;
+
+			let (candidate_c, candidate_hash_c) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_a, 3);
+			let (_candidate_d, candidate_hash_d) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_b, 4);
+			let (_candidate_e, candidate_hash_e) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_b, 5);
+			let (candidate_f, candidate_hash_f) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_b, 6);
+			let (_candidate_g, candidate_hash_g) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_f, 7);
+			let (candidate_h, candidate_hash_h) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_c, 8);
+			let (candidate_i, candidate_hash_i) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_h, 9);
+			let (_candidate_j, candidate_hash_j) =
+				make_and_back_candidate!(test_state, virtual_overseer, leaf_a, &candidate_i, 10);
+
+			// Should not get any backable candidates for the other para.
+			get_backable_candidates(&mut virtual_overseer, &leaf_a, 2.into(), vec![], 1, vec![])
+				.await;
+			get_backable_candidates(&mut virtual_overseer, &leaf_a, 2.into(), vec![], 5, vec![])
+				.await;
+			get_backable_candidates(
+				&mut virtual_overseer,
+				&leaf_a,
+				2.into(),
+				vec![candidate_hash_a],
+				1,
+				vec![],
+			)
+			.await;
+
+			// Test various scenarios with various counts.
+
+			// empty required_path
+			{
+				get_backable_candidates(
+					&mut virtual_overseer,
+					&leaf_a,
+					1.into(),
+					vec![],
+					1,
+					vec![(candidate_hash_b, leaf_a.hash)],
+				)
+				.await;
+				get_backable_candidates(
+					&mut virtual_overseer,
+					&leaf_a,
+					1.into(),
+					vec![],
+					2,
+					vec![(candidate_hash_b, leaf_a.hash), (candidate_hash_d, leaf_a.hash)],
+				)
+				.await;
+				get_backable_candidates(
+					&mut virtual_overseer,
+					&leaf_a,
+					1.into(),
+					vec![],
+					4,
+					vec![
+						(candidate_hash_a, leaf_a.hash),
+						(candidate_hash_c, leaf_a.hash),
+						(candidate_hash_h, leaf_a.hash),
+						(candidate_hash_i, leaf_a.hash),
+					],
+				)
+				.await;
+			}
+
+			// required path of 1
+			{
+				get_backable_candidates(
+					&mut virtual_overseer,
+					&leaf_a,
+					1.into(),
+					vec![candidate_hash_a],
+					1,
+					vec![(candidate_hash_c, leaf_a.hash)],
+				)
+				.await;
+				get_backable_candidates(
+					&mut virtual_overseer,
+					&leaf_a,
+					1.into(),
+					vec![candidate_hash_b],
+					1,
+					vec![(candidate_hash_d, leaf_a.hash)],
+				)
+				.await;
+				get_backable_candidates(
+					&mut virtual_overseer,
+					&leaf_a,
+					1.into(),
+					vec![candidate_hash_a],
+					2,
+					vec![(candidate_hash_c, leaf_a.hash), (candidate_hash_h, leaf_a.hash)],
+				)
+				.await;
+
+				// If the requested count exceeds the largest chain, return the longest
+				// chain we can get.
+				for count in 2..10 {
+					get_backable_candidates(
+						&mut virtual_overseer,
+						&leaf_a,
+						1.into(),
+						vec![candidate_hash_b],
+						count,
+						vec![(candidate_hash_f, leaf_a.hash), (candidate_hash_g, leaf_a.hash)],
+					)
+					.await;
+				}
+			}
+
+			// required path of 2
+			{
+				get_backable_candidates(
+					&mut virtual_overseer,
+					&leaf_a,
+					1.into(),
+					vec![candidate_hash_b, candidate_hash_f],
+					1,
+					vec![(candidate_hash_g, leaf_a.hash)],
+				)
+				.await;
+				get_backable_candidates(
+					&mut virtual_overseer,
+					&leaf_a,
+					1.into(),
+					vec![candidate_hash_a, candidate_hash_c],
+					1,
+					vec![(candidate_hash_h, leaf_a.hash)],
+				)
+				.await;
+				// If the requested count exceeds the largest chain, return the longest
+				// chain we can get.
+				for count in 4..10 {
+					get_backable_candidates(
+						&mut virtual_overseer,
+						&leaf_a,
+						1.into(),
+						vec![candidate_hash_a, candidate_hash_c],
+						count,
+						vec![
+							(candidate_hash_h, leaf_a.hash),
+							(candidate_hash_i, leaf_a.hash),
+							(candidate_hash_j, leaf_a.hash),
+						],
+					)
+					.await;
+				}
+			}
+
+			// No more candidates in any chain.
+			{
+				let required_paths = vec![
+					vec![candidate_hash_b, candidate_hash_f, candidate_hash_g],
+					vec![candidate_hash_b, candidate_hash_e],
+					vec![candidate_hash_b, candidate_hash_d],
+					vec![
+						candidate_hash_a,
+						candidate_hash_c,
+						candidate_hash_h,
+						candidate_hash_i,
+						candidate_hash_j,
+					],
+				];
+				for path in required_paths {
+					for count in 1..4 {
+						get_backable_candidates(
+							&mut virtual_overseer,
+							&leaf_a,
+							1.into(),
+							path.clone(),
+							count,
+							vec![],
+						)
+						.await;
+					}
+				}
+			}
+
+			// Should not get anything at the wrong path.
+			get_backable_candidates(
+				&mut virtual_overseer,
+				&leaf_a,
+				1.into(),
+				vec![candidate_hash_d],
+				1,
+				vec![],
+			)
+			.await;
+			get_backable_candidates(
+				&mut virtual_overseer,
+				&leaf_a,
+				1.into(),
+				vec![candidate_hash_b, candidate_hash_a],
+				3,
+				vec![],
+			)
+			.await;
+			get_backable_candidates(
+				&mut virtual_overseer,
+				&leaf_a,
+				1.into(),
+				vec![candidate_hash_a, candidate_hash_c, candidate_hash_d],
+				3,
+				vec![],
+			)
+			.await;
+
+			virtual_overseer
+		});
+
+		assert_eq!(view.active_leaves.len(), 1);
+		assert_eq!(view.candidate_storage.len(), 2);
+		// 10 candidates and 7 parents on para 1.
+		assert_eq!(view.candidate_storage.get(&1.into()).unwrap().len(), (7, 10));
+		assert_eq!(view.candidate_storage.get(&2.into()).unwrap().len(), (0, 0));
+	}
+}
+
 // Test depth query.
 #[test]
 fn check_hypothetical_frontier_query() {
@@ -1448,12 +2044,13 @@ fn persists_pending_availability_candidate() {
 		second_candidate(&mut virtual_overseer, candidate_b.clone()).await;
 		back_candidate(&mut virtual_overseer, &candidate_b, candidate_hash_b).await;
 
-		get_backable_candidate(
+		get_backable_candidates(
 			&mut virtual_overseer,
 			&leaf_b,
 			para_id,
 			vec![candidate_hash_a],
-			Some((candidate_hash_b, leaf_b_hash)),
+			1,
+			vec![(candidate_hash_b, leaf_b_hash)],
 		)
 		.await;
 
@@ -1512,12 +2109,13 @@ fn backwards_compatible() {
 		second_candidate(&mut virtual_overseer, candidate_a.clone()).await;
 		back_candidate(&mut virtual_overseer, &candidate_a, candidate_hash_a).await;
 
-		get_backable_candidate(
+		get_backable_candidates(
 			&mut virtual_overseer,
 			&leaf_a,
 			para_id,
 			vec![],
-			Some((candidate_hash_a, candidate_relay_parent)),
+			1,
+			vec![(candidate_hash_a, candidate_relay_parent)],
 		)
 		.await;
 
@@ -1537,7 +2135,7 @@ fn backwards_compatible() {
 		)
 		.await;
 
-		get_backable_candidate(&mut virtual_overseer, &leaf_b, para_id, vec![], None).await;
+		get_backable_candidates(&mut virtual_overseer, &leaf_b, para_id, vec![], 1, vec![]).await;
 
 		virtual_overseer
 	});
diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs
index 3970b857261..51f768d782e 100644
--- a/polkadot/node/core/provisioner/src/lib.rs
+++ b/polkadot/node/core/provisioner/src/lib.rs
@@ -806,15 +806,18 @@ async fn get_backable_candidate(
 ) -> Result<Option<(CandidateHash, Hash)>, Error> {
 	let (tx, rx) = oneshot::channel();
 	sender
-		.send_message(ProspectiveParachainsMessage::GetBackableCandidate(
+		.send_message(ProspectiveParachainsMessage::GetBackableCandidates(
 			relay_parent,
 			para_id,
+			1, // core count hardcoded to 1, until elastic scaling is implemented and enabled.
 			required_path,
 			tx,
 		))
 		.await;
 
-	rx.await.map_err(Error::CanceledBackableCandidate)
+	rx.await
+		.map_err(Error::CanceledBackableCandidate)
+		.map(|res| res.get(0).copied())
 }
 
 /// The availability bitfield for a given core is the transpose
diff --git a/polkadot/node/core/provisioner/src/tests.rs b/polkadot/node/core/provisioner/src/tests.rs
index 1d7bdfcfcb8..b26df8ddb91 100644
--- a/polkadot/node/core/provisioner/src/tests.rs
+++ b/polkadot/node/core/provisioner/src/tests.rs
@@ -373,13 +373,18 @@ mod select_candidates {
 					let _ = sender.send(response);
 				},
 				AllMessages::ProspectiveParachains(
-					ProspectiveParachainsMessage::GetBackableCandidate(.., tx),
-				) => match prospective_parachains_mode {
-					ProspectiveParachainsMode::Enabled { .. } => {
-						let _ = tx.send(candidates_iter.next());
-					},
-					ProspectiveParachainsMode::Disabled =>
-						panic!("unexpected prospective parachains request"),
+					ProspectiveParachainsMessage::GetBackableCandidates(_, _, count, _, tx),
+				) => {
+					assert_eq!(count, 1);
+
+					match prospective_parachains_mode {
+						ProspectiveParachainsMode::Enabled { .. } => {
+							let _ =
+								tx.send(candidates_iter.next().map_or_else(Vec::new, |c| vec![c]));
+						},
+						ProspectiveParachainsMode::Disabled =>
+							panic!("unexpected prospective parachains request"),
+					}
 				},
 				_ => panic!("Unexpected message: {:?}", from_job),
 			}
diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs
index 1d5d82b57fd..549e43a671d 100644
--- a/polkadot/node/subsystem-types/src/messages.rs
+++ b/polkadot/node/subsystem-types/src/messages.rs
@@ -1128,14 +1128,16 @@ pub enum ProspectiveParachainsMessage {
 	/// has been backed. This requires that the candidate was successfully introduced in
 	/// the past.
 	CandidateBacked(ParaId, CandidateHash),
-	/// Get a backable candidate hash along with its relay parent for the given parachain,
+	/// Get N backable candidate hashes along with their relay parents for the given parachain,
 	/// under the given relay-parent hash, which is a descendant of the given candidate hashes.
+	/// N should represent the number of scheduled cores of this ParaId.
 	/// Returns `None` on the channel if no such candidate exists.
-	GetBackableCandidate(
+	GetBackableCandidates(
 		Hash,
 		ParaId,
+		u32,
 		Vec<CandidateHash>,
-		oneshot::Sender<Option<(CandidateHash, Hash)>>,
+		oneshot::Sender<Vec<(CandidateHash, Hash)>>,
 	),
 	/// Get the hypothetical frontier membership of candidates with the given properties
 	/// under the specified active leaves' fragment trees.
diff --git a/polkadot/roadmap/implementers-guide/src/node/backing/prospective-parachains.md b/polkadot/roadmap/implementers-guide/src/node/backing/prospective-parachains.md
index 286aeddb986..8f00ff08494 100644
--- a/polkadot/roadmap/implementers-guide/src/node/backing/prospective-parachains.md
+++ b/polkadot/roadmap/implementers-guide/src/node/backing/prospective-parachains.md
@@ -92,9 +92,10 @@ prospective validation data. This is unlikely to change.
     been backed.
   - Sent by the Backing Subsystem after it successfully imports a
     statement giving a candidate the necessary quorum of backing votes.
-- `ProspectiveParachainsMessage::GetBackableCandidate`
-  - Get a backable candidate hash along with its relay parent for a given parachain,
-    under a given relay-parent (leaf) hash, which is a descendant of given candidate hashes.
+- `ProspectiveParachainsMessage::GetBackableCandidates`
+  - Get the requested number of backable candidate hashes along with their relay parent for a given
+    parachain,under a given relay-parent (leaf) hash, which are descendants of given candidate
+    hashes.
   - Sent by the Provisioner when requesting backable candidates, when
     selecting candidates for a given relay-parent.
 - `ProspectiveParachainsMessage::GetHypotheticalFrontier`
diff --git a/polkadot/roadmap/implementers-guide/src/node/utility/provisioner.md b/polkadot/roadmap/implementers-guide/src/node/utility/provisioner.md
index 0b4fe6a4587..b017259da8c 100644
--- a/polkadot/roadmap/implementers-guide/src/node/utility/provisioner.md
+++ b/polkadot/roadmap/implementers-guide/src/node/utility/provisioner.md
@@ -187,16 +187,16 @@ this process is a vector of `CandidateHash`s, sorted in order of their core inde
 
 #### Required Path
 
-Required path is a parameter for `ProspectiveParachainsMessage::GetBackableCandidate`, which the provisioner sends in
+Required path is a parameter for `ProspectiveParachainsMessage::GetBackableCandidates`, which the provisioner sends in
 candidate selection.
 
-An empty required path indicates that the requested candidate should be a direct child of the most recently included
+An empty required path indicates that the requested candidate chain should start with the most recently included
 parablock for the given `para_id` as of the given relay parent.
 
 In contrast, a required path with one or more entries prompts [prospective
 parachains](../backing/prospective-parachains.md) to step forward through its fragment tree for the given `para_id` and
-relay parent until the desired parablock is reached. We then select a direct child of that parablock to pass to the
-provisioner.
+relay parent until the desired parablock is reached. We then select the chain starting with the direct child of that
+parablock to pass to the provisioner.
 
 The parablocks making up a required path do not need to have been previously seen as included in relay chain blocks.
 Thus the ability to provision backable candidates based on a required path effectively decouples backing from inclusion.
diff --git a/prdoc/pr_3160.prdoc b/prdoc/pr_3160.prdoc
new file mode 100644
index 00000000000..22305b6635a
--- /dev/null
+++ b/prdoc/pr_3160.prdoc
@@ -0,0 +1,12 @@
+title: "prospective-parachains: allow requesting a chain of backable candidates"
+
+topic: Node
+
+doc:
+  - audience: Node Dev
+    description: |
+      Enable requesting a chain of multiple backable candidates. Will be used by the provisioner
+      to build paras inherent data for elastic scaling.
+
+crates:
+  - name: polkadot-node-core-prospective-parachains
-- 
GitLab