diff --git a/Cargo.lock b/Cargo.lock
index a96bb680b750b287f6b424311cb1a63d0d598b11..d2b7a47f84c93c461e78770d1bda0f2a80a12099 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3730,6 +3730,7 @@ dependencies = [
  "sp-timestamp",
  "sp-tracing 16.0.0",
  "sp-trie",
+ "sp-version",
  "substrate-prometheus-endpoint",
  "tracing",
 ]
@@ -3784,12 +3785,15 @@ dependencies = [
  "parity-scale-codec",
  "parking_lot 0.12.1",
  "polkadot-node-primitives",
+ "polkadot-node-subsystem",
  "polkadot-parachain-primitives",
  "polkadot-primitives",
  "polkadot-test-client",
  "portpicker",
+ "rstest",
  "sc-cli",
  "sc-client-api",
+ "sp-api",
  "sp-blockchain",
  "sp-consensus",
  "sp-core",
@@ -3797,6 +3801,7 @@ dependencies = [
  "sp-keystore",
  "sp-runtime",
  "sp-state-machine",
+ "sp-version",
  "substrate-test-utils",
  "tokio",
  "tracing",
@@ -3830,9 +3835,11 @@ dependencies = [
 name = "cumulus-client-pov-recovery"
 version = "0.7.0"
 dependencies = [
+ "assert_matches",
  "async-trait",
  "cumulus-primitives-core",
  "cumulus-relay-chain-interface",
+ "cumulus-test-client",
  "cumulus-test-service",
  "futures",
  "futures-timer",
@@ -3843,12 +3850,18 @@ dependencies = [
  "polkadot-primitives",
  "portpicker",
  "rand 0.8.5",
+ "rstest",
  "sc-cli",
  "sc-client-api",
  "sc-consensus",
+ "sc-utils",
+ "sp-api",
+ "sp-blockchain",
  "sp-consensus",
  "sp-maybe-compressed-blob",
  "sp-runtime",
+ "sp-tracing 16.0.0",
+ "sp-version",
  "substrate-test-utils",
  "tokio",
  "tracing",
@@ -4219,6 +4232,7 @@ dependencies = [
  "sp-api",
  "sp-blockchain",
  "sp-state-machine",
+ "sp-version",
  "thiserror",
 ]
 
diff --git a/cumulus/client/consensus/common/Cargo.toml b/cumulus/client/consensus/common/Cargo.toml
index d369304e2e33f6ec51b02f7853b51fee9c4581ae..09c2f58d45e4e04ca420d6280c39eed12e39ad4f 100644
--- a/cumulus/client/consensus/common/Cargo.toml
+++ b/cumulus/client/consensus/common/Cargo.toml
@@ -28,6 +28,7 @@ sp-core = { path = "../../../../substrate/primitives/core" }
 sp-runtime = { path = "../../../../substrate/primitives/runtime" }
 sp-timestamp = { path = "../../../../substrate/primitives/timestamp" }
 sp-trie = { path = "../../../../substrate/primitives/trie" }
+sp-version = { path = "../../../../substrate/primitives/version" }
 prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../../substrate/utils/prometheus" }
 
 # Polkadot
diff --git a/cumulus/client/consensus/common/src/tests.rs b/cumulus/client/consensus/common/src/tests.rs
index aca9226570723c3285b2817e6bcdca22b141968b..2a944bc7f9fa221d63c48678f2899d50251b3655 100644
--- a/cumulus/client/consensus/common/src/tests.rs
+++ b/cumulus/client/consensus/common/src/tests.rs
@@ -38,6 +38,7 @@ use polkadot_primitives::HeadData;
 use sc_client_api::{Backend as _, UsageProvider};
 use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
 use sp_consensus::{BlockOrigin, BlockStatus};
+use sp_version::RuntimeVersion;
 use std::{
 	collections::{BTreeMap, HashMap},
 	pin::Pin,
@@ -153,6 +154,14 @@ impl RelayChainInterface for Relaychain {
 		unimplemented!("Not needed for test")
 	}
 
+	async fn candidates_pending_availability(
+		&self,
+		_: PHash,
+		_: ParaId,
+	) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
+		unimplemented!("Not needed for test")
+	}
+
 	async fn session_index_for_child(&self, _: PHash) -> RelayChainResult<SessionIndex> {
 		Ok(0)
 	}
@@ -247,6 +256,10 @@ impl RelayChainInterface for Relaychain {
 			extrinsics_root: PHash::zero(),
 		}))
 	}
+
+	async fn version(&self, _: PHash) -> RelayChainResult<RuntimeVersion> {
+		unimplemented!("Not needed for test")
+	}
 }
 
 fn sproof_with_best_parent(client: &Client) -> RelayStateSproofBuilder {
diff --git a/cumulus/client/network/Cargo.toml b/cumulus/client/network/Cargo.toml
index d4fc752872589fbfd361f5df49939874d208ab3d..0dd7c4fdb0f60ac2f70f0fb697c901f960515b9a 100644
--- a/cumulus/client/network/Cargo.toml
+++ b/cumulus/client/network/Cargo.toml
@@ -24,11 +24,14 @@ sp-consensus = { path = "../../../substrate/primitives/consensus/common" }
 sp-core = { path = "../../../substrate/primitives/core" }
 sp-runtime = { path = "../../../substrate/primitives/runtime" }
 sp-state-machine = { path = "../../../substrate/primitives/state-machine" }
+sp-api = { path = "../../../substrate/primitives/api" }
+sp-version = { path = "../../../substrate/primitives/version" }
 
 # Polkadot
 polkadot-node-primitives = { path = "../../../polkadot/node/primitives" }
 polkadot-parachain-primitives = { path = "../../../polkadot/parachain" }
 polkadot-primitives = { path = "../../../polkadot/primitives" }
+polkadot-node-subsystem = { path = "../../../polkadot/node/subsystem" }
 
 # Cumulus
 cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
@@ -37,6 +40,7 @@ cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
 portpicker = "0.1.1"
 tokio = { version = "1.32.0", features = ["macros"] }
 url = "2.4.0"
+rstest = "0.18.2"
 
 # Substrate
 sc-cli = { path = "../../../substrate/client/cli" }
diff --git a/cumulus/client/network/src/lib.rs b/cumulus/client/network/src/lib.rs
index f442ed5840bddcd4a859d149635ff740e4d232f4..dab15bba590a0f46720d45bdaddc92f594fb8a9f 100644
--- a/cumulus/client/network/src/lib.rs
+++ b/cumulus/client/network/src/lib.rs
@@ -20,6 +20,7 @@
 //! that use the relay chain provided consensus. See [`RequireSecondedInBlockAnnounce`]
 //! and [`WaitToAnnounce`] for more information about this implementation.
 
+use sp_api::RuntimeApiInfo;
 use sp_consensus::block_validation::{
 	BlockAnnounceValidator as BlockAnnounceValidatorT, Validation,
 };
@@ -28,6 +29,7 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
 
 use cumulus_relay_chain_interface::RelayChainInterface;
 use polkadot_node_primitives::{CollationSecondedSignal, Statement};
+use polkadot_node_subsystem::messages::RuntimeApiRequest;
 use polkadot_parachain_primitives::primitives::HeadData;
 use polkadot_primitives::{
 	CandidateReceipt, CompactStatement, Hash as PHash, Id as ParaId, OccupiedCoreAssumption,
@@ -266,18 +268,41 @@ where
 		Ok(para_head)
 	}
 
-	/// Get the backed block hash of the given parachain in the relay chain.
-	async fn backed_block_hash(
+	/// Get the backed block hashes of the given parachain in the relay chain.
+	async fn backed_block_hashes(
 		relay_chain_interface: &RCInterface,
 		hash: PHash,
 		para_id: ParaId,
-	) -> Result<Option<PHash>, BoxedError> {
-		let candidate_receipt = relay_chain_interface
-			.candidate_pending_availability(hash, para_id)
+	) -> Result<impl Iterator<Item = PHash>, BoxedError> {
+		let runtime_api_version = relay_chain_interface
+			.version(hash)
 			.await
 			.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
+		let parachain_host_runtime_api_version =
+			runtime_api_version
+				.api_version(
+					&<dyn polkadot_primitives::runtime_api::ParachainHost<
+						polkadot_primitives::Block,
+					>>::ID,
+				)
+				.unwrap_or_default();
+
+		// If the relay chain runtime does not support the new runtime API, fallback to the
+		// deprecated one.
+		let candidate_receipts = if parachain_host_runtime_api_version <
+			RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT
+		{
+			#[allow(deprecated)]
+			relay_chain_interface
+				.candidate_pending_availability(hash, para_id)
+				.await
+				.map(|c| c.into_iter().collect::<Vec<_>>())
+		} else {
+			relay_chain_interface.candidates_pending_availability(hash, para_id).await
+		}
+		.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
 
-		Ok(candidate_receipt.map(|cr| cr.descriptor.para_head))
+		Ok(candidate_receipts.into_iter().map(|cr| cr.descriptor.para_head))
 	}
 
 	/// Handle a block announcement with empty data (no statement) attached to it.
@@ -298,15 +323,20 @@ where
 		let best_head =
 			Self::included_block(&relay_chain_interface, relay_chain_best_hash, para_id).await?;
 		let known_best_number = best_head.number();
-		let backed_block = || async {
-			Self::backed_block_hash(&relay_chain_interface, relay_chain_best_hash, para_id).await
-		};
 
 		if best_head == header {
 			tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
 
-			Ok(Validation::Success { is_new_best: true })
-		} else if Some(HeadData(header.encode()).hash()) == backed_block().await? {
+			return Ok(Validation::Success { is_new_best: true })
+		}
+
+		let mut backed_blocks =
+			Self::backed_block_hashes(&relay_chain_interface, relay_chain_best_hash, para_id)
+				.await?;
+
+		let head_hash = HeadData(header.encode()).hash();
+
+		if backed_blocks.any(|block_hash| block_hash == head_hash) {
 			tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);
 
 			Ok(Validation::Success { is_new_best: true })
diff --git a/cumulus/client/network/src/tests.rs b/cumulus/client/network/src/tests.rs
index 3f5757d5eac13cd751a89e77545a937b3223c9a0..eb0d7f0e01b391279648a5aea6031a275cf409a5 100644
--- a/cumulus/client/network/src/tests.rs
+++ b/cumulus/client/network/src/tests.rs
@@ -34,6 +34,7 @@ use polkadot_test_client::{
 	Client as PClient, ClientBlockImportExt, DefaultTestClientBuilderExt, FullBackend as PBackend,
 	InitPolkadotBlockBuilder, TestClientBuilder, TestClientBuilderExt,
 };
+use rstest::rstest;
 use sc_client_api::{Backend, BlockchainEvents};
 use sp_blockchain::HeaderBackend;
 use sp_consensus::BlockOrigin;
@@ -42,7 +43,8 @@ use sp_keyring::Sr25519Keyring;
 use sp_keystore::{testing::MemoryKeystore, Keystore, KeystorePtr};
 use sp_runtime::RuntimeAppPublic;
 use sp_state_machine::StorageValue;
-use std::{collections::BTreeMap, time::Duration};
+use sp_version::RuntimeVersion;
+use std::{borrow::Cow, collections::BTreeMap, time::Duration};
 
 fn check_error(error: crate::BoxedError, check_error: impl Fn(&BlockAnnounceError) -> bool) {
 	let error = *error
@@ -53,6 +55,33 @@ fn check_error(error: crate::BoxedError, check_error: impl Fn(&BlockAnnounceErro
 	}
 }
 
+fn dummy_candidate() -> CommittedCandidateReceipt {
+	CommittedCandidateReceipt {
+		descriptor: CandidateDescriptor {
+			para_head: polkadot_parachain_primitives::primitives::HeadData(
+				default_header().encode(),
+			)
+			.hash(),
+			para_id: 0u32.into(),
+			relay_parent: PHash::random(),
+			collator: CollatorPair::generate().0.public(),
+			persisted_validation_data_hash: PHash::random(),
+			pov_hash: PHash::random(),
+			erasure_root: PHash::random(),
+			signature: sp_core::sr25519::Signature::default().into(),
+			validation_code_hash: ValidationCodeHash::from(PHash::random()),
+		},
+		commitments: CandidateCommitments {
+			upward_messages: Default::default(),
+			horizontal_messages: Default::default(),
+			new_validation_code: None,
+			head_data: HeadData(Vec::new()),
+			processed_downward_messages: 0,
+			hrmp_watermark: 0,
+		},
+	}
+}
+
 #[derive(Clone)]
 struct DummyRelayChainInterface {
 	data: Arc<Mutex<ApiData>>,
@@ -69,6 +98,8 @@ impl DummyRelayChainInterface {
 			data: Arc::new(Mutex::new(ApiData {
 				validators: vec![Sr25519Keyring::Alice.public().into()],
 				has_pending_availability: false,
+				runtime_version:
+					RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT,
 			})),
 			relay_client: Arc::new(builder.build()),
 			relay_backend,
@@ -131,36 +162,37 @@ impl RelayChainInterface for DummyRelayChainInterface {
 		_: PHash,
 		_: ParaId,
 	) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
+		if self.data.lock().runtime_version >=
+			RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT
+		{
+			panic!("Should have used candidates_pending_availability instead");
+		}
+
 		if self.data.lock().has_pending_availability {
-			Ok(Some(CommittedCandidateReceipt {
-				descriptor: CandidateDescriptor {
-					para_head: polkadot_parachain_primitives::primitives::HeadData(
-						default_header().encode(),
-					)
-					.hash(),
-					para_id: 0u32.into(),
-					relay_parent: PHash::random(),
-					collator: CollatorPair::generate().0.public(),
-					persisted_validation_data_hash: PHash::random(),
-					pov_hash: PHash::random(),
-					erasure_root: PHash::random(),
-					signature: sp_core::sr25519::Signature::default().into(),
-					validation_code_hash: ValidationCodeHash::from(PHash::random()),
-				},
-				commitments: CandidateCommitments {
-					upward_messages: Default::default(),
-					horizontal_messages: Default::default(),
-					new_validation_code: None,
-					head_data: HeadData(Vec::new()),
-					processed_downward_messages: 0,
-					hrmp_watermark: 0,
-				},
-			}))
+			Ok(Some(dummy_candidate()))
 		} else {
 			Ok(None)
 		}
 	}
 
+	async fn candidates_pending_availability(
+		&self,
+		_: PHash,
+		_: ParaId,
+	) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
+		if self.data.lock().runtime_version <
+			RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT
+		{
+			panic!("Should have used candidate_pending_availability instead");
+		}
+
+		if self.data.lock().has_pending_availability {
+			Ok(vec![dummy_candidate()])
+		} else {
+			Ok(vec![])
+		}
+	}
+
 	async fn session_index_for_child(&self, _: PHash) -> RelayChainResult<SessionIndex> {
 		Ok(0)
 	}
@@ -264,6 +296,28 @@ impl RelayChainInterface for DummyRelayChainInterface {
 
 		Ok(header)
 	}
+
+	async fn version(&self, _: PHash) -> RelayChainResult<RuntimeVersion> {
+		let version = self.data.lock().runtime_version;
+
+		let apis = sp_version::create_apis_vec!([(
+			<dyn polkadot_primitives::runtime_api::ParachainHost<polkadot_primitives::Block>>::ID,
+			version
+		)])
+		.into_owned()
+		.to_vec();
+
+		Ok(RuntimeVersion {
+			spec_name: sp_version::create_runtime_str!("test"),
+			impl_name: sp_version::create_runtime_str!("test"),
+			authoring_version: 1,
+			spec_version: 1,
+			impl_version: 0,
+			apis: Cow::Owned(apis),
+			transaction_version: 5,
+			state_version: 1,
+		})
+	}
 }
 
 fn make_validator_and_api() -> (
@@ -574,11 +628,14 @@ fn relay_parent_not_imported_when_block_announce_is_processed() {
 
 /// Ensures that when we receive a block announcement without a statement included, while the block
 /// is not yet included by the node checking the announcement, but the node is already backed.
-#[test]
-fn block_announced_without_statement_and_block_only_backed() {
+#[rstest]
+#[case(RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT)]
+#[case(10)]
+fn block_announced_without_statement_and_block_only_backed(#[case] runtime_version: u32) {
 	block_on(async move {
 		let (mut validator, api) = make_validator_and_api();
 		api.data.lock().has_pending_availability = true;
+		api.data.lock().runtime_version = runtime_version;
 
 		let header = default_header();
 
@@ -592,4 +649,5 @@ fn block_announced_without_statement_and_block_only_backed() {
 struct ApiData {
 	validators: Vec<ValidatorId>,
 	has_pending_availability: bool,
+	runtime_version: u32,
 }
diff --git a/cumulus/client/pov-recovery/Cargo.toml b/cumulus/client/pov-recovery/Cargo.toml
index 7afe7fae34bd799ed75d557ad5f0ca5067743f7f..539802d6938663e1268e887c17f505d23c72c1c3 100644
--- a/cumulus/client/pov-recovery/Cargo.toml
+++ b/cumulus/client/pov-recovery/Cargo.toml
@@ -22,6 +22,8 @@ sc-consensus = { path = "../../../substrate/client/consensus/common" }
 sp-consensus = { path = "../../../substrate/primitives/consensus/common" }
 sp-maybe-compressed-blob = { path = "../../../substrate/primitives/maybe-compressed-blob" }
 sp-runtime = { path = "../../../substrate/primitives/runtime" }
+sp-api = { path = "../../../substrate/primitives/api" }
+sp-version = { path = "../../../substrate/primitives/version" }
 
 # Polkadot
 polkadot-node-primitives = { path = "../../../polkadot/node/primitives" }
@@ -35,8 +37,14 @@ cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
 async-trait = "0.1.79"
 
 [dev-dependencies]
+rstest = "0.18.2"
 tokio = { version = "1.32.0", features = ["macros"] }
 portpicker = "0.1.1"
+sp-blockchain = { path = "../../../substrate/primitives/blockchain" }
+cumulus-test-client = { path = "../../test/client" }
+sc-utils = { path = "../../../substrate/client/utils" }
+sp-tracing = { path = "../../../substrate/primitives/tracing" }
+assert_matches = "1.5"
 
 # Cumulus
 cumulus-test-service = { path = "../../test/service" }
diff --git a/cumulus/client/pov-recovery/src/active_candidate_recovery.rs b/cumulus/client/pov-recovery/src/active_candidate_recovery.rs
index c41c543f04d1f0d3f3a9fe7356dc7ab3ed5e5f11..50de98909ea471ea3de39a9edd85f4f9d71a8b3b 100644
--- a/cumulus/client/pov-recovery/src/active_candidate_recovery.rs
+++ b/cumulus/client/pov-recovery/src/active_candidate_recovery.rs
@@ -21,7 +21,7 @@ use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
 
 use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt};
 
-use std::{collections::HashSet, pin::Pin, sync::Arc};
+use std::{pin::Pin, sync::Arc};
 
 use crate::RecoveryHandle;
 
@@ -32,14 +32,12 @@ pub(crate) struct ActiveCandidateRecovery<Block: BlockT> {
 	/// The recoveries that are currently being executed.
 	recoveries:
 		FuturesUnordered<Pin<Box<dyn Future<Output = (Block::Hash, Option<Arc<PoV>>)> + Send>>>,
-	/// The block hashes of the candidates currently being recovered.
-	candidates: HashSet<Block::Hash>,
 	recovery_handle: Box<dyn RecoveryHandle>,
 }
 
 impl<Block: BlockT> ActiveCandidateRecovery<Block> {
 	pub fn new(recovery_handle: Box<dyn RecoveryHandle>) -> Self {
-		Self { recoveries: Default::default(), candidates: Default::default(), recovery_handle }
+		Self { recoveries: Default::default(), recovery_handle }
 	}
 
 	/// Recover the given `candidate`.
@@ -63,8 +61,6 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
 			)
 			.await;
 
-		self.candidates.insert(block_hash);
-
 		self.recoveries.push(
 			async move {
 				match rx.await {
@@ -97,7 +93,6 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
 	pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option<Arc<PoV>>) {
 		loop {
 			if let Some(res) = self.recoveries.next().await {
-				self.candidates.remove(&res.0);
 				return res
 			} else {
 				futures::pending!()
diff --git a/cumulus/client/pov-recovery/src/lib.rs b/cumulus/client/pov-recovery/src/lib.rs
index 0ca21749c3eb557a78f0996c703e86b35cab6f17..6ace18155e871d40baa2662c8c13b605b5768bbe 100644
--- a/cumulus/client/pov-recovery/src/lib.rs
+++ b/cumulus/client/pov-recovery/src/lib.rs
@@ -48,11 +48,12 @@
 
 use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
 use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
+use sp_api::RuntimeApiInfo;
 use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
 use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
 
 use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT};
-use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
+use polkadot_node_subsystem::messages::{AvailabilityRecoveryMessage, RuntimeApiRequest};
 use polkadot_overseer::Handle as OverseerHandle;
 use polkadot_primitives::{
 	CandidateReceipt, CommittedCandidateReceipt, Id as ParaId, SessionIndex,
@@ -75,6 +76,9 @@ use std::{
 	time::Duration,
 };
 
+#[cfg(test)]
+mod tests;
+
 mod active_candidate_recovery;
 use active_candidate_recovery::ActiveCandidateRecovery;
 
@@ -544,7 +548,7 @@ where
 		)
 		.await
 		{
-			Ok(pending_candidate_stream) => pending_candidate_stream.fuse(),
+			Ok(pending_candidates_stream) => pending_candidates_stream.fuse(),
 			Err(err) => {
 				tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
 				return
@@ -554,9 +558,11 @@ where
 		futures::pin_mut!(pending_candidates);
 		loop {
 			select! {
-				pending_candidate = pending_candidates.next() => {
-					if let Some((receipt, session_index)) = pending_candidate {
-						self.handle_pending_candidate(receipt, session_index);
+				next_pending_candidates = pending_candidates.next() => {
+					if let Some((candidates, session_index)) = next_pending_candidates {
+						for candidate in candidates {
+							self.handle_pending_candidate(candidate, session_index);
+						}
 					} else {
 						tracing::debug!(target: LOG_TARGET, "Pending candidates stream ended");
 						return;
@@ -615,7 +621,7 @@ async fn pending_candidates(
 	relay_chain_client: impl RelayChainInterface + Clone,
 	para_id: ParaId,
 	sync_service: Arc<dyn SyncOracle + Sync + Send>,
-) -> RelayChainResult<impl Stream<Item = (CommittedCandidateReceipt, SessionIndex)>> {
+) -> RelayChainResult<impl Stream<Item = (Vec<CommittedCandidateReceipt>, SessionIndex)>> {
 	let import_notification_stream = relay_chain_client.import_notification_stream().await?;
 
 	let filtered_stream = import_notification_stream.filter_map(move |n| {
@@ -632,16 +638,54 @@ async fn pending_candidates(
 				return None
 			}
 
-			let pending_availability_result = client_for_closure
-				.candidate_pending_availability(hash, para_id)
+			let runtime_api_version = client_for_closure
+				.version(hash)
 				.await
 				.map_err(|e| {
 					tracing::error!(
 						target: LOG_TARGET,
 						error = ?e,
-						"Failed to fetch pending candidates.",
+						"Failed to fetch relay chain runtime version.",
 					)
-				});
+				})
+				.ok()?;
+			let parachain_host_runtime_api_version = runtime_api_version
+				.api_version(
+					&<dyn polkadot_primitives::runtime_api::ParachainHost<
+						polkadot_primitives::Block,
+					>>::ID,
+				)
+				.unwrap_or_default();
+
+			// If the relay chain runtime does not support the new runtime API, fallback to the
+			// deprecated one.
+			let pending_availability_result = if parachain_host_runtime_api_version <
+				RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT
+			{
+				#[allow(deprecated)]
+				client_for_closure
+					.candidate_pending_availability(hash, para_id)
+					.await
+					.map_err(|e| {
+						tracing::error!(
+							target: LOG_TARGET,
+							error = ?e,
+							"Failed to fetch pending candidates.",
+						)
+					})
+					.map(|candidate| candidate.into_iter().collect::<Vec<_>>())
+			} else {
+				client_for_closure.candidates_pending_availability(hash, para_id).await.map_err(
+					|e| {
+						tracing::error!(
+							target: LOG_TARGET,
+							error = ?e,
+							"Failed to fetch pending candidates.",
+						)
+					},
+				)
+			};
+
 			let session_index_result =
 				client_for_closure.session_index_for_child(hash).await.map_err(|e| {
 					tracing::error!(
@@ -651,8 +695,8 @@ async fn pending_candidates(
 					)
 				});
 
-			if let Ok(Some(candidate)) = pending_availability_result {
-				session_index_result.map(|session_index| (candidate, session_index)).ok()
+			if let Ok(candidates) = pending_availability_result {
+				session_index_result.map(|session_index| (candidates, session_index)).ok()
 			} else {
 				None
 			}
diff --git a/cumulus/client/pov-recovery/src/tests.rs b/cumulus/client/pov-recovery/src/tests.rs
new file mode 100644
index 0000000000000000000000000000000000000000..75bf308ef27aa051e42806b7f65e7ab8b45a9c1a
--- /dev/null
+++ b/cumulus/client/pov-recovery/src/tests.rs
@@ -0,0 +1,1404 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use super::*;
+use assert_matches::assert_matches;
+use codec::{Decode, Encode};
+use cumulus_primitives_core::relay_chain::{BlockId, CandidateCommitments, CandidateDescriptor};
+use cumulus_relay_chain_interface::{
+	InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, PHash, PHeader,
+	PersistedValidationData, StorageValue, ValidationCodeHash, ValidatorId,
+};
+use cumulus_test_client::{
+	runtime::{Block, Header},
+	Sr25519Keyring,
+};
+use futures::{channel::mpsc, SinkExt};
+use polkadot_node_primitives::AvailableData;
+use polkadot_node_subsystem::{messages::AvailabilityRecoveryMessage, RecoveryError, TimeoutExt};
+use rstest::rstest;
+use sc_client_api::{
+	BlockImportNotification, ClientInfo, CompactProof, FinalityNotification, FinalityNotifications,
+	FinalizeSummary, ImportNotifications, StorageEventStream, StorageKey,
+};
+use sc_consensus::import_queue::RuntimeOrigin;
+use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
+use sp_blockchain::Info;
+use sp_runtime::{generic::SignedBlock, Justifications};
+use sp_version::RuntimeVersion;
+use std::{
+	borrow::Cow,
+	collections::BTreeMap,
+	ops::Range,
+	sync::{Arc, Mutex},
+};
+use tokio::task;
+
+const GENESIS_HASH: PHash = PHash::zero();
+const TEST_SESSION_INDEX: SessionIndex = 0;
+
+struct AvailabilityRecoverySubsystemHandle {
+	tx: mpsc::Sender<AvailabilityRecoveryMessage>,
+}
+
+impl AvailabilityRecoverySubsystemHandle {
+	fn new() -> (Self, mpsc::Receiver<AvailabilityRecoveryMessage>) {
+		let (tx, rx) = mpsc::channel(10);
+
+		(Self { tx }, rx)
+	}
+}
+
+#[async_trait::async_trait]
+impl RecoveryHandle for AvailabilityRecoverySubsystemHandle {
+	async fn send_recovery_msg(
+		&mut self,
+		message: AvailabilityRecoveryMessage,
+		_origin: &'static str,
+	) {
+		self.tx.send(message).await.expect("Receiver dropped");
+	}
+}
+
+struct ParachainClientInner<Block: BlockT> {
+	import_notifications_rx: Option<TracingUnboundedReceiver<BlockImportNotification<Block>>>,
+	finality_notifications_rx: Option<TracingUnboundedReceiver<FinalityNotification<Block>>>,
+	usage_infos: Vec<ClientInfo<Block>>,
+	block_statuses: Arc<Mutex<HashMap<Block::Hash, BlockStatus>>>,
+}
+
+impl<Block: BlockT> ParachainClientInner<Block> {
+	fn new(
+		usage_infos: Vec<ClientInfo<Block>>,
+		block_statuses: Arc<Mutex<HashMap<Block::Hash, BlockStatus>>>,
+	) -> (
+		Self,
+		TracingUnboundedSender<BlockImportNotification<Block>>,
+		TracingUnboundedSender<FinalityNotification<Block>>,
+	) {
+		let (import_notifications_tx, import_notifications_rx) =
+			sc_utils::mpsc::tracing_unbounded("import_notif", 10);
+		let (finality_notifications_tx, finality_notifications_rx) =
+			sc_utils::mpsc::tracing_unbounded("finality_notif", 10);
+		(
+			Self {
+				import_notifications_rx: Some(import_notifications_rx),
+				finality_notifications_rx: Some(finality_notifications_rx),
+				usage_infos,
+				block_statuses,
+			},
+			import_notifications_tx,
+			finality_notifications_tx,
+		)
+	}
+}
+struct ParachainClient<Block: BlockT> {
+	inner: Arc<Mutex<ParachainClientInner<Block>>>,
+}
+
+impl<Block: BlockT> ParachainClient<Block> {
+	fn new(
+		usage_infos: Vec<ClientInfo<Block>>,
+		block_statuses: Arc<Mutex<HashMap<Block::Hash, BlockStatus>>>,
+	) -> (
+		Self,
+		TracingUnboundedSender<BlockImportNotification<Block>>,
+		TracingUnboundedSender<FinalityNotification<Block>>,
+	) {
+		let (inner, import_notifications_tx, finality_notifications_tx) =
+			ParachainClientInner::new(usage_infos, block_statuses);
+		(
+			Self { inner: Arc::new(Mutex::new(inner)) },
+			import_notifications_tx,
+			finality_notifications_tx,
+		)
+	}
+}
+
+impl<Block: BlockT> BlockchainEvents<Block> for ParachainClient<Block> {
+	fn import_notification_stream(&self) -> ImportNotifications<Block> {
+		self.inner
+			.lock()
+			.expect("poisoned lock")
+			.import_notifications_rx
+			.take()
+			.expect("Should only be taken once")
+	}
+
+	fn every_import_notification_stream(&self) -> ImportNotifications<Block> {
+		unimplemented!()
+	}
+
+	fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
+		self.inner
+			.lock()
+			.expect("poisoned lock")
+			.finality_notifications_rx
+			.take()
+			.expect("Should only be taken once")
+	}
+
+	fn storage_changes_notification_stream(
+		&self,
+		_filter_keys: Option<&[StorageKey]>,
+		_child_filter_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
+	) -> sp_blockchain::Result<StorageEventStream<Block::Hash>> {
+		unimplemented!()
+	}
+}
+
+impl<Block: BlockT> BlockBackend<Block> for ParachainClient<Block> {
+	fn block_body(
+		&self,
+		_: Block::Hash,
+	) -> sp_blockchain::Result<Option<Vec<<Block as BlockT>::Extrinsic>>> {
+		unimplemented!()
+	}
+
+	fn block(&self, _: Block::Hash) -> sp_blockchain::Result<Option<SignedBlock<Block>>> {
+		unimplemented!()
+	}
+
+	fn block_status(&self, hash: Block::Hash) -> sp_blockchain::Result<sp_consensus::BlockStatus> {
+		Ok(self
+			.inner
+			.lock()
+			.expect("Poisoned lock")
+			.block_statuses
+			.lock()
+			.expect("Poisoned lock")
+			.get(&hash)
+			.cloned()
+			.unwrap_or(BlockStatus::Unknown))
+	}
+
+	fn justifications(&self, _: Block::Hash) -> sp_blockchain::Result<Option<Justifications>> {
+		unimplemented!()
+	}
+
+	fn block_hash(&self, _: NumberFor<Block>) -> sp_blockchain::Result<Option<Block::Hash>> {
+		unimplemented!()
+	}
+
+	fn indexed_transaction(&self, _: Block::Hash) -> sp_blockchain::Result<Option<Vec<u8>>> {
+		unimplemented!()
+	}
+
+	fn has_indexed_transaction(&self, _: Block::Hash) -> sp_blockchain::Result<bool> {
+		unimplemented!()
+	}
+
+	fn block_indexed_body(&self, _: Block::Hash) -> sp_blockchain::Result<Option<Vec<Vec<u8>>>> {
+		unimplemented!()
+	}
+
+	fn requires_full_sync(&self) -> bool {
+		unimplemented!()
+	}
+}
+
+impl<Block: BlockT> UsageProvider<Block> for ParachainClient<Block> {
+	fn usage_info(&self) -> ClientInfo<Block> {
+		let infos = &mut self.inner.lock().expect("Poisoned lock").usage_infos;
+		assert!(!infos.is_empty());
+
+		if infos.len() == 1 {
+			infos.last().unwrap().clone()
+		} else {
+			infos.remove(0)
+		}
+	}
+}
+
+struct ParachainImportQueue<Block: BlockT> {
+	import_requests_tx: TracingUnboundedSender<Vec<IncomingBlock<Block>>>,
+}
+
+impl<Block: BlockT> ParachainImportQueue<Block> {
+	fn new() -> (Self, TracingUnboundedReceiver<Vec<IncomingBlock<Block>>>) {
+		let (import_requests_tx, import_requests_rx) =
+			sc_utils::mpsc::tracing_unbounded("test_import_req_forwarding", 10);
+		(Self { import_requests_tx }, import_requests_rx)
+	}
+}
+
+impl<Block: BlockT> ImportQueueService<Block> for ParachainImportQueue<Block> {
+	fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<Block>>) {
+		assert_matches!(origin, BlockOrigin::ConsensusBroadcast);
+		self.import_requests_tx.unbounded_send(blocks).unwrap();
+	}
+
+	fn import_justifications(
+		&mut self,
+		_: RuntimeOrigin,
+		_: Block::Hash,
+		_: NumberFor<Block>,
+		_: Justifications,
+	) {
+		unimplemented!()
+	}
+}
+
+#[derive(Default)]
+struct DummySyncOracle {
+	is_major_syncing: bool,
+}
+
+impl DummySyncOracle {
+	fn new(is_major_syncing: bool) -> Self {
+		Self { is_major_syncing }
+	}
+}
+
+impl SyncOracle for DummySyncOracle {
+	fn is_major_syncing(&self) -> bool {
+		self.is_major_syncing
+	}
+
+	fn is_offline(&self) -> bool {
+		false
+	}
+}
+
+#[derive(Clone)]
+struct RelaychainInner {
+	runtime_version: u32,
+	import_notifications: Vec<PHeader>,
+	candidates_pending_availability: HashMap<PHash, Vec<CommittedCandidateReceipt>>,
+}
+
+#[derive(Clone)]
+struct Relaychain {
+	inner: Arc<Mutex<RelaychainInner>>,
+}
+
+impl Relaychain {
+	fn new(relay_chain_blocks: Vec<(PHeader, Vec<CommittedCandidateReceipt>)>) -> Self {
+		let (candidates_pending_availability, import_notifications) = relay_chain_blocks
+			.into_iter()
+			.map(|(header, receipt)| ((header.hash(), receipt), header))
+			.unzip();
+		Self {
+			inner: Arc::new(Mutex::new(RelaychainInner {
+				import_notifications,
+				candidates_pending_availability,
+				// The version that introduced candidates_pending_availability
+				runtime_version:
+					RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT,
+			})),
+		}
+	}
+
+	fn set_runtime_version(&self, version: u32) {
+		self.inner.lock().expect("Poisoned lock").runtime_version = version;
+	}
+}
+
+#[async_trait::async_trait]
+impl RelayChainInterface for Relaychain {
+	async fn version(&self, _: PHash) -> RelayChainResult<RuntimeVersion> {
+		let version = self.inner.lock().expect("Poisoned lock").runtime_version;
+
+		let apis = sp_version::create_apis_vec!([(
+			<dyn polkadot_primitives::runtime_api::ParachainHost<polkadot_primitives::Block>>::ID,
+			version
+		)])
+		.into_owned()
+		.to_vec();
+
+		Ok(RuntimeVersion {
+			spec_name: sp_version::create_runtime_str!("test"),
+			impl_name: sp_version::create_runtime_str!("test"),
+			authoring_version: 1,
+			spec_version: 1,
+			impl_version: 0,
+			apis: Cow::Owned(apis),
+			transaction_version: 5,
+			state_version: 1,
+		})
+	}
+
+	async fn validators(&self, _: PHash) -> RelayChainResult<Vec<ValidatorId>> {
+		unimplemented!("Not needed for test")
+	}
+
+	async fn best_block_hash(&self) -> RelayChainResult<PHash> {
+		unimplemented!("Not needed for test")
+	}
+
+	async fn finalized_block_hash(&self) -> RelayChainResult<PHash> {
+		unimplemented!("Not needed for test")
+	}
+
+	async fn retrieve_dmq_contents(
+		&self,
+		_: ParaId,
+		_: PHash,
+	) -> RelayChainResult<Vec<InboundDownwardMessage>> {
+		unimplemented!("Not needed for test")
+	}
+
+	async fn retrieve_all_inbound_hrmp_channel_contents(
+		&self,
+		_: ParaId,
+		_: PHash,
+	) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
+		unimplemented!("Not needed for test")
+	}
+
+	async fn persisted_validation_data(
+		&self,
+		_: PHash,
+		_: ParaId,
+		_: OccupiedCoreAssumption,
+	) -> RelayChainResult<Option<PersistedValidationData>> {
+		unimplemented!("Not needed for test")
+	}
+
+	async fn validation_code_hash(
+		&self,
+		_: PHash,
+		_: ParaId,
+		_: OccupiedCoreAssumption,
+	) -> RelayChainResult<Option<ValidationCodeHash>> {
+		unimplemented!("Not needed for test")
+	}
+
+	async fn candidate_pending_availability(
+		&self,
+		hash: PHash,
+		_: ParaId,
+	) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
+		if self.inner.lock().expect("Poisoned lock").runtime_version >=
+			RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT
+		{
+			panic!("Should have used candidates_pending_availability instead");
+		}
+
+		Ok(self
+			.inner
+			.lock()
+			.expect("Poisoned lock")
+			.candidates_pending_availability
+			.remove(&hash)
+			.map(|mut c| {
+				assert_eq!(c.len(), 1);
+				c.pop().unwrap()
+			}))
+	}
+
+	async fn candidates_pending_availability(
+		&self,
+		hash: PHash,
+		_: ParaId,
+	) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
+		if self.inner.lock().expect("Poisoned lock").runtime_version <
+			RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT
+		{
+			panic!("Should have used candidate_pending_availability instead");
+		}
+
+		Ok(self
+			.inner
+			.lock()
+			.expect("Poisoned lock")
+			.candidates_pending_availability
+			.remove(&hash)
+			.expect("Not found"))
+	}
+
+	async fn session_index_for_child(&self, _: PHash) -> RelayChainResult<SessionIndex> {
+		Ok(TEST_SESSION_INDEX)
+	}
+
+	async fn import_notification_stream(
+		&self,
+	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
+		Ok(Box::pin(
+			futures::stream::iter(std::mem::take(
+				&mut self.inner.lock().expect("Poisoned lock").import_notifications,
+			))
+			.chain(futures::stream::pending()),
+		))
+	}
+
+	async fn finality_notification_stream(
+		&self,
+	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
+		unimplemented!("Not needed for test")
+	}
+
+	async fn is_major_syncing(&self) -> RelayChainResult<bool> {
+		unimplemented!("Not needed for test");
+	}
+
+	fn overseer_handle(&self) -> RelayChainResult<OverseerHandle> {
+		unimplemented!("Not needed for test")
+	}
+
+	async fn get_storage_by_key(
+		&self,
+		_: PHash,
+		_: &[u8],
+	) -> RelayChainResult<Option<StorageValue>> {
+		unimplemented!("Not needed for test")
+	}
+
+	async fn prove_read(
+		&self,
+		_: PHash,
+		_: &Vec<Vec<u8>>,
+	) -> RelayChainResult<sc_client_api::StorageProof> {
+		unimplemented!("Not needed for test")
+	}
+
+	async fn wait_for_block(&self, _: PHash) -> RelayChainResult<()> {
+		unimplemented!("Not needed for test");
+	}
+
+	async fn new_best_notification_stream(
+		&self,
+	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
+		unimplemented!("Not needed for test");
+	}
+
+	async fn header(&self, _: BlockId) -> RelayChainResult<Option<PHeader>> {
+		unimplemented!("Not needed for test");
+	}
+}
+
+fn make_candidate_chain(candidate_number_range: Range<u32>) -> Vec<CommittedCandidateReceipt> {
+	let collator = Sr25519Keyring::Ferdie;
+	let mut latest_parent_hash = GENESIS_HASH;
+	let mut candidates = vec![];
+
+	for number in candidate_number_range {
+		let head_data = Header {
+			number,
+			digest: Default::default(),
+			extrinsics_root: Default::default(),
+			parent_hash: latest_parent_hash,
+			state_root: Default::default(),
+		};
+
+		latest_parent_hash = head_data.hash();
+
+		candidates.push(CommittedCandidateReceipt {
+			descriptor: CandidateDescriptor {
+				para_id: ParaId::from(1000),
+				relay_parent: PHash::zero(),
+				collator: collator.public().into(),
+				persisted_validation_data_hash: PHash::zero(),
+				pov_hash: PHash::zero(),
+				erasure_root: PHash::zero(),
+				signature: collator.sign(&[0u8; 132]).into(),
+				para_head: PHash::zero(),
+				validation_code_hash: PHash::zero().into(),
+			},
+			commitments: CandidateCommitments {
+				head_data: head_data.encode().into(),
+				upward_messages: vec![].try_into().expect("empty vec fits within bounds"),
+				new_validation_code: None,
+				horizontal_messages: vec![].try_into().expect("empty vec fits within bounds"),
+				processed_downward_messages: 0,
+				hrmp_watermark: 0_u32,
+			},
+		});
+	}
+
+	candidates
+}
+
+fn dummy_usage_info(finalized_number: u32) -> ClientInfo<Block> {
+	ClientInfo {
+		chain: Info {
+			best_hash: PHash::zero(),
+			best_number: 0,
+			genesis_hash: PHash::zero(),
+			finalized_hash: PHash::zero(),
+			// Only this field is being used.
+			finalized_number,
+			finalized_state: None,
+			number_leaves: 0,
+			block_gap: None,
+		},
+		usage: None,
+	}
+}
+
+fn dummy_pvd() -> PersistedValidationData {
+	PersistedValidationData {
+		parent_head: vec![].into(),
+		relay_parent_number: 1,
+		relay_parent_storage_root: PHash::zero(),
+		max_pov_size: 100,
+	}
+}
+
+#[tokio::test]
+async fn pending_candidate_height_lower_than_latest_finalized() {
+	sp_tracing::init_for_tests();
+
+	for finalized_number in [3, 4, 5] {
+		let (recovery_subsystem_tx, mut recovery_subsystem_rx) =
+			AvailabilityRecoverySubsystemHandle::new();
+		let recovery_delay_range =
+			RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) };
+		let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10);
+		let candidates = make_candidate_chain(1..4);
+		let relay_chain_client = Relaychain::new(vec![(
+			PHeader {
+				parent_hash: PHash::from_low_u64_be(0),
+				number: 1,
+				state_root: PHash::random(),
+				extrinsics_root: PHash::random(),
+				digest: Default::default(),
+			},
+			candidates,
+		)]);
+		let (parachain_client, _import_notifications_tx, _finality_notifications_tx) =
+			ParachainClient::new(vec![dummy_usage_info(finalized_number)], Default::default());
+		let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new();
+
+		// If the latest finalized block has a larger height compared to the pending candidate, the
+		// new candidate won't be recovered. Candidates have heights is 1, 2 and 3. Latest finalized
+		// block is 3, 4 or 5.
+		let pov_recovery = PoVRecovery::<Block, _, _>::new(
+			Box::new(recovery_subsystem_tx),
+			recovery_delay_range,
+			Arc::new(parachain_client),
+			Box::new(parachain_import_queue),
+			relay_chain_client,
+			ParaId::new(1000),
+			explicit_recovery_chan_rx,
+			Arc::new(DummySyncOracle::default()),
+		);
+
+		task::spawn(pov_recovery.run());
+
+		// No recovery message received
+		assert_matches!(
+			recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await,
+			None
+		);
+
+		// No import request received
+		assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None);
+	}
+}
+
+#[rstest]
+#[case(RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT)]
+#[case(10)]
+#[tokio::test]
+async fn single_pending_candidate_recovery_success(#[case] runtime_version: u32) {
+	sp_tracing::init_for_tests();
+
+	let (recovery_subsystem_tx, mut recovery_subsystem_rx) =
+		AvailabilityRecoverySubsystemHandle::new();
+	let recovery_delay_range =
+		RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) };
+	let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10);
+	let candidates = make_candidate_chain(1..2);
+	let header = Header::decode(&mut &candidates[0].commitments.head_data.0[..]).unwrap();
+	let candidate_hash = candidates[0].hash();
+
+	let relay_chain_client = Relaychain::new(vec![(
+		PHeader {
+			parent_hash: PHash::from_low_u64_be(0),
+			number: 1,
+			state_root: PHash::random(),
+			extrinsics_root: PHash::random(),
+			digest: Default::default(),
+		},
+		candidates,
+	)]);
+	relay_chain_client.set_runtime_version(runtime_version);
+
+	let mut known_blocks = HashMap::new();
+	known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState);
+	let (parachain_client, _import_notifications_tx, _finality_notifications_tx) =
+		ParachainClient::new(vec![dummy_usage_info(0)], Arc::new(Mutex::new(known_blocks)));
+	let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new();
+
+	let pov_recovery = PoVRecovery::<Block, _, _>::new(
+		Box::new(recovery_subsystem_tx),
+		recovery_delay_range,
+		Arc::new(parachain_client),
+		Box::new(parachain_import_queue),
+		relay_chain_client,
+		ParaId::new(1000),
+		explicit_recovery_chan_rx,
+		Arc::new(DummySyncOracle::default()),
+	);
+
+	task::spawn(pov_recovery.run());
+
+	assert_matches!(
+		recovery_subsystem_rx.next().await,
+		Some(AvailabilityRecoveryMessage::RecoverAvailableData(
+			receipt,
+			session_index,
+			None,
+			None,
+			response_tx
+		)) => {
+			assert_eq!(receipt.hash(), candidate_hash);
+			assert_eq!(session_index, TEST_SESSION_INDEX);
+			response_tx.send(
+				Ok(
+					AvailableData {
+						pov: Arc::new(PoV {
+							block_data: ParachainBlockData::<Block>::new(
+								header.clone(),
+								vec![],
+								CompactProof {encoded_nodes: vec![]}
+							).encode().into()
+						}),
+						validation_data: dummy_pvd(),
+					}
+				)
+			).unwrap()
+		}
+	);
+
+	// No more recovery messages received.
+	assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None);
+
+	// Received import request for the recovered candidate
+	assert_matches!(import_requests_rx.next().await, Some(incoming_blocks) => {
+		assert_eq!(incoming_blocks.len(), 1);
+		assert_eq!(incoming_blocks[0].header, Some(header));
+	});
+
+	// No import request received
+	assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None);
+}
+
+#[tokio::test]
+async fn single_pending_candidate_recovery_retry_succeeds() {
+	sp_tracing::init_for_tests();
+
+	let (recovery_subsystem_tx, mut recovery_subsystem_rx) =
+		AvailabilityRecoverySubsystemHandle::new();
+	let recovery_delay_range =
+		RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) };
+	let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10);
+	let candidates = make_candidate_chain(1..2);
+	let header = Header::decode(&mut &candidates[0].commitments.head_data.0[..]).unwrap();
+	let candidate_hash = candidates[0].hash();
+
+	let relay_chain_client = Relaychain::new(vec![(
+		PHeader {
+			parent_hash: PHash::from_low_u64_be(0),
+			number: 1,
+			state_root: PHash::random(),
+			extrinsics_root: PHash::random(),
+			digest: Default::default(),
+		},
+		candidates,
+	)]);
+	let mut known_blocks = HashMap::new();
+	known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState);
+	let (parachain_client, _import_notifications_tx, _finality_notifications_tx) =
+		ParachainClient::new(vec![dummy_usage_info(0)], Arc::new(Mutex::new(known_blocks)));
+	let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new();
+
+	let pov_recovery = PoVRecovery::<Block, _, _>::new(
+		Box::new(recovery_subsystem_tx),
+		recovery_delay_range,
+		Arc::new(parachain_client),
+		Box::new(parachain_import_queue),
+		relay_chain_client,
+		ParaId::new(1000),
+		explicit_recovery_chan_rx,
+		Arc::new(DummySyncOracle::default()),
+	);
+
+	task::spawn(pov_recovery.run());
+
+	// First recovery fails.
+	assert_matches!(
+		recovery_subsystem_rx.next().await,
+		Some(AvailabilityRecoveryMessage::RecoverAvailableData(
+			receipt,
+			session_index,
+			None,
+			None,
+			response_tx
+		)) => {
+			assert_eq!(receipt.hash(), candidate_hash);
+			assert_eq!(session_index, TEST_SESSION_INDEX);
+			response_tx.send(
+				Err(RecoveryError::Unavailable)
+			).unwrap()
+		}
+	);
+	// Candidate is not imported.
+	assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None);
+
+	// Recovery is retried and it succeeds now.
+	assert_matches!(
+		recovery_subsystem_rx.next().await,
+		Some(AvailabilityRecoveryMessage::RecoverAvailableData(
+			receipt,
+			session_index,
+			None,
+			None,
+			response_tx
+		)) => {
+			assert_eq!(receipt.hash(), candidate_hash);
+			assert_eq!(session_index, TEST_SESSION_INDEX);
+			response_tx.send(
+				Ok(
+					AvailableData {
+						pov: Arc::new(PoV {
+							block_data: ParachainBlockData::<Block>::new(
+								header.clone(),
+								vec![],
+								CompactProof {encoded_nodes: vec![]}
+							).encode().into()
+						}),
+						validation_data: dummy_pvd(),
+					}
+				)
+			).unwrap()
+		}
+	);
+
+	// No more recovery messages received.
+	assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None);
+
+	// Received import request for the recovered candidate
+	assert_matches!(import_requests_rx.next().await, Some(incoming_blocks) => {
+		assert_eq!(incoming_blocks.len(), 1);
+		assert_eq!(incoming_blocks[0].header, Some(header));
+	});
+
+	// No import request received
+	assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None);
+}
+
+#[tokio::test]
+async fn single_pending_candidate_recovery_retry_fails() {
+	sp_tracing::init_for_tests();
+
+	let (recovery_subsystem_tx, mut recovery_subsystem_rx) =
+		AvailabilityRecoverySubsystemHandle::new();
+	let recovery_delay_range =
+		RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) };
+	let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10);
+	let candidates = make_candidate_chain(1..2);
+	let candidate_hash = candidates[0].hash();
+
+	let relay_chain_client = Relaychain::new(vec![(
+		PHeader {
+			parent_hash: PHash::from_low_u64_be(0),
+			number: 1,
+			state_root: PHash::random(),
+			extrinsics_root: PHash::random(),
+			digest: Default::default(),
+		},
+		candidates,
+	)]);
+	let mut known_blocks = HashMap::new();
+	known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState);
+	let (parachain_client, _import_notifications_tx, _finality_notifications_tx) =
+		ParachainClient::new(vec![dummy_usage_info(0)], Arc::new(Mutex::new(known_blocks)));
+	let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new();
+
+	let pov_recovery = PoVRecovery::<Block, _, _>::new(
+		Box::new(recovery_subsystem_tx),
+		recovery_delay_range,
+		Arc::new(parachain_client),
+		Box::new(parachain_import_queue),
+		relay_chain_client,
+		ParaId::new(1000),
+		explicit_recovery_chan_rx,
+		Arc::new(DummySyncOracle::default()),
+	);
+
+	task::spawn(pov_recovery.run());
+
+	// First recovery fails.
+	assert_matches!(
+		recovery_subsystem_rx.next().await,
+		Some(AvailabilityRecoveryMessage::RecoverAvailableData(
+			receipt,
+			session_index,
+			None,
+			None,
+			response_tx
+		)) => {
+			assert_eq!(receipt.hash(), candidate_hash);
+			assert_eq!(session_index, TEST_SESSION_INDEX);
+			response_tx.send(
+				Err(RecoveryError::Unavailable)
+			).unwrap()
+		}
+	);
+	// Candidate is not imported.
+	assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None);
+
+	// Second retry fails.
+	assert_matches!(
+		recovery_subsystem_rx.next().await,
+		Some(AvailabilityRecoveryMessage::RecoverAvailableData(
+			receipt,
+			session_index,
+			None,
+			None,
+			response_tx
+		)) => {
+			assert_eq!(receipt.hash(), candidate_hash);
+			assert_eq!(session_index, TEST_SESSION_INDEX);
+			response_tx.send(
+				Err(RecoveryError::Unavailable)
+			).unwrap()
+		}
+	);
+	// Candidate is not imported.
+	assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None);
+
+	// After the second attempt, give up.
+	// No more recovery messages received.
+	assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None);
+}
+
+#[tokio::test]
+async fn single_pending_candidate_recovery_irrecoverable_error() {
+	sp_tracing::init_for_tests();
+
+	let (recovery_subsystem_tx, mut recovery_subsystem_rx) =
+		AvailabilityRecoverySubsystemHandle::new();
+	let recovery_delay_range =
+		RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) };
+	let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10);
+	let candidates = make_candidate_chain(1..2);
+	let candidate_hash = candidates[0].hash();
+
+	let relay_chain_client = Relaychain::new(vec![(
+		PHeader {
+			parent_hash: PHash::from_low_u64_be(0),
+			number: 1,
+			state_root: PHash::random(),
+			extrinsics_root: PHash::random(),
+			digest: Default::default(),
+		},
+		candidates,
+	)]);
+	let mut known_blocks = HashMap::new();
+	known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState);
+	let (parachain_client, _import_notifications_tx, _finality_notifications_tx) =
+		ParachainClient::new(vec![dummy_usage_info(0)], Arc::new(Mutex::new(known_blocks)));
+	let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new();
+
+	let pov_recovery = PoVRecovery::<Block, _, _>::new(
+		Box::new(recovery_subsystem_tx),
+		recovery_delay_range,
+		Arc::new(parachain_client),
+		Box::new(parachain_import_queue),
+		relay_chain_client,
+		ParaId::new(1000),
+		explicit_recovery_chan_rx,
+		Arc::new(DummySyncOracle::default()),
+	);
+
+	task::spawn(pov_recovery.run());
+
+	// Recovery succeeds but the block data is wrong. Will not be retried.
+	assert_matches!(
+		recovery_subsystem_rx.next().await,
+		Some(AvailabilityRecoveryMessage::RecoverAvailableData(
+			receipt,
+			session_index,
+			None,
+			None,
+			response_tx
+		)) => {
+			assert_eq!(receipt.hash(), candidate_hash);
+			assert_eq!(session_index, TEST_SESSION_INDEX);
+			response_tx.send(
+				Ok(
+					AvailableData {
+						pov: Arc::new(PoV {
+							// Empty block data. It will fail to decode.
+							block_data: vec![].into()
+						}),
+						validation_data: dummy_pvd(),
+					}
+				)
+			).unwrap()
+		}
+	);
+	// Candidate is not imported.
+	assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None);
+
+	// No more recovery messages received.
+	assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None);
+}
+
+#[tokio::test]
+async fn pending_candidates_recovery_skipped_while_syncing() {
+	sp_tracing::init_for_tests();
+
+	let (recovery_subsystem_tx, mut recovery_subsystem_rx) =
+		AvailabilityRecoverySubsystemHandle::new();
+	let recovery_delay_range =
+		RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) };
+	let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10);
+	let candidates = make_candidate_chain(1..4);
+
+	let relay_chain_client = Relaychain::new(vec![(
+		PHeader {
+			parent_hash: PHash::from_low_u64_be(0),
+			number: 1,
+			state_root: PHash::random(),
+			extrinsics_root: PHash::random(),
+			digest: Default::default(),
+		},
+		candidates,
+	)]);
+	let mut known_blocks = HashMap::new();
+	known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState);
+	let (parachain_client, _import_notifications_tx, _finality_notifications_tx) =
+		ParachainClient::new(vec![dummy_usage_info(0)], Arc::new(Mutex::new(known_blocks)));
+	let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new();
+
+	let pov_recovery = PoVRecovery::<Block, _, _>::new(
+		Box::new(recovery_subsystem_tx),
+		recovery_delay_range,
+		Arc::new(parachain_client),
+		Box::new(parachain_import_queue),
+		relay_chain_client,
+		ParaId::new(1000),
+		explicit_recovery_chan_rx,
+		Arc::new(DummySyncOracle::new(true)),
+	);
+
+	task::spawn(pov_recovery.run());
+
+	// No recovery messages received.
+	assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None);
+
+	// No candidate is imported.
+	assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None);
+}
+
+#[tokio::test]
+async fn candidate_is_imported_while_awaiting_recovery() {
+	sp_tracing::init_for_tests();
+
+	let (recovery_subsystem_tx, mut recovery_subsystem_rx) =
+		AvailabilityRecoverySubsystemHandle::new();
+	let recovery_delay_range =
+		RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) };
+	let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10);
+	let candidates = make_candidate_chain(1..2);
+	let header = Header::decode(&mut &candidates[0].commitments.head_data.0[..]).unwrap();
+	let candidate_hash = candidates[0].hash();
+
+	let relay_chain_client = Relaychain::new(vec![(
+		PHeader {
+			parent_hash: PHash::from_low_u64_be(0),
+			number: 1,
+			state_root: PHash::random(),
+			extrinsics_root: PHash::random(),
+			digest: Default::default(),
+		},
+		candidates,
+	)]);
+	let mut known_blocks = HashMap::new();
+	known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState);
+	let (parachain_client, import_notifications_tx, _finality_notifications_tx) =
+		ParachainClient::new(vec![dummy_usage_info(0)], Arc::new(Mutex::new(known_blocks)));
+	let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new();
+
+	let pov_recovery = PoVRecovery::<Block, _, _>::new(
+		Box::new(recovery_subsystem_tx),
+		recovery_delay_range,
+		Arc::new(parachain_client),
+		Box::new(parachain_import_queue),
+		relay_chain_client,
+		ParaId::new(1000),
+		explicit_recovery_chan_rx,
+		Arc::new(DummySyncOracle::default()),
+	);
+
+	task::spawn(pov_recovery.run());
+
+	let recovery_response_tx;
+
+	assert_matches!(
+		recovery_subsystem_rx.next().await,
+		Some(AvailabilityRecoveryMessage::RecoverAvailableData(
+			receipt,
+			session_index,
+			None,
+			None,
+			response_tx
+		)) => {
+			assert_eq!(receipt.hash(), candidate_hash);
+			assert_eq!(session_index, TEST_SESSION_INDEX);
+			recovery_response_tx = response_tx;
+		}
+	);
+
+	// While candidate is pending recovery, import the candidate from external source.
+	let (unpin_sender, _unpin_receiver) = sc_utils::mpsc::tracing_unbounded("test_unpin", 10);
+	import_notifications_tx
+		.unbounded_send(BlockImportNotification::new(
+			header.hash(),
+			BlockOrigin::ConsensusBroadcast,
+			header.clone(),
+			false,
+			None,
+			unpin_sender,
+		))
+		.unwrap();
+
+	recovery_response_tx
+		.send(Ok(AvailableData {
+			pov: Arc::new(PoV {
+				block_data: ParachainBlockData::<Block>::new(
+					header.clone(),
+					vec![],
+					CompactProof { encoded_nodes: vec![] },
+				)
+				.encode()
+				.into(),
+			}),
+			validation_data: dummy_pvd(),
+		}))
+		.unwrap();
+
+	// Received import request for the recovered candidate. This could be optimised to not trigger a
+	// reimport.
+	assert_matches!(import_requests_rx.next().await, Some(incoming_blocks) => {
+		assert_eq!(incoming_blocks.len(), 1);
+		assert_eq!(incoming_blocks[0].header, Some(header));
+	});
+
+	// No more recovery messages received.
+	assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None);
+
+	// No more import requests received
+	assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None);
+}
+
+#[tokio::test]
+async fn candidate_is_finalized_while_awaiting_recovery() {
+	sp_tracing::init_for_tests();
+
+	let (recovery_subsystem_tx, mut recovery_subsystem_rx) =
+		AvailabilityRecoverySubsystemHandle::new();
+	let recovery_delay_range =
+		RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) };
+	let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10);
+	let candidates = make_candidate_chain(1..2);
+	let header = Header::decode(&mut &candidates[0].commitments.head_data.0[..]).unwrap();
+	let candidate_hash = candidates[0].hash();
+
+	let relay_chain_client = Relaychain::new(vec![(
+		PHeader {
+			parent_hash: PHash::from_low_u64_be(0),
+			number: 1,
+			state_root: PHash::random(),
+			extrinsics_root: PHash::random(),
+			digest: Default::default(),
+		},
+		candidates,
+	)]);
+	let mut known_blocks = HashMap::new();
+	known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState);
+	let (parachain_client, _import_notifications_tx, finality_notifications_tx) =
+		ParachainClient::new(vec![dummy_usage_info(0)], Arc::new(Mutex::new(known_blocks)));
+	let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new();
+
+	let pov_recovery = PoVRecovery::<Block, _, _>::new(
+		Box::new(recovery_subsystem_tx),
+		recovery_delay_range,
+		Arc::new(parachain_client),
+		Box::new(parachain_import_queue),
+		relay_chain_client,
+		ParaId::new(1000),
+		explicit_recovery_chan_rx,
+		Arc::new(DummySyncOracle::default()),
+	);
+
+	task::spawn(pov_recovery.run());
+
+	let recovery_response_tx;
+
+	assert_matches!(
+		recovery_subsystem_rx.next().await,
+		Some(AvailabilityRecoveryMessage::RecoverAvailableData(
+			receipt,
+			session_index,
+			None,
+			None,
+			response_tx
+		)) => {
+			assert_eq!(receipt.hash(), candidate_hash);
+			assert_eq!(session_index, TEST_SESSION_INDEX);
+			// save it for later.
+			recovery_response_tx = response_tx;
+		}
+	);
+
+	// While candidate is pending recovery, it gets finalized.
+	let (unpin_sender, _unpin_receiver) = sc_utils::mpsc::tracing_unbounded("test_unpin", 10);
+	finality_notifications_tx
+		.unbounded_send(FinalityNotification::from_summary(
+			FinalizeSummary { header: header.clone(), finalized: vec![], stale_heads: vec![] },
+			unpin_sender,
+		))
+		.unwrap();
+
+	recovery_response_tx
+		.send(Ok(AvailableData {
+			pov: Arc::new(PoV {
+				block_data: ParachainBlockData::<Block>::new(
+					header.clone(),
+					vec![],
+					CompactProof { encoded_nodes: vec![] },
+				)
+				.encode()
+				.into(),
+			}),
+			validation_data: dummy_pvd(),
+		}))
+		.unwrap();
+
+	// No more recovery messages received.
+	assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None);
+
+	// candidate is imported
+	assert_matches!(import_requests_rx.next().await, Some(incoming_blocks) => {
+		assert_eq!(incoming_blocks.len(), 1);
+		assert_eq!(incoming_blocks[0].header, Some(header));
+	});
+
+	// No more import requests received
+	assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None);
+}
+
+#[tokio::test]
+async fn chained_recovery_success() {
+	sp_tracing::init_for_tests();
+
+	let (recovery_subsystem_tx, mut recovery_subsystem_rx) =
+		AvailabilityRecoverySubsystemHandle::new();
+	let recovery_delay_range =
+		RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(0) };
+	let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10);
+	let candidates = make_candidate_chain(1..4);
+	let headers = candidates
+		.iter()
+		.map(|candidate| Header::decode(&mut &candidate.commitments.head_data.0[..]).unwrap())
+		.collect::<Vec<_>>();
+	let candidate_hashes = candidates.iter().map(|candidate| candidate.hash()).collect::<Vec<_>>();
+
+	let relay_chain_client = Relaychain::new(vec![(
+		PHeader {
+			parent_hash: PHash::from_low_u64_be(0),
+			number: 1,
+			state_root: PHash::random(),
+			extrinsics_root: PHash::random(),
+			digest: Default::default(),
+		},
+		// 3 pending candidates
+		candidates,
+	)]);
+	let mut known_blocks = HashMap::new();
+	known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState);
+	let known_blocks = Arc::new(Mutex::new(known_blocks));
+	let (parachain_client, import_notifications_tx, _finality_notifications_tx) =
+		ParachainClient::new(vec![dummy_usage_info(0)], known_blocks.clone());
+	let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new();
+
+	let pov_recovery = PoVRecovery::<Block, _, _>::new(
+		Box::new(recovery_subsystem_tx),
+		recovery_delay_range,
+		Arc::new(parachain_client),
+		Box::new(parachain_import_queue),
+		relay_chain_client,
+		ParaId::new(1000),
+		explicit_recovery_chan_rx,
+		Arc::new(DummySyncOracle::default()),
+	);
+
+	task::spawn(pov_recovery.run());
+
+	// Candidates are recovered in the right order.
+	for (candidate_hash, header) in candidate_hashes.into_iter().zip(headers.into_iter()) {
+		assert_matches!(
+			recovery_subsystem_rx.next().await,
+			Some(AvailabilityRecoveryMessage::RecoverAvailableData(
+				receipt,
+				session_index,
+				None,
+				None,
+				response_tx
+			)) => {
+				assert_eq!(receipt.hash(), candidate_hash);
+				assert_eq!(session_index, TEST_SESSION_INDEX);
+				response_tx
+					.send(Ok(AvailableData {
+						pov: Arc::new(PoV {
+							block_data: ParachainBlockData::<Block>::new(
+								header.clone(),
+								vec![],
+								CompactProof { encoded_nodes: vec![] },
+							)
+							.encode()
+							.into(),
+						}),
+						validation_data: dummy_pvd(),
+						}))
+					.unwrap();
+			}
+		);
+
+		assert_matches!(import_requests_rx.next().await, Some(incoming_blocks) => {
+			assert_eq!(incoming_blocks.len(), 1);
+			assert_eq!(incoming_blocks[0].header, Some(header.clone()));
+		});
+
+		known_blocks
+			.lock()
+			.expect("Poisoned lock")
+			.insert(header.hash(), BlockStatus::InChainWithState);
+
+		let (unpin_sender, _unpin_receiver) = sc_utils::mpsc::tracing_unbounded("test_unpin", 10);
+		import_notifications_tx
+			.unbounded_send(BlockImportNotification::new(
+				header.hash(),
+				BlockOrigin::ConsensusBroadcast,
+				header,
+				false,
+				None,
+				unpin_sender,
+			))
+			.unwrap();
+	}
+
+	// No more recovery messages received.
+	assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None);
+
+	// No more import requests received
+	assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None);
+}
+
+#[tokio::test]
+async fn chained_recovery_child_succeeds_before_parent() {
+	sp_tracing::init_for_tests();
+
+	let (recovery_subsystem_tx, mut recovery_subsystem_rx) =
+		AvailabilityRecoverySubsystemHandle::new();
+	let recovery_delay_range =
+		RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(0) };
+	let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10);
+	let candidates = make_candidate_chain(1..3);
+	let headers = candidates
+		.iter()
+		.map(|candidate| Header::decode(&mut &candidate.commitments.head_data.0[..]).unwrap())
+		.collect::<Vec<_>>();
+	let candidate_hashes = candidates.iter().map(|candidate| candidate.hash()).collect::<Vec<_>>();
+
+	let relay_chain_client = Relaychain::new(vec![(
+		PHeader {
+			parent_hash: PHash::from_low_u64_be(0),
+			number: 1,
+			state_root: PHash::random(),
+			extrinsics_root: PHash::random(),
+			digest: Default::default(),
+		},
+		// 2 pending candidates
+		candidates,
+	)]);
+	let mut known_blocks = HashMap::new();
+	known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState);
+	let known_blocks = Arc::new(Mutex::new(known_blocks));
+	let (parachain_client, _import_notifications_tx, _finality_notifications_tx) =
+		ParachainClient::new(vec![dummy_usage_info(0)], known_blocks.clone());
+	let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new();
+
+	let pov_recovery = PoVRecovery::<Block, _, _>::new(
+		Box::new(recovery_subsystem_tx),
+		recovery_delay_range,
+		Arc::new(parachain_client),
+		Box::new(parachain_import_queue),
+		relay_chain_client,
+		ParaId::new(1000),
+		explicit_recovery_chan_rx,
+		Arc::new(DummySyncOracle::default()),
+	);
+
+	task::spawn(pov_recovery.run());
+
+	let mut recovery_responses_senders = vec![];
+
+	for candidate_hash in candidate_hashes.iter() {
+		assert_matches!(
+			recovery_subsystem_rx.next().await,
+			Some(AvailabilityRecoveryMessage::RecoverAvailableData(
+				receipt,
+				session_index,
+				None,
+				None,
+				response_tx
+			)) => {
+				assert_eq!(receipt.hash(), *candidate_hash);
+				assert_eq!(session_index, TEST_SESSION_INDEX);
+				recovery_responses_senders.push(response_tx);
+			}
+		);
+	}
+
+	// Send out the responses in reverse order.
+	for (recovery_response_sender, header) in
+		recovery_responses_senders.into_iter().zip(headers.iter()).rev()
+	{
+		recovery_response_sender
+			.send(Ok(AvailableData {
+				pov: Arc::new(PoV {
+					block_data: ParachainBlockData::<Block>::new(
+						header.clone(),
+						vec![],
+						CompactProof { encoded_nodes: vec![] },
+					)
+					.encode()
+					.into(),
+				}),
+				validation_data: dummy_pvd(),
+			}))
+			.unwrap();
+	}
+
+	assert_matches!(import_requests_rx.next().await, Some(incoming_blocks) => {
+		// The two import requests will be batched.
+		assert_eq!(incoming_blocks.len(), 2);
+		assert_eq!(incoming_blocks[0].header, Some(headers[0].clone()));
+		assert_eq!(incoming_blocks[1].header, Some(headers[1].clone()));
+	});
+
+	// No more recovery messages received.
+	assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None);
+
+	// No more import requests received
+	assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None);
+}
diff --git a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs
index 578b942776dcde99553e0a5af513b03acb3a80a1..7871623e8447a2645ef772a495d7f698660f7dc5 100644
--- a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs
+++ b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs
@@ -30,7 +30,7 @@ use futures::{FutureExt, Stream, StreamExt};
 use polkadot_service::{
 	CollatorPair, Configuration, FullBackend, FullClient, Handle, NewFull, TaskManager,
 };
