From dacd0aed5ef84e20d634b2158562ead4bc3e9089 Mon Sep 17 00:00:00 2001
From: Sebastian Kunert <skunert49@gmail.com>
Date: Tue, 25 Jan 2022 18:27:54 +0100
Subject: [PATCH] Unify RelayChainInterface error handling and introduce async
 (#909)

---
 cumulus/Cargo.lock                            |   3 +
 cumulus/client/consensus/common/Cargo.toml    |   2 +-
 .../common/src/parachain_consensus.rs         |  84 +++++---
 cumulus/client/consensus/common/src/tests.rs  |  19 +-
 .../client/consensus/relay-chain/src/lib.rs   |   2 +-
 cumulus/client/network/src/lib.rs             | 134 ++++++------
 cumulus/client/network/src/tests.rs           | 129 +++++++-----
 cumulus/client/pov-recovery/src/lib.rs        |  70 +++---
 .../client/relay-chain-interface/Cargo.toml   |   3 +
 .../client/relay-chain-interface/src/lib.rs   | 184 ++++++++--------
 cumulus/client/relay-chain-local/src/lib.rs   | 199 ++++++++----------
 cumulus/client/service/src/lib.rs             |  22 +-
 cumulus/pallets/parachain-system/src/lib.rs   |   2 +-
 .../parachain-template/node/src/service.rs    |   7 +-
 cumulus/polkadot-parachains/src/service.rs    |  26 ++-
 .../parachain-inherent/src/client_side.rs     |  49 ++++-
 cumulus/test/service/src/lib.rs               |  11 +-
 17 files changed, 532 insertions(+), 414 deletions(-)

diff --git a/cumulus/Cargo.lock b/cumulus/Cargo.lock
index 01a0ad13299..eb4ca303306 100644
--- a/cumulus/Cargo.lock
+++ b/cumulus/Cargo.lock
@@ -1895,14 +1895,17 @@ dependencies = [
  "async-trait",
  "cumulus-primitives-core",
  "derive_more",
+ "futures 0.3.19",
  "parking_lot 0.11.2",
  "polkadot-overseer",
  "sc-client-api",
+ "sc-service",
  "sp-api",
  "sp-blockchain",
  "sp-core",
  "sp-runtime",
  "sp-state-machine",
+ "thiserror",
 ]
 
 [[package]]
diff --git a/cumulus/client/consensus/common/Cargo.toml b/cumulus/client/consensus/common/Cargo.toml
index 057f0b34966..379f73740f2 100644
--- a/cumulus/client/consensus/common/Cargo.toml
+++ b/cumulus/client/consensus/common/Cargo.toml
@@ -25,7 +25,7 @@ cumulus-relay-chain-interface = { path = "../../relay-chain-interface" }
 futures = { version = "0.3.8", features = ["compat"] }
 codec = { package = "parity-scale-codec", version = "2.3.0", features = [ "derive" ] }
 tracing = "0.1.25"
-async-trait = "0.1.42"
+async-trait = "0.1.52"
 dyn-clone = "1.0.4"
 
 [dev-dependencies]
diff --git a/cumulus/client/consensus/common/src/parachain_consensus.rs b/cumulus/client/consensus/common/src/parachain_consensus.rs
index 224e3e5fd9b..6328681fd9f 100644
--- a/cumulus/client/consensus/common/src/parachain_consensus.rs
+++ b/cumulus/client/consensus/common/src/parachain_consensus.rs
@@ -14,12 +14,13 @@
 // You should have received a copy of the GNU General Public License
 // along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.
 
-use cumulus_relay_chain_interface::RelayChainInterface;
+use async_trait::async_trait;
+use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
 use sc_client_api::{
 	Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
 };
 use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
-use sp_blockchain::{Error as ClientError, Result as ClientResult};
+use sp_blockchain::Error as ClientError;
 use sp_consensus::{BlockOrigin, BlockStatus};
 use sp_runtime::{
 	generic::BlockId,
@@ -29,11 +30,14 @@ use sp_runtime::{
 use polkadot_primitives::v1::{Block as PBlock, Id as ParaId, OccupiedCoreAssumption};
 
 use codec::Decode;
-use futures::{future, select, FutureExt, Stream, StreamExt};
+use futures::{select, FutureExt, Stream, StreamExt};
 
 use std::{pin::Pin, sync::Arc};
 
+const LOG_TARGET: &str = "cumulus-consensus";
+
 /// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
+#[async_trait]
 pub trait RelaychainClient: Clone + 'static {
 	/// The error type for interacting with the Polkadot client.
 	type Error: std::fmt::Debug + Send;
@@ -42,17 +46,17 @@ pub trait RelaychainClient: Clone + 'static {
 	type HeadStream: Stream<Item = Vec<u8>> + Send + Unpin;
 
 	/// Get a stream of new best heads for the given parachain.
-	fn new_best_heads(&self, para_id: ParaId) -> Self::HeadStream;
+	async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream>;
 
 	/// Get a stream of finalized heads for the given parachain.
-	fn finalized_heads(&self, para_id: ParaId) -> Self::HeadStream;
+	async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream>;
 
 	/// Returns the parachain head for the given `para_id` at the given block id.
-	fn parachain_head_at(
+	async fn parachain_head_at(
 		&self,
 		at: &BlockId<PBlock>,
 		para_id: ParaId,
-	) -> ClientResult<Option<Vec<u8>>>;
+	) -> RelayChainResult<Option<Vec<u8>>>;
 }
 
 /// Follow the finalized head of the given parachain.
@@ -66,7 +70,13 @@ where
 	R: RelaychainClient,
 	B: Backend<Block>,
 {
-	let mut finalized_heads = relay_chain.finalized_heads(para_id);
+	let mut finalized_heads = match relay_chain.finalized_heads(para_id).await {
+		Ok(finalized_heads_stream) => finalized_heads_stream,
+		Err(err) => {
+			tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
+			return
+		},
+	};
 
 	loop {
 		let finalized_head = if let Some(h) = finalized_heads.next().await {
@@ -165,7 +175,14 @@ async fn follow_new_best<P, R, Block, B>(
 	R: RelaychainClient,
 	B: Backend<Block>,
 {
-	let mut new_best_heads = relay_chain.new_best_heads(para_id).fuse();
+	let mut new_best_heads = match relay_chain.new_best_heads(para_id).await {
+		Ok(best_heads_stream) => best_heads_stream.fuse(),
+		Err(err) => {
+			tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream.");
+			return
+		},
+	};
+
 	let mut imported_blocks = parachain.import_notification_stream().fuse();
 	// The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain
 	// block before the parachain block it included. In this case we need to wait for this block to
@@ -368,6 +385,7 @@ where
 	}
 }
 
+#[async_trait]
 impl<RCInterface> RelaychainClient for RCInterface
 where
 	RCInterface: RelayChainInterface + Clone + 'static,
@@ -376,39 +394,53 @@ where
 
 	type HeadStream = Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;
 
-	fn new_best_heads(&self, para_id: ParaId) -> Self::HeadStream {
+	async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream> {
 		let relay_chain = self.clone();
 
-		self.import_notification_stream()
+		let new_best_notification_stream = self
+			.new_best_notification_stream()
+			.await?
 			.filter_map(move |n| {
-				future::ready(if n.is_new_best {
-					relay_chain.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().flatten()
-				} else {
-					None
-				})
+				let relay_chain = relay_chain.clone();
+				async move {
+					relay_chain
+						.parachain_head_at(&BlockId::hash(n.hash()), para_id)
+						.await
+						.ok()
+						.flatten()
+				}
 			})
-			.boxed()
+			.boxed();
+		Ok(new_best_notification_stream)
 	}
 
-	fn finalized_heads(&self, para_id: ParaId) -> Self::HeadStream {
+	async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream> {
 		let relay_chain = self.clone();
 
-		self.finality_notification_stream()
+		let finality_notification_stream = self
+			.finality_notification_stream()
+			.await?
 			.filter_map(move |n| {
-				future::ready(
-					relay_chain.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().flatten(),
-				)
+				let relay_chain = relay_chain.clone();
+				async move {
+					relay_chain
+						.parachain_head_at(&BlockId::hash(n.hash()), para_id)
+						.await
+						.ok()
+						.flatten()
+				}
 			})
-			.boxed()
+			.boxed();
+		Ok(finality_notification_stream)
 	}
 
-	fn parachain_head_at(
+	async fn parachain_head_at(
 		&self,
 		at: &BlockId<PBlock>,
 		para_id: ParaId,
-	) -> ClientResult<Option<Vec<u8>>> {
+	) -> RelayChainResult<Option<Vec<u8>>> {
 		self.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
+			.await
 			.map(|s| s.map(|s| s.parent_head.0))
-			.map_err(Into::into)
 	}
 }
diff --git a/cumulus/client/consensus/common/src/tests.rs b/cumulus/client/consensus/common/src/tests.rs
index 4340b7b681e..ceb60aa501e 100644
--- a/cumulus/client/consensus/common/src/tests.rs
+++ b/cumulus/client/consensus/common/src/tests.rs
@@ -16,7 +16,9 @@
 
 use crate::*;
 
+use async_trait::async_trait;
 use codec::Encode;
+use cumulus_relay_chain_interface::RelayChainResult;
 use cumulus_test_client::{
 	runtime::{Block, Header},
 	Backend, Client, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt,
@@ -26,7 +28,7 @@ use futures_timer::Delay;
 use polkadot_primitives::v1::{Block as PBlock, Id as ParaId};
 use sc_client_api::UsageProvider;
 use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
-use sp_blockchain::{Error as ClientError, Result as ClientResult};
+use sp_blockchain::Error as ClientError;
 use sp_consensus::BlockOrigin;
 use sp_runtime::generic::BlockId;
 use std::{
@@ -66,12 +68,13 @@ impl Relaychain {
 	}
 }
 
+#[async_trait]
 impl crate::parachain_consensus::RelaychainClient for Relaychain {
 	type Error = ClientError;
 
 	type HeadStream = Box<dyn Stream<Item = Vec<u8>> + Send + Unpin>;
 
-	fn new_best_heads(&self, _: ParaId) -> Self::HeadStream {
+	async fn new_best_heads(&self, _: ParaId) -> RelayChainResult<Self::HeadStream> {
 		let stream = self
 			.inner
 			.lock()
@@ -80,10 +83,10 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
 			.take()
 			.expect("Should only be called once");
 
-		Box::new(stream.map(|v| v.encode()))
+		Ok(Box::new(stream.map(|v| v.encode())))
 	}
 
-	fn finalized_heads(&self, _: ParaId) -> Self::HeadStream {
+	async fn finalized_heads(&self, _: ParaId) -> RelayChainResult<Self::HeadStream> {
 		let stream = self
 			.inner
 			.lock()
@@ -92,10 +95,14 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
 			.take()
 			.expect("Should only be called once");
 
-		Box::new(stream.map(|v| v.encode()))
+		Ok(Box::new(stream.map(|v| v.encode())))
 	}
 
-	fn parachain_head_at(&self, _: &BlockId<PBlock>, _: ParaId) -> ClientResult<Option<Vec<u8>>> {
+	async fn parachain_head_at(
+		&self,
+		_: &BlockId<PBlock>,
+		_: ParaId,
+	) -> RelayChainResult<Option<Vec<u8>>> {
 		unimplemented!("Not required for tests")
 	}
 }
diff --git a/cumulus/client/consensus/relay-chain/src/lib.rs b/cumulus/client/consensus/relay-chain/src/lib.rs
index 7ab3ef28619..69a92175da1 100644
--- a/cumulus/client/consensus/relay-chain/src/lib.rs
+++ b/cumulus/client/consensus/relay-chain/src/lib.rs
@@ -176,7 +176,7 @@ where
 			.propose(
 				inherent_data,
 				Default::default(),
-				//TODO: Fix this.
+				// TODO: Fix this.
 				Duration::from_millis(500),
 				// Set the block limit to 50% of the maximum PoV size.
 				//
diff --git a/cumulus/client/network/src/lib.rs b/cumulus/client/network/src/lib.rs
index 79e4f7c1b79..2010803d384 100644
--- a/cumulus/client/network/src/lib.rs
+++ b/cumulus/client/network/src/lib.rs
@@ -38,11 +38,7 @@ use polkadot_primitives::v1::{
 };
 
 use codec::{Decode, DecodeAll, Encode};
-use futures::{
-	channel::oneshot,
-	future::{ready, FutureExt},
-	Future,
-};
+use futures::{channel::oneshot, future::FutureExt, Future};
 
 use std::{convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc};
 
@@ -128,7 +124,7 @@ impl BlockAnnounceData {
 	/// Check the signature of the statement.
 	///
 	/// Returns an `Err(_)` if it failed.
-	fn check_signature<RCInterface>(
+	async fn check_signature<RCInterface>(
 		self,
 		relay_chain_client: &RCInterface,
 	) -> Result<Validation, BlockAnnounceError>
@@ -138,16 +134,16 @@ impl BlockAnnounceData {
 		let validator_index = self.statement.unchecked_validator_index();
 
 		let runtime_api_block_id = BlockId::Hash(self.relay_parent);
-		let session_index = match relay_chain_client.session_index_for_child(&runtime_api_block_id)
-		{
-			Ok(r) => r,
-			Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
-		};
+		let session_index =
+			match relay_chain_client.session_index_for_child(&runtime_api_block_id).await {
+				Ok(r) => r,
+				Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
+			};
 
 		let signing_context = SigningContext { parent_hash: self.relay_parent, session_index };
 
 		// Check that the signer is a legit validator.
-		let authorities = match relay_chain_client.validators(&runtime_api_block_id) {
+		let authorities = match relay_chain_client.validators(&runtime_api_block_id).await {
 			Ok(r) => r,
 			Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
 		};
@@ -222,6 +218,7 @@ impl TryFrom<&'_ CollationSecondedSignal> for BlockAnnounceData {
 /// chain. If it is at the tip, it is required to provide a justification or otherwise we reject
 /// it. However, if the announcement is for a block below the tip the announcement is accepted
 /// as it probably comes from a node that is currently syncing the chain.
+#[derive(Clone)]
 pub struct BlockAnnounceValidator<Block, RCInterface> {
 	phantom: PhantomData<Block>,
 	relay_chain_interface: RCInterface,
@@ -247,13 +244,14 @@ where
 	RCInterface: RelayChainInterface + Clone,
 {
 	/// Get the included block of the given parachain in the relay chain.
-	fn included_block(
+	async fn included_block(
 		relay_chain_interface: &RCInterface,
 		block_id: &BlockId<PBlock>,
 		para_id: ParaId,
 	) -> Result<Block::Header, BoxedError> {
 		let validation_data = relay_chain_interface
 			.persisted_validation_data(block_id, para_id, OccupiedCoreAssumption::TimedOut)
+			.await
 			.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?
 			.ok_or_else(|| {
 				Box::new(BlockAnnounceError("Could not find parachain head in relay chain".into()))
@@ -269,56 +267,59 @@ where
 	}
 
 	/// Get the backed block hash of the given parachain in the relay chain.
-	fn backed_block_hash(
+	async fn backed_block_hash(
 		relay_chain_interface: &RCInterface,
 		block_id: &BlockId<PBlock>,
 		para_id: ParaId,
 	) -> Result<Option<PHash>, BoxedError> {
 		let candidate_receipt = relay_chain_interface
 			.candidate_pending_availability(block_id, para_id)
+			.await
 			.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
 
 		Ok(candidate_receipt.map(|cr| cr.descriptor.para_head))
 	}
 
 	/// Handle a block announcement with empty data (no statement) attached to it.
-	fn handle_empty_block_announce_data(
+	async fn handle_empty_block_announce_data(
 		&self,
 		header: Block::Header,
-	) -> impl Future<Output = Result<Validation, BoxedError>> {
+	) -> Result<Validation, BoxedError> {
 		let relay_chain_interface = self.relay_chain_interface.clone();
 		let para_id = self.para_id;
 
-		async move {
-			// Check if block is equal or higher than best (this requires a justification)
-			let relay_chain_best_hash = relay_chain_interface.best_block_hash();
-			let runtime_api_block_id = BlockId::Hash(relay_chain_best_hash);
-			let block_number = header.number();
-
-			let best_head =
-				Self::included_block(&relay_chain_interface, &runtime_api_block_id, para_id)?;
-			let known_best_number = best_head.number();
-			let backed_block =
-				|| Self::backed_block_hash(&relay_chain_interface, &runtime_api_block_id, para_id);
-
-			if best_head == header {
-				tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
-
-				Ok(Validation::Success { is_new_best: true })
-			} else if Some(HeadData(header.encode()).hash()) == backed_block()? {
-				tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);
-
-				Ok(Validation::Success { is_new_best: true })
-			} else if block_number >= known_best_number {
-				tracing::debug!(
+		// Check if block is equal or higher than best (this requires a justification)
+		let relay_chain_best_hash = relay_chain_interface
+			.best_block_hash()
+			.await
+			.map_err(|e| Box::new(e) as Box<_>)?;
+		let runtime_api_block_id = BlockId::Hash(relay_chain_best_hash);
+		let block_number = header.number();
+
+		let best_head =
+			Self::included_block(&relay_chain_interface, &runtime_api_block_id, para_id).await?;
+		let known_best_number = best_head.number();
+		let backed_block = || async {
+			Self::backed_block_hash(&relay_chain_interface, &runtime_api_block_id, para_id).await
+		};
+
+		if best_head == header {
+			tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
+
+			Ok(Validation::Success { is_new_best: true })
+		} else if Some(HeadData(header.encode()).hash()) == backed_block().await? {
+			tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);
+
+			Ok(Validation::Success { is_new_best: true })
+		} else if block_number >= known_best_number {
+			tracing::debug!(
 					target: LOG_TARGET,
 					"Validation failed because a justification is needed if the block at the top of the chain."
 				);
 
-				Ok(Validation::Failure { disconnect: false })
-			} else {
-				Ok(Validation::Success { is_new_best: false })
-			}
+			Ok(Validation::Failure { disconnect: false })
+		} else {
+			Ok(Validation::Success { is_new_best: false })
 		}
 	}
 }
@@ -331,32 +332,40 @@ where
 	fn validate(
 		&mut self,
 		header: &Block::Header,
-		mut data: &[u8],
+		data: &[u8],
 	) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
-		if self.relay_chain_interface.is_major_syncing() {
-			return ready(Ok(Validation::Success { is_new_best: false })).boxed()
-		}
+		let relay_chain_interface = self.relay_chain_interface.clone();
+		let mut data = data.to_vec();
+		let header = header.clone();
+		let header_encoded = header.encode();
+		let block_announce_validator = self.clone();
 
-		if data.is_empty() {
-			return self.handle_empty_block_announce_data(header.clone()).boxed()
-		}
+		async move {
+			let relay_chain_is_syncing = relay_chain_interface
+				.is_major_syncing()
+				.await
+				.map_err(|e| {
+					tracing::error!(target: LOG_TARGET, "Unable to determine sync status. {}", e)
+				})
+				.unwrap_or(false);
 
-		let block_announce_data = match BlockAnnounceData::decode_all(&mut data) {
-			Ok(r) => r,
-			Err(err) =>
-				return async move {
-					Err(Box::new(BlockAnnounceError(format!(
+			if relay_chain_is_syncing {
+				return Ok(Validation::Success { is_new_best: false })
+			}
+
+			if data.is_empty() {
+				return block_announce_validator.handle_empty_block_announce_data(header).await
+			}
+
+			let block_announce_data = match BlockAnnounceData::decode_all(&mut data) {
+				Ok(r) => r,
+				Err(err) =>
+					return Err(Box::new(BlockAnnounceError(format!(
 						"Can not decode the `BlockAnnounceData`: {:?}",
 						err
-					))) as Box<_>)
-				}
-				.boxed(),
-		};
+					))) as Box<_>),
+			};
 
-		let relay_chain_interface = self.relay_chain_interface.clone();
-		let header_encoded = header.encode();
-
-		async move {
 			if let Err(e) = block_announce_data.validate(header_encoded) {
 				return Ok(e)
 			}
@@ -370,6 +379,7 @@ where
 
 			block_announce_data
 				.check_signature(&relay_chain_interface)
+				.await
 				.map_err(|e| Box::new(e) as Box<_>)
 		}
 		.boxed()
diff --git a/cumulus/client/network/src/tests.rs b/cumulus/client/network/src/tests.rs
index 34584edd69d..bd52fc0b93b 100644
--- a/cumulus/client/network/src/tests.rs
+++ b/cumulus/client/network/src/tests.rs
@@ -16,15 +16,15 @@
 
 use super::*;
 use async_trait::async_trait;
-use cumulus_relay_chain_interface::WaitError;
+use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
 use cumulus_relay_chain_local::{check_block_in_chain, BlockCheckStatus};
 use cumulus_test_service::runtime::{Block, Hash, Header};
-use futures::{executor::block_on, poll, task::Poll, FutureExt, StreamExt};
+use futures::{executor::block_on, poll, task::Poll, FutureExt, Stream, StreamExt};
 use parking_lot::Mutex;
 use polkadot_node_primitives::{SignedFullStatement, Statement};
 use polkadot_primitives::v1::{
-	Block as PBlock, CandidateCommitments, CandidateDescriptor, CollatorPair,
-	CommittedCandidateReceipt, Hash as PHash, HeadData, Id as ParaId, InboundDownwardMessage,
+	CandidateCommitments, CandidateDescriptor, CollatorPair, CommittedCandidateReceipt,
+	Hash as PHash, HeadData, Header as PHeader, Id as ParaId, InboundDownwardMessage,
 	InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, SessionIndex,
 	SigningContext, ValidationCodeHash, ValidatorId,
 };
@@ -77,53 +77,60 @@ impl DummyRelayChainInterface {
 
 #[async_trait]
 impl RelayChainInterface for DummyRelayChainInterface {
-	fn validators(
+	async fn validators(
 		&self,
 		_: &cumulus_primitives_core::relay_chain::BlockId,
-	) -> Result<Vec<ValidatorId>, sp_api::ApiError> {
+	) -> RelayChainResult<Vec<ValidatorId>> {
 		Ok(self.data.lock().validators.clone())
 	}
 
-	fn block_status(
+	async fn block_status(
 		&self,
 		block_id: cumulus_primitives_core::relay_chain::BlockId,
-	) -> Result<sp_blockchain::BlockStatus, sp_blockchain::Error> {
-		self.relay_backend.blockchain().status(block_id)
+	) -> RelayChainResult<sp_blockchain::BlockStatus> {
+		self.relay_backend
+			.blockchain()
+			.status(block_id)
+			.map_err(RelayChainError::BlockchainError)
 	}
 
-	fn best_block_hash(&self) -> PHash {
-		self.relay_backend.blockchain().info().best_hash
+	async fn best_block_hash(&self) -> RelayChainResult<PHash> {
+		Ok(self.relay_backend.blockchain().info().best_hash)
 	}
 
-	fn retrieve_dmq_contents(&self, _: ParaId, _: PHash) -> Option<Vec<InboundDownwardMessage>> {
+	async fn retrieve_dmq_contents(
+		&self,
+		_: ParaId,
+		_: PHash,
+	) -> RelayChainResult<Vec<InboundDownwardMessage>> {
 		unimplemented!("Not needed for test")
 	}
 
-	fn retrieve_all_inbound_hrmp_channel_contents(
+	async fn retrieve_all_inbound_hrmp_channel_contents(
 		&self,
 		_: ParaId,
 		_: PHash,
-	) -> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
-		Some(BTreeMap::new())
+	) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
+		Ok(BTreeMap::new())
 	}
 
-	fn persisted_validation_data(
+	async fn persisted_validation_data(
 		&self,
 		_: &cumulus_primitives_core::relay_chain::BlockId,
 		_: ParaId,
 		_: OccupiedCoreAssumption,
-	) -> Result<Option<PersistedValidationData>, sp_api::ApiError> {
+	) -> RelayChainResult<Option<PersistedValidationData>> {
 		Ok(Some(PersistedValidationData {
 			parent_head: HeadData(default_header().encode()),
 			..Default::default()
 		}))
 	}
 
-	fn candidate_pending_availability(
+	async fn candidate_pending_availability(
 		&self,
 		_: &cumulus_primitives_core::relay_chain::BlockId,
 		_: ParaId,
-	) -> Result<Option<CommittedCandidateReceipt>, sp_api::ApiError> {
+	) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
 		if self.data.lock().has_pending_availability {
 			Ok(Some(CommittedCandidateReceipt {
 				descriptor: CandidateDescriptor {
@@ -152,60 +159,58 @@ impl RelayChainInterface for DummyRelayChainInterface {
 		}
 	}
 
-	fn session_index_for_child(
+	async fn session_index_for_child(
 		&self,
 		_: &cumulus_primitives_core::relay_chain::BlockId,
-	) -> Result<SessionIndex, sp_api::ApiError> {
+	) -> RelayChainResult<SessionIndex> {
 		Ok(0)
 	}
 
-	fn import_notification_stream(&self) -> sc_client_api::ImportNotifications<PBlock> {
-		self.relay_client.import_notification_stream()
-	}
-
-	fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications<PBlock> {
-		self.relay_client.finality_notification_stream()
+	async fn import_notification_stream(
+		&self,
+	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
+		Ok(Box::pin(
+			self.relay_client
+				.import_notification_stream()
+				.map(|notification| notification.header),
+		))
 	}
 
-	fn storage_changes_notification_stream(
+	async fn finality_notification_stream(
 		&self,
-		filter_keys: Option<&[sc_client_api::StorageKey]>,
-		child_filter_keys: Option<
-			&[(sc_client_api::StorageKey, Option<Vec<sc_client_api::StorageKey>>)],
-		>,
-	) -> sc_client_api::blockchain::Result<sc_client_api::StorageEventStream<PHash>> {
-		self.relay_client
-			.storage_changes_notification_stream(filter_keys, child_filter_keys)
+	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
+		Ok(Box::pin(
+			self.relay_client
+				.finality_notification_stream()
+				.map(|notification| notification.header),
+		))
 	}
 
-	fn is_major_syncing(&self) -> bool {
-		false
+	async fn is_major_syncing(&self) -> RelayChainResult<bool> {
+		Ok(false)
 	}
 
-	fn overseer_handle(&self) -> Option<Handle> {
+	fn overseer_handle(&self) -> RelayChainResult<Option<Handle>> {
 		unimplemented!("Not needed for test")
 	}
 
-	fn get_storage_by_key(
+	async fn get_storage_by_key(
 		&self,
 		_: &polkadot_service::BlockId,
 		_: &[u8],
-	) -> Result<Option<StorageValue>, sp_blockchain::Error> {
+	) -> RelayChainResult<Option<StorageValue>> {
 		unimplemented!("Not needed for test")
 	}
 
-	fn prove_read(
+	async fn prove_read(
 		&self,
 		_: &polkadot_service::BlockId,
 		_: &Vec<Vec<u8>>,
-	) -> Result<Option<sc_client_api::StorageProof>, Box<dyn sp_state_machine::Error>> {
+	) -> RelayChainResult<sc_client_api::StorageProof> {
 		unimplemented!("Not needed for test")
 	}
 
-	async fn wait_for_block(
-		&self,
-		hash: PHash,
-	) -> Result<(), cumulus_relay_chain_interface::WaitError> {
+	async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
 		let mut listener = match check_block_in_chain(
 			self.relay_backend.clone(),
 			self.relay_client.clone(),
@@ -219,16 +224,32 @@ impl RelayChainInterface for DummyRelayChainInterface {
 
 		loop {
 			futures::select! {
-				_ = timeout => return Err(WaitError::Timeout(hash)),
+				_ = timeout => return Err(RelayChainError::WaitTimeout(hash)),
 				evt = listener.next() => match evt {
 					Some(evt) if evt.hash == hash => return Ok(()),
 					// Not the event we waited on.
 					Some(_) => continue,
-					None => return Err(WaitError::ImportListenerClosed(hash)),
+					None => return Err(RelayChainError::ImportListenerClosed(hash)),
 				}
 			}
 		}
 	}
+
+	async fn new_best_notification_stream(
+		&self,
+	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
+		let notifications_stream =
+			self.relay_client
+				.import_notification_stream()
+				.filter_map(|notification| async move {
+					if notification.is_new_best {
+						Some(notification.header)
+					} else {
+						None
+					}
+				});
+		Ok(Box::pin(notifications_stream))
+	}
 }
 
 fn make_validator_and_api(
@@ -274,6 +295,7 @@ async fn make_gossip_message_and_header(
 	.unwrap();
 	let session_index = relay_chain_interface
 		.session_index_for_child(&BlockId::Hash(relay_parent))
+		.await
 		.unwrap();
 	let signing_context = SigningContext { parent_hash: relay_parent, session_index };
 
@@ -442,9 +464,9 @@ fn check_statement_is_correctly_signed() {
 	assert_eq!(Validation::Failure { disconnect: true }, res.unwrap());
 }
 
-#[test]
-fn check_statement_seconded() {
-	let (mut validator, api) = make_validator_and_api();
+#[tokio::test]
+async fn check_statement_seconded() {
+	let (mut validator, relay_chain_interface) = make_validator_and_api();
 	let header = default_header();
 	let relay_parent = H256::from_low_u64_be(1);
 
@@ -455,7 +477,10 @@ fn check_statement_seconded() {
 		Some(&Sr25519Keyring::Alice.to_seed()),
 	)
 	.unwrap();
-	let session_index = api.session_index_for_child(&BlockId::Hash(relay_parent)).unwrap();
+	let session_index = relay_chain_interface
+		.session_index_for_child(&BlockId::Hash(relay_parent))
+		.await
+		.unwrap();
 	let signing_context = SigningContext { parent_hash: relay_parent, session_index };
 
 	let statement = Statement::Valid(Default::default());
diff --git a/cumulus/client/pov-recovery/src/lib.rs b/cumulus/client/pov-recovery/src/lib.rs
index 4d3f67ea06e..d5d1a19b1d9 100644
--- a/cumulus/client/pov-recovery/src/lib.rs
+++ b/cumulus/client/pov-recovery/src/lib.rs
@@ -56,7 +56,7 @@ use polkadot_primitives::v1::{
 };
 
 use cumulus_primitives_core::ParachainBlockData;
-use cumulus_relay_chain_interface::RelayChainInterface;
+use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
 
 use codec::Decode;
 use futures::{select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
@@ -381,7 +381,14 @@ where
 		let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
 		let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
 		let pending_candidates =
-			pending_candidates(self.relay_chain_interface.clone(), self.para_id).fuse();
+			match pending_candidates(self.relay_chain_interface.clone(), self.para_id).await {
+				Ok(pending_candidate_stream) => pending_candidate_stream.fuse(),
+				Err(err) => {
+					tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
+					return
+				},
+			};
+
 		futures::pin_mut!(pending_candidates);
 
 		loop {
@@ -435,28 +442,41 @@ where
 }
 
 /// Returns a stream over pending candidates for the parachain corresponding to `para_id`.
-fn pending_candidates(
-	relay_chain_client: impl RelayChainInterface,
+async fn pending_candidates(
+	relay_chain_client: impl RelayChainInterface + Clone,
 	para_id: ParaId,
-) -> impl Stream<Item = (CommittedCandidateReceipt, SessionIndex)> {
-	relay_chain_client.import_notification_stream().filter_map(move |n| {
-		let res = relay_chain_client
-			.candidate_pending_availability(&BlockId::hash(n.hash), para_id)
-			.and_then(|pa| {
-				relay_chain_client
-					.session_index_for_child(&BlockId::hash(n.hash))
-					.map(|v| pa.map(|pa| (pa, v)))
-			})
-			.map_err(|e| {
-				tracing::error!(
-					target: LOG_TARGET,
-					error = ?e,
-					"Failed fetch pending candidates.",
-				)
-			})
-			.ok()
-			.flatten();
-
-		async move { res }
-	})
+) -> RelayChainResult<impl Stream<Item = (CommittedCandidateReceipt, SessionIndex)>> {
+	let import_notification_stream = relay_chain_client.import_notification_stream().await?;
+
+	let filtered_stream = import_notification_stream.filter_map(move |n| {
+		let client_for_closure = relay_chain_client.clone();
+		async move {
+			let block_id = BlockId::hash(n.hash());
+			let pending_availability_result = client_for_closure
+				.candidate_pending_availability(&block_id, para_id)
+				.await
+				.map_err(|e| {
+					tracing::error!(
+						target: LOG_TARGET,
+						error = ?e,
+						"Failed to fetch pending candidates.",
+					)
+				});
+			let session_index_result =
+				client_for_closure.session_index_for_child(&block_id).await.map_err(|e| {
+					tracing::error!(
+						target: LOG_TARGET,
+						error = ?e,
+						"Failed to fetch session index.",
+					)
+				});
+
+			if let Ok(Some(candidate)) = pending_availability_result {
+				session_index_result.map(|session_index| (candidate, session_index)).ok()
+			} else {
+				None
+			}
+		}
+	});
+	Ok(filtered_stream)
 }
diff --git a/cumulus/client/relay-chain-interface/Cargo.toml b/cumulus/client/relay-chain-interface/Cargo.toml
index a962155ed1e..b76ebcc3137 100644
--- a/cumulus/client/relay-chain-interface/Cargo.toml
+++ b/cumulus/client/relay-chain-interface/Cargo.toml
@@ -15,7 +15,10 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master
 sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" }
 
+futures = "0.3.1"
 parking_lot = "0.11.1"
 derive_more = "0.99.2"
 async-trait = "0.1.52"
+thiserror = "1.0.30"
diff --git a/cumulus/client/relay-chain-interface/src/lib.rs b/cumulus/client/relay-chain-interface/src/lib.rs
index 185e9a6f0a3..13b0551b38c 100644
--- a/cumulus/client/relay-chain-interface/src/lib.rs
+++ b/cumulus/client/relay-chain-interface/src/lib.rs
@@ -14,136 +14,140 @@
 // You should have received a copy of the GNU General Public License
 // along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.
 
-use std::{collections::BTreeMap, sync::Arc};
+use std::{collections::BTreeMap, pin::Pin, sync::Arc};
 
 use cumulus_primitives_core::{
 	relay_chain::{
 		v1::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId},
-		Block as PBlock, BlockId, Hash as PHash, InboundHrmpMessage,
+		BlockId, Hash as PHash, Header as PHeader, InboundHrmpMessage,
 	},
 	InboundDownwardMessage, ParaId, PersistedValidationData,
 };
 use polkadot_overseer::Handle as OverseerHandle;
 use sc_client_api::{blockchain::BlockStatus, StorageProof};
 
+use futures::Stream;
+
+use async_trait::async_trait;
 use sp_api::ApiError;
 use sp_state_machine::StorageValue;
 
-use async_trait::async_trait;
+pub type RelayChainResult<T> = Result<T, RelayChainError>;
 
-#[derive(Debug, derive_more::Display)]
-pub enum WaitError {
-	#[display(fmt = "Timeout while waiting for relay-chain block `{}` to be imported.", _0)]
-	Timeout(PHash),
-	#[display(
-		fmt = "Import listener closed while waiting for relay-chain block `{}` to be imported.",
-		_0
-	)]
+#[derive(thiserror::Error, Debug)]
+pub enum RelayChainError {
+	#[error("Error occured while calling relay chain runtime: {0:?}")]
+	ApiError(#[from] ApiError),
+	#[error("Timeout while waiting for relay-chain block `{0}` to be imported.")]
+	WaitTimeout(PHash),
+	#[error("Import listener closed while waiting for relay-chain block `{0}` to be imported.")]
 	ImportListenerClosed(PHash),
-	#[display(
-		fmt = "Blockchain returned an error while waiting for relay-chain block `{}` to be imported: {:?}",
-		_0,
-		_1
-	)]
-	BlockchainError(PHash, sp_blockchain::Error),
+	#[error("Blockchain returned an error while waiting for relay-chain block `{0}` to be imported: {1:?}")]
+	WaitBlockchainError(PHash, sp_blockchain::Error),
+	#[error("Blockchain returned an error: {0:?}")]
+	BlockchainError(#[from] sp_blockchain::Error),
+	#[error("State machine error occured: {0:?}")]
+	StateMachineError(Box<dyn sp_state_machine::Error>),
+	#[error("Unspecified error occured: {0:?}")]
+	GenericError(String),
 }
 
 /// Trait that provides all necessary methods for interaction between collator and relay chain.
 #[async_trait]
 pub trait RelayChainInterface: Send + Sync {
 	/// Fetch a storage item by key.
-	fn get_storage_by_key(
+	async fn get_storage_by_key(
 		&self,
 		block_id: &BlockId,
 		key: &[u8],
-	) -> Result<Option<StorageValue>, sp_blockchain::Error>;
+	) -> RelayChainResult<Option<StorageValue>>;
 
 	/// Fetch a vector of current validators.
-	fn validators(&self, block_id: &BlockId) -> Result<Vec<ValidatorId>, ApiError>;
+	async fn validators(&self, block_id: &BlockId) -> RelayChainResult<Vec<ValidatorId>>;
 
 	/// Get the status of a given block.
-	fn block_status(&self, block_id: BlockId) -> Result<BlockStatus, sp_blockchain::Error>;
+	async fn block_status(&self, block_id: BlockId) -> RelayChainResult<BlockStatus>;
 
 	/// Get the hash of the current best block.
-	fn best_block_hash(&self) -> PHash;
+	async fn best_block_hash(&self) -> RelayChainResult<PHash>;
 
 	/// Returns the whole contents of the downward message queue for the parachain we are collating
 	/// for.
 	///
 	/// Returns `None` in case of an error.
-	fn retrieve_dmq_contents(
+	async fn retrieve_dmq_contents(
 		&self,
 		para_id: ParaId,
 		relay_parent: PHash,
-	) -> Option<Vec<InboundDownwardMessage>>;
+	) -> RelayChainResult<Vec<InboundDownwardMessage>>;
 
 	/// Returns channels contents for each inbound HRMP channel addressed to the parachain we are
 	/// collating for.
 	///
 	/// Empty channels are also included.
-	fn retrieve_all_inbound_hrmp_channel_contents(
+	async fn retrieve_all_inbound_hrmp_channel_contents(
 		&self,
 		para_id: ParaId,
 		relay_parent: PHash,
-	) -> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>>;
+	) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>>;
 
 	/// Yields the persisted validation data for the given `ParaId` along with an assumption that
 	/// should be used if the para currently occupies a core.
 	///
 	/// Returns `None` if either the para is not registered or the assumption is `Freed`
 	/// and the para already occupies a core.
-	fn persisted_validation_data(
+	async fn persisted_validation_data(
 		&self,
 		block_id: &BlockId,
 		para_id: ParaId,
 		_: OccupiedCoreAssumption,
-	) -> Result<Option<PersistedValidationData>, ApiError>;
+	) -> RelayChainResult<Option<PersistedValidationData>>;
 
 	/// Get the receipt of a candidate pending availability. This returns `Some` for any paras
 	/// assigned to occupied cores in `availability_cores` and `None` otherwise.
-	fn candidate_pending_availability(
+	async fn candidate_pending_availability(
 		&self,
 		block_id: &BlockId,
 		para_id: ParaId,
-	) -> Result<Option<CommittedCandidateReceipt>, ApiError>;
+	) -> RelayChainResult<Option<CommittedCandidateReceipt>>;
 
 	/// Returns the session index expected at a child of the block.
-	fn session_index_for_child(&self, block_id: &BlockId) -> Result<SessionIndex, ApiError>;
+	async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult<SessionIndex>;
 
 	/// Get a stream of import block notifications.
-	fn import_notification_stream(&self) -> sc_client_api::ImportNotifications<PBlock>;
+	async fn import_notification_stream(
+		&self,
+	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>>;
+
+	/// Get a stream of new best block notifications.
+	async fn new_best_notification_stream(
+		&self,
+	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>>;
 
 	/// Wait for a block with a given hash in the relay chain.
 	///
 	/// This method returns immediately on error or if the block is already
 	/// reported to be in chain. Otherwise, it waits for the block to arrive.
-	async fn wait_for_block(&self, hash: PHash) -> Result<(), WaitError>;
+	async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()>;
 
 	/// Get a stream of finality notifications.
-	fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications<PBlock>;
-
-	/// Get a stream of storage change notifications.
-	fn storage_changes_notification_stream(
+	async fn finality_notification_stream(
 		&self,
-		filter_keys: Option<&[sc_client_api::StorageKey]>,
-		child_filter_keys: Option<
-			&[(sc_client_api::StorageKey, Option<Vec<sc_client_api::StorageKey>>)],
-		>,
-	) -> sc_client_api::blockchain::Result<sc_client_api::StorageEventStream<PHash>>;
+	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>>;
 
 	/// Whether the synchronization service is undergoing major sync.
 	/// Returns true if so.
-	fn is_major_syncing(&self) -> bool;
+	async fn is_major_syncing(&self) -> RelayChainResult<bool>;
 
 	/// Get a handle to the overseer.
-	fn overseer_handle(&self) -> Option<OverseerHandle>;
+	fn overseer_handle(&self) -> RelayChainResult<Option<OverseerHandle>>;
 
 	/// Generate a storage read proof.
-	fn prove_read(
+	async fn prove_read(
 		&self,
 		block_id: &BlockId,
 		relevant_keys: &Vec<Vec<u8>>,
-	) -> Result<Option<StorageProof>, Box<dyn sp_state_machine::Error>>;
+	) -> RelayChainResult<StorageProof>;
 }
 
 #[async_trait]
@@ -151,98 +155,100 @@ impl<T> RelayChainInterface for Arc<T>
 where
 	T: RelayChainInterface + ?Sized,
 {
-	fn retrieve_dmq_contents(
+	async fn retrieve_dmq_contents(
 		&self,
 		para_id: ParaId,
 		relay_parent: PHash,
-	) -> Option<Vec<InboundDownwardMessage>> {
-		(**self).retrieve_dmq_contents(para_id, relay_parent)
+	) -> RelayChainResult<Vec<InboundDownwardMessage>> {
+		(**self).retrieve_dmq_contents(para_id, relay_parent).await
 	}
 
-	fn retrieve_all_inbound_hrmp_channel_contents(
+	async fn retrieve_all_inbound_hrmp_channel_contents(
 		&self,
 		para_id: ParaId,
 		relay_parent: PHash,
-	) -> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
-		(**self).retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent)
+	) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
+		(**self).retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent).await
 	}
 
-	fn persisted_validation_data(
+	async fn persisted_validation_data(
 		&self,
 		block_id: &BlockId,
 		para_id: ParaId,
 		occupied_core_assumption: OccupiedCoreAssumption,
-	) -> Result<Option<PersistedValidationData>, ApiError> {
-		(**self).persisted_validation_data(block_id, para_id, occupied_core_assumption)
+	) -> RelayChainResult<Option<PersistedValidationData>> {
+		(**self)
+			.persisted_validation_data(block_id, para_id, occupied_core_assumption)
+			.await
 	}
 
-	fn candidate_pending_availability(
+	async fn candidate_pending_availability(
 		&self,
 		block_id: &BlockId,
 		para_id: ParaId,
-	) -> Result<Option<CommittedCandidateReceipt>, ApiError> {
-		(**self).candidate_pending_availability(block_id, para_id)
+	) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
+		(**self).candidate_pending_availability(block_id, para_id).await
 	}
 
-	fn session_index_for_child(&self, block_id: &BlockId) -> Result<SessionIndex, ApiError> {
-		(**self).session_index_for_child(block_id)
+	async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult<SessionIndex> {
+		(**self).session_index_for_child(block_id).await
 	}
 
-	fn validators(&self, block_id: &BlockId) -> Result<Vec<ValidatorId>, ApiError> {
-		(**self).validators(block_id)
+	async fn validators(&self, block_id: &BlockId) -> RelayChainResult<Vec<ValidatorId>> {
+		(**self).validators(block_id).await
 	}
 
-	fn import_notification_stream(&self) -> sc_client_api::ImportNotifications<PBlock> {
-		(**self).import_notification_stream()
-	}
-
-	fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications<PBlock> {
-		(**self).finality_notification_stream()
+	async fn import_notification_stream(
+		&self,
+	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
+		(**self).import_notification_stream().await
 	}
 
-	fn storage_changes_notification_stream(
+	async fn finality_notification_stream(
 		&self,
-		filter_keys: Option<&[sc_client_api::StorageKey]>,
-		child_filter_keys: Option<
-			&[(sc_client_api::StorageKey, Option<Vec<sc_client_api::StorageKey>>)],
-		>,
-	) -> sc_client_api::blockchain::Result<sc_client_api::StorageEventStream<PHash>> {
-		(**self).storage_changes_notification_stream(filter_keys, child_filter_keys)
+	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
+		(**self).finality_notification_stream().await
 	}
 
-	fn best_block_hash(&self) -> PHash {
-		(**self).best_block_hash()
+	async fn best_block_hash(&self) -> RelayChainResult<PHash> {
+		(**self).best_block_hash().await
 	}
 
-	fn block_status(&self, block_id: BlockId) -> Result<BlockStatus, sp_blockchain::Error> {
-		(**self).block_status(block_id)
+	async fn block_status(&self, block_id: BlockId) -> RelayChainResult<BlockStatus> {
+		(**self).block_status(block_id).await
 	}
 
-	fn is_major_syncing(&self) -> bool {
-		(**self).is_major_syncing()
+	async fn is_major_syncing(&self) -> RelayChainResult<bool> {
+		(**self).is_major_syncing().await
 	}
 
-	fn overseer_handle(&self) -> Option<OverseerHandle> {
+	fn overseer_handle(&self) -> RelayChainResult<Option<OverseerHandle>> {
 		(**self).overseer_handle()
 	}
 
-	fn get_storage_by_key(
+	async fn get_storage_by_key(
 		&self,
 		block_id: &BlockId,
 		key: &[u8],
-	) -> Result<Option<StorageValue>, sp_blockchain::Error> {
-		(**self).get_storage_by_key(block_id, key)
+	) -> RelayChainResult<Option<StorageValue>> {
+		(**self).get_storage_by_key(block_id, key).await
 	}
 
-	fn prove_read(
+	async fn prove_read(
 		&self,
 		block_id: &BlockId,
 		relevant_keys: &Vec<Vec<u8>>,
-	) -> Result<Option<StorageProof>, Box<dyn sp_state_machine::Error>> {
-		(**self).prove_read(block_id, relevant_keys)
+	) -> RelayChainResult<StorageProof> {
+		(**self).prove_read(block_id, relevant_keys).await
 	}
 
-	async fn wait_for_block(&self, hash: PHash) -> Result<(), WaitError> {
+	async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
 		(**self).wait_for_block(hash).await
 	}
+
+	async fn new_best_notification_stream(
+		&self,
+	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
+		(**self).new_best_notification_stream().await
+	}
 }
diff --git a/cumulus/client/relay-chain-local/src/lib.rs b/cumulus/client/relay-chain-local/src/lib.rs
index 5177d1f4afb..903a8ff3c66 100644
--- a/cumulus/client/relay-chain-local/src/lib.rs
+++ b/cumulus/client/relay-chain-local/src/lib.rs
@@ -14,19 +14,19 @@
 // You should have received a copy of the GNU General Public License
 // along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.
 
-use std::{sync::Arc, time::Duration};
+use std::{pin::Pin, sync::Arc, time::Duration};
 
 use async_trait::async_trait;
 use cumulus_primitives_core::{
 	relay_chain::{
 		v1::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId},
 		v2::ParachainHost,
-		Block as PBlock, BlockId, Hash as PHash, InboundHrmpMessage,
+		Block as PBlock, BlockId, Hash as PHash, Header as PHeader, InboundHrmpMessage,
 	},
 	InboundDownwardMessage, ParaId, PersistedValidationData,
 };
-use cumulus_relay_chain_interface::{RelayChainInterface, WaitError};
-use futures::{FutureExt, StreamExt};
+use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
+use futures::{FutureExt, Stream, StreamExt};
 use parking_lot::Mutex;
 use polkadot_client::{ClientHandle, ExecuteWithClient, FullBackend};
 use polkadot_service::{
@@ -37,12 +37,11 @@ use sc_client_api::{
 	StorageProof, UsageProvider,
 };
 use sc_telemetry::TelemetryWorkerHandle;
-use sp_api::{ApiError, ProvideRuntimeApi};
+use sp_api::ProvideRuntimeApi;
 use sp_consensus::SyncOracle;
 use sp_core::{sp_std::collections::btree_map::BTreeMap, Pair};
 use sp_state_machine::{Backend as StateBackend, StorageValue};
 
-const LOG_TARGET: &str = "relay-chain-local";
 /// The timeout in seconds after that the waiting for a block should be aborted.
 const TIMEOUT_IN_SECONDS: u64 = 6;
 
@@ -88,158 +87,117 @@ where
 		+ Send,
 	Client::Api: ParachainHost<PBlock> + BabeApi<PBlock>,
 {
-	fn retrieve_dmq_contents(
+	async fn retrieve_dmq_contents(
 		&self,
 		para_id: ParaId,
 		relay_parent: PHash,
-	) -> Option<Vec<InboundDownwardMessage>> {
-		self.full_client
-			.runtime_api()
-			.dmq_contents_with_context(
-				&BlockId::hash(relay_parent),
-				sp_core::ExecutionContext::Importing,
-				para_id,
-			)
-			.map_err(|e| {
-				tracing::error!(
-					target: LOG_TARGET,
-					relay_parent = ?relay_parent,
-					error = ?e,
-					"An error occured during requesting the downward messages.",
-				);
-			})
-			.ok()
+	) -> RelayChainResult<Vec<InboundDownwardMessage>> {
+		Ok(self.full_client.runtime_api().dmq_contents_with_context(
+			&BlockId::hash(relay_parent),
+			sp_core::ExecutionContext::Importing,
+			para_id,
+		)?)
 	}
 
-	fn retrieve_all_inbound_hrmp_channel_contents(
+	async fn retrieve_all_inbound_hrmp_channel_contents(
 		&self,
 		para_id: ParaId,
 		relay_parent: PHash,
-	) -> Option<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
-		self.full_client
-			.runtime_api()
-			.inbound_hrmp_channels_contents_with_context(
-				&BlockId::hash(relay_parent),
-				sp_core::ExecutionContext::Importing,
-				para_id,
-			)
-			.map_err(|e| {
-				tracing::error!(
-					target: LOG_TARGET,
-					relay_parent = ?relay_parent,
-					error = ?e,
-					"An error occured during requesting the inbound HRMP messages.",
-				);
-			})
-			.ok()
+	) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
+		Ok(self.full_client.runtime_api().inbound_hrmp_channels_contents_with_context(
+			&BlockId::hash(relay_parent),
+			sp_core::ExecutionContext::Importing,
+			para_id,
+		)?)
 	}
 
-	fn persisted_validation_data(
+	async fn persisted_validation_data(
 		&self,
 		block_id: &BlockId,
 		para_id: ParaId,
 		occupied_core_assumption: OccupiedCoreAssumption,
-	) -> Result<Option<PersistedValidationData>, ApiError> {
-		self.full_client.runtime_api().persisted_validation_data(
+	) -> RelayChainResult<Option<PersistedValidationData>> {
+		Ok(self.full_client.runtime_api().persisted_validation_data(
 			block_id,
 			para_id,
 			occupied_core_assumption,
-		)
+		)?)
 	}
 
-	fn candidate_pending_availability(
+	async fn candidate_pending_availability(
 		&self,
 		block_id: &BlockId,
 		para_id: ParaId,
-	) -> Result<Option<CommittedCandidateReceipt>, ApiError> {
-		self.full_client.runtime_api().candidate_pending_availability(block_id, para_id)
-	}
-
-	fn session_index_for_child(&self, block_id: &BlockId) -> Result<SessionIndex, ApiError> {
-		self.full_client.runtime_api().session_index_for_child(block_id)
+	) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
+		Ok(self
+			.full_client
+			.runtime_api()
+			.candidate_pending_availability(block_id, para_id)?)
 	}
 
-	fn validators(&self, block_id: &BlockId) -> Result<Vec<ValidatorId>, ApiError> {
-		self.full_client.runtime_api().validators(block_id)
+	async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult<SessionIndex> {
+		Ok(self.full_client.runtime_api().session_index_for_child(block_id)?)
 	}
 
-	fn import_notification_stream(&self) -> sc_client_api::ImportNotifications<PBlock> {
-		self.full_client.import_notification_stream()
+	async fn validators(&self, block_id: &BlockId) -> RelayChainResult<Vec<ValidatorId>> {
+		Ok(self.full_client.runtime_api().validators(block_id)?)
 	}
 
-	fn finality_notification_stream(&self) -> sc_client_api::FinalityNotifications<PBlock> {
-		self.full_client.finality_notification_stream()
+	async fn import_notification_stream(
+		&self,
+	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
+		let notification_stream = self
+			.full_client
+			.import_notification_stream()
+			.map(|notification| notification.header);
+		Ok(Box::pin(notification_stream))
 	}
 
-	fn storage_changes_notification_stream(
+	async fn finality_notification_stream(
 		&self,
-		filter_keys: Option<&[sc_client_api::StorageKey]>,
-		child_filter_keys: Option<
-			&[(sc_client_api::StorageKey, Option<Vec<sc_client_api::StorageKey>>)],
-		>,
-	) -> sc_client_api::blockchain::Result<sc_client_api::StorageEventStream<PHash>> {
-		self.full_client
-			.storage_changes_notification_stream(filter_keys, child_filter_keys)
+	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
+		let notification_stream = self
+			.full_client
+			.finality_notification_stream()
+			.map(|notification| notification.header);
+		Ok(Box::pin(notification_stream))
 	}
 
-	fn best_block_hash(&self) -> PHash {
-		self.backend.blockchain().info().best_hash
+	async fn best_block_hash(&self) -> RelayChainResult<PHash> {
+		Ok(self.backend.blockchain().info().best_hash)
 	}
 
-	fn block_status(&self, block_id: BlockId) -> Result<BlockStatus, sp_blockchain::Error> {
-		self.backend.blockchain().status(block_id)
+	async fn block_status(&self, block_id: BlockId) -> RelayChainResult<BlockStatus> {
+		Ok(self.backend.blockchain().status(block_id)?)
 	}
 
-	fn is_major_syncing(&self) -> bool {
+	async fn is_major_syncing(&self) -> RelayChainResult<bool> {
 		let mut network = self.sync_oracle.lock();
-		network.is_major_syncing()
+		Ok(network.is_major_syncing())
 	}
 
-	fn overseer_handle(&self) -> Option<Handle> {
-		self.overseer_handle.clone()
+	fn overseer_handle(&self) -> RelayChainResult<Option<Handle>> {
+		Ok(self.overseer_handle.clone())
 	}
 
-	fn get_storage_by_key(
+	async fn get_storage_by_key(
 		&self,
 		block_id: &BlockId,
 		key: &[u8],
-	) -> Result<Option<StorageValue>, sp_blockchain::Error> {
+	) -> RelayChainResult<Option<StorageValue>> {
 		let state = self.backend.state_at(*block_id)?;
-		state.storage(key).map_err(sp_blockchain::Error::Storage)
+		state.storage(key).map_err(RelayChainError::GenericError)
 	}
 
-	fn prove_read(
+	async fn prove_read(
 		&self,
 		block_id: &BlockId,
 		relevant_keys: &Vec<Vec<u8>>,
-	) -> Result<Option<StorageProof>, Box<dyn sp_state_machine::Error>> {
-		let state_backend = self
-			.backend
-			.state_at(*block_id)
-			.map_err(|e| {
-				tracing::error!(
-					target: LOG_TARGET,
-					relay_parent = ?block_id,
-					error = ?e,
-					"Cannot obtain the state of the relay chain.",
-				);
-			})
-			.ok();
-
-		match state_backend {
-			Some(state) => sp_state_machine::prove_read(state, relevant_keys)
-				.map_err(|e| {
-					tracing::error!(
-						target: LOG_TARGET,
-						relay_parent = ?block_id,
-						error = ?e,
-						"Failed to collect required relay chain state storage proof.",
-					);
-					e
-				})
-				.map(Some),
-			None => Ok(None),
-		}
+	) -> RelayChainResult<StorageProof> {
+		let state_backend = self.backend.state_at(*block_id)?;
+
+		sp_state_machine::prove_read(state_backend, relevant_keys)
+			.map_err(RelayChainError::StateMachineError)
 	}
 
 	/// Wait for a given relay chain block in an async way.
@@ -259,7 +217,7 @@ where
 	///
 	/// The timeout is set to 6 seconds. This should be enough time to import the block in the current
 	/// round and if not, the new round of the relay chain already started anyway.
-	async fn wait_for_block(&self, hash: PHash) -> Result<(), WaitError> {
+	async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
 		let mut listener =
 			match check_block_in_chain(self.backend.clone(), self.full_client.clone(), hash)? {
 				BlockCheckStatus::InChain => return Ok(()),
@@ -270,16 +228,28 @@ where
 
 		loop {
 			futures::select! {
-				_ = timeout => return Err(WaitError::Timeout(hash)),
+				_ = timeout => return Err(RelayChainError::WaitTimeout(hash)),
 				evt = listener.next() => match evt {
 					Some(evt) if evt.hash == hash => return Ok(()),
 					// Not the event we waited on.
 					Some(_) => continue,
-					None => return Err(WaitError::ImportListenerClosed(hash)),
+					None => return Err(RelayChainError::ImportListenerClosed(hash)),
 				}
 			}
 		}
 	}
+
+	async fn new_best_notification_stream(
+		&self,
+	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
+		let notifications_stream =
+			self.full_client
+				.import_notification_stream()
+				.filter_map(|notification| async move {
+					notification.is_new_best.then(|| notification.header)
+				});
+		Ok(Box::pin(notifications_stream))
+	}
 }
 
 pub enum BlockCheckStatus {
@@ -294,16 +264,15 @@ pub fn check_block_in_chain<Client>(
 	backend: Arc<FullBackend>,
 	client: Arc<Client>,
 	hash: PHash,
-) -> Result<BlockCheckStatus, WaitError>
+) -> RelayChainResult<BlockCheckStatus>
 where
 	Client: BlockchainEvents<PBlock>,
 {
 	let _lock = backend.get_import_lock().read();
 
 	let block_id = BlockId::Hash(hash);
-	match backend.blockchain().status(block_id) {
-		Ok(BlockStatus::InChain) => return Ok(BlockCheckStatus::InChain),
-		Err(err) => return Err(WaitError::BlockchainError(hash, err)),
+	match backend.blockchain().status(block_id)? {
+		BlockStatus::InChain => return Ok(BlockCheckStatus::InChain),
 		_ => {},
 	}
 
@@ -495,7 +464,7 @@ mod tests {
 
 		assert!(matches!(
 			block_on(relay_chain_interface.wait_for_block(hash)),
-			Err(WaitError::Timeout(_))
+			Err(RelayChainError::WaitTimeout(_))
 		));
 	}
 
diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs
index 5b050e75aa5..08cd8584f22 100644
--- a/cumulus/client/service/src/lib.rs
+++ b/cumulus/client/service/src/lib.rs
@@ -107,10 +107,13 @@ where
 		.spawn_essential_handle()
 		.spawn("cumulus-consensus", None, consensus);
 
+	let overseer_handle = relay_chain_interface
+		.overseer_handle()
+		.map_err(|e| sc_service::Error::Application(Box::new(e)))?
+		.ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?;
+
 	let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new(
-		relay_chain_interface
-			.overseer_handle()
-			.ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?,
+		overseer_handle.clone(),
 		// We want that collators wait at maximum the relay chain slot duration before starting
 		// to recover blocks.
 		cumulus_client_pov_recovery::RecoveryDelay::WithMax { max: relay_chain_slot_duration },
@@ -128,9 +131,7 @@ where
 		runtime_api: client.clone(),
 		block_status,
 		announce_block,
-		overseer_handle: relay_chain_interface
-			.overseer_handle()
-			.ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?,
+		overseer_handle,
 		spawner,
 		para_id,
 		key: collator_key,
@@ -192,10 +193,13 @@ where
 		.spawn_essential_handle()
 		.spawn("cumulus-consensus", None, consensus);
 
+	let overseer_handle = relay_chain_interface
+		.overseer_handle()
+		.map_err(|e| sc_service::Error::Application(Box::new(e)))?
+		.ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?;
+
 	let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new(
-		relay_chain_interface
-			.overseer_handle()
-			.ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?,
+		overseer_handle,
 		// Full nodes should at least wait 2.5 minutes (assuming 6 seconds slot duration) and
 		// in maximum 5 minutes before starting to recover blocks. Collators should already start
 		// the recovery way before full nodes try to recover a certain block and then share the
diff --git a/cumulus/pallets/parachain-system/src/lib.rs b/cumulus/pallets/parachain-system/src/lib.rs
index b30d0777076..7c261e8583c 100644
--- a/cumulus/pallets/parachain-system/src/lib.rs
+++ b/cumulus/pallets/parachain-system/src/lib.rs
@@ -605,7 +605,7 @@ pub mod pallet {
 	#[pallet::genesis_build]
 	impl<T: Config> GenesisBuild<T> for GenesisConfig {
 		fn build(&self) {
-			//TODO: Remove after https://github.com/paritytech/cumulus/issues/479
+			// TODO: Remove after https://github.com/paritytech/cumulus/issues/479
 			sp_io::storage::set(b":c", &[]);
 		}
 	}
diff --git a/cumulus/parachain-template/node/src/service.rs b/cumulus/parachain-template/node/src/service.rs
index d0c0826e32c..400daeaa67a 100644
--- a/cumulus/parachain-template/node/src/service.rs
+++ b/cumulus/parachain-template/node/src/service.rs
@@ -436,14 +436,15 @@ pub async fn start_parachain_node(
 				BuildAuraConsensusParams {
 					proposer_factory,
 					create_inherent_data_providers: move |_, (relay_parent, validation_data)| {
-						let parachain_inherent =
+						let relay_chain_interface = relay_chain_interface.clone();
+						async move {
+							let parachain_inherent =
 							cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
 								relay_parent,
 								&relay_chain_interface,
 								&validation_data,
 								id,
-							);
-						async move {
+							).await;
 							let time = sp_timestamp::InherentDataProvider::from_system_time();
 
 							let slot =
diff --git a/cumulus/polkadot-parachains/src/service.rs b/cumulus/polkadot-parachains/src/service.rs
index 9f81b9af0d7..a67b20c11e5 100644
--- a/cumulus/polkadot-parachains/src/service.rs
+++ b/cumulus/polkadot-parachains/src/service.rs
@@ -738,14 +738,15 @@ pub async fn start_rococo_parachain_node(
 			>(BuildAuraConsensusParams {
 				proposer_factory,
 				create_inherent_data_providers: move |_, (relay_parent, validation_data)| {
+						let relay_chain_interface = relay_chain_interface.clone();
+					async move {
 					let parachain_inherent =
 					cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
 						relay_parent,
 						&relay_chain_interface,
 						&validation_data,
 						id,
-					);
-					async move {
+					).await;
 						let time = sp_timestamp::InherentDataProvider::from_system_time();
 
 						let slot =
@@ -875,14 +876,15 @@ where
 					block_import: client.clone(),
 					relay_chain_interface: relay_chain_interface.clone(),
 					create_inherent_data_providers: move |_, (relay_parent, validation_data)| {
-						let parachain_inherent =
+						let relay_chain_interface = relay_chain_interface.clone();
+						async move {
+							let parachain_inherent =
 							cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
 								relay_parent,
 								&relay_chain_interface,
 								&validation_data,
 								id,
-							);
-						async move {
+							).await;
 							let parachain_inherent = parachain_inherent.ok_or_else(|| {
 								Box::<dyn std::error::Error + Send + Sync>::from(
 									"Failed to create parachain inherent",
@@ -1157,14 +1159,15 @@ where
 						proposer_factory,
 						create_inherent_data_providers:
 							move |_, (relay_parent, validation_data)| {
-								let parachain_inherent =
+								let relay_chain_for_aura = relay_chain_for_aura.clone();
+								async move {
+									let parachain_inherent =
 							cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
 								relay_parent,
 								&relay_chain_for_aura,
 								&validation_data,
 								id,
-							);
-								async move {
+							).await;
 									let time =
 										sp_timestamp::InherentDataProvider::from_system_time();
 
@@ -1216,14 +1219,15 @@ where
 						relay_chain_interface: relay_chain_interface.clone(),
 						create_inherent_data_providers:
 							move |_, (relay_parent, validation_data)| {
-								let parachain_inherent =
+								let relay_chain_interface = relay_chain_interface.clone();
+								async move {
+									let parachain_inherent =
 									cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
 										relay_parent,
 										&relay_chain_interface,
 										&validation_data,
 										id,
-									);
-								async move {
+									).await;
 									let parachain_inherent =
 										parachain_inherent.ok_or_else(|| {
 											Box::<dyn std::error::Error + Send + Sync>::from(
diff --git a/cumulus/primitives/parachain-inherent/src/client_side.rs b/cumulus/primitives/parachain-inherent/src/client_side.rs
index dab368dc6cd..b14c2257654 100644
--- a/cumulus/primitives/parachain-inherent/src/client_side.rs
+++ b/cumulus/primitives/parachain-inherent/src/client_side.rs
@@ -29,7 +29,7 @@ const LOG_TARGET: &str = "parachain-inherent";
 
 /// Collect the relevant relay chain state in form of a proof for putting it into the validation
 /// data inherent.
-fn collect_relay_storage_proof(
+async fn collect_relay_storage_proof(
 	relay_chain_interface: &impl RelayChainInterface,
 	para_id: ParaId,
 	relay_parent: PHash,
@@ -42,6 +42,7 @@ fn collect_relay_storage_proof(
 			&relay_parent_block_id,
 			&relay_well_known_keys::hrmp_ingress_channel_index(para_id),
 		)
+		.await
 		.map_err(|e| {
 			tracing::error!(
 				target: LOG_TARGET,
@@ -70,6 +71,7 @@ fn collect_relay_storage_proof(
 			&relay_parent_block_id,
 			&relay_well_known_keys::hrmp_egress_channel_index(para_id),
 		)
+		.await
 		.map_err(|e| {
 			tracing::error!(
 				target: LOG_TARGET,
@@ -108,26 +110,57 @@ fn collect_relay_storage_proof(
 		relay_well_known_keys::hrmp_channels(HrmpChannelId { sender: para_id, recipient })
 	}));
 
-	relay_chain_interface.prove_read(&relay_parent_block_id, &relevant_keys).ok()?
+	relay_chain_interface
+		.prove_read(&relay_parent_block_id, &relevant_keys)
+		.await
+		.map_err(|e| {
+			tracing::error!(
+				target: LOG_TARGET,
+				relay_parent = ?relay_parent_block_id,
+				error = ?e,
+				"Cannot obtain read proof from relay chain.",
+			);
+		})
+		.ok()
 }
 
 impl ParachainInherentData {
 	/// Create the [`ParachainInherentData`] at the given `relay_parent`.
 	///
 	/// Returns `None` if the creation failed.
-	pub fn create_at(
+	pub async fn create_at(
 		relay_parent: PHash,
 		relay_chain_interface: &impl RelayChainInterface,
 		validation_data: &PersistedValidationData,
 		para_id: ParaId,
 	) -> Option<ParachainInherentData> {
 		let relay_chain_state =
-			collect_relay_storage_proof(relay_chain_interface, para_id, relay_parent)?;
-
-		let downward_messages =
-			relay_chain_interface.retrieve_dmq_contents(para_id, relay_parent)?;
+			collect_relay_storage_proof(relay_chain_interface, para_id, relay_parent).await?;
+
+		let downward_messages = relay_chain_interface
+			.retrieve_dmq_contents(para_id, relay_parent)
+			.await
+			.map_err(|e| {
+				tracing::error!(
+					target: LOG_TARGET,
+					relay_parent = ?relay_parent,
+					error = ?e,
+					"An error occured during requesting the downward messages.",
+				);
+			})
+			.ok()?;
 		let horizontal_messages = relay_chain_interface
-			.retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent)?;
+			.retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent)
+			.await
+			.map_err(|e| {
+				tracing::error!(
+					target: LOG_TARGET,
+					relay_parent = ?relay_parent,
+					error = ?e,
+					"An error occured during requesting the inbound HRMP messages.",
+				);
+			})
+			.ok()?;
 
 		Some(ParachainInherentData {
 			downward_messages,
diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs
index 319076be306..750cb7d881f 100644
--- a/cumulus/test/service/src/lib.rs
+++ b/cumulus/test/service/src/lib.rs
@@ -31,9 +31,9 @@ use cumulus_client_service::{
 use cumulus_primitives_core::ParaId;
 use cumulus_relay_chain_local::RelayChainLocal;
 use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi};
+use parking_lot::Mutex;
 
 use frame_system_rpc_runtime_api::AccountNonceApi;
-use parking_lot::Mutex;
 use polkadot_primitives::v1::{CollatorPair, Hash as PHash, PersistedValidationData};
 use polkadot_service::ProvideRuntimeApi;
 use sc_client_api::execution_extensions::ExecutionStrategies;
@@ -288,15 +288,16 @@ where
 					para_id,
 					proposer_factory,
 					move |_, (relay_parent, validation_data)| {
-						let parachain_inherent =
+						let relay_chain_interface = relay_chain_interface_for_closure.clone();
+						async move {
+							let parachain_inherent =
 							cumulus_primitives_parachain_inherent::ParachainInherentData::create_at(
 								relay_parent,
-								&relay_chain_interface_for_closure,
+								&relay_chain_interface,
 								&validation_data,
 								para_id,
-							);
+							).await;
 
-						async move {
 							let time = sp_timestamp::InherentDataProvider::from_system_time();
 
 							let parachain_inherent = parachain_inherent.ok_or_else(|| {
-- 
GitLab