parachains_loop.rs 36.1 KiB
Newer Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.

// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.

use crate::{parachains_loop_metrics::ParachainsLoopMetrics, ParachainsPipeline};

use async_trait::async_trait;
use bp_polkadot_core::{
	parachains::{ParaHash, ParaHeadsProof, ParaId},
	BlockNumber as RelayBlockNumber,
};
use futures::{
	future::{FutureExt, Shared},
	poll, select_biased,
use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf, ParachainBase};
use relay_utils::{
	metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient,
	TrackedTransactionStatus, TransactionTracker,
};
use std::{future::Future, pin::Pin, task::Poll};
Serban Iorga's avatar
Serban Iorga committed
/// Parachain header availability at a certain chain.
Serban Iorga's avatar
Serban Iorga committed
pub enum AvailableHeader<T> {
	/// The client can not report actual parachain head at this moment.
	///
	/// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used.
	/// This variant must be treated as "we don't want to update parachain head value at the
	/// target chain at this moment".
	Unavailable,
Serban Iorga's avatar
Serban Iorga committed
	/// There's no parachain header at the relay chain.
	///
	/// Normally it means that the parachain is not registered there.
	Missing,
	/// Parachain head with given hash is available at the source chain.
	Available(T),
Serban Iorga's avatar
Serban Iorga committed
impl<T> AvailableHeader<T> {
	/// Return available header.
	pub fn as_available(&self) -> Option<&T> {
		match *self {
			AvailableHeader::Available(ref header) => Some(header),
			_ => None,
		}
	}
}

impl<T> From<Option<T>> for AvailableHeader<T> {
	fn from(maybe_header: Option<T>) -> AvailableHeader<T> {
		match maybe_header {
			Some(header) => AvailableHeader::Available(header),
			None => AvailableHeader::Missing,
/// Source client used in parachain heads synchronization loop.
#[async_trait]
pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
	/// Returns `Ok(true)` if client is in synced state.
	async fn ensure_synced(&self) -> Result<bool, Self::Error>;

	/// Get parachain head id at given block.
	async fn parachain_head(
		&self,
		at_block: HeaderIdOf<P::SourceRelayChain>,
	) -> Result<AvailableHeader<HeaderIdOf<P::SourceParachain>>, Self::Error>;
	/// Get parachain head proof at given block.
	async fn prove_parachain_head(
		at_block: HeaderIdOf<P::SourceRelayChain>,
	) -> Result<(ParaHeadsProof, ParaHash), Self::Error>;
}

/// Target client used in parachain heads synchronization loop.
#[async_trait]
pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
	/// Transaction tracker to track submitted transactions.
	type TransactionTracker: TransactionTracker<HeaderId = HeaderIdOf<P::TargetChain>>;
	/// Get best block id.
	async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error>;

	/// Get best finalized source relay chain block id. If `free_source_relay_headers_interval`
	/// is `Some(_)`, the returned
	async fn best_finalized_source_relay_chain_block(
		&self,
		at_block: &HeaderIdOf<P::TargetChain>,
	) -> Result<HeaderIdOf<P::SourceRelayChain>, Self::Error>;
	/// Get free source **relay** headers submission interval, if it is configured in the
	/// target runtime. We assume that the target chain will accept parachain header, proved
	/// at such relay header for free.
	async fn free_source_relay_headers_interval(
		&self,
	) -> Result<Option<BlockNumberOf<P::SourceRelayChain>>, Self::Error>;
	async fn parachain_head(
		&self,
		at_block: HeaderIdOf<P::TargetChain>,
	) -> Result<
		Option<(HeaderIdOf<P::SourceRelayChain>, HeaderIdOf<P::SourceParachain>)>,
		Self::Error,
	>;

	/// Submit parachain heads proof.
		at_source_block: HeaderIdOf<P::SourceRelayChain>,
		para_head_hash: ParaHash,
		proof: ParaHeadsProof,
	) -> Result<Self::TransactionTracker, Self::Error>;
}

/// Return prefix that will be used by default to expose Prometheus metrics of the parachains
/// sync loop.
pub fn metrics_prefix<P: ParachainsPipeline>() -> String {
	format!(
		"{}_to_{}_Parachains_{}",
		P::SourceRelayChain::NAME,
		P::TargetChain::NAME,
		P::SourceParachain::PARACHAIN_ID
	)
}

/// Run parachain heads synchronization.
pub async fn run<P: ParachainsPipeline>(
	source_client: impl SourceClient<P>,
	target_client: impl TargetClient<P>,
	metrics_params: MetricsParams,
	exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), relay_utils::Error>
where
	P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
{
	let exit_signal = exit_signal.shared();
	relay_utils::relay_loop(source_client, target_client)
		.with_metrics(metrics_params)
		.loop_metric(ParachainsLoopMetrics::new(Some(&metrics_prefix::<P>()))?)?
		.expose()
		.await?
		.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
			run_until_connection_lost(
				source_client,
				target_client,
				metrics,
				only_free_headers,
				exit_signal.clone(),
			)
		})
		.await
}