-use sc_cli::SubstrateCli;
+use sc_cli::{RuntimeVersion, SubstrateCli};
 use sc_client_api::{
 	blockchain::BlockStatus, Backend, BlockchainEvents, HeaderBackend, ImportNotifications,
 	StorageProof,
@@ -68,6 +68,10 @@ impl RelayChainInProcessInterface {
 
 #[async_trait]
 impl RelayChainInterface for RelayChainInProcessInterface {
+	async fn version(&self, relay_parent: PHash) -> RelayChainResult<RuntimeVersion> {
+		Ok(self.full_client.runtime_version_at(relay_parent)?)
+	}
+
 	async fn retrieve_dmq_contents(
 		&self,
 		para_id: ParaId,
@@ -251,6 +255,14 @@ impl RelayChainInterface for RelayChainInProcessInterface {
 				});
 		Ok(Box::pin(notifications_stream))
 	}
+
+	async fn candidates_pending_availability(
+		&self,
+		hash: PHash,
+		para_id: ParaId,
+	) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
+		Ok(self.full_client.runtime_api().candidates_pending_availability(hash, para_id)?)
+	}
 }
 
 pub enum BlockCheckStatus {
diff --git a/cumulus/client/relay-chain-interface/Cargo.toml b/cumulus/client/relay-chain-interface/Cargo.toml
index 5d612cdc0eef571b977dca8e907a02e8f9d876b1..e8603693ac8da957988afac9e98468b759476665 100644
--- a/cumulus/client/relay-chain-interface/Cargo.toml
+++ b/cumulus/client/relay-chain-interface/Cargo.toml
@@ -18,6 +18,7 @@ sp-api = { path = "../../../substrate/primitives/api" }
 sp-blockchain = { path = "../../../substrate/primitives/blockchain" }
 sp-state-machine = { path = "../../../substrate/primitives/state-machine" }
 sc-client-api = { path = "../../../substrate/client/api" }
+sp-version = { path = "../../../substrate/primitives/version", default-features = false }
 
 futures = "0.3.28"
 async-trait = "0.1.79"
diff --git a/cumulus/client/relay-chain-interface/src/lib.rs b/cumulus/client/relay-chain-interface/src/lib.rs
index 7c7796b468c0a6f458eceded955d47e7ac8cbbf6..46e19b40f010cce4342f54cbe08e975a52fee185 100644
--- a/cumulus/client/relay-chain-interface/src/lib.rs
+++ b/cumulus/client/relay-chain-interface/src/lib.rs
@@ -16,10 +16,10 @@
 
 use std::{collections::BTreeMap, pin::Pin, sync::Arc};
 
+use futures::Stream;
 use polkadot_overseer::prometheus::PrometheusError;
 use sc_client_api::StorageProof;
-
-use futures::Stream;
+use sp_version::RuntimeVersion;
 
 use async_trait::async_trait;
 use codec::Error as CodecError;
@@ -149,8 +149,12 @@ pub trait RelayChainInterface: Send + Sync {
 		_: OccupiedCoreAssumption,
 	) -> RelayChainResult<Option<PersistedValidationData>>;
 
-	/// Get the receipt of a candidate pending availability. This returns `Some` for any paras
-	/// assigned to occupied cores in `availability_cores` and `None` otherwise.
+	/// Get the receipt of the first candidate pending availability of this para_id. This returns
+	/// `Some` for any paras assigned to occupied cores in `availability_cores` and `None`
+	/// otherwise.
+	#[deprecated(
+		note = "`candidate_pending_availability` only returns one candidate and is deprecated. Use `candidates_pending_availability` instead."
+	)]
 	async fn candidate_pending_availability(
 		&self,
 		block_id: PHash,
@@ -203,6 +207,16 @@ pub trait RelayChainInterface: Send + Sync {
 		para_id: ParaId,
 		occupied_core_assumption: OccupiedCoreAssumption,
 	) -> RelayChainResult<Option<ValidationCodeHash>>;
+
+	/// Get the receipts of all candidates pending availability for this para_id.
+	async fn candidates_pending_availability(
+		&self,
+		block_id: PHash,
+		para_id: ParaId,
+	) -> RelayChainResult<Vec<CommittedCandidateReceipt>>;
+
+	/// Get the runtime version of the relay chain.
+	async fn version(&self, relay_parent: PHash) -> RelayChainResult<RuntimeVersion>;
 }
 
 #[async_trait]
