diff --git a/cumulus/Cargo.lock b/cumulus/Cargo.lock
index 5d82d317b84db4cc97f34737423f280e8a761f06..8519f2cbf2c1118104e9467f476bdb79f6d05a70 100644
--- a/cumulus/Cargo.lock
+++ b/cumulus/Cargo.lock
@@ -1673,11 +1673,14 @@ name = "cumulus-client-consensus-common"
 version = "0.1.0"
 dependencies = [
  "async-trait",
+ "cumulus-client-pov-recovery",
+ "cumulus-primitives-core",
  "cumulus-relay-chain-interface",
  "cumulus-test-client",
  "dyn-clone",
  "futures",
  "futures-timer",
+ "log",
  "parity-scale-codec",
  "polkadot-primitives",
  "sc-client-api",
@@ -1786,6 +1789,7 @@ dependencies = [
  "cumulus-relay-chain-inprocess-interface",
  "cumulus-relay-chain-interface",
  "cumulus-relay-chain-minimal-node",
+ "futures",
  "parking_lot 0.12.1",
  "polkadot-primitives",
  "sc-client-api",
diff --git a/cumulus/client/consensus/aura/src/import_queue.rs b/cumulus/client/consensus/aura/src/import_queue.rs
index 5cc7e9e065960863b5eda0e77d9b3c290d4fe7a0..862abfb349a05c1b3840ee3266d6c022ce249367 100644
--- a/cumulus/client/consensus/aura/src/import_queue.rs
+++ b/cumulus/client/consensus/aura/src/import_queue.rs
@@ -17,7 +17,7 @@
 //! Parachain specific wrapper for the AuRa import queue.
 
 use codec::Codec;
-use cumulus_client_consensus_common::ParachainBlockImport;
+use cumulus_client_consensus_common::ParachainBlockImportMarker;
 use sc_client_api::{backend::AuxStore, BlockOf, UsageProvider};
 use sc_consensus::{import_queue::DefaultImportQueue, BlockImport};
 use sc_consensus_aura::{AuraVerifier, CompatibilityMode};
@@ -37,7 +37,7 @@ use substrate_prometheus_endpoint::Registry;
 /// Parameters for [`import_queue`].
 pub struct ImportQueueParams<'a, I, C, CIDP, S> {
 	/// The block import to use.
-	pub block_import: ParachainBlockImport<I>,
+	pub block_import: I,
 	/// The client to interact with the chain.
 	pub client: Arc<C>,
 	/// The inherent data providers, to create the inherent data.
@@ -73,6 +73,7 @@ where
 		+ UsageProvider<Block>
 		+ HeaderBackend<Block>,
 	I: BlockImport<Block, Error = ConsensusError, Transaction = sp_api::TransactionFor<C, Block>>
+		+ ParachainBlockImportMarker
 		+ Send
 		+ Sync
 		+ 'static,
diff --git a/cumulus/client/consensus/aura/src/lib.rs b/cumulus/client/consensus/aura/src/lib.rs
index 93b475f6c74b6c476a31aa1c3abff4a9fcd34ace..965f8fe3baa80fcf9e37a475ee6b65897d63b91f 100644
--- a/cumulus/client/consensus/aura/src/lib.rs
+++ b/cumulus/client/consensus/aura/src/lib.rs
@@ -24,7 +24,7 @@
 
 use codec::{Decode, Encode};
 use cumulus_client_consensus_common::{
-	ParachainBlockImport, ParachainCandidate, ParachainConsensus,
+	ParachainBlockImportMarker, ParachainCandidate, ParachainConsensus,
 };
 use cumulus_primitives_core::{relay_chain::v2::Hash as PHash, PersistedValidationData};
 
@@ -75,7 +75,7 @@ impl<B, CIDP, W> Clone for AuraConsensus<B, CIDP, W> {
 pub struct BuildAuraConsensusParams<PF, BI, CIDP, Client, BS, SO> {
 	pub proposer_factory: PF,
 	pub create_inherent_data_providers: CIDP,
-	pub block_import: ParachainBlockImport<BI>,
+	pub block_import: BI,
 	pub para_client: Arc<Client>,
 	pub backoff_authoring_blocks: Option<BS>,
 	pub sync_oracle: SO,
@@ -114,7 +114,11 @@ where
 		Client:
 			ProvideRuntimeApi<B> + BlockOf + AuxStore + HeaderBackend<B> + Send + Sync + 'static,
 		Client::Api: AuraApi<B, P::Public>,
-		BI: BlockImport<B, Transaction = sp_api::TransactionFor<Client, B>> + Send + Sync + 'static,
+		BI: BlockImport<B, Transaction = sp_api::TransactionFor<Client, B>>
+			+ ParachainBlockImportMarker
+			+ Send
+			+ Sync
+			+ 'static,
 		SO: SyncOracle + Send + Sync + Clone + 'static,
 		BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
 		PF: Environment<B, Error = Error> + Send + Sync + 'static,
diff --git a/cumulus/client/consensus/common/Cargo.toml b/cumulus/client/consensus/common/Cargo.toml
index 84b73f0fef5eff43df0ebea91f1b79e681af301e..c06e28c8d058b4cfe002b014e94a9ee539104343 100644
--- a/cumulus/client/consensus/common/Cargo.toml
+++ b/cumulus/client/consensus/common/Cargo.toml
@@ -10,6 +10,7 @@ async-trait = "0.1.59"
 codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive" ] }
 dyn-clone = "1.0.10"
 futures = "0.3.25"
+log = "0.4.17"
 tracing = "0.1.37"
 
 # Substrate
@@ -24,7 +25,9 @@ sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" }
 polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }
 
 # Cumulus
+cumulus-primitives-core = { path = "../../../primitives/core" }
 cumulus-relay-chain-interface = { path = "../../relay-chain-interface" }
+cumulus-client-pov-recovery = { path = "../../pov-recovery" }
 
 [dev-dependencies]
 futures-timer = "3.0.2"
diff --git a/cumulus/client/consensus/common/src/level_monitor.rs b/cumulus/client/consensus/common/src/level_monitor.rs
new file mode 100644
index 0000000000000000000000000000000000000000..294527f1f9fb0f6d8b8366b6df30af9f2380d664
--- /dev/null
+++ b/cumulus/client/consensus/common/src/level_monitor.rs
@@ -0,0 +1,378 @@
+// Copyright 2022 Parity Technologies (UK) Ltd.
+// This file is part of Cumulus.
+
+// Cumulus is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Cumulus is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.
+
+use sc_client_api::{blockchain::Backend as _, Backend, HeaderBackend as _};
+use sp_blockchain::{HashAndNumber, TreeRoute};
+use sp_runtime::traits::{Block as BlockT, NumberFor, One, Saturating, UniqueSaturatedInto, Zero};
+use std::{
+	collections::{HashMap, HashSet},
+	sync::Arc,
+};
+
+/// Value good enough to be used with parachains using the current backend implementation
+/// that ships with Substrate. This value may change in the future.
+pub const MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT: usize = 32;
+
+// Counter threshold after which we are going to eventually cleanup our internal data.
+const CLEANUP_THRESHOLD: u32 = 32;
+
+/// Upper bound to the number of leaves allowed for each level of the blockchain.
+///
+/// If the limit is set and more leaves are detected on block import, then the older ones are
+/// dropped to make space for the fresh blocks.
+///
+/// In environments where blocks confirmations from the relay chain may be "slow", then
+/// setting an upper bound helps keeping the chain health by dropping old (presumably) stale
+/// leaves and prevents discarding new blocks because we've reached the backend max value.
+pub enum LevelLimit {
+	/// Limit set to [`MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT`].
+	Default,
+	/// No explicit limit, however a limit may be implicitly imposed by the backend implementation.
+	None,
+	/// Custom value.
+	Some(usize),
+}
+
+/// Support structure to constrain the number of leaves at each level.
+pub struct LevelMonitor<Block: BlockT, BE> {
+	// Max number of leaves for each level.
+	level_limit: usize,
+	// Monotonic counter used to keep track of block freshness.
+	pub(crate) import_counter: NumberFor<Block>,
+	// Map between blocks hashes and freshness.
+	pub(crate) freshness: HashMap<Block::Hash, NumberFor<Block>>,
+	// Blockchain levels cache.
+	pub(crate) levels: HashMap<NumberFor<Block>, HashSet<Block::Hash>>,
+	// Lower level number stored by the levels map.
+	lowest_level: NumberFor<Block>,
+	// Backend reference to remove blocks on level saturation.
+	backend: Arc<BE>,
+}
+
+/// Contains information about the target scheduled for removal.
+struct TargetInfo<Block: BlockT> {
+	/// Index of freshest leaf in the leaves array.
+	freshest_leaf_idx: usize,
+	/// Route from target to its freshest leaf.
+	freshest_route: TreeRoute<Block>,
+}
+
+impl<Block, BE> LevelMonitor<Block, BE>
+where
+	Block: BlockT,
+	BE: Backend<Block>,
+{
+	/// Instance a new monitor structure.
+	pub fn new(level_limit: usize, backend: Arc<BE>) -> Self {
+		let mut monitor = LevelMonitor {
+			level_limit,
+			import_counter: Zero::zero(),
+			freshness: HashMap::new(),
+			levels: HashMap::new(),
+			lowest_level: Zero::zero(),
+			backend,
+		};
+		monitor.restore();
+		monitor
+	}
+
+	/// Restore the structure using the backend.
+	///
+	/// Blocks freshness values are inferred from the height and not from the effective import
+	/// moment. This is a not accurate but "good-enough" best effort solution.
+	///
+	/// Level limits are not enforced during this phase.
+	fn restore(&mut self) {
+		let info = self.backend.blockchain().info();
+		log::debug!(
+			target: "parachain",
+			"Restoring chain level monitor from last finalized block: {} {}",
+			info.finalized_number, info.finalized_hash
+		);
+
+		self.lowest_level = info.finalized_number;
+		self.import_counter = info.finalized_number;
+		self.block_imported(info.finalized_number, info.finalized_hash);
+
+		let mut counter_max = info.finalized_number;
+
+		for leaf in self.backend.blockchain().leaves().unwrap_or_default() {
+			let route =
+				sp_blockchain::tree_route(self.backend.blockchain(), info.finalized_hash, leaf)
+					.expect("Route from finalized to leaf should be available; qed");
+			if !route.retracted().is_empty() {
+				continue
+			}
+			route.enacted().iter().for_each(|elem| {
+				if !self.freshness.contains_key(&elem.hash) {
+					// Use the block height value as the freshness.
+					self.import_counter = elem.number;
+					self.block_imported(elem.number, elem.hash);
+				}
+			});
+			counter_max = std::cmp::max(self.import_counter, counter_max);
+		}
+
+		log::debug!(target: "parachain", "Restored chain level monitor up to height {}", counter_max);
+
+		self.import_counter = counter_max;
+	}
+
+	/// Check and enforce the limit bound at the given height.
+	///
+	/// In practice this will enforce the given height in having a number of blocks less than
+	/// the limit passed to the constructor.
+	///
+	/// If the given level is found to have a number of blocks greater than or equal the limit
+	/// then the limit is enforced by chosing one (or more) blocks to remove.
+	///
+	/// The removal strategy is driven by the block freshness.
+	///
+	/// A block freshness is determined by the most recent leaf freshness descending from the block
+	/// itself. In other words its freshness is equal to its more "fresh" descendant.
+	///
+	/// The least "fresh" blocks are eventually removed.
+	pub fn enforce_limit(&mut self, number: NumberFor<Block>) {
+		let level_len = self.levels.get(&number).map(|l| l.len()).unwrap_or_default();
+		if level_len < self.level_limit {
+			return
+		}
+
+		// Sort leaves by freshness only once (less fresh first) and keep track of
+		// leaves that were invalidated on removal.
+		let mut leaves = self.backend.blockchain().leaves().unwrap_or_default();
+		leaves.sort_unstable_by(|a, b| self.freshness.get(a).cmp(&self.freshness.get(b)));
+		let mut invalidated_leaves = HashSet::new();
+
+		// This may not be the most efficient way to remove **multiple** entries, but is the easy
+		// one :-). Should be considered that in "normal" conditions the number of blocks to remove
+		// is 0 or 1, it is not worth to complicate the code too much. One condition that may
+		// trigger multiple removals (2+) is if we restart the node using an existing db and a
+		// smaller limit wrt the one previously used.
+		let remove_count = level_len - self.level_limit + 1;
+
+		log::debug!(
+			target: "parachain",
+			"Detected leaves overflow at height {number}, removing {remove_count} obsolete blocks",
+		);
+
+		(0..remove_count).all(|_| {
+			self.find_target(number, &leaves, &invalidated_leaves).map_or(false, |target| {
+				self.remove_target(target, number, &leaves, &mut invalidated_leaves);
+				true
+			})
+		});
+	}
+
+	// Helper function to find the best candidate to be removed.
+	//
+	// Given a set of blocks with height equal to `number` (potential candidates)
+	// 1. For each candidate fetch all the leaves that are descending from it.
+	// 2. Set the candidate freshness equal to the fresher of its descending leaves.
+	// 3. The target is set as the candidate that is less fresh.
+	//
+	// Input `leaves` are assumed to be already ordered by "freshness" (less fresh first).
+	//
+	// Returns the index of the target fresher leaf within `leaves` and the route from target to
+	// such leaf.
+	fn find_target(
+		&self,
+		number: NumberFor<Block>,
+		leaves: &[Block::Hash],
+		invalidated_leaves: &HashSet<usize>,
+	) -> Option<TargetInfo<Block>> {
+		let mut target_info: Option<TargetInfo<Block>> = None;
+		let blockchain = self.backend.blockchain();
+		let best_hash = blockchain.info().best_hash;
+
+		// Leaves that where already assigned to some node and thus can be skipped
+		// during the search.
+		let mut assigned_leaves = HashSet::new();
+
+		let level = self.levels.get(&number)?;
+
+		for blk_hash in level.iter().filter(|hash| **hash != best_hash) {
+			// Search for the fresher leaf information for this block
+			let candidate_info = leaves
+				.iter()
+				.enumerate()
+				.filter(|(leaf_idx, _)| {
+					!assigned_leaves.contains(leaf_idx) && !invalidated_leaves.contains(leaf_idx)
+				})
+				.rev()
+				.find_map(|(leaf_idx, leaf_hash)| {
+					if blk_hash == leaf_hash {
+						let entry = HashAndNumber { number, hash: *blk_hash };
+						TreeRoute::new(vec![entry], 0).ok().map(|freshest_route| TargetInfo {
+							freshest_leaf_idx: leaf_idx,
+							freshest_route,
+						})
+					} else {
+						match sp_blockchain::tree_route(blockchain, *blk_hash, *leaf_hash) {
+							Ok(route) if route.retracted().is_empty() => Some(TargetInfo {
+								freshest_leaf_idx: leaf_idx,
+								freshest_route: route,
+							}),
+							Err(err) => {
+								log::warn!(
+									target: "parachain",
+									"(Lookup) Unable getting route from {:?} to {:?}: {}",
+									blk_hash, leaf_hash, err,
+								);
+								None
+							},
+							_ => None,
+						}
+					}
+				});
+
+			let candidate_info = match candidate_info {
+				Some(candidate_info) => {
+					assigned_leaves.insert(candidate_info.freshest_leaf_idx);
+					candidate_info
+				},
+				None => {
+					// This should never happen
+					log::error!(
+						target: "parachain",
+						"Unable getting route to any leaf from {:?} (this is a bug)",
+						blk_hash,
+					);
+					continue
+				},
+			};
+
+			// Found fresher leaf for this candidate.
+			// This candidate is set as the new target if:
+			// 1. its fresher leaf is less fresh than the previous target fresher leaf AND
+			// 2. best block is not in its route
+
+			let is_less_fresh = || {
+				target_info
+					.as_ref()
+					.map(|ti| candidate_info.freshest_leaf_idx < ti.freshest_leaf_idx)
+					.unwrap_or(true)
+			};
+			let not_contains_best = || {
+				candidate_info
+					.freshest_route
+					.enacted()
+					.iter()
+					.all(|entry| entry.hash != best_hash)
+			};
+
+			if is_less_fresh() && not_contains_best() {
+				let early_stop = candidate_info.freshest_leaf_idx == 0;
+				target_info = Some(candidate_info);
+				if early_stop {
+					// We will never find a candidate with an worst freshest leaf than this.
+					break
+				}
+			}
+		}
+
+		target_info
+	}
+
+	// Remove the target block and all its descendants.
+	//
+	// Leaves should have already been ordered by "freshness" (less fresh first).
+	fn remove_target(
+		&mut self,
+		target: TargetInfo<Block>,
+		number: NumberFor<Block>,
+		leaves: &[Block::Hash],
+		invalidated_leaves: &mut HashSet<usize>,
+	) {
+		let mut remove_leaf = |number, hash| {
+			log::debug!(target: "parachain", "Removing block (@{}) {:?}", number, hash);
+			if let Err(err) = self.backend.remove_leaf_block(hash) {
+				log::debug!(target: "parachain", "Remove not possible for {}: {}", hash, err);
+				return false
+			}
+			self.levels.get_mut(&number).map(|level| level.remove(&hash));
+			self.freshness.remove(&hash);
+			true
+		};
+
+		invalidated_leaves.insert(target.freshest_leaf_idx);
+
+		// Takes care of route removal. Starts from the leaf and stops as soon as an error is
+		// encountered. In this case an error is interpreted as the block being not a leaf
+		// and it will be removed while removing another route from the same block but to a
+		// different leaf.
+		let mut remove_route = |route: TreeRoute<Block>| {
+			route.enacted().iter().rev().all(|elem| remove_leaf(elem.number, elem.hash));
+		};
+
+		let target_hash = target.freshest_route.common_block().hash;
+		debug_assert_eq!(
+			target.freshest_route.common_block().number,
+			number,
+			"This is a bug in LevelMonitor::find_target() or the Backend is corrupted"
+		);
+
+		// Remove freshest (cached) route first.
+		remove_route(target.freshest_route);
+
+		// Don't bother trying with leaves we already found to not be our descendants.
+		let to_skip = leaves.len() - target.freshest_leaf_idx;
+		leaves.iter().enumerate().rev().skip(to_skip).for_each(|(leaf_idx, leaf_hash)| {
+			if invalidated_leaves.contains(&leaf_idx) {
+				return
+			}
+			match sp_blockchain::tree_route(self.backend.blockchain(), target_hash, *leaf_hash) {
+				Ok(route) if route.retracted().is_empty() => {
+					invalidated_leaves.insert(leaf_idx);
+					remove_route(route);
+				},
+				Err(err) => {
+					log::warn!(
+						target: "parachain",
+						"(Removal) unable getting route from {:?} to {:?}: {}",
+						target_hash, leaf_hash, err,
+					);
+				},
+				_ => (),
+			};
+		});
+
+		remove_leaf(number, target_hash);
+	}
+
+	/// Add a new imported block information to the monitor.
+	pub fn block_imported(&mut self, number: NumberFor<Block>, hash: Block::Hash) {
+		self.freshness.insert(hash, self.import_counter);
+		self.levels.entry(number).or_default().insert(hash);
+		self.import_counter += One::one();
+
+		// Do cleanup once in a while, we are allowed to have some obsolete information.
+		let finalized_num = self.backend.blockchain().info().finalized_number;
+		let delta: u32 = finalized_num.saturating_sub(self.lowest_level).unique_saturated_into();
+		if delta >= CLEANUP_THRESHOLD {
+			for i in 0..delta {
+				let number = self.lowest_level + i.unique_saturated_into();
+				self.levels.remove(&number).map(|level| {
+					level.iter().for_each(|hash| {
+						self.freshness.remove(hash);
+					})
+				});
+			}
+
+			self.lowest_level = finalized_num;
+		}
+	}
+}
diff --git a/cumulus/client/consensus/common/src/lib.rs b/cumulus/client/consensus/common/src/lib.rs
index d5d33585439d4c00200a32bc065be3ac8b683856..39119f345c298e8aebc5e292c643482a7ea33f1c 100644
--- a/cumulus/client/consensus/common/src/lib.rs
+++ b/cumulus/client/consensus/common/src/lib.rs
@@ -15,14 +15,23 @@
 // along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.
 
 use polkadot_primitives::v2::{Hash as PHash, PersistedValidationData};
-use sc_consensus::BlockImport;
-use sp_runtime::traits::Block as BlockT;
 
+use sc_client_api::Backend;
+use sc_consensus::{shared_data::SharedData, BlockImport, ImportResult};
+use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
+
+use std::sync::Arc;
+
+mod level_monitor;
 mod parachain_consensus;
 #[cfg(test)]
 mod tests;
+
 pub use parachain_consensus::run_parachain_consensus;
 
+use level_monitor::LevelMonitor;
+pub use level_monitor::{LevelLimit, MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT};
+
 /// The result of [`ParachainConsensus::produce_candidate`].
 pub struct ParachainCandidate<B> {
 	/// The block that was built for this candidate.
@@ -74,47 +83,93 @@ impl<B: BlockT> ParachainConsensus<B> for Box<dyn ParachainConsensus<B> + Send +
 /// This is used to set `block_import_params.fork_choice` to `false` as long as the block origin is
 /// not `NetworkInitialSync`. The best block for parachains is determined by the relay chain. Meaning
 /// we will update the best block, as it is included by the relay-chain.
-pub struct ParachainBlockImport<I>(I);
+pub struct ParachainBlockImport<Block: BlockT, BI, BE> {
+	inner: BI,
+	monitor: Option<SharedData<LevelMonitor<Block, BE>>>,
+}
 
-impl<I> ParachainBlockImport<I> {
+impl<Block: BlockT, BI, BE: Backend<Block>> ParachainBlockImport<Block, BI, BE> {
 	/// Create a new instance.
-	pub fn new(inner: I) -> Self {
-		Self(inner)
+	///
+	/// The number of leaves per level limit is set to `LevelLimit::Default`.
+	pub fn new(inner: BI, backend: Arc<BE>) -> Self {
+		Self::new_with_limit(inner, backend, LevelLimit::Default)
+	}
+
+	/// Create a new instance with an explicit limit to the number of leaves per level.
+	///
+	/// This function alone doesn't enforce the limit on levels for old imported blocks,
+	/// the limit is eventually enforced only when new blocks are imported.
+	pub fn new_with_limit(inner: BI, backend: Arc<BE>, level_leaves_max: LevelLimit) -> Self {
+		let level_limit = match level_leaves_max {
+			LevelLimit::None => None,
+			LevelLimit::Some(limit) => Some(limit),
+			LevelLimit::Default => Some(MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT),
+		};
+
+		let monitor =
+			level_limit.map(|level_limit| SharedData::new(LevelMonitor::new(level_limit, backend)));
+
+		Self { inner, monitor }
 	}
 }
 
-impl<I: Clone> Clone for ParachainBlockImport<I> {
+impl<Block: BlockT, I: Clone, BE> Clone for ParachainBlockImport<Block, I, BE> {
 	fn clone(&self) -> Self {
-		ParachainBlockImport(self.0.clone())
+		ParachainBlockImport { inner: self.inner.clone(), monitor: self.monitor.clone() }
 	}
 }
 
 #[async_trait::async_trait]
-impl<Block, I> BlockImport<Block> for ParachainBlockImport<I>
+impl<Block, BI, BE> BlockImport<Block> for ParachainBlockImport<Block, BI, BE>
 where
 	Block: BlockT,
-	I: BlockImport<Block> + Send,
+	BI: BlockImport<Block> + Send,
+	BE: Backend<Block>,
 {
-	type Error = I::Error;
-	type Transaction = I::Transaction;
+	type Error = BI::Error;
+	type Transaction = BI::Transaction;
 
 	async fn check_block(
 		&mut self,
 		block: sc_consensus::BlockCheckParams<Block>,
 	) -> Result<sc_consensus::ImportResult, Self::Error> {
-		self.0.check_block(block).await
+		self.inner.check_block(block).await
 	}
 
 	async fn import_block(
 		&mut self,
-		mut block_import_params: sc_consensus::BlockImportParams<Block, Self::Transaction>,
+		mut params: sc_consensus::BlockImportParams<Block, Self::Transaction>,
 		cache: std::collections::HashMap<sp_consensus::CacheKeyId, Vec<u8>>,
 	) -> Result<sc_consensus::ImportResult, Self::Error> {
+		// Blocks are stored within the backend by using POST hash.
+		let hash = params.post_hash();
+		let number = *params.header.number();
+
 		// Best block is determined by the relay chain, or if we are doing the initial sync
 		// we import all blocks as new best.
-		block_import_params.fork_choice = Some(sc_consensus::ForkChoiceStrategy::Custom(
-			block_import_params.origin == sp_consensus::BlockOrigin::NetworkInitialSync,
+		params.fork_choice = Some(sc_consensus::ForkChoiceStrategy::Custom(
+			params.origin == sp_consensus::BlockOrigin::NetworkInitialSync,
 		));
-		self.0.import_block(block_import_params, cache).await
+
+		let maybe_lock = self.monitor.as_ref().map(|monitor_lock| {
+			let mut monitor = monitor_lock.shared_data_locked();
+			monitor.enforce_limit(number);
+			monitor.release_mutex()
+		});
+
+		let res = self.inner.import_block(params, cache).await?;
+
+		if let (Some(mut monitor_lock), ImportResult::Imported(_)) = (maybe_lock, &res) {
+			let mut monitor = monitor_lock.upgrade();
+			monitor.block_imported(number, hash);
+		}
+
+		Ok(res)
 	}
 }
+
+/// Marker trait denoting a block import type that fits the parachain requirements.
+pub trait ParachainBlockImportMarker {}
+
+impl<B: BlockT, BI, BE> ParachainBlockImportMarker for ParachainBlockImport<B, BI, BE> {}
diff --git a/cumulus/client/consensus/common/src/parachain_consensus.rs b/cumulus/client/consensus/common/src/parachain_consensus.rs
index 860eb552c876d176c0ae94eef643eeb9e9e7c406..ffbbab5a2002db1abe1f4e0d9597b5938952524a 100644
--- a/cumulus/client/consensus/common/src/parachain_consensus.rs
+++ b/cumulus/client/consensus/common/src/parachain_consensus.rs
@@ -15,7 +15,6 @@
 // along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.
 
 use async_trait::async_trait;
-use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
 use sc_client_api::{
 	Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
 };
@@ -27,15 +26,25 @@ use sp_runtime::{
 	traits::{Block as BlockT, Header as HeaderT},
 };
 
+use cumulus_client_pov_recovery::{RecoveryDelay, RecoveryKind, RecoveryRequest};
+use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
+
 use polkadot_primitives::v2::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption};
 
 use codec::Decode;
-use futures::{select, FutureExt, Stream, StreamExt};
+use futures::{channel::mpsc::Sender, select, FutureExt, Stream, StreamExt};
 
-use std::{pin::Pin, sync::Arc};
+use std::{pin::Pin, sync::Arc, time::Duration};
 
 const LOG_TARGET: &str = "cumulus-consensus";
 
+// Delay range to trigger explicit requests.
+// The chosen value doesn't have any special meaning, a random delay within the order of
+// seconds in practice should be a good enough to allow a quick recovery without DOSing
+// the relay chain.
+const RECOVERY_DELAY: RecoveryDelay =
+	RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) };
+
 /// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
 #[async_trait]
 pub trait RelaychainClient: Clone + 'static {
@@ -82,7 +91,7 @@ where
 		let finalized_head = if let Some(h) = finalized_heads.next().await {
 			h
 		} else {
-			tracing::debug!(target: "cumulus-consensus", "Stopping following finalized head.");
+			tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
 			return
 		};
 
@@ -90,7 +99,7 @@ where
 			Ok(header) => header,
 			Err(err) => {
 				tracing::debug!(
-					target: "cumulus-consensus",
+					target: LOG_TARGET,
 					error = ?err,
 					"Could not decode parachain header while following finalized heads.",
 				);
@@ -105,12 +114,12 @@ where
 			if let Err(e) = parachain.finalize_block(hash, None, true) {
 				match e {
 					ClientError::UnknownBlock(_) => tracing::debug!(
-						target: "cumulus-consensus",
+						target: LOG_TARGET,
 						block_hash = ?hash,
 						"Could not finalize block because it is unknown.",
 					),
 					_ => tracing::warn!(
-						target: "cumulus-consensus",
+						target: LOG_TARGET,
 						error = ?e,
 						block_hash = ?hash,
 						"Failed to finalize block",
@@ -136,6 +145,7 @@ pub async fn run_parachain_consensus<P, R, Block, B>(
 	parachain: Arc<P>,
 	relay_chain: R,
 	announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
+	recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
 ) where
 	Block: BlockT,
 	P: Finalizer<Block, B>
@@ -148,8 +158,13 @@ pub async fn run_parachain_consensus<P, R, Block, B>(
 	R: RelaychainClient,
 	B: Backend<Block>,
 {
-	let follow_new_best =
-		follow_new_best(para_id, parachain.clone(), relay_chain.clone(), announce_block);
+	let follow_new_best = follow_new_best(
+		para_id,
+		parachain.clone(),
+		relay_chain.clone(),
+		announce_block,
+		recovery_chan_tx,
+	);
 	let follow_finalized_head = follow_finalized_head(para_id, parachain, relay_chain);
 	select! {
 		_ = follow_new_best.fuse() => {},
@@ -163,6 +178,7 @@ async fn follow_new_best<P, R, Block, B>(
 	parachain: Arc<P>,
 	relay_chain: R,
 	announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
+	recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
 ) where
 	Block: BlockT,
 	P: Finalizer<Block, B>
@@ -197,10 +213,11 @@ async fn follow_new_best<P, R, Block, B>(
 						h,
 						&*parachain,
 						&mut unset_best_header,
+						recovery_chan_tx.clone(),
 					).await,
 					None => {
 						tracing::debug!(
-							target: "cumulus-consensus",
+							target: LOG_TARGET,
 							"Stopping following new best.",
 						);
 						return
@@ -217,7 +234,7 @@ async fn follow_new_best<P, R, Block, B>(
 					).await,
 					None => {
 						tracing::debug!(
-							target: "cumulus-consensus",
+							target: LOG_TARGET,
 							"Stopping following imported blocks.",
 						);
 						return
@@ -276,7 +293,7 @@ async fn handle_new_block_imported<Block, P>(
 			import_block_as_new_best(unset_hash, unset_best_header, parachain).await;
 		},
 		state => tracing::debug!(
-			target: "cumulus-consensus",
+			target: LOG_TARGET,
 			?unset_best_header,
 			?notification.header,
 			?state,
@@ -290,6 +307,7 @@ async fn handle_new_best_parachain_head<Block, P>(
 	head: Vec<u8>,
 	parachain: &P,
 	unset_best_header: &mut Option<Block::Header>,
+	mut recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
 ) where
 	Block: BlockT,
 	P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
@@ -299,7 +317,7 @@ async fn handle_new_best_parachain_head<Block, P>(
 		Ok(header) => header,
 		Err(err) => {
 			tracing::debug!(
-				target: "cumulus-consensus",
+				target: LOG_TARGET,
 				error = ?err,
 				"Could not decode Parachain header while following best heads.",
 			);
@@ -311,7 +329,7 @@ async fn handle_new_best_parachain_head<Block, P>(
 
 	if parachain.usage_info().chain.best_hash == hash {
 		tracing::debug!(
-			target: "cumulus-consensus",
+			target: LOG_TARGET,
 			block_hash = ?hash,
 			"Skipping set new best block, because block is already the best.",
 		)
@@ -325,7 +343,7 @@ async fn handle_new_best_parachain_head<Block, P>(
 			},
 			Ok(BlockStatus::InChainPruned) => {
 				tracing::error!(
-					target: "cumulus-collator",
+					target: LOG_TARGET,
 					block_hash = ?hash,
 					"Trying to set pruned block as new best!",
 				);
@@ -334,14 +352,30 @@ async fn handle_new_best_parachain_head<Block, P>(
 				*unset_best_header = Some(parachain_head);
 
 				tracing::debug!(
-					target: "cumulus-collator",
+					target: LOG_TARGET,
 					block_hash = ?hash,
 					"Parachain block not yet imported, waiting for import to enact as best block.",
 				);
+
+				if let Some(ref mut recovery_chan_tx) = recovery_chan_tx {
+					// Best effort channel to actively encourage block recovery.
+					// An error here is not fatal; the relay chain continuously re-announces
+					// the best block, thus we will have other opportunities to retry.
+					let req =
+						RecoveryRequest { hash, delay: RECOVERY_DELAY, kind: RecoveryKind::Full };
+					if let Err(err) = recovery_chan_tx.try_send(req) {
+						tracing::warn!(
+							target: LOG_TARGET,
+							block_hash = ?hash,
+							error = ?err,
+							"Unable to notify block recovery subsystem"
+						)
+					}
+				}
 			},
 			Err(e) => {
 				tracing::error!(
-					target: "cumulus-collator",
+					target: LOG_TARGET,
 					block_hash = ?hash,
 					error = ?e,
 					"Failed to get block status of block.",
@@ -361,7 +395,7 @@ where
 	let best_number = parachain.usage_info().chain.best_number;
 	if *header.number() < best_number {
 		tracing::debug!(
-			target: "cumulus-consensus",
+			target: LOG_TARGET,
 			%best_number,
 			block_number = %header.number(),
 			"Skipping importing block as new best block, because there already exists a \
@@ -377,7 +411,7 @@ where
 
 	if let Err(err) = (&*parachain).import_block(block_import_params, Default::default()).await {
 		tracing::warn!(
-			target: "cumulus-consensus",
+			target: LOG_TARGET,
 			block_hash = ?hash,
 			error = ?err,
 			"Failed to set new best block.",
diff --git a/cumulus/client/consensus/common/src/tests.rs b/cumulus/client/consensus/common/src/tests.rs
index 23729abebb467182025e061467bb11c1b1816f64..92cecc37d2946c6451cb8281278c573fe856508e 100644
--- a/cumulus/client/consensus/common/src/tests.rs
+++ b/cumulus/client/consensus/common/src/tests.rs
@@ -18,6 +18,7 @@ use crate::*;
 
 use async_trait::async_trait;
 use codec::Encode;
+use cumulus_client_pov_recovery::RecoveryKind;
 use cumulus_relay_chain_interface::RelayChainResult;
 use cumulus_test_client::{
 	runtime::{Block, Header},
@@ -26,10 +27,10 @@ use cumulus_test_client::{
 use futures::{channel::mpsc, executor::block_on, select, FutureExt, Stream, StreamExt};
 use futures_timer::Delay;
 use polkadot_primitives::v2::Id as ParaId;
-use sc_client_api::UsageProvider;
+use sc_client_api::{blockchain::Backend as _, Backend as _, UsageProvider};
 use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
 use sp_blockchain::Error as ClientError;
-use sp_consensus::BlockOrigin;
+use sp_consensus::{BlockOrigin, BlockStatus};
 use sp_runtime::generic::BlockId;
 use std::{
 	sync::{Arc, Mutex},
@@ -103,21 +104,82 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
 	}
 }
 
-fn build_and_import_block(mut client: Arc<Client>, import_as_best: bool) -> Block {
-	let builder = client.init_block_builder(None, Default::default());
+fn build_block<B: InitBlockBuilder>(
+	builder: &B,
+	at: Option<BlockId<Block>>,
+	timestamp: Option<u64>,
+) -> Block {
+	let builder = match at {
+		Some(at) => match timestamp {
+			Some(ts) =>
+				builder.init_block_builder_with_timestamp(&at, None, Default::default(), ts),
+			None => builder.init_block_builder_at(&at, None, Default::default()),
+		},
+		None => builder.init_block_builder(None, Default::default()),
+	};
+
+	let mut block = builder.build().unwrap().block;
 
-	let block = builder.build().unwrap().block;
-	let (header, body) = block.clone().deconstruct();
+	// Simulate some form of post activity (like a Seal or Other generic things).
+	// This is mostly used to excercise the `LevelMonitor` correct behavior.
+	// (in practice we want that header post-hash != pre-hash)
+	block.header.digest.push(sp_runtime::DigestItem::Other(vec![1, 2, 3]));
 
-	let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header);
+	block
+}
+
+async fn import_block<I: BlockImport<Block>>(
+	importer: &mut I,
+	block: Block,
+	origin: BlockOrigin,
+	import_as_best: bool,
+) {
+	let (mut header, body) = block.deconstruct();
+
+	let post_digest =
+		header.digest.pop().expect("post digested is present in manually crafted block");
+
+	let mut block_import_params = BlockImportParams::new(origin, header);
 	block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(import_as_best));
 	block_import_params.body = Some(body);
+	block_import_params.post_digests.push(post_digest);
 
-	block_on(client.import_block(block_import_params, Default::default())).unwrap();
+	importer.import_block(block_import_params, Default::default()).await.unwrap();
+}
 
+fn import_block_sync<I: BlockImport<Block>>(
+	importer: &mut I,
+	block: Block,
+	origin: BlockOrigin,
+	import_as_best: bool,
+) {
+	block_on(import_block(importer, block, origin, import_as_best));
+}
+
+fn build_and_import_block_ext<B: InitBlockBuilder, I: BlockImport<Block>>(
+	builder: &B,
+	origin: BlockOrigin,
+	import_as_best: bool,
+	importer: &mut I,
+	at: Option<BlockId<Block>>,
+	timestamp: Option<u64>,
+) -> Block {
+	let block = build_block(builder, at, timestamp);
+	import_block_sync(importer, block.clone(), origin, import_as_best);
 	block
 }
 
+fn build_and_import_block(mut client: Arc<Client>, import_as_best: bool) -> Block {
+	build_and_import_block_ext(
+		&*client.clone(),
+		BlockOrigin::Own,
+		import_as_best,
+		&mut client,
+		None,
+		None,
+	)
+}
+
 #[test]
 fn follow_new_best_works() {
 	sp_tracing::try_init_simple();
@@ -129,7 +191,7 @@ fn follow_new_best_works() {
 	let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
 
 	let consensus =
-		run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
+		run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
 
 	let work = async move {
 		new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
@@ -152,6 +214,68 @@ fn follow_new_best_works() {
 	});
 }
 
+#[test]
+fn follow_new_best_with_dummy_recovery_works() {
+	sp_tracing::try_init_simple();
+
+	let client = Arc::new(TestClientBuilder::default().build());
+
+	let relay_chain = Relaychain::new();
+	let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
+
+	let (recovery_chan_tx, mut recovery_chan_rx) = futures::channel::mpsc::channel(3);
+
+	let consensus = run_parachain_consensus(
+		100.into(),
+		client.clone(),
+		relay_chain,
+		Arc::new(|_, _| {}),
+		Some(recovery_chan_tx),
+	);
+
+	let block = build_block(&*client.clone(), None, None);
+	let block_clone = block.clone();
+	let client_clone = client.clone();
+
+	let work = async move {
+		new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
+		loop {
+			Delay::new(Duration::from_millis(100)).await;
+			match client.block_status(&BlockId::Hash(block.hash())).unwrap() {
+				BlockStatus::Unknown => {},
+				status => {
+					assert_eq!(block.hash(), client.usage_info().chain.best_hash);
+					assert_eq!(status, BlockStatus::InChainWithState);
+					break
+				},
+			}
+		}
+	};
+
+	let dummy_block_recovery = async move {
+		loop {
+			if let Some(req) = recovery_chan_rx.next().await {
+				assert_eq!(req.hash, block_clone.hash());
+				assert_eq!(req.kind, RecoveryKind::Full);
+				Delay::new(Duration::from_millis(500)).await;
+				import_block(&mut &*client_clone, block_clone.clone(), BlockOrigin::Own, true)
+					.await;
+			}
+		}
+	};
+
+	block_on(async move {
+		futures::pin_mut!(consensus);
+		futures::pin_mut!(work);
+
+		select! {
+			r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
+			_ = dummy_block_recovery.fuse() => {},
+			_ = work.fuse() => {},
+		}
+	});
+}
+
 #[test]
 fn follow_finalized_works() {
 	sp_tracing::try_init_simple();
@@ -163,7 +287,7 @@ fn follow_finalized_works() {
 	let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone();
 
 	let consensus =
-		run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
+		run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
 
 	let work = async move {
 		finalized_sender.unbounded_send(block.header().clone()).unwrap();
@@ -204,7 +328,7 @@ fn follow_finalized_does_not_stop_on_unknown_block() {
 	let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone();
 
 	let consensus =
-		run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
+		run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
 
 	let work = async move {
 		for _ in 0..3usize {
@@ -254,7 +378,7 @@ fn follow_new_best_sets_best_after_it_is_imported() {
 	let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
 
 	let consensus =
-		run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
+		run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
 
 	let work = async move {
 		new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
@@ -331,7 +455,7 @@ fn do_not_set_best_block_to_older_block() {
 	let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
 
 	let consensus =
-		run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
+		run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
 
 	let client2 = client.clone();
 	let work = async move {
@@ -355,3 +479,216 @@ fn do_not_set_best_block_to_older_block() {
 	// Build and import a new best block.
 	build_and_import_block(client2.clone(), true);
 }
+
+#[test]
+fn prune_blocks_on_level_overflow() {
+	// Here we are using the timestamp value to generate blocks with different hashes.
+	const LEVEL_LIMIT: usize = 3;
+	const TIMESTAMP_MULTIPLIER: u64 = 60000;
+
+	let backend = Arc::new(Backend::new_test(1000, 3));
+	let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build());
+	let mut para_import = ParachainBlockImport::new_with_limit(
+		client.clone(),
+		backend.clone(),
+		LevelLimit::Some(LEVEL_LIMIT),
+	);
+
+	let block0 = build_and_import_block_ext(
+		&*client,
+		BlockOrigin::NetworkInitialSync,
+		true,
+		&mut para_import,
+		None,
+		None,
+	);
+	let id0 = BlockId::Hash(block0.header.hash());
+
+	let blocks1 = (0..LEVEL_LIMIT)
+		.into_iter()
+		.map(|i| {
+			build_and_import_block_ext(
+				&*client,
+				if i == 1 { BlockOrigin::NetworkInitialSync } else { BlockOrigin::Own },
+				i == 1,
+				&mut para_import,
+				Some(id0),
+				Some(i as u64 * TIMESTAMP_MULTIPLIER),
+			)
+		})
+		.collect::<Vec<_>>();
+	let id10 = BlockId::Hash(blocks1[0].header.hash());
+
+	let blocks2 = (0..2)
+		.into_iter()
+		.map(|i| {
+			build_and_import_block_ext(
+				&*client,
+				BlockOrigin::Own,
+				false,
+				&mut para_import,
+				Some(id10),
+				Some(i as u64 * TIMESTAMP_MULTIPLIER),
+			)
+		})
+		.collect::<Vec<_>>();
+
+	// Initial scenario (with B11 imported as best)
+	//
+	//   B0 --+-- B10 --+-- B20
+	//        +-- B11   +-- B21
+	//        +-- B12
+
+	let leaves = backend.blockchain().leaves().unwrap();
+	let mut expected = vec![
+		blocks2[0].header.hash(),
+		blocks2[1].header.hash(),
+		blocks1[1].header.hash(),
+		blocks1[2].header.hash(),
+	];
+	assert_eq!(leaves, expected);
+	let best = client.usage_info().chain.best_hash;
+	assert_eq!(best, blocks1[1].header.hash());
+
+	let block13 = build_and_import_block_ext(
+		&*client,
+		BlockOrigin::Own,
+		false,
+		&mut para_import,
+		Some(id0),
+		Some(LEVEL_LIMIT as u64 * TIMESTAMP_MULTIPLIER),
+	);
+
+	// Expected scenario
+	//
+	//   B0 --+-- B10 --+-- B20
+	//        +-- B11   +-- B21
+	//        +--(B13)              <-- B12 has been replaced
+
+	let leaves = backend.blockchain().leaves().unwrap();
+	expected[3] = block13.header.hash();
+	assert_eq!(leaves, expected);
+
+	let block14 = build_and_import_block_ext(
+		&*client,
+		BlockOrigin::Own,
+		false,
+		&mut para_import,
+		Some(id0),
+		Some(2 * LEVEL_LIMIT as u64 * TIMESTAMP_MULTIPLIER),
+	);
+
+	// Expected scenario
+	//
+	//   B0 --+--(B14)              <-- B10 has been replaced
+	//        +-- B11
+	//        +--(B13)
+
+	let leaves = backend.blockchain().leaves().unwrap();
+	expected.remove(0);
+	expected.remove(0);
+	expected.push(block14.header.hash());
+	assert_eq!(leaves, expected);
+}
+
+#[test]
+fn restore_limit_monitor() {
+	// Here we are using the timestamp value to generate blocks with different hashes.
+	const LEVEL_LIMIT: usize = 2;
+	const TIMESTAMP_MULTIPLIER: u64 = 60000;
+
+	let backend = Arc::new(Backend::new_test(1000, 3));
+	let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build());
+
+	// Start with a block import not enforcing any limit...
+	let mut para_import = ParachainBlockImport::new_with_limit(
+		client.clone(),
+		backend.clone(),
+		LevelLimit::Some(usize::MAX),
+	);
+
+	let block00 = build_and_import_block_ext(
+		&*client,
+		BlockOrigin::NetworkInitialSync,
+		true,
+		&mut para_import,
+		None,
+		None,
+	);
+	let id00 = BlockId::Hash(block00.header.hash());
+
+	let blocks1 = (0..LEVEL_LIMIT + 1)
+		.into_iter()
+		.map(|i| {
+			build_and_import_block_ext(
+				&*client,
+				if i == 1 { BlockOrigin::NetworkInitialSync } else { BlockOrigin::Own },
+				i == 1,
+				&mut para_import,
+				Some(id00),
+				Some(i as u64 * TIMESTAMP_MULTIPLIER),
+			)
+		})
+		.collect::<Vec<_>>();
+	let id10 = BlockId::Hash(blocks1[0].header.hash());
+
+	let _ = (0..LEVEL_LIMIT)
+		.into_iter()
+		.map(|i| {
+			build_and_import_block_ext(
+				&*client,
+				BlockOrigin::Own,
+				false,
+				&mut para_import,
+				Some(id10),
+				Some(i as u64 * TIMESTAMP_MULTIPLIER),
+			)
+		})
+		.collect::<Vec<_>>();
+
+	// Scenario before limit application (with B11 imported as best)
+	// Import order (freshess): B00, B10, B11, B12, B20, B21
+	//
+	//   B00 --+-- B10 --+-- B20
+	//         |         +-- B21
+	//         +-- B11
+	//         |
+	//         +-- B12
+
+	// Simulate a restart by forcing a new monitor structure instance
+
+	let mut para_import = ParachainBlockImport::new_with_limit(
+		client.clone(),
+		backend.clone(),
+		LevelLimit::Some(LEVEL_LIMIT),
+	);
+
+	let block13 = build_and_import_block_ext(
+		&*client,
+		BlockOrigin::Own,
+		false,
+		&mut para_import,
+		Some(id00),
+		Some(LEVEL_LIMIT as u64 * TIMESTAMP_MULTIPLIER),
+	);
+
+	// Expected scenario
+	//
+	//   B0 --+-- B11
+	//        +--(B13)
+
+	let leaves = backend.blockchain().leaves().unwrap();
+	let expected = vec![blocks1[1].header.hash(), block13.header.hash()];
+	assert_eq!(leaves, expected);
+
+	let monitor = para_import.monitor.unwrap();
+	let monitor = monitor.shared_data();
+	assert_eq!(monitor.import_counter, 5);
+	assert!(monitor.levels.iter().all(|(number, hashes)| {
+		hashes
+			.iter()
+			.filter(|hash| **hash != block13.header.hash())
+			.all(|hash| *number == *monitor.freshness.get(hash).unwrap())
+	}));
+	assert_eq!(*monitor.freshness.get(&block13.header.hash()).unwrap(), monitor.import_counter - 1);
+}
diff --git a/cumulus/client/consensus/relay-chain/src/import_queue.rs b/cumulus/client/consensus/relay-chain/src/import_queue.rs
index 0460ab8d58230bd270fdfa5650e6a01f57a13783..31004c0005ea88cd432b5abb527790fafbdc0ac6 100644
--- a/cumulus/client/consensus/relay-chain/src/import_queue.rs
+++ b/cumulus/client/consensus/relay-chain/src/import_queue.rs
@@ -16,7 +16,7 @@
 
 use std::{marker::PhantomData, sync::Arc};
 
-use cumulus_client_consensus_common::ParachainBlockImport;
+use cumulus_client_consensus_common::ParachainBlockImportMarker;
 
 use sc_consensus::{
 	import_queue::{BasicQueue, Verifier as VerifierT},
@@ -107,13 +107,17 @@ where
 /// Start an import queue for a Cumulus collator that does not uses any special authoring logic.
 pub fn import_queue<Client, Block: BlockT, I, CIDP>(
 	client: Arc<Client>,
-	block_import: ParachainBlockImport<I>,
+	block_import: I,
 	create_inherent_data_providers: CIDP,
 	spawner: &impl sp_core::traits::SpawnEssentialNamed,
 	registry: Option<&substrate_prometheus_endpoint::Registry>,
 ) -> ClientResult<BasicQueue<Block, I::Transaction>>
 where
-	I: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
+	I: BlockImport<Block, Error = ConsensusError>
+		+ ParachainBlockImportMarker
+		+ Send
+		+ Sync
+		+ 'static,
 	I::Transaction: Send,
 	Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
 	<Client as ProvideRuntimeApi<Block>>::Api: BlockBuilderApi<Block>,
diff --git a/cumulus/client/consensus/relay-chain/src/lib.rs b/cumulus/client/consensus/relay-chain/src/lib.rs
index efcdc1e4c3bb4b2a09ff199cb9a12cd04c457111..4cd0ab24beba130e58ca09c8f4fdf10b6d08c617 100644
--- a/cumulus/client/consensus/relay-chain/src/lib.rs
+++ b/cumulus/client/consensus/relay-chain/src/lib.rs
@@ -34,11 +34,10 @@
 //! 5. After the parachain candidate got backed and included, all collators start at 1.
 
 use cumulus_client_consensus_common::{
-	ParachainBlockImport, ParachainCandidate, ParachainConsensus,
+	ParachainBlockImportMarker, ParachainCandidate, ParachainConsensus,
 };
 use cumulus_primitives_core::{relay_chain::v2::Hash as PHash, ParaId, PersistedValidationData};
 use cumulus_relay_chain_interface::RelayChainInterface;
-use parking_lot::Mutex;
 
 use sc_consensus::{BlockImport, BlockImportParams};
 use sp_consensus::{
@@ -46,6 +45,8 @@ use sp_consensus::{
 };
 use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider};
 use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
+
+use parking_lot::Mutex;
 use std::{marker::PhantomData, sync::Arc, time::Duration};
 
 mod import_queue;
@@ -56,11 +57,11 @@ const LOG_TARGET: &str = "cumulus-consensus-relay-chain";
 /// The implementation of the relay-chain provided consensus for parachains.
 pub struct RelayChainConsensus<B, PF, BI, RCInterface, CIDP> {
 	para_id: ParaId,
-	_phantom: PhantomData<B>,
 	proposer_factory: Arc<Mutex<PF>>,
 	create_inherent_data_providers: Arc<CIDP>,
-	block_import: Arc<futures::lock::Mutex<ParachainBlockImport<BI>>>,
+	block_import: Arc<futures::lock::Mutex<BI>>,
 	relay_chain_interface: RCInterface,
+	_phantom: PhantomData<B>,
 }
 
 impl<B, PF, BI, RCInterface, CIDP> Clone for RelayChainConsensus<B, PF, BI, RCInterface, CIDP>
@@ -70,11 +71,11 @@ where
 	fn clone(&self) -> Self {
 		Self {
 			para_id: self.para_id,
-			_phantom: PhantomData,
 			proposer_factory: self.proposer_factory.clone(),
 			create_inherent_data_providers: self.create_inherent_data_providers.clone(),
 			block_import: self.block_import.clone(),
 			relay_chain_interface: self.relay_chain_interface.clone(),
+			_phantom: PhantomData,
 		}
 	}
 }
@@ -82,6 +83,7 @@ where
 impl<B, PF, BI, RCInterface, CIDP> RelayChainConsensus<B, PF, BI, RCInterface, CIDP>
 where
 	B: BlockT,
+	BI: ParachainBlockImportMarker,
 	RCInterface: RelayChainInterface,
 	CIDP: CreateInherentDataProviders<B, (PHash, PersistedValidationData)>,
 {
@@ -90,7 +92,7 @@ where
 		para_id: ParaId,
 		proposer_factory: PF,
 		create_inherent_data_providers: CIDP,
-		block_import: ParachainBlockImport<BI>,
+		block_import: BI,
 		relay_chain_interface: RCInterface,
 	) -> Self {
 		Self {
@@ -143,7 +145,7 @@ impl<B, PF, BI, RCInterface, CIDP> ParachainConsensus<B>
 where
 	B: BlockT,
 	RCInterface: RelayChainInterface + Clone,
-	BI: BlockImport<B> + Send + Sync,
+	BI: BlockImport<B> + ParachainBlockImportMarker + Send + Sync,
 	PF: Environment<B> + Send + Sync,
 	PF::Proposer: Proposer<
 		B,
@@ -221,7 +223,7 @@ pub struct BuildRelayChainConsensusParams<PF, BI, CIDP, RCInterface> {
 	pub para_id: ParaId,
 	pub proposer_factory: PF,
 	pub create_inherent_data_providers: CIDP,
-	pub block_import: ParachainBlockImport<BI>,
+	pub block_import: BI,
 	pub relay_chain_interface: RCInterface,
 }
 
@@ -246,7 +248,7 @@ where
 		ProofRecording = EnableProofRecording,
 		Proof = <EnableProofRecording as ProofRecording>::Proof,
 	>,
-	BI: BlockImport<Block> + Send + Sync + 'static,
+	BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
 	CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)> + 'static,
 	RCInterface: RelayChainInterface + Clone + 'static,
 {
diff --git a/cumulus/client/pov-recovery/src/active_candidate_recovery.rs b/cumulus/client/pov-recovery/src/active_candidate_recovery.rs
index a269a26f821fc9b389639f3c123770d9b3d09af0..caae3615a853e5cd82d926d00465f127233025de 100644
--- a/cumulus/client/pov-recovery/src/active_candidate_recovery.rs
+++ b/cumulus/client/pov-recovery/src/active_candidate_recovery.rs
@@ -42,19 +42,19 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
 		Self { recoveries: Default::default(), candidates: Default::default(), overseer_handle }
 	}
 
-	/// Recover the given `pending_candidate`.
+	/// Recover the given `candidate`.
 	pub async fn recover_candidate(
 		&mut self,
 		block_hash: Block::Hash,
-		pending_candidate: crate::PendingCandidate<Block>,
+		candidate: &crate::Candidate<Block>,
 	) {
 		let (tx, rx) = oneshot::channel();
 
 		self.overseer_handle
 			.send_msg(
 				AvailabilityRecoveryMessage::RecoverAvailableData(
-					pending_candidate.receipt,
-					pending_candidate.session_index,
+					candidate.receipt.clone(),
+					candidate.session_index,
 					None,
 					tx,
 				),
diff --git a/cumulus/client/pov-recovery/src/lib.rs b/cumulus/client/pov-recovery/src/lib.rs
index e1f59677423df1a4a1f08005eb59320a05670c90..90c0a853214975c987aa1a0c159d32829ed1c3d5 100644
--- a/cumulus/client/pov-recovery/src/lib.rs
+++ b/cumulus/client/pov-recovery/src/lib.rs
@@ -59,7 +59,9 @@ use cumulus_primitives_core::ParachainBlockData;
 use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
 
 use codec::Decode;
-use futures::{select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
+use futures::{
+	channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt,
+};
 use futures_timer::Delay;
 use rand::{thread_rng, Rng};
 
@@ -75,38 +77,52 @@ use active_candidate_recovery::ActiveCandidateRecovery;
 
 const LOG_TARGET: &str = "cumulus-pov-recovery";
 
-/// Represents a pending candidate.
-struct PendingCandidate<Block: BlockT> {
-	receipt: CandidateReceipt,
-	session_index: SessionIndex,
-	block_number: NumberFor<Block>,
+/// Type of recovery to trigger.
+#[derive(Debug, PartialEq)]
+pub enum RecoveryKind {
+	/// Single block recovery.
+	Simple,
+	/// Full ancestry recovery.
+	Full,
 }
 
-/// The delay between observing an unknown block and recovering this block.
+/// Structure used to trigger an explicit recovery request via `PoVRecovery`.
+pub struct RecoveryRequest<Block: BlockT> {
+	/// Hash of the last block to recover.
+	pub hash: Block::Hash,
+	/// Recovery delay range. Randomizing the start of the recovery within this interval
+	/// can be used to prevent self-DOSing if the recovery request is part of a
+	/// distributed protocol and there is the possibility that multiple actors are
+	/// requiring to perform the recovery action at approximately the same time.
+	pub delay: RecoveryDelay,
+	/// Recovery type.
+	pub kind: RecoveryKind,
+}
+
+/// The delay between observing an unknown block and triggering the recovery of a block.
 #[derive(Clone, Copy)]
-pub enum RecoveryDelay {
-	/// Start recovering the block in maximum of the given delay.
-	WithMax { max: Duration },
-	/// Start recovering the block after at least `min` delay and in maximum `max` delay.
-	WithMinAndMax { min: Duration, max: Duration },
+pub struct RecoveryDelay {
+	/// Start recovering after `min` delay.
+	pub min: Duration,
+	/// Start recovering before `max` delay.
+	pub max: Duration,
 }
 
-impl RecoveryDelay {
-	/// Return as [`Delay`].
-	fn as_delay(self) -> Delay {
-		match self {
-			Self::WithMax { max } => Delay::new(max.mul_f64(thread_rng().gen())),
-			Self::WithMinAndMax { min, max } =>
-				Delay::new(min + max.saturating_sub(min).mul_f64(thread_rng().gen())),
-		}
-	}
+/// Represents an outstanding block candidate.
+struct Candidate<Block: BlockT> {
+	receipt: CandidateReceipt,
+	session_index: SessionIndex,
+	block_number: NumberFor<Block>,
+	parent_hash: Block::Hash,
+	// Lazy recovery has been submitted.
+	waiting_recovery: bool,
 }
 
 /// Encapsulates the logic of the pov recovery.
 pub struct PoVRecovery<Block: BlockT, PC, RC> {
 	/// All the pending candidates that we are waiting for to be imported or that need to be
 	/// recovered when `next_candidate_to_recover` tells us to do so.
-	pending_candidates: HashMap<Block::Hash, PendingCandidate<Block>>,
+	candidates: HashMap<Block::Hash, Candidate<Block>>,
 	/// A stream of futures that resolve to hashes of candidates that need to be recovered.
 	///
 	/// The candidates to the hashes are stored in `pending_candidates`. If a candidate is not
@@ -122,6 +138,8 @@ pub struct PoVRecovery<Block: BlockT, PC, RC> {
 	parachain_import_queue: Box<dyn ImportQueueService<Block>>,
 	relay_chain_interface: RC,
 	para_id: ParaId,
+	/// Explicit block recovery requests channel.
+	recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
 }
 
 impl<Block: BlockT, PC, RCInterface> PoVRecovery<Block, PC, RCInterface>
@@ -137,9 +155,10 @@ where
 		parachain_import_queue: Box<dyn ImportQueueService<Block>>,
 		relay_chain_interface: RCInterface,
 		para_id: ParaId,
+		recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
 	) -> Self {
 		Self {
-			pending_candidates: HashMap::new(),
+			candidates: HashMap::new(),
 			next_candidate_to_recover: Default::default(),
 			active_candidate_recovery: ActiveCandidateRecovery::new(overseer_handle),
 			recovery_delay,
@@ -148,6 +167,7 @@ where
 			parachain_import_queue,
 			relay_chain_interface,
 			para_id,
+			recovery_chan_rx,
 		}
 	}
 
@@ -174,69 +194,54 @@ where
 		}
 
 		let hash = header.hash();
-		match self.parachain_client.block_status(&BlockId::Hash(hash)) {
-			Ok(BlockStatus::Unknown) => (),
-			// Any other state means, we should ignore it.
-			Ok(_) => return,
-			Err(e) => {
-				tracing::debug!(
-					target: LOG_TARGET,
-					error = ?e,
-					block_hash = ?hash,
-					"Failed to get block status",
-				);
-				return
-			},
-		}
 
-		tracing::debug!(target: LOG_TARGET, ?hash, "Adding pending candidate");
-		if self
-			.pending_candidates
-			.insert(
-				hash,
-				PendingCandidate {
-					block_number: *header.number(),
-					receipt: receipt.to_plain(),
-					session_index,
-				},
-			)
-			.is_some()
-		{
+		if self.candidates.contains_key(&hash) {
 			return
 		}
 
-		// Delay the recovery by some random time to not spam the relay chain.
-		let delay = self.recovery_delay.as_delay();
-		self.next_candidate_to_recover.push(
-			async move {
-				delay.await;
-				hash
-			}
-			.boxed(),
+		tracing::debug!(target: LOG_TARGET, block_hash = ?hash, "Adding outstanding candidate");
+		self.candidates.insert(
+			hash,
+			Candidate {
+				block_number: *header.number(),
+				receipt: receipt.to_plain(),
+				session_index,
+				parent_hash: *header.parent_hash(),
+				waiting_recovery: false,
+			},
 		);
+
+		// If required, triggers a lazy recovery request that will eventually be blocked
+		// if in the meantime the block is imported.
+		self.recover(RecoveryRequest {
+			hash,
+			delay: self.recovery_delay,
+			kind: RecoveryKind::Simple,
+		});
 	}
 
 	/// Handle an imported block.
-	fn handle_block_imported(&mut self, hash: &Block::Hash) {
-		self.pending_candidates.remove(hash);
+	fn handle_block_imported(&mut self, block_hash: &Block::Hash) {
+		self.candidates.get_mut(block_hash).map(|candidate| {
+			// Prevents triggering an already enqueued recovery request
+			candidate.waiting_recovery = false;
+		});
 	}
 
 	/// Handle a finalized block with the given `block_number`.
 	fn handle_block_finalized(&mut self, block_number: NumberFor<Block>) {
-		self.pending_candidates.retain(|_, pc| pc.block_number > block_number);
+		self.candidates.retain(|_, pc| pc.block_number > block_number);
 	}
 
 	/// Recover the candidate for the given `block_hash`.
 	async fn recover_candidate(&mut self, block_hash: Block::Hash) {
-		let pending_candidate = match self.pending_candidates.remove(&block_hash) {
-			Some(pending_candidate) => pending_candidate,
-			None => return,
-		};
-
-		tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request");
-		self.active_candidate_recovery
-			.recover_candidate(block_hash, pending_candidate)
-			.await;
+		match self.candidates.get(&block_hash) {
+			Some(candidate) if candidate.waiting_recovery => {
+				tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request");
+				self.active_candidate_recovery.recover_candidate(block_hash, candidate).await;
+			},
+			_ => (),
+		}
 	}
 
 	/// Clear `waiting_for_parent` from the given `hash` and do this recursively for all child
@@ -348,7 +353,7 @@ where
 	async fn import_block(&mut self, block: Block) {
 		let mut blocks = VecDeque::new();
 
-		tracing::debug!(target: LOG_TARGET, hash = ?block.hash(), "Importing block retrieved using pov_recovery");
+		tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), "Importing block retrieved using pov_recovery");
 		blocks.push_back(block);
 
 		let mut incoming_blocks = Vec::new();
@@ -379,6 +384,70 @@ where
 			.import_blocks(BlockOrigin::ConsensusBroadcast, incoming_blocks);
 	}
 
+	/// Attempts an explicit recovery of one or more blocks.
+	pub fn recover(&mut self, req: RecoveryRequest<Block>) {
+		let RecoveryRequest { mut hash, delay, kind } = req;
+		let mut to_recover = Vec::new();
+
+		let do_recover = loop {
+			let candidate = match self.candidates.get_mut(&hash) {
+				Some(candidate) => candidate,
+				None => {
+					tracing::debug!(
+						target: LOG_TARGET,
+						block_hash = ?hash,
+						"Cound not recover. Block was never announced as candidate"
+					);
+					break false
+				},
+			};
+
+			match self.parachain_client.block_status(&BlockId::Hash(hash)) {
+				Ok(BlockStatus::Unknown) if !candidate.waiting_recovery => {
+					candidate.waiting_recovery = true;
+					to_recover.push(hash);
+				},
+				Ok(_) => break true,
+				Err(e) => {
+					tracing::error!(
+						target: LOG_TARGET,
+						error = ?e,
+						block_hash = ?hash,
+						"Failed to get block status",
+					);
+					break false
+				},
+			}
+
+			if kind == RecoveryKind::Simple {
+				break true
+			}
+
+			hash = candidate.parent_hash;
+		};
+
+		if do_recover {
+			for hash in to_recover.into_iter().rev() {
+				let delay =
+					delay.min + delay.max.saturating_sub(delay.min).mul_f64(thread_rng().gen());
+				tracing::debug!(
+					target: LOG_TARGET,
+					block_hash = ?hash,
+					"Starting {:?} block recovery in {:?} sec",
+					kind,
+					delay.as_secs(),
+				);
+				self.next_candidate_to_recover.push(
+					async move {
+						Delay::new(delay).await;
+						hash
+					}
+					.boxed(),
+				);
+			}
+		}
+	}
+
 	/// Run the pov-recovery.
 	pub async fn run(mut self) {
 		let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
@@ -400,10 +469,15 @@ where
 					if let Some((receipt, session_index)) = pending_candidate {
 						self.handle_pending_candidate(receipt, session_index);
 					} else {
-						tracing::debug!(
-							target: LOG_TARGET,
-							"Pending candidates stream ended",
-						);
+						tracing::debug!(target: LOG_TARGET, "Pending candidates stream ended");
+						return;
+					}
+				},
+				recovery_req = self.recovery_chan_rx.next() => {
+					if let Some(req) = recovery_req {
+						self.recover(req);
+					} else {
+						tracing::debug!(target: LOG_TARGET, "Recovery channel stream ended");
 						return;
 					}
 				},
@@ -411,10 +485,7 @@ where
 					if let Some(imported) = imported {
 						self.handle_block_imported(&imported.hash);
 					} else {
-						tracing::debug!(
-							target: LOG_TARGET,
-							"Imported blocks stream ended",
-						);
+						tracing::debug!(target: LOG_TARGET,	"Imported blocks stream ended");
 						return;
 					}
 				},
@@ -422,10 +493,7 @@ where
 					if let Some(finalized) = finalized {
 						self.handle_block_finalized(*finalized.header.number());
 					} else {
-						tracing::debug!(
-							target: LOG_TARGET,
-							"Finalized blocks stream ended",
-						);
+						tracing::debug!(target: LOG_TARGET,	"Finalized blocks stream ended");
 						return;
 					}
 				},
diff --git a/cumulus/client/service/Cargo.toml b/cumulus/client/service/Cargo.toml
index 6642deef06ec426389c29ec6e15b68f2709f0ed0..3c9108c8c71daa4656172e64a4cc8f3cdb3d4374 100644
--- a/cumulus/client/service/Cargo.toml
+++ b/cumulus/client/service/Cargo.toml
@@ -6,6 +6,7 @@ edition = "2021"
 
 [dependencies]
 parking_lot = "0.12.1"
+futures = "0.3.24"
 
 # Substrate
 sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs
index 82ce3ce487209ca00993a1830a18a1ae49e476a2..4e091c033a71a0d3d9f45669e7bed583af1cee33 100644
--- a/cumulus/client/service/src/lib.rs
+++ b/cumulus/client/service/src/lib.rs
@@ -20,11 +20,13 @@
 
 use cumulus_client_cli::CollatorOptions;
 use cumulus_client_consensus_common::ParachainConsensus;
+use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelay};
 use cumulus_primitives_core::{CollectCollationInfo, ParaId};
 use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
 use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
 use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node;
 use polkadot_primitives::v2::CollatorPair;
+
 use sc_client_api::{
 	Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, UsageProvider,
 };
@@ -35,8 +37,15 @@ use sp_api::ProvideRuntimeApi;
 use sp_blockchain::HeaderBackend;
 use sp_core::traits::SpawnNamed;
 use sp_runtime::traits::Block as BlockT;
+
+use futures::channel::mpsc;
 use std::{sync::Arc, time::Duration};
 
+// Given the sporadic nature of the explicit recovery operation and the
+// possibility to retry infinite times this value is more than enough.
+// In practice here we expect no more than one queued messages.
+const RECOVERY_CHAN_SIZE: usize = 8;
+
 /// Parameters given to [`start_collator`].
 pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawner> {
 	pub block_status: Arc<BS>,
@@ -90,11 +99,14 @@ where
 	RCInterface: RelayChainInterface + Clone + 'static,
 	Backend: BackendT<Block> + 'static,
 {
+	let (recovery_chan_tx, recovery_chan_rx) = mpsc::channel(RECOVERY_CHAN_SIZE);
+
 	let consensus = cumulus_client_consensus_common::run_parachain_consensus(
 		para_id,
 		client.clone(),
 		relay_chain_interface.clone(),
 		announce_block.clone(),
+		Some(recovery_chan_tx),
 	);
 
 	task_manager
@@ -105,15 +117,16 @@ where
 		.overseer_handle()
 		.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
 
-	let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new(
+	let pov_recovery = PoVRecovery::new(
 		overseer_handle.clone(),
 		// We want that collators wait at maximum the relay chain slot duration before starting
 		// to recover blocks.
-		cumulus_client_pov_recovery::RecoveryDelay::WithMax { max: relay_chain_slot_duration },
+		RecoveryDelay { min: core::time::Duration::ZERO, max: relay_chain_slot_duration },
 		client.clone(),
 		import_queue,
 		relay_chain_interface.clone(),
 		para_id,
+		recovery_chan_rx,
 	);
 
 	task_manager
@@ -173,11 +186,14 @@ where
 	Backend: BackendT<Block> + 'static,
 	RCInterface: RelayChainInterface + Clone + 'static,
 {
+	let (recovery_chan_tx, recovery_chan_rx) = mpsc::channel(RECOVERY_CHAN_SIZE);
+
 	let consensus = cumulus_client_consensus_common::run_parachain_consensus(
 		para_id,
 		client.clone(),
 		relay_chain_interface.clone(),
 		announce_block,
+		Some(recovery_chan_tx),
 	);
 
 	task_manager
@@ -188,21 +204,19 @@ where
 		.overseer_handle()
 		.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
 
-	let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new(
+	let pov_recovery = PoVRecovery::new(
 		overseer_handle,
 		// Full nodes should at least wait 2.5 minutes (assuming 6 seconds slot duration) and
 		// in maximum 5 minutes before starting to recover blocks. Collators should already start
 		// the recovery way before full nodes try to recover a certain block and then share the
 		// block with the network using "the normal way". Full nodes are just the "last resort"
 		// for block recovery.
-		cumulus_client_pov_recovery::RecoveryDelay::WithMinAndMax {
-			min: relay_chain_slot_duration * 25,
-			max: relay_chain_slot_duration * 50,
-		},
+		RecoveryDelay { min: relay_chain_slot_duration * 25, max: relay_chain_slot_duration * 50 },
 		client,
 		import_queue,
 		relay_chain_interface,
 		para_id,
+		recovery_chan_rx,
 	);
 
 	task_manager
diff --git a/cumulus/parachain-template/node/src/service.rs b/cumulus/parachain-template/node/src/service.rs
index 2df17e2eb89e6487efbb1b0fa60b74255111afa2..a8ee57cadbf9b304cb7cfc2b8ced15e2d05a6de3 100644
--- a/cumulus/parachain-template/node/src/service.rs
+++ b/cumulus/parachain-template/node/src/service.rs
@@ -51,7 +51,7 @@ type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
 
 type ParachainBackend = TFullBackend<Block>;
 
-type ParachainBlockImport = TParachainBlockImport<Arc<ParachainClient>>;
+type ParachainBlockImport = TParachainBlockImport<Block, Arc<ParachainClient>, ParachainBackend>;
 
 /// Starts a `ServiceBuilder` for a full service.
 ///
@@ -111,7 +111,7 @@ pub fn new_partial(
 		client.clone(),
 	);
 
-	let block_import = ParachainBlockImport::new(client.clone());
+	let block_import = ParachainBlockImport::new(client.clone(), backend.clone());
 
 	let import_queue = build_import_queue(
 		client.clone(),
@@ -141,7 +141,7 @@ async fn start_node_impl(
 	parachain_config: Configuration,
 	polkadot_config: Configuration,
 	collator_options: CollatorOptions,
-	id: ParaId,
+	para_id: ParaId,
 	hwbench: Option<sc_sysinfo::HwBench>,
 ) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
 	let parachain_config = prepare_node_config(parachain_config);
@@ -167,7 +167,8 @@ async fn start_node_impl(
 		s => s.to_string().into(),
 	})?;
 
-	let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface.clone(), id);
+	let block_announce_validator =
+		BlockAnnounceValidator::new(relay_chain_interface.clone(), para_id);
 
 	let force_authoring = parachain_config.force_authoring;
 	let validator = parachain_config.role.is_authority();
@@ -219,7 +220,7 @@ async fn start_node_impl(
 		task_manager: &mut task_manager,
 		config: parachain_config,
 		keystore: params.keystore_container.sync_keystore(),
-		backend: backend.clone(),
+		backend,
 		network: network.clone(),
 		system_rpc_tx,
 		tx_handler_controller,
@@ -258,12 +259,12 @@ async fn start_node_impl(
 			network,
 			params.keystore_container.sync_keystore(),
 			force_authoring,
-			id,
+			para_id,
 		)?;
 
 		let spawner = task_manager.spawn_handle();
 		let params = StartCollatorParams {
-			para_id: id,
+			para_id,
 			block_status: client.clone(),
 			announce_block,
 			client: client.clone(),
@@ -282,7 +283,7 @@ async fn start_node_impl(
 			client: client.clone(),
 			announce_block,
 			task_manager: &mut task_manager,
-			para_id: id,
+			para_id,
 			relay_chain_interface,
 			relay_chain_slot_duration,
 			import_queue: import_queue_service,
@@ -345,7 +346,7 @@ fn build_consensus(
 	sync_oracle: Arc<NetworkService<Block, Hash>>,
 	keystore: SyncCryptoStorePtr,
 	force_authoring: bool,
-	id: ParaId,
+	para_id: ParaId,
 ) -> Result<Box<dyn ParachainConsensus<Block>>, sc_service::Error> {
 	let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)?;
 
@@ -367,7 +368,7 @@ fn build_consensus(
 						relay_parent,
 						&relay_chain_interface,
 						&validation_data,
-						id,
+						para_id,
 					)
 					.await;
 				let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
@@ -408,8 +409,8 @@ pub async fn start_parachain_node(
 	parachain_config: Configuration,
 	polkadot_config: Configuration,
 	collator_options: CollatorOptions,
-	id: ParaId,
+	para_id: ParaId,
 	hwbench: Option<sc_sysinfo::HwBench>,
 ) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
-	start_node_impl(parachain_config, polkadot_config, collator_options, id, hwbench).await
+	start_node_impl(parachain_config, polkadot_config, collator_options, para_id, hwbench).await
 }
diff --git a/cumulus/polkadot-parachain/src/service.rs b/cumulus/polkadot-parachain/src/service.rs
index b38858c8f3e8a7f3d88a3cd0a6b20aabf42e4331..7fab9065e3a83d9bcf687681e533c7b742388b23 100644
--- a/cumulus/polkadot-parachain/src/service.rs
+++ b/cumulus/polkadot-parachain/src/service.rs
@@ -71,7 +71,8 @@ type ParachainClient<RuntimeApi> = TFullClient<Block, RuntimeApi, WasmExecutor<H
 
 type ParachainBackend = TFullBackend<Block>;
 
-type ParachainBlockImport<RuntimeApi> = TParachainBlockImport<Arc<ParachainClient<RuntimeApi>>>;
+type ParachainBlockImport<RuntimeApi> =
+	TParachainBlockImport<Block, Arc<ParachainClient<RuntimeApi>>, ParachainBackend>;
 
 /// Native executor instance.
 pub struct ShellRuntimeExecutor;
@@ -275,7 +276,7 @@ where
 		client.clone(),
 	);
 
-	let block_import = ParachainBlockImport::new(client.clone());
+	let block_import = ParachainBlockImport::new(client.clone(), backend.clone());
 
 	let import_queue = build_import_queue(
 		client.clone(),
@@ -1142,6 +1143,7 @@ where
 			let telemetry2 = telemetry.clone();
 			let prometheus_registry2 = prometheus_registry.map(|r| (*r).clone());
 			let relay_chain_for_aura = relay_chain_interface.clone();
+
 			let aura_consensus = BuildOnAccess::Uninitialized(Some(Box::new(move || {
 				let slot_duration =
 					cumulus_client_consensus_aura::slot_duration(&*client2).unwrap();
diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs
index 76d568ffebadeb6ebaf3d727ae4fb30e84a4e6c7..2b7f17ce14be36264459eca01cb1b3c431974f85 100644
--- a/cumulus/test/service/src/lib.rs
+++ b/cumulus/test/service/src/lib.rs
@@ -117,11 +117,15 @@ pub type Client = TFullClient<
 	sc_executor::NativeElseWasmExecutor<RuntimeExecutor>,
 >;
 
+/// The backend type being used by the test service.
+pub type Backend = TFullBackend<Block>;
+
+/// The block-import type being used by the test service.
+pub type ParachainBlockImport = TParachainBlockImport<Block, Arc<Client>, Backend>;
+
 /// Transaction pool type used by the test service
 pub type TransactionPool = Arc<sc_transaction_pool::FullPool<Block, Client>>;
 
-type ParachainBlockImport = TParachainBlockImport<Arc<Client>>;
-
 /// Starts a `ServiceBuilder` for a full service.
 ///
 /// Use this macro if you don't actually need the full service, but just the builder in order to
@@ -131,7 +135,7 @@ pub fn new_partial(
 ) -> Result<
 	PartialComponents<
 		Client,
-		TFullBackend<Block>,
+		Backend,
 		(),
 		sc_consensus::import_queue::BasicQueue<Block, PrefixedMemoryDB<BlakeTwo256>>,
 		sc_transaction_pool::FullPool<Block, Client>,
@@ -150,7 +154,7 @@ pub fn new_partial(
 		sc_service::new_full_parts::<Block, RuntimeApi, _>(config, None, executor)?;
 	let client = Arc::new(client);
 
-	let block_import = ParachainBlockImport::new(client.clone());
+	let block_import = ParachainBlockImport::new(client.clone(), backend.clone());
 
 	let registry = config.prometheus_registry();
 
@@ -299,7 +303,7 @@ where
 		task_manager: &mut task_manager,
 		config: parachain_config,
 		keystore: params.keystore_container.sync_keystore(),
-		backend,
+		backend: backend.clone(),
 		network: network.clone(),
 		system_rpc_tx,
 		tx_handler_controller,