/// Run parachain heads synchronization.
async fn run_until_connection_lost<P: ParachainsPipeline>(
	source_client: impl SourceClient<P>,
	target_client: impl TargetClient<P>,
	metrics: Option<ParachainsLoopMetrics>,
	exit_signal: impl Future<Output = ()> + Send,
) -> Result<(), FailedClient>
where
	P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
{
	let exit_signal = exit_signal.fuse();
	let min_block_interval = std::cmp::min(
		P::SourceRelayChain::AVERAGE_BLOCK_INTERVAL,
		P::TargetChain::AVERAGE_BLOCK_INTERVAL,
	);

	// free parachain header = header, available (proved) at free relay chain block. Let's
	// read interval of free source relay chain blocks from target client
	let free_source_relay_headers_interval = if only_free_headers {
		let free_source_relay_headers_interval =
			target_client.free_source_relay_headers_interval().await.map_err(|e| {
				log::warn!(
					target: "bridge",
					"Failed to read free {} headers interval at {}: {:?}",
					P::SourceRelayChain::NAME,
					P::TargetChain::NAME,
					e,
				);
				FailedClient::Target
			})?;
		match free_source_relay_headers_interval {
			Some(free_source_relay_headers_interval) if free_source_relay_headers_interval != 0 => {
				log::trace!(
					target: "bridge",
					"Free {} headers interval at {}: {:?}",
					P::SourceRelayChain::NAME,
					P::TargetChain::NAME,
					free_source_relay_headers_interval,
				);
				free_source_relay_headers_interval
			},
			_ => {
				log::warn!(
					target: "bridge",
					"Invalid free {} headers interval at {}: {:?}",
					P::SourceRelayChain::NAME,
					P::TargetChain::NAME,
					free_source_relay_headers_interval,
				);
				return Err(FailedClient::Target)
			},
		}
	} else {
		// ignore - we don't need it
		0
	};

	let mut submitted_heads_tracker: Option<SubmittedHeadsTracker<P>> = None;

	futures::pin_mut!(exit_signal);

	// Note that the internal loop breaks with `FailedClient` error even if error is non-connection.
	// It is Ok for now, but it may need to be fixed in the future to use exponential backoff for
	// regular errors.

	loop {
		// Either wait for new block, or exit signal.
		// Please note that we are prioritizing the exit signal since if both events happen at once
		// it doesn't make sense to perform one more loop iteration.
		select_biased! {
			_ = exit_signal => return Ok(()),
			_ = async_std::task::sleep(min_block_interval).fuse() => {},
		}

		// if source client is not yet synced, we'll need to sleep. Otherwise we risk submitting too
		// much redundant transactions
		match source_client.ensure_synced().await {
			Ok(true) => (),
			Ok(false) => {
				log::warn!(
					target: "bridge",
					"{} client is syncing. Won't do anything until it is synced",
				);
				continue
			},
			Err(e) => {
				log::warn!(
					target: "bridge",
					"{} client has failed to return its sync status: {:?}",
				return Err(FailedClient::Source)
			},
		}

		// if we have active transaction, we'll need to wait until it is mined or dropped
		let best_target_block = target_client.best_block().await.map_err(|e| {
			log::warn!(target: "bridge", "Failed to read best {} block: {:?}", P::SourceRelayChain::NAME, e);
			FailedClient::Target
		})?;
		let (relay_of_head_at_target, head_at_target) =
			read_head_at_target(&target_client, metrics.as_ref(), &best_target_block).await?;

		// check if our transaction has been mined
		if let Some(tracker) = submitted_heads_tracker.take() {
			match tracker.update(&best_target_block, &head_at_target).await {
				SubmittedHeadStatus::Waiting(tracker) => {
					// no news about our transaction and we shall keep waiting
					submitted_heads_tracker = Some(tracker);
					continue
				},
				SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)) => {
					// all heads have been updated, we don't need this tracker anymore
				},
				SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost) => {
					log::warn!(
						target: "bridge",
						"Parachains synchronization from {} to {} has stalled. Going to restart",
						P::TargetChain::NAME,
					);

					return Err(FailedClient::Both)
				},
			}
		// in all-headers strategy we'll be submitting para head, available at
		// `best_finalized_relay_block_at_target`
		let best_finalized_relay_block_at_target = target_client
			.best_finalized_source_relay_chain_block(&best_target_block)
			.await
			.map_err(|e| {
				log::warn!(
					target: "bridge",
					"Failed to read best finalized {} block from {}: {:?}",
					P::TargetChain::NAME,
					e,
				);
				FailedClient::Target
			})?;

		// ..but if we only need to submit free headers, we need to submit para
		// head, available at best free source relay chain header, known to the
		// target chain
		let prove_at_relay_block = if only_free_headers {
			match relay_of_head_at_target {
				Some(relay_of_head_at_target) => {
					// find last free relay chain header in the range that we are interested in
					let scan_range_begin = relay_of_head_at_target.number();
					let scan_range_end = best_finalized_relay_block_at_target.number();
					if scan_range_end.saturating_sub(scan_range_begin) <
						free_source_relay_headers_interval
					{
						// there are no new **free** relay chain headers in the range
						log::trace!(
							target: "bridge",
							"Waiting for new free {} headers at {}: scanned {:?}..={:?}",
							P::SourceRelayChain::NAME,
							P::TargetChain::NAME,
							scan_range_begin,
							scan_range_end,
						);
						continue;
					}

					// we may submit new parachain head for free
					best_finalized_relay_block_at_target
				},
				None => {
					// no parachain head at target => let's submit first one
					best_finalized_relay_block_at_target
				},
			}
		} else {
			best_finalized_relay_block_at_target
		};

		// now let's check if we need to update parachain head at all
			read_head_at_source(&source_client, metrics.as_ref(), &prove_at_relay_block).await?;
		let is_update_required = is_update_required::<P>(
			head_at_source,
			head_at_target,

		if is_update_required {
			let (head_proof, head_hash) =
				source_client.prove_parachain_head(prove_at_relay_block).await.map_err(|e| {
					log::warn!(
						target: "bridge",
						"Failed to prove {} parachain ParaId({}) heads: {:?}",
						P::SourceRelayChain::NAME,
						P::SourceParachain::PARACHAIN_ID,
						e,
					);
					FailedClient::Source
				})?;
			log::info!(
				target: "bridge",
				"Submitting {} parachain ParaId({}) head update transaction to {}. Para hash at source relay {:?}: {:?}",
				P::SourceRelayChain::NAME,
				P::SourceParachain::PARACHAIN_ID,
				P::TargetChain::NAME,
			let transaction_tracker = target_client
				.submit_parachain_head_proof(
					prove_at_relay_block,
					head_hash,
					head_proof,
					only_free_headers,
				)
				.await
				.map_err(|e| {
					log::warn!(
						target: "bridge",
						"Failed to submit {} parachain ParaId({}) heads proof to {}: {:?}",
						P::SourceRelayChain::NAME,
						P::SourceParachain::PARACHAIN_ID,
						P::TargetChain::NAME,
						e,
					);
					FailedClient::Target
				})?;
			submitted_heads_tracker =
				Some(SubmittedHeadsTracker::<P>::new(head_at_source, transaction_tracker));
/// Returns `true` if we need to submit parachain-head-update transaction.
fn is_update_required<P: ParachainsPipeline>(
	head_at_source: AvailableHeader<HeaderIdOf<P::SourceParachain>>,
	head_at_target: Option<HeaderIdOf<P::SourceParachain>>,
	prove_at_relay_block: HeaderIdOf<P::SourceRelayChain>,
	best_target_block: HeaderIdOf<P::TargetChain>,
	P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
		"Checking if {} parachain ParaId({}) needs update at {}:\n\t\
			At {} ({:?}): {:?}\n\t\
			At {} ({:?}): {:?}",
		P::SourceRelayChain::NAME,
		P::SourceParachain::PARACHAIN_ID,
	let needs_update = match (head_at_source, head_at_target) {
		(AvailableHeader::Unavailable, _) => {
			// source client has politely asked us not to update current parachain head
			// at the target chain
			false
		},
		(AvailableHeader::Available(head_at_source), Some(head_at_target))
			if head_at_source.number() > head_at_target.number() =>
		{
			// source client knows head that is better than the head known to the target
			// client
			true
		},
		(AvailableHeader::Available(_), Some(_)) => {
			// this is normal case when relay has recently updated heads, when parachain is
			// not progressing, or when our source client is still syncing
			false
		},
		(AvailableHeader::Available(_), None) => {
			// parachain is not yet known to the target client. This is true when parachain
			// or bridge has been just onboarded/started
			true
		},
		(AvailableHeader::Missing, Some(_)) => {
			// parachain/parathread has been offboarded removed from the system. It needs to
			// be propageted to the target client
			true
		},
		(AvailableHeader::Missing, None) => {
			// all's good - parachain is unknown to both clients
			false
		},
	};
	if needs_update {
		log::trace!(
			target: "bridge",
			"{} parachain ParaId({}) needs update at {}: {:?} vs {:?}",
			P::SourceRelayChain::NAME,
			P::SourceParachain::PARACHAIN_ID,
			P::TargetChain::NAME,
			head_at_source,
			head_at_target,
		);
/// Reads parachain head from the source client.
async fn read_head_at_source<P: ParachainsPipeline>(
	source_client: &impl SourceClient<P>,
	metrics: Option<&ParachainsLoopMetrics>,
	at_relay_block: &HeaderIdOf<P::SourceRelayChain>,
) -> Result<AvailableHeader<HeaderIdOf<P::SourceParachain>>, FailedClient> {
	let para_head = source_client.parachain_head(*at_relay_block).await;
	match para_head {
		Ok(AvailableHeader::Available(para_head)) => {
			if let Some(metrics) = metrics {
				metrics.update_best_parachain_block_at_source(
					ParaId(P::SourceParachain::PARACHAIN_ID),
					para_head.number(),
			}
			Ok(AvailableHeader::Available(para_head))
		},
		Ok(r) => Ok(r),
		Err(e) => {
			log::warn!(
				target: "bridge",
				"Failed to read head of {} parachain ParaId({:?}): {:?}",
				P::SourceRelayChain::NAME,
				P::SourceParachain::PARACHAIN_ID,
				e,
			);
			Err(FailedClient::Source)
		},
/// Reads parachain head from the target client. Also returns source relay chain header
/// that has been used to prove that head.
async fn read_head_at_target<P: ParachainsPipeline>(
	target_client: &impl TargetClient<P>,
	metrics: Option<&ParachainsLoopMetrics>,
	at_block: &HeaderIdOf<P::TargetChain>,
) -> Result<
	(Option<HeaderIdOf<P::SourceRelayChain>>, Option<HeaderIdOf<P::SourceParachain>>),
	FailedClient,
> {
	let para_head_id = target_client.parachain_head(*at_block).await;
	match para_head_id {
		Ok(Some((relay_header_id, para_head_id))) => {
			if let Some(metrics) = metrics {
				metrics.update_best_parachain_block_at_target(
					ParaId(P::SourceParachain::PARACHAIN_ID),
					para_head_id.number(),
			Ok((Some(relay_header_id), Some(para_head_id)))
		Err(e) => {
			log::warn!(
				target: "bridge",
				"Failed to read head of {} parachain ParaId({}) at {}: {:?}",
				P::SourceRelayChain::NAME,
				P::SourceParachain::PARACHAIN_ID,
				P::TargetChain::NAME,
				e,
			);
			Err(FailedClient::Target)
		},
/// Submitted heads status.
enum SubmittedHeadStatus<P: ParachainsPipeline> {
	/// Heads are not yet updated.
	Waiting(SubmittedHeadsTracker<P>),
	/// Heads transaction has either been finalized or lost (i.e. received its "final" status).
	Final(TrackedTransactionStatus<HeaderIdOf<P::TargetChain>>),
/// Type of the transaction tracker that the `SubmittedHeadsTracker` is using.
///
/// It needs to be shared because of `poll` macro and our consuming `update` method.
type SharedTransactionTracker<P> = Shared<
	Pin<
		Box<
			dyn Future<
					Output = TrackedTransactionStatus<
						HeaderIdOf<<P as ParachainsPipeline>::TargetChain>,
					>,
				> + Send,
		>,
	>,
>;

/// Submitted parachain heads transaction.
struct SubmittedHeadsTracker<P: ParachainsPipeline> {
	/// Parachain header id that we have submitted.
	submitted_head: AvailableHeader<HeaderIdOf<P::SourceParachain>>,
	/// Future that waits for submitted transaction finality or loss.
	///
	/// It needs to be shared because of `poll` macro and our consuming `update` method.
	transaction_tracker: SharedTransactionTracker<P>,
impl<P: ParachainsPipeline> SubmittedHeadsTracker<P> {
	/// Creates new parachain heads transaction tracker.
	pub fn new(
		submitted_head: AvailableHeader<HeaderIdOf<P::SourceParachain>>,
		transaction_tracker: impl TransactionTracker<HeaderId = HeaderIdOf<P::TargetChain>> + 'static,
			transaction_tracker: transaction_tracker.wait().fuse().boxed().shared(),
	/// Returns `None` if all submitted parachain heads have been updated.
	pub async fn update(
		at_target_block: &HeaderIdOf<P::TargetChain>,
		head_at_target: &Option<HeaderIdOf<P::SourceParachain>>,
	) -> SubmittedHeadStatus<P> {
		// check if our head has been updated
		let is_head_updated = match (self.submitted_head, head_at_target) {
			(AvailableHeader::Available(submitted_head), Some(head_at_target))
				if head_at_target.number() >= submitted_head.number() =>
				true,
			(AvailableHeader::Missing, None) => true,
			_ => false,
		};
		if is_head_updated {
			log::trace!(
				target: "bridge",
				"Head of parachain ParaId({}) has been updated at {}: {:?}",
				P::SourceParachain::PARACHAIN_ID,
				P::TargetChain::NAME,
				head_at_target,
			);
			return SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(*at_target_block))
		// if underlying transaction tracker has reported that the transaction is lost, we may
		// then restart our sync
		let transaction_tracker = self.transaction_tracker.clone();
		match poll!(transaction_tracker) {
			Poll::Ready(TrackedTransactionStatus::Lost) =>
				return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost),
			Poll::Ready(TrackedTransactionStatus::Finalized(_)) => {
				// so we are here and our transaction is mined+finalized, but some of heads were not
				// updated => we're considering our loop as stalled
				return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost)
	}
}

#[cfg(test)]
mod tests {
	use super::*;
	use async_std::sync::{Arc, Mutex};
	use codec::Encode;
	use futures::{SinkExt, StreamExt};
	use relay_substrate_client::test_chain::{TestChain, TestParachain};
	use relay_utils::{HeaderId, MaybeConnectionError};
	use sp_core::H256;
	const PARA_10_HASH: ParaHash = H256([10u8; 32]);
	const PARA_20_HASH: ParaHash = H256([20u8; 32]);

	#[derive(Clone, Debug)]
	enum TestError {
		Error,
	}

	impl MaybeConnectionError for TestError {
		fn is_connection_error(&self) -> bool {
			false
		}
	}

	struct TestParachainsPipeline;

	impl ParachainsPipeline for TestParachainsPipeline {
		type SourceRelayChain = TestChain;
		type SourceParachain = TestParachain;
		type TargetChain = TestChain;
	}

	#[derive(Clone, Debug)]
	struct TestClient {
		data: Arc<Mutex<TestClientData>>,
	}

	struct TestTransactionTracker(Option<TrackedTransactionStatus<HeaderIdOf<TestChain>>>);

	#[async_trait]
	impl TransactionTracker for TestTransactionTracker {
		type HeaderId = HeaderIdOf<TestChain>;

		async fn wait(self) -> TrackedTransactionStatus<HeaderIdOf<TestChain>> {
			match self.0 {
				Some(status) => status,
				None => futures::future::pending().await,
			}
	#[derive(Clone, Debug)]
	struct TestClientData {
		source_sync_status: Result<bool, TestError>,
		source_head: HashMap<
			BlockNumberOf<TestChain>,
			Result<AvailableHeader<HeaderIdOf<TestParachain>>, TestError>,
		>,
		target_free_source_relay_headers_interval:
			Result<Option<BlockNumberOf<TestChain>>, TestError>,
		target_best_block: Result<HeaderIdOf<TestChain>, TestError>,
		target_best_finalized_source_block: Result<HeaderIdOf<TestChain>, TestError>,
		#[allow(clippy::type_complexity)]
		target_head: Result<Option<(HeaderIdOf<TestChain>, HeaderIdOf<TestParachain>)>, TestError>,
		target_submit_result: Result<(), TestError>,

		submitted_proof_at_source_relay_block: Option<HeaderIdOf<TestChain>>,
		exit_signal_sender: Option<Box<futures::channel::mpsc::UnboundedSender<()>>>,
	}

	impl TestClientData {
		pub fn minimal() -> Self {
			TestClientData {
				source_sync_status: Ok(true),
				source_head: vec![(0, Ok(AvailableHeader::Available(HeaderId(0, PARA_20_HASH))))]
					.into_iter()
					.collect(),
				target_free_source_relay_headers_interval: Ok(None),
				target_best_block: Ok(HeaderId(0, Default::default())),
				target_best_finalized_source_block: Ok(HeaderId(0, Default::default())),
				target_submit_result: Ok(()),

				submitted_proof_at_source_relay_block: None,
				exit_signal_sender: None,
			}
		}

		pub fn with_exit_signal_sender(
			sender: futures::channel::mpsc::UnboundedSender<()>,
		) -> Self {
			let mut client = Self::minimal();
			client.exit_signal_sender = Some(Box::new(sender));
			client
		}
	}

	impl From<TestClientData> for TestClient {
		fn from(data: TestClientData) -> TestClient {
			TestClient { data: Arc::new(Mutex::new(data)) }
		}
	}

	#[async_trait]
	impl RelayClient for TestClient {
		type Error = TestError;

		async fn reconnect(&mut self) -> Result<(), TestError> {
			unimplemented!()
		}
	}

	#[async_trait]
	impl SourceClient<TestParachainsPipeline> for TestClient {
		async fn ensure_synced(&self) -> Result<bool, TestError> {
			self.data.lock().await.source_sync_status.clone()
		}

		async fn parachain_head(
			&self,
		) -> Result<AvailableHeader<HeaderIdOf<TestParachain>>, TestError> {
			self.data
				.lock()
				.await
				.source_head
				.get(&at_block.0)
				.expect(&format!("SourceClient::parachain_head({})", at_block.0))
				.clone()
		) -> Result<(ParaHeadsProof, ParaHash), TestError> {
			let head_result =
				SourceClient::<TestParachainsPipeline>::parachain_head(self, at_block).await?;
			let head = head_result.as_available().unwrap();
			let storage_proof = vec![head.hash().encode()];
			let proof = (ParaHeadsProof { storage_proof }, head.hash());
			self.data.lock().await.source_proof.clone().map(|_| proof)
		}
	}

	#[async_trait]
	impl TargetClient<TestParachainsPipeline> for TestClient {
		type TransactionTracker = TestTransactionTracker;

		async fn best_block(&self) -> Result<HeaderIdOf<TestChain>, TestError> {
			self.data.lock().await.target_best_block.clone()
		}

		async fn best_finalized_source_relay_chain_block(
			&self,
			_at_block: &HeaderIdOf<TestChain>,
		) -> Result<HeaderIdOf<TestChain>, TestError> {
			self.data.lock().await.target_best_finalized_source_block.clone()
		}

		async fn free_source_relay_headers_interval(
			&self,
		) -> Result<Option<BlockNumberOf<TestParachain>>, TestError> {
			self.data.lock().await.target_free_source_relay_headers_interval.clone()
		}

		async fn parachain_head(
			&self,
			_at_block: HeaderIdOf<TestChain>,
		) -> Result<Option<(HeaderIdOf<TestChain>, HeaderIdOf<TestParachain>)>, TestError> {
			self.data.lock().await.target_head.clone()
			_proof: ParaHeadsProof,
		) -> Result<TestTransactionTracker, Self::Error> {
			let mut data = self.data.lock().await;
			data.target_submit_result.clone()?;
			data.submitted_proof_at_source_relay_block = Some(at_source_block);
			if let Some(mut exit_signal_sender) = data.exit_signal_sender.take() {
				exit_signal_sender.send(()).await.unwrap();
			}
			Ok(TestTransactionTracker(Some(
				TrackedTransactionStatus::Finalized(Default::default()),
			)))
		}
	}

	#[test]
	fn when_source_client_fails_to_return_sync_state() {
		let mut test_source_client = TestClientData::minimal();
		test_source_client.source_sync_status = Err(TestError::Error);

		assert_eq!(
			async_std::task::block_on(run_until_connection_lost(
				TestClient::from(test_source_client),
				TestClient::from(TestClientData::minimal()),
				None,
				futures::future::pending(),
			)),
			Err(FailedClient::Source),
		);
	}

	#[test]
	fn when_target_client_fails_to_return_best_block() {
		let mut test_target_client = TestClientData::minimal();
		test_target_client.target_best_block = Err(TestError::Error);

		assert_eq!(
			async_std::task::block_on(run_until_connection_lost(
				TestClient::from(TestClientData::minimal()),
				TestClient::from(test_target_client),
				None,
				futures::future::pending(),
			)),
			Err(FailedClient::Target),
		);
	}

	#[test]
	fn when_target_client_fails_to_read_heads() {
		let mut test_target_client = TestClientData::minimal();
		test_target_client.target_head = Err(TestError::Error);

		assert_eq!(
			async_std::task::block_on(run_until_connection_lost(
				TestClient::from(TestClientData::minimal()),
				TestClient::from(test_target_client),
				None,
				futures::future::pending(),
			)),
			Err(FailedClient::Target),
		);
	}

	#[test]
	fn when_target_client_fails_to_read_best_finalized_source_block() {
		let mut test_target_client = TestClientData::minimal();
		test_target_client.target_best_finalized_source_block = Err(TestError::Error);

		assert_eq!(
			async_std::task::block_on(run_until_connection_lost(
				TestClient::from(TestClientData::minimal()),
				TestClient::from(test_target_client),
				None,
				futures::future::pending(),
			)),
			Err(FailedClient::Target),
		);
	}

	#[test]
	fn when_source_client_fails_to_read_heads() {
		let mut test_source_client = TestClientData::minimal();
		test_source_client.source_head.insert(0, Err(TestError::Error));

		assert_eq!(
			async_std::task::block_on(run_until_connection_lost(
				TestClient::from(test_source_client),
				TestClient::from(TestClientData::minimal()),
				None,
				futures::future::pending(),
			)),
			Err(FailedClient::Source),
		);
	}

	#[test]
	fn when_source_client_fails_to_prove_heads() {
		let mut test_source_client = TestClientData::minimal();
		test_source_client.source_proof = Err(TestError::Error);

		assert_eq!(
			async_std::task::block_on(run_until_connection_lost(
				TestClient::from(test_source_client),
				TestClient::from(TestClientData::minimal()),
				None,
				futures::future::pending(),
			)),
			Err(FailedClient::Source),
		);
	}

	#[test]
	fn when_target_client_rejects_update_transaction() {
		let mut test_target_client = TestClientData::minimal();
		test_target_client.target_submit_result = Err(TestError::Error);

		assert_eq!(
			async_std::task::block_on(run_until_connection_lost(
				TestClient::from(TestClientData::minimal()),
				TestClient::from(test_target_client),
				None,
				futures::future::pending(),
			)),
			Err(FailedClient::Target),
		);
	}

	#[test]
	fn minimal_working_case() {
		let (exit_signal_sender, exit_signal) = futures::channel::mpsc::unbounded();
		assert_eq!(
			async_std::task::block_on(run_until_connection_lost(
				TestClient::from(TestClientData::minimal()),
				TestClient::from(TestClientData::with_exit_signal_sender(exit_signal_sender)),
				None,
				exit_signal.into_future().map(|(_, _)| ()),
			)),
			Ok(()),
		);
	}

	#[async_std::test]
	async fn free_headers_are_relayed() {
		// prepare following case:
		// 1) best source relay at target: 95
		// 2) best source parachain at target: 5 at relay 50
		// 3) free headers interval: 10
		// 4) at source relay chain block 90 source parachain block is 9
		// +
		// 5) best finalized source relay chain block is 95
		// 6) at source relay chain block 95 source parachain block is 42
		// =>
		// parachain block 42 would have been relayed, because 95 - 50 > 10
		let (exit_signal_sender, exit_signal) = futures::channel::mpsc::unbounded();
		let clients_data = TestClientData {
			source_sync_status: Ok(true),
			source_head: vec![
				(90, Ok(AvailableHeader::Available(HeaderId(9, [9u8; 32].into())))),
				(95, Ok(AvailableHeader::Available(HeaderId(42, [42u8; 32].into())))),
			]
			.into_iter()
			.collect(),
			source_proof: Ok(()),

			target_free_source_relay_headers_interval: Ok(Some(10)),
			target_best_block: Ok(HeaderId(200, [200u8; 32].into())),
			target_best_finalized_source_block: Ok(HeaderId(95, [95u8; 32].into())),
			target_head: Ok(Some((HeaderId(50, [50u8; 32].into()), HeaderId(5, [5u8; 32].into())))),