@@ -237,6 +251,7 @@ where
 			.await
 	}
 
+	#[allow(deprecated)]
 	async fn candidate_pending_availability(
 		&self,
 		block_id: PHash,
@@ -321,4 +336,16 @@ where
 			.validation_code_hash(relay_parent, para_id, occupied_core_assumption)
 			.await
 	}
+
+	async fn candidates_pending_availability(
+		&self,
+		block_id: PHash,
+		para_id: ParaId,
+	) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
+		(**self).candidates_pending_availability(block_id, para_id).await
+	}
+
+	async fn version(&self, relay_parent: PHash) -> RelayChainResult<RuntimeVersion> {
+		(**self).version(relay_parent).await
+	}
 }
diff --git a/cumulus/client/relay-chain-rpc-interface/src/lib.rs b/cumulus/client/relay-chain-rpc-interface/src/lib.rs
index 3a4c186e301eab295aa3befbd3c5549636fdb2c0..bb7bfa5dc32268b87bfbe1788aad7b6604961276 100644
--- a/cumulus/client/relay-chain-rpc-interface/src/lib.rs
+++ b/cumulus/client/relay-chain-rpc-interface/src/lib.rs
@@ -33,6 +33,7 @@ use sc_client_api::StorageProof;
 use sp_core::sp_std::collections::btree_map::BTreeMap;
 use sp_state_machine::StorageValue;
 use sp_storage::StorageKey;
+use sp_version::RuntimeVersion;
 use std::pin::Pin;
 
 use cumulus_primitives_core::relay_chain::BlockId;
@@ -237,4 +238,18 @@ impl RelayChainInterface for RelayChainRpcInterface {
 		let imported_headers_stream = self.rpc_client.get_best_heads_stream()?;
 		Ok(imported_headers_stream.boxed())
 	}
+
+	async fn candidates_pending_availability(
+		&self,
+		hash: RelayHash,
+		para_id: ParaId,
+	) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
+		self.rpc_client
+			.parachain_host_candidates_pending_availability(hash, para_id)
+			.await
+	}
+
+	async fn version(&self, relay_parent: RelayHash) -> RelayChainResult<RuntimeVersion> {
+		self.rpc_client.runtime_version(relay_parent).await
+	}
 }
diff --git a/prdoc/pr_4733.prdoc b/prdoc/pr_4733.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..e633248398526c43fc0da478779c4fea86524e7a
--- /dev/null
+++ b/prdoc/pr_4733.prdoc
@@ -0,0 +1,27 @@
+title: Add pov-recovery unit tests and support for elastic scaling
+
+doc:
+  - audience: Node Dev
+    description: |
+      Adds unit tests for cumulus pov-recovery and support for elastic scaling (recovering multiple candidates in a single relay chain block).
+
+crates:
+  - name: cumulus-client-network
+    bump: patch
+  - name: cumulus-client-pov-recovery
+    bump: patch
+  - name: cumulus-relay-chain-interface
+    bump: major
+    validate: false
+  - name: cumulus-relay-chain-inprocess-interface
+    bump: minor
+  - name: cumulus-relay-chain-rpc-interface
+    bump: minor
+  - name: cumulus-client-consensus-common
+    bump: none
+  - name: sc-client-api
+    bump: minor
+  - name: sp-blockchain
+    bump: minor
+  - name: sp-consensus
+    bump: minor
diff --git a/substrate/client/api/src/client.rs b/substrate/client/api/src/client.rs
index 2de09840e4dfdc15e1f1d1acaa5f3e438de0caca..45cfafb258463a50c2ce0ce5b843346bd6cb9dcb 100644
--- a/substrate/client/api/src/client.rs
+++ b/substrate/client/api/src/client.rs
@@ -168,7 +168,7 @@ pub trait ProvideUncles<Block: BlockT> {
 }
 
 /// Client info
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct ClientInfo<Block: BlockT> {
 	/// Best block hash.
 	pub chain: Info<Block>,
diff --git a/substrate/primitives/blockchain/src/backend.rs b/substrate/primitives/blockchain/src/backend.rs
index 06e5b682964a4e7672575c9e8b5460d7acb2de96..933e41e2ab453bb3593ba94dce6397a097f9ed46 100644
--- a/substrate/primitives/blockchain/src/backend.rs
+++ b/substrate/primitives/blockchain/src/backend.rs
@@ -284,7 +284,7 @@ impl<Block: BlockT> DisplacedLeavesAfterFinalization<Block> {
 }
 
 /// Blockchain info
-#[derive(Debug, Eq, PartialEq)]
+#[derive(Debug, Eq, PartialEq, Clone)]
 pub struct Info<Block: BlockT> {
 	/// Best block hash.
 	pub best_hash: Block::Hash,
diff --git a/substrate/primitives/consensus/common/src/lib.rs b/substrate/primitives/consensus/common/src/lib.rs
index 01d3b7a24f9c143201f07092c7484e72f513d8f0..37636b34b03df6221865eada59abbb11cb92519a 100644
--- a/substrate/primitives/consensus/common/src/lib.rs
+++ b/substrate/primitives/consensus/common/src/lib.rs
@@ -40,7 +40,7 @@ pub use sp_inherents::InherentData;
 pub use sp_state_machine::Backend as StateBackend;
 
 /// Block status.
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, PartialEq, Eq, Clone)]
 pub enum BlockStatus {
 	/// Added to the import queue.
 	Queued,