From dc8aa5df7dc1b1393653196bda099997efe829bb Mon Sep 17 00:00:00 2001
From: Serban Iorga <serban@parity.io>
Date: Wed, 16 Aug 2023 12:42:51 +0300
Subject: [PATCH] Finality loop refactoring (#2357)

---
 bridges/relays/finality/Cargo.toml            |    2 +-
 bridges/relays/finality/src/finality_loop.rs  | 1072 ++++++++---------
 .../finality/src/finality_loop_tests.rs       |  604 ----------
 .../relays/finality/src/finality_proofs.rs    |  227 ++++
 bridges/relays/finality/src/headers.rs        |  237 ++++
 bridges/relays/finality/src/lib.rs            |   40 +-
 bridges/relays/finality/src/mock.rs           |  209 ++++
 .../src/finality/source.rs                    |    2 +-
 bridges/relays/utils/src/relay_loop.rs        |   12 +-
 9 files changed, 1226 insertions(+), 1179 deletions(-)
 delete mode 100644 bridges/relays/finality/src/finality_loop_tests.rs
 create mode 100644 bridges/relays/finality/src/finality_proofs.rs
 create mode 100644 bridges/relays/finality/src/headers.rs
 create mode 100644 bridges/relays/finality/src/mock.rs

diff --git a/bridges/relays/finality/Cargo.toml b/bridges/relays/finality/Cargo.toml
index ab75533b023..7fcd08ef6f5 100644
--- a/bridges/relays/finality/Cargo.toml
+++ b/bridges/relays/finality/Cargo.toml
@@ -12,7 +12,7 @@ async-trait = "0.1"
 backoff = "0.4"
 bp-header-chain = { path = "../../primitives/header-chain" }
 futures = "0.3.28"
-log = "0.4.17"
+log = "0.4.20"
 num-traits = "0.2"
 relay-utils = { path = "../utils" }
 
diff --git a/bridges/relays/finality/src/finality_loop.rs b/bridges/relays/finality/src/finality_loop.rs
index 7c8217c209f..b1f1f018c0e 100644
--- a/bridges/relays/finality/src/finality_loop.rs
+++ b/bridges/relays/finality/src/finality_loop.rs
@@ -19,23 +19,23 @@
 //! is the mandatory headers, which we always submit to the target node. For such headers, we
 //! assume that the persistent proof either exists, or will eventually become available.
 
+use crate::{sync_loop_metrics::SyncLoopMetrics, Error, FinalitySyncPipeline, SourceHeader};
+
 use crate::{
-	sync_loop_metrics::SyncLoopMetrics, FinalityPipeline, FinalitySyncPipeline, SourceClientBase,
-	SourceHeader,
+	base::SourceClientBase,
+	finality_proofs::{FinalityProofsBuf, FinalityProofsStream},
+	headers::{JustifiedHeader, JustifiedHeaderSelector},
 };
-
 use async_trait::async_trait;
-use backoff::backoff::Backoff;
-use bp_header_chain::FinalityProof;
-use futures::{select, Future, FutureExt, Stream, StreamExt};
-use num_traits::{One, Saturating};
+use backoff::{backoff::Backoff, ExponentialBackoff};
+use futures::{future::Fuse, select, Future, FutureExt};
+use num_traits::Saturating;
 use relay_utils::{
 	metrics::MetricsParams, relay_loop::Client as RelayClient, retry_backoff, FailedClient,
 	HeaderId, MaybeConnectionError, TrackedTransactionStatus, TransactionTracker,
 };
 use std::{
 	fmt::Debug,
-	pin::Pin,
 	time::{Duration, Instant},
 };
 
@@ -104,653 +104,593 @@ pub fn metrics_prefix<P: FinalitySyncPipeline>() -> String {
 	format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME)
 }
 
-/// Run finality proofs synchronization loop.
-pub async fn run<P: FinalitySyncPipeline>(
-	source_client: impl SourceClient<P>,
-	target_client: impl TargetClient<P>,
-	sync_params: FinalitySyncParams,
-	metrics_params: MetricsParams,
-	exit_signal: impl Future<Output = ()> + 'static + Send,
-) -> Result<(), relay_utils::Error> {
-	let exit_signal = exit_signal.shared();
-	relay_utils::relay_loop(source_client, target_client)
-		.with_metrics(metrics_params)
-		.loop_metric(SyncLoopMetrics::new(
-			Some(&metrics_prefix::<P>()),
-			"source",
-			"source_at_target",
-		)?)?
-		.expose()
-		.await?
-		.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
-			run_until_connection_lost(
-				source_client,
-				target_client,
-				sync_params.clone(),
-				metrics,
-				exit_signal.clone(),
-			)
-		})
-		.await
+pub struct SyncInfo<P: FinalitySyncPipeline> {
+	pub best_number_at_source: P::Number,
+	pub best_number_at_target: P::Number,
+	pub is_using_same_fork: bool,
 }
 
-/// Unjustified headers container. Ordered by header number.
-pub(crate) type UnjustifiedHeaders<H> = Vec<H>;
-/// Finality proofs container. Ordered by target header number.
-pub(crate) type FinalityProofs<P> =
-	Vec<(<P as FinalityPipeline>::Number, <P as FinalityPipeline>::FinalityProof)>;
-/// Reference to finality proofs container.
-pub(crate) type FinalityProofsRef<'a, P> =
-	&'a [(<P as FinalityPipeline>::Number, <P as FinalityPipeline>::FinalityProof)];
-
-/// Error that may happen inside finality synchronization loop.
-#[derive(Debug)]
-pub(crate) enum Error<P: FinalitySyncPipeline, SourceError, TargetError> {
-	/// Source client request has failed with given error.
-	Source(SourceError),
-	/// Target client request has failed with given error.
-	Target(TargetError),
-	/// Finality proof for mandatory header is missing from the source node.
-	MissingMandatoryFinalityProof(P::Number),
-}
+impl<P: FinalitySyncPipeline> SyncInfo<P> {
+	/// Checks if both clients are on the same fork.
+	async fn is_on_same_fork<SC: SourceClient<P>>(
+		source_client: &SC,
+		id_at_target: &HeaderId<P::Hash, P::Number>,
+	) -> Result<bool, SC::Error> {
+		let header_at_source = source_client.header_and_finality_proof(id_at_target.0).await?.0;
+		let header_hash_at_source = header_at_source.hash();
+		Ok(if id_at_target.1 == header_hash_at_source {
+			true
+		} else {
+			log::error!(
+				target: "bridge",
+				"Source node ({}) and pallet at target node ({}) have different headers at the same height {:?}: \
+				at-source {:?} vs at-target {:?}",
+				P::SOURCE_NAME,
+				P::TARGET_NAME,
+				id_at_target.0,
+				header_hash_at_source,
+				id_at_target.1,
+			);
+
+			false
+		})
+	}
 
-impl<P, SourceError, TargetError> Error<P, SourceError, TargetError>
-where
-	P: FinalitySyncPipeline,
-	SourceError: MaybeConnectionError,
-	TargetError: MaybeConnectionError,
-{
-	fn fail_if_connection_error(&self) -> Result<(), FailedClient> {
-		match *self {
-			Error::Source(ref error) if error.is_connection_error() => Err(FailedClient::Source),
-			Error::Target(ref error) if error.is_connection_error() => Err(FailedClient::Target),
-			_ => Ok(()),
+	async fn new<SC: SourceClient<P>, TC: TargetClient<P>>(
+		source_client: &SC,
+		target_client: &TC,
+	) -> Result<Self, Error<P, SC::Error, TC::Error>> {
+		let best_number_at_source =
+			source_client.best_finalized_block_number().await.map_err(Error::Source)?;
+		let best_id_at_target =
+			target_client.best_finalized_source_block_id().await.map_err(Error::Target)?;
+		let best_number_at_target = best_id_at_target.0;
+
+		let is_using_same_fork = Self::is_on_same_fork(source_client, &best_id_at_target)
+			.await
+			.map_err(Error::Source)?;
+
+		Ok(Self { best_number_at_source, best_number_at_target, is_using_same_fork })
+	}
+
+	fn update_metrics(&self, metrics_sync: &Option<SyncLoopMetrics>) {
+		if let Some(metrics_sync) = metrics_sync {
+			metrics_sync.update_best_block_at_source(self.best_number_at_source);
+			metrics_sync.update_best_block_at_target(self.best_number_at_target);
+			metrics_sync.update_using_same_fork(self.is_using_same_fork);
 		}
 	}
+
+	pub fn num_headers(&self) -> P::Number {
+		self.best_number_at_source.saturating_sub(self.best_number_at_target)
+	}
 }
 
 /// Information about transaction that we have submitted.
 #[derive(Debug, Clone)]
-pub(crate) struct Transaction<Tracker, Number> {
+pub struct Transaction<Tracker, Number> {
 	/// Submitted transaction tracker.
-	pub tracker: Tracker,
+	tracker: Tracker,
 	/// The number of the header we have submitted.
-	pub submitted_header_number: Number,
+	header_number: Number,
 }
 
 impl<Tracker: TransactionTracker, Number: Debug + PartialOrd> Transaction<Tracker, Number> {
 	pub async fn submit<
-		C: TargetClient<P, TransactionTracker = Tracker>,
 		P: FinalitySyncPipeline<Number = Number>,
+		TC: TargetClient<P, TransactionTracker = Tracker>,
 	>(
-		target_client: &C,
+		target_client: &TC,
 		header: P::Header,
 		justification: P::FinalityProof,
-	) -> Result<Self, C::Error> {
-		let submitted_header_number = header.number();
+	) -> Result<Self, TC::Error> {
+		let header_number = header.number();
 		log::debug!(
 			target: "bridge",
 			"Going to submit finality proof of {} header #{:?} to {}",
 			P::SOURCE_NAME,
-			submitted_header_number,
+			header_number,
 			P::TARGET_NAME,
 		);
 
 		let tracker = target_client.submit_finality_proof(header, justification).await?;
-		Ok(Transaction { tracker, submitted_header_number })
+		Ok(Transaction { tracker, header_number })
 	}
 
-	pub async fn track<C: TargetClient<P>, P: FinalitySyncPipeline<Number = Number>>(
+	async fn track<
+		P: FinalitySyncPipeline<Number = Number>,
+		SC: SourceClient<P>,
+		TC: TargetClient<P>,
+	>(
 		self,
-		target_client: &C,
-	) -> Result<(), String> {
+		target_client: TC,
+	) -> Result<(), Error<P, SC::Error, TC::Error>> {
 		match self.tracker.wait().await {
 			TrackedTransactionStatus::Finalized(_) => {
 				// The transaction has been finalized, but it may have been finalized in the
 				// "failed" state. So let's check if the block number was actually updated.
-				// If it wasn't then we are stalled.
-				//
-				// Please also note that we're returning an error if we fail to read required data
-				// from the target client - that's the best we can do here to avoid actual stall.
 				target_client
 					.best_finalized_source_block_id()
 					.await
-					.map_err(|e| format!("failed to read best block from target node: {e:?}"))
+					.map_err(Error::Target)
 					.and_then(|best_id_at_target| {
-						if self.submitted_header_number > best_id_at_target.0 {
-							return Err(format!(
-								"best block at target after tx is {:?} and we've submitted {:?}",
-								best_id_at_target.0, self.submitted_header_number,
-							))
+						if self.header_number > best_id_at_target.0 {
+							return Err(Error::ProofSubmissionTxFailed {
+								submitted_number: self.header_number,
+								best_number_at_target: best_id_at_target.0,
+							})
 						}
 						Ok(())
 					})
 			},
-			TrackedTransactionStatus::Lost => Err("transaction failed".to_string()),
+			TrackedTransactionStatus::Lost => Err(Error::ProofSubmissionTxLost),
 		}
 	}
 }
 
-/// Finality proofs stream that may be restarted.
-pub(crate) struct RestartableFinalityProofsStream<S> {
-	/// Flag that the stream needs to be restarted.
-	pub(crate) needs_restart: bool,
-	/// The stream itself.
-	stream: Pin<Box<S>>,
-}
+/// Finality synchronization loop state.
+struct FinalityLoop<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> {
+	source_client: SC,
+	target_client: TC,
 
-impl<S: Stream> RestartableFinalityProofsStream<S> {
-	pub async fn create_raw_stream<
-		C: SourceClient<P, FinalityProofsStream = S>,
-		P: FinalitySyncPipeline,
-	>(
-		source_client: &C,
-	) -> Result<S, FailedClient> {
-		source_client.finality_proofs().await.map_err(|error| {
-			log::error!(
-				target: "bridge",
-				"Failed to subscribe to {} justifications: {:?}. Going to reconnect",
-				P::SOURCE_NAME,
-				error,
-			);
+	sync_params: FinalitySyncParams,
+	metrics_sync: Option<SyncLoopMetrics>,
 
-			FailedClient::Source
-		})
+	progress: (Instant, Option<P::Number>),
+	retry_backoff: ExponentialBackoff,
+	finality_proofs_stream: FinalityProofsStream<P, SC>,
+	finality_proofs_buf: FinalityProofsBuf<P>,
+	best_submitted_number: Option<P::Number>,
+}
+
+impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> FinalityLoop<P, SC, TC> {
+	pub fn new(
+		source_client: SC,
+		target_client: TC,
+		sync_params: FinalitySyncParams,
+		metrics_sync: Option<SyncLoopMetrics>,
+	) -> Self {
+		Self {
+			source_client,
+			target_client,
+			sync_params,
+			metrics_sync,
+			progress: (Instant::now(), None),
+			retry_backoff: retry_backoff(),
+			finality_proofs_stream: FinalityProofsStream::new(),
+			finality_proofs_buf: FinalityProofsBuf::new(vec![]),
+			best_submitted_number: None,
+		}
 	}
 
-	pub async fn restart_if_scheduled<
-		C: SourceClient<P, FinalityProofsStream = S>,
-		P: FinalitySyncPipeline,
-	>(
-		&mut self,
-		source_client: &C,
-	) -> Result<(), FailedClient> {
-		if self.needs_restart {
-			log::warn!(target: "bridge", "{} finality proofs stream is being restarted", P::SOURCE_NAME);
+	fn update_progress(&mut self, info: &SyncInfo<P>) {
+		let (prev_time, prev_best_number_at_target) = self.progress;
+		let now = Instant::now();
+
+		let needs_update = now - prev_time > Duration::from_secs(10) ||
+			prev_best_number_at_target
+				.map(|prev_best_number_at_target| {
+					info.best_number_at_target.saturating_sub(prev_best_number_at_target) >
+						10.into()
+				})
+				.unwrap_or(true);
 
-			self.needs_restart = false;
-			self.stream = Box::pin(Self::create_raw_stream(source_client).await?);
+		if !needs_update {
+			return
 		}
-		Ok(())
+
+		log::info!(
+			target: "bridge",
+			"Synced {:?} of {:?} headers",
+			info.best_number_at_target,
+			info.best_number_at_source,
+		);
+
+		self.progress = (now, Some(info.best_number_at_target))
 	}
 
-	pub fn next(&mut self) -> Option<S::Item> {
-		match self.stream.next().now_or_never() {
-			Some(Some(finality_proof)) => Some(finality_proof),
-			Some(None) => {
-				self.needs_restart = true;
-				None
-			},
-			None => None,
+	pub async fn select_header_to_submit(
+		&mut self,
+		info: &SyncInfo<P>,
+	) -> Result<Option<JustifiedHeader<P>>, Error<P, SC::Error, TC::Error>> {
+		// to see that the loop is progressing
+		log::trace!(
+			target: "bridge",
+			"Considering range of headers ({}; {}]",
+			info.best_number_at_target,
+			info.best_number_at_source
+		);
+
+		// read missing headers
+		let selector = JustifiedHeaderSelector::new::<SC, TC>(&self.source_client, info).await?;
+		// if we see that the header schedules GRANDPA change, we need to submit it
+		if self.sync_params.only_mandatory_headers {
+			return Ok(selector.select_mandatory())
 		}
-	}
-}
 
-impl<S> From<S> for RestartableFinalityProofsStream<S> {
-	fn from(stream: S) -> Self {
-		RestartableFinalityProofsStream { needs_restart: false, stream: Box::pin(stream) }
+		// all headers that are missing from the target client are non-mandatory
+		// => even if we have already selected some header and its persistent finality proof,
+		// we may try to select better header by reading non-persistent proofs from the stream
+		self.finality_proofs_buf.fill(&mut self.finality_proofs_stream);
+		let maybe_justified_header = selector.select(&self.finality_proofs_buf);
+
+		// remove obsolete 'recent' finality proofs + keep its size under certain limit
+		let oldest_finality_proof_to_keep = maybe_justified_header
+			.as_ref()
+			.map(|justified_header| justified_header.number())
+			.unwrap_or(info.best_number_at_target);
+		self.finality_proofs_buf
+			.prune(oldest_finality_proof_to_keep, self.sync_params.recent_finality_proofs_limit);
+
+		Ok(maybe_justified_header)
 	}
-}
 
-/// Finality synchronization loop state.
-pub(crate) struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> {
-	/// Synchronization loop progress.
-	pub(crate) progress: &'a mut (Instant, Option<P::Number>),
-	/// Finality proofs stream.
-	pub(crate) finality_proofs_stream:
-		&'a mut RestartableFinalityProofsStream<FinalityProofsStream>,
-	/// Recent finality proofs that we have read from the stream.
-	pub(crate) recent_finality_proofs: &'a mut FinalityProofs<P>,
-	/// Number of the last header, submitted to the target node.
-	pub(crate) submitted_header_number: Option<P::Number>,
-}
+	pub async fn run_iteration(
+		&mut self,
+	) -> Result<
+		Option<Transaction<TC::TransactionTracker, P::Number>>,
+		Error<P, SC::Error, TC::Error>,
+	> {
+		// read best source headers ids from source and target nodes
+		let info = SyncInfo::new(&self.source_client, &self.target_client).await?;
+		info.update_metrics(&self.metrics_sync);
+		self.update_progress(&info);
+
+		// if we have already submitted header, then we just need to wait for it
+		// if we're waiting too much, then we believe our transaction has been lost and restart sync
+		if Some(info.best_number_at_target) < self.best_submitted_number {
+			return Ok(None)
+		}
 
-/// Run finality relay loop until connection to one of nodes is lost.
-pub(crate) async fn run_until_connection_lost<P: FinalitySyncPipeline>(
-	source_client: impl SourceClient<P>,
-	target_client: impl TargetClient<P>,
-	sync_params: FinalitySyncParams,
-	metrics_sync: Option<SyncLoopMetrics>,
-	exit_signal: impl Future<Output = ()>,
-) -> Result<(), FailedClient> {
-	let last_transaction_tracker = futures::future::Fuse::terminated();
-	let exit_signal = exit_signal.fuse();
-	futures::pin_mut!(last_transaction_tracker, exit_signal);
-
-	let mut finality_proofs_stream =
-		RestartableFinalityProofsStream::create_raw_stream(&source_client).await?.into();
-	let mut recent_finality_proofs = Vec::new();
-
-	let mut progress = (Instant::now(), None);
-	let mut retry_backoff = retry_backoff();
-	let mut last_submitted_header_number = None;
-
-	loop {
-		// run loop iteration
-		let iteration_result = run_loop_iteration(
-			&source_client,
-			&target_client,
-			FinalityLoopState {
-				progress: &mut progress,
-				finality_proofs_stream: &mut finality_proofs_stream,
-				recent_finality_proofs: &mut recent_finality_proofs,
-				submitted_header_number: last_submitted_header_number,
-			},
-			&sync_params,
-			&metrics_sync,
-		)
-		.await;
-
-		// deal with errors
-		let next_tick = match iteration_result {
-			Ok(Some(updated_transaction)) => {
-				last_submitted_header_number = Some(updated_transaction.submitted_header_number);
-				last_transaction_tracker.set(updated_transaction.track(&target_client).fuse());
-				retry_backoff.reset();
-				sync_params.tick
-			},
-			Ok(None) => {
-				retry_backoff.reset();
-				sync_params.tick
-			},
-			Err(error) => {
-				log::error!(target: "bridge", "Finality sync loop iteration has failed with error: {:?}", error);
-				error.fail_if_connection_error()?;
-				retry_backoff.next_backoff().unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY)
+		// submit new header if we have something new
+		match self.select_header_to_submit(&info).await? {
+			Some(header) => {
+				let transaction =
+					Transaction::submit(&self.target_client, header.header, header.proof)
+						.await
+						.map_err(Error::Target)?;
+				self.best_submitted_number = Some(transaction.header_number);
+				Ok(Some(transaction))
 			},
-		};
-		finality_proofs_stream.restart_if_scheduled(&source_client).await?;
-
-		// wait till exit signal, or new source block
-		select! {
-			transaction_result = last_transaction_tracker => {
-				transaction_result.map_err(|e| {
-					log::error!(
-						target: "bridge",
-						"Finality synchronization from {} to {} has stalled with error: {}. Going to restart",
-						P::SOURCE_NAME,
-						P::TARGET_NAME,
-						e,
-					);
-
-					// Restart the loop if we're stalled.
-					FailedClient::Both
-				})?
-			},
-			_ = async_std::task::sleep(next_tick).fuse() => {},
-			_ = exit_signal => return Ok(()),
+			None => Ok(None),
 		}
 	}
-}
 
-pub(crate) async fn run_loop_iteration<P, SC, TC>(
-	source_client: &SC,
-	target_client: &TC,
-	state: FinalityLoopState<'_, P, SC::FinalityProofsStream>,
-	sync_params: &FinalitySyncParams,
-	metrics_sync: &Option<SyncLoopMetrics>,
-) -> Result<Option<Transaction<TC::TransactionTracker, P::Number>>, Error<P, SC::Error, TC::Error>>
-where
-	P: FinalitySyncPipeline,
-	SC: SourceClient<P>,
-	TC: TargetClient<P>,
-{
-	// read best source headers ids from source and target nodes
-	let best_number_at_source =
-		source_client.best_finalized_block_number().await.map_err(Error::Source)?;
-	let best_id_at_target =
-		target_client.best_finalized_source_block_id().await.map_err(Error::Target)?;
-	let best_number_at_target = best_id_at_target.0;
-
-	let different_hash_at_source = ensure_same_fork::<P, _>(&best_id_at_target, source_client)
-		.await
-		.map_err(Error::Source)?;
-	let using_same_fork = different_hash_at_source.is_none();
-	if let Some(ref different_hash_at_source) = different_hash_at_source {
-		log::error!(
-			target: "bridge",
-			"Source node ({}) and pallet at target node ({}) have different headers at the same height {:?}: \
-			at-source {:?} vs at-target {:?}",
-			P::SOURCE_NAME,
-			P::TARGET_NAME,
-			best_number_at_target,
-			different_hash_at_source,
-			best_id_at_target.1,
-		);
-	}
+	async fn ensure_finality_proofs_stream(&mut self) -> Result<(), FailedClient> {
+		if let Err(e) = self.finality_proofs_stream.ensure_stream(&self.source_client).await {
+			if e.is_connection_error() {
+				return Err(FailedClient::Source)
+			}
+		}
 
-	if let Some(ref metrics_sync) = *metrics_sync {
-		metrics_sync.update_best_block_at_source(best_number_at_source);
-		metrics_sync.update_best_block_at_target(best_number_at_target);
-		metrics_sync.update_using_same_fork(using_same_fork);
+		Ok(())
 	}
-	*state.progress =
-		print_sync_progress::<P>(*state.progress, best_number_at_source, best_number_at_target);
-
-	// if we have already submitted header, then we just need to wait for it
-	// if we're waiting too much, then we believe our transaction has been lost and restart sync
-	if let Some(submitted_header_number) = state.submitted_header_number {
-		if best_number_at_target >= submitted_header_number {
-			// transaction has been mined && we can continue
-		} else {
-			return Ok(None)
+
+	/// Run finality relay loop until connection to one of nodes is lost.
+	async fn run_until_connection_lost(
+		&mut self,
+		exit_signal: impl Future<Output = ()>,
+	) -> Result<(), FailedClient> {
+		self.ensure_finality_proofs_stream().await?;
+		let proof_submission_tx_tracker = Fuse::terminated();
+		let exit_signal = exit_signal.fuse();
+		futures::pin_mut!(exit_signal, proof_submission_tx_tracker);
+
+		loop {
+			// run loop iteration
+			let next_tick = match self.run_iteration().await {
+				Ok(Some(tx)) => {
+					proof_submission_tx_tracker
+						.set(tx.track::<P, SC, _>(self.target_client.clone()).fuse());
+					self.retry_backoff.reset();
+					self.sync_params.tick
+				},
+				Ok(None) => {
+					self.retry_backoff.reset();
+					self.sync_params.tick
+				},
+				Err(error) => {
+					log::error!(target: "bridge", "Finality sync loop iteration has failed with error: {:?}", error);
+					error.fail_if_connection_error()?;
+					self.retry_backoff
+						.next_backoff()
+						.unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY)
+				},
+			};
+			self.ensure_finality_proofs_stream().await?;
+
+			// wait till exit signal, or new source block
+			select! {
+				proof_submission_result = proof_submission_tx_tracker => {
+					if let Err(e) = proof_submission_result {
+						log::error!(
+							target: "bridge",
+							"Finality sync proof submission tx to {} has failed with error: {:?}.",
+							P::TARGET_NAME,
+							e,
+						);
+						self.best_submitted_number = None;
+						e.fail_if_connection_error()?;
+					}
+				},
+				_ = async_std::task::sleep(next_tick).fuse() => {},
+				_ = exit_signal => return Ok(()),
+			}
 		}
 	}
 
-	// submit new header if we have something new
-	match select_header_to_submit(
-		source_client,
-		target_client,
-		state.finality_proofs_stream,
-		state.recent_finality_proofs,
-		best_number_at_source,
-		best_number_at_target,
-		sync_params,
-	)
-	.await?
-	{
-		Some((header, justification)) => {
-			let transaction = Transaction::submit(target_client, header, justification)
-				.await
-				.map_err(Error::Target)?;
-			Ok(Some(transaction))
-		},
-		None => Ok(None),
+	pub async fn run(
+		source_client: SC,
+		target_client: TC,
+		sync_params: FinalitySyncParams,
+		metrics_sync: Option<SyncLoopMetrics>,
+		exit_signal: impl Future<Output = ()>,
+	) -> Result<(), FailedClient> {
+		let mut finality_loop = Self::new(source_client, target_client, sync_params, metrics_sync);
+		finality_loop.run_until_connection_lost(exit_signal).await
 	}
 }
 
-pub(crate) async fn select_header_to_submit<P, SC, TC>(
-	source_client: &SC,
-	target_client: &TC,
-	finality_proofs_stream: &mut RestartableFinalityProofsStream<SC::FinalityProofsStream>,
-	recent_finality_proofs: &mut FinalityProofs<P>,
-	best_number_at_source: P::Number,
-	best_number_at_target: P::Number,
-	sync_params: &FinalitySyncParams,
-) -> Result<Option<(P::Header, P::FinalityProof)>, Error<P, SC::Error, TC::Error>>
-where
-	P: FinalitySyncPipeline,
-	SC: SourceClient<P>,
-	TC: TargetClient<P>,
-{
-	// to see that the loop is progressing
-	log::trace!(
-		target: "bridge",
-		"Considering range of headers ({:?}; {:?}]",
-		best_number_at_target,
-		best_number_at_source,
-	);
-
-	// read missing headers. if we see that the header schedules GRANDPA change, we need to
-	// submit this header
-	let selected_finality_proof = read_missing_headers::<P, SC, TC>(
-		source_client,
-		target_client,
-		best_number_at_source,
-		best_number_at_target,
-	)
-	.await?;
-	let (mut unjustified_headers, mut selected_finality_proof) = match selected_finality_proof {
-		SelectedFinalityProof::Mandatory(header, finality_proof) =>
-			return Ok(Some((header, finality_proof))),
-		_ if sync_params.only_mandatory_headers => {
-			// we are not reading finality proofs from the stream, so eventually it'll break
-			// but we don't care about transient proofs at all, so it is acceptable
-			return Ok(None)
-		},
-		SelectedFinalityProof::Regular(unjustified_headers, header, finality_proof) =>
-			(unjustified_headers, Some((header, finality_proof))),
-		SelectedFinalityProof::None(unjustified_headers) => (unjustified_headers, None),
-	};
-
-	// all headers that are missing from the target client are non-mandatory
-	// => even if we have already selected some header and its persistent finality proof,
-	// we may try to select better header by reading non-persistent proofs from the stream
-	read_finality_proofs_from_stream::<P, _>(finality_proofs_stream, recent_finality_proofs);
-	selected_finality_proof = select_better_recent_finality_proof::<P>(
-		recent_finality_proofs,
-		&mut unjustified_headers,
-		selected_finality_proof,
-	);
-
-	// remove obsolete 'recent' finality proofs + keep its size under certain limit
-	let oldest_finality_proof_to_keep = selected_finality_proof
-		.as_ref()
-		.map(|(header, _)| header.number())
-		.unwrap_or(best_number_at_target);
-	prune_recent_finality_proofs::<P>(
-		oldest_finality_proof_to_keep,
-		recent_finality_proofs,
-		sync_params.recent_finality_proofs_limit,
-	);
-
-	Ok(selected_finality_proof)
-}
-
-/// Ensures that both clients are on the same fork.
-///
-/// Returns `Some(_)` with header has at the source client if headers are different.
-async fn ensure_same_fork<P: FinalitySyncPipeline, SC: SourceClient<P>>(
-	best_id_at_target: &HeaderId<P::Hash, P::Number>,
-	source_client: &SC,
-) -> Result<Option<P::Hash>, SC::Error> {
-	let header_at_source = source_client.header_and_finality_proof(best_id_at_target.0).await?.0;
-	let header_hash_at_source = header_at_source.hash();
-	Ok(if best_id_at_target.1 == header_hash_at_source {
-		None
-	} else {
-		Some(header_hash_at_source)
-	})
-}
-
-/// Finality proof that has been selected by the `read_missing_headers` function.
-pub(crate) enum SelectedFinalityProof<Header, FinalityProof> {
-	/// Mandatory header and its proof has been selected. We shall submit proof for this header.
-	Mandatory(Header, FinalityProof),
-	/// Regular header and its proof has been selected. We may submit this proof, or proof for
-	/// some better header.
-	Regular(UnjustifiedHeaders<Header>, Header, FinalityProof),
-	/// We haven't found any missing header with persistent proof at the target client.
-	None(UnjustifiedHeaders<Header>),
+/// Run finality proofs synchronization loop.
+pub async fn run<P: FinalitySyncPipeline>(
+	source_client: impl SourceClient<P>,
+	target_client: impl TargetClient<P>,
+	sync_params: FinalitySyncParams,
+	metrics_params: MetricsParams,
+	exit_signal: impl Future<Output = ()> + 'static + Send,
+) -> Result<(), relay_utils::Error> {
+	let exit_signal = exit_signal.shared();
+	relay_utils::relay_loop(source_client, target_client)
+		.with_metrics(metrics_params)
+		.loop_metric(SyncLoopMetrics::new(
+			Some(&metrics_prefix::<P>()),
+			"source",
+			"source_at_target",
+		)?)?
+		.expose()
+		.await?
+		.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
+			FinalityLoop::run(
+				source_client,
+				target_client,
+				sync_params.clone(),
+				metrics,
+				exit_signal.clone(),
+			)
+		})
+		.await
 }
 
-/// Read missing headers and their persistent finality proofs from the target client.
-///
-/// If we have found some header with known proof, it is returned.
-/// Otherwise, `SelectedFinalityProof::None` is returned.
-///
-/// Unless we have found mandatory header, all missing headers are collected and returned.
-pub(crate) async fn read_missing_headers<
-	P: FinalitySyncPipeline,
-	SC: SourceClient<P>,
-	TC: TargetClient<P>,
->(
-	source_client: &SC,
-	_target_client: &TC,
-	best_number_at_source: P::Number,
-	best_number_at_target: P::Number,
-) -> Result<SelectedFinalityProof<P::Header, P::FinalityProof>, Error<P, SC::Error, TC::Error>> {
-	let mut unjustified_headers = Vec::new();
-	let mut selected_finality_proof = None;
-	let mut header_number = best_number_at_target + One::one();
-	while header_number <= best_number_at_source {
-		let (header, finality_proof) = source_client
-			.header_and_finality_proof(header_number)
-			.await
-			.map_err(Error::Source)?;
-		let is_mandatory = header.is_mandatory();
-
-		match (is_mandatory, finality_proof) {
-			(true, Some(finality_proof)) => {
-				log::trace!(target: "bridge", "Header {:?} is mandatory", header_number);
-				return Ok(SelectedFinalityProof::Mandatory(header, finality_proof))
+#[cfg(test)]
+mod tests {
+	use super::*;
+
+	use crate::mock::*;
+	use futures::{FutureExt, StreamExt};
+	use parking_lot::Mutex;
+	use relay_utils::{FailedClient, HeaderId, TrackedTransactionStatus};
+	use std::{collections::HashMap, sync::Arc};
+
+	fn prepare_test_clients(
+		exit_sender: futures::channel::mpsc::UnboundedSender<()>,
+		state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static,
+		source_headers: HashMap<TestNumber, (TestSourceHeader, Option<TestFinalityProof>)>,
+	) -> (TestSourceClient, TestTargetClient) {
+		let internal_state_function: Arc<dyn Fn(&mut ClientsData) + Send + Sync> =
+			Arc::new(move |data| {
+				if state_function(data) {
+					exit_sender.unbounded_send(()).unwrap();
+				}
+			});
+		let clients_data = Arc::new(Mutex::new(ClientsData {
+			source_best_block_number: 10,
+			source_headers,
+			source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)],
+
+			target_best_block_id: HeaderId(5, 5),
+			target_headers: vec![],
+			target_transaction_tracker: TestTransactionTracker(
+				TrackedTransactionStatus::Finalized(Default::default()),
+			),
+		}));
+		(
+			TestSourceClient {
+				on_method_call: internal_state_function.clone(),
+				data: clients_data.clone(),
 			},
-			(true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())),
-			(false, Some(finality_proof)) => {
-				log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number);
-				unjustified_headers.clear();
-				selected_finality_proof = Some((header, finality_proof));
-			},
-			(false, None) => {
-				unjustified_headers.push(header);
-			},
-		}
-
-		header_number = header_number + One::one();
+			TestTargetClient { on_method_call: internal_state_function, data: clients_data },
+		)
 	}
 
-	log::trace!(
-		target: "bridge",
-		"Read {} {} headers. Selected finality proof for header: {:?}",
-		best_number_at_source.saturating_sub(best_number_at_target),
-		P::SOURCE_NAME,
-		selected_finality_proof.as_ref().map(|(header, _)| header),
-	);
-
-	Ok(match selected_finality_proof {
-		Some((header, proof)) => SelectedFinalityProof::Regular(unjustified_headers, header, proof),
-		None => SelectedFinalityProof::None(unjustified_headers),
-	})
-}
-
-/// Read finality proofs from the stream.
-pub(crate) fn read_finality_proofs_from_stream<
-	P: FinalitySyncPipeline,
-	FPS: Stream<Item = P::FinalityProof>,
->(
-	finality_proofs_stream: &mut RestartableFinalityProofsStream<FPS>,
-	recent_finality_proofs: &mut FinalityProofs<P>,
-) {
-	let mut proofs_count = 0;
-	let mut first_header_number = None;
-	let mut last_header_number = None;
-	while let Some(finality_proof) = finality_proofs_stream.next() {
-		let target_header_number = finality_proof.target_header_number();
-		if first_header_number.is_none() {
-			first_header_number = Some(target_header_number);
+	fn test_sync_params() -> FinalitySyncParams {
+		FinalitySyncParams {
+			tick: Duration::from_secs(0),
+			recent_finality_proofs_limit: 1024,
+			stall_timeout: Duration::from_secs(1),
+			only_mandatory_headers: false,
 		}
-		last_header_number = Some(target_header_number);
-		proofs_count += 1;
+	}
 
-		recent_finality_proofs.push((target_header_number, finality_proof));
+	fn run_sync_loop(
+		state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static,
+	) -> (ClientsData, Result<(), FailedClient>) {
+		let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded();
+		let (source_client, target_client) = prepare_test_clients(
+			exit_sender,
+			state_function,
+			vec![
+				(5, (TestSourceHeader(false, 5, 5), None)),
+				(6, (TestSourceHeader(false, 6, 6), None)),
+				(7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))),
+				(8, (TestSourceHeader(true, 8, 8), Some(TestFinalityProof(8)))),
+				(9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))),
+				(10, (TestSourceHeader(false, 10, 10), None)),
+			]
+			.into_iter()
+			.collect(),
+		);
+		let sync_params = test_sync_params();
+
+		let clients_data = source_client.data.clone();
+		let result = async_std::task::block_on(FinalityLoop::run(
+			source_client,
+			target_client,
+			sync_params,
+			None,
+			exit_receiver.into_future().map(|(_, _)| ()),
+		));
+
+		let clients_data = clients_data.lock().clone();
+		(clients_data, result)
 	}
 
-	if proofs_count != 0 {
-		log::trace!(
-			target: "bridge",
-			"Read {} finality proofs from {} finality stream for headers in range [{:?}; {:?}]",
-			proofs_count,
-			P::SOURCE_NAME,
-			first_header_number,
-			last_header_number,
+	#[test]
+	fn finality_sync_loop_works() {
+		let (client_data, result) = run_sync_loop(|data| {
+			// header#7 has persistent finality proof, but it isn't mandatory => it isn't submitted,
+			// because header#8 has persistent finality proof && it is mandatory => it is submitted
+			// header#9 has persistent finality proof, but it isn't mandatory => it is submitted,
+			// because   there are no more persistent finality proofs
+			//
+			// once this ^^^ is done, we generate more blocks && read proof for blocks 12 and 14
+			// from the stream
+			if data.target_best_block_id.0 == 9 {
+				data.source_best_block_number = 14;
+				data.source_headers.insert(11, (TestSourceHeader(false, 11, 11), None));
+				data.source_headers
+					.insert(12, (TestSourceHeader(false, 12, 12), Some(TestFinalityProof(12))));
+				data.source_headers.insert(13, (TestSourceHeader(false, 13, 13), None));
+				data.source_headers
+					.insert(14, (TestSourceHeader(false, 14, 14), Some(TestFinalityProof(14))));
+			}
+			// once this ^^^ is done, we generate more blocks && read persistent proof for block 16
+			if data.target_best_block_id.0 == 14 {
+				data.source_best_block_number = 17;
+				data.source_headers.insert(15, (TestSourceHeader(false, 15, 15), None));
+				data.source_headers
+					.insert(16, (TestSourceHeader(false, 16, 16), Some(TestFinalityProof(16))));
+				data.source_headers.insert(17, (TestSourceHeader(false, 17, 17), None));
+			}
+
+			data.target_best_block_id.0 == 16
+		});
+
+		assert_eq!(result, Ok(()));
+		assert_eq!(
+			client_data.target_headers,
+			vec![
+				// before adding 11..14: finality proof for mandatory header#8
+				(TestSourceHeader(true, 8, 8), TestFinalityProof(8)),
+				// before adding 11..14: persistent finality proof for non-mandatory header#9
+				(TestSourceHeader(false, 9, 9), TestFinalityProof(9)),
+				// after adding 11..14: ephemeral finality proof for non-mandatory header#14
+				(TestSourceHeader(false, 14, 14), TestFinalityProof(14)),
+				// after adding 15..17: persistent finality proof for non-mandatory header#16
+				(TestSourceHeader(false, 16, 16), TestFinalityProof(16)),
+			],
 		);
 	}
-}
 
-/// Try to select better header and its proof, given finality proofs that we
-/// have recently read from the stream.
-pub(crate) fn select_better_recent_finality_proof<P: FinalitySyncPipeline>(
-	recent_finality_proofs: FinalityProofsRef<P>,
-	unjustified_headers: &mut UnjustifiedHeaders<P::Header>,
-	selected_finality_proof: Option<(P::Header, P::FinalityProof)>,
-) -> Option<(P::Header, P::FinalityProof)> {
-	if unjustified_headers.is_empty() || recent_finality_proofs.is_empty() {
-		log::trace!(
-			target: "bridge",
-			"Can not improve selected {} finality proof {:?}. No unjustified headers and recent proofs",
-			P::SOURCE_NAME,
-			selected_finality_proof.as_ref().map(|(h, _)| h.number()),
+	fn run_only_mandatory_headers_mode_test(
+		only_mandatory_headers: bool,
+		has_mandatory_headers: bool,
+	) -> Option<JustifiedHeader<TestFinalitySyncPipeline>> {
+		let (exit_sender, _) = futures::channel::mpsc::unbounded();
+		let (source_client, target_client) = prepare_test_clients(
+			exit_sender,
+			|_| false,
+			vec![
+				(6, (TestSourceHeader(false, 6, 6), Some(TestFinalityProof(6)))),
+				(7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))),
+				(8, (TestSourceHeader(has_mandatory_headers, 8, 8), Some(TestFinalityProof(8)))),
+				(9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))),
+				(10, (TestSourceHeader(false, 10, 10), Some(TestFinalityProof(10)))),
+			]
+			.into_iter()
+			.collect(),
 		);
-		return selected_finality_proof
+		async_std::task::block_on(async {
+			let mut finality_loop = FinalityLoop::new(
+				source_client,
+				target_client,
+				FinalitySyncParams {
+					tick: Duration::from_secs(0),
+					recent_finality_proofs_limit: 0,
+					stall_timeout: Duration::from_secs(0),
+					only_mandatory_headers,
+				},
+				None,
+			);
+			let info = SyncInfo {
+				best_number_at_source: 10,
+				best_number_at_target: 5,
+				is_using_same_fork: true,
+			};
+			finality_loop.select_header_to_submit(&info).await.unwrap()
+		})
 	}
 
-	const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed";
-
-	// we need proofs for headers in range unjustified_range_begin..=unjustified_range_end
-	let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number();
-	let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number();
-
-	// we have proofs for headers in range buffered_range_begin..=buffered_range_end
-	let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0;
-	let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0;
-
-	// we have two ranges => find intersection
-	let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin);
-	let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end);
-	let intersection = intersection_begin..=intersection_end;
-
-	// find last proof from intersection
-	let selected_finality_proof_index = recent_finality_proofs
-		.binary_search_by_key(intersection.end(), |(number, _)| *number)
-		.unwrap_or_else(|index| index.saturating_sub(1));
-	let (selected_header_number, finality_proof) =
-		&recent_finality_proofs[selected_finality_proof_index];
-	let has_selected_finality_proof = intersection.contains(selected_header_number);
-	log::trace!(
-		target: "bridge",
-		"Trying to improve selected {} finality proof {:?}. Headers range: [{:?}; {:?}]. Proofs range: [{:?}; {:?}].\
-		Trying to improve to: {:?}. Result: {}",
-		P::SOURCE_NAME,
-		selected_finality_proof.as_ref().map(|(h, _)| h.number()),
-		unjustified_range_begin,
-		unjustified_range_end,
-		buffered_range_begin,
-		buffered_range_end,
-		selected_header_number,
-		if has_selected_finality_proof { "improved" } else { "not improved" },
-	);
-	if !has_selected_finality_proof {
-		return selected_finality_proof
+	#[test]
+	fn select_header_to_submit_skips_non_mandatory_headers_when_only_mandatory_headers_are_required(
+	) {
+		assert_eq!(run_only_mandatory_headers_mode_test(true, false), None);
+		assert_eq!(
+			run_only_mandatory_headers_mode_test(false, false),
+			Some(JustifiedHeader {
+				header: TestSourceHeader(false, 10, 10),
+				proof: TestFinalityProof(10)
+			}),
+		);
 	}
 
-	// now remove all obsolete headers and extract selected header
-	let selected_header_position = unjustified_headers
-		.binary_search_by_key(selected_header_number, |header| header.number())
-		.expect("unjustified_headers contain all headers from intersection; qed");
-	let selected_header = unjustified_headers.swap_remove(selected_header_position);
-	Some((selected_header, finality_proof.clone()))
-}
+	#[test]
+	fn select_header_to_submit_selects_mandatory_headers_when_only_mandatory_headers_are_required()
+	{
+		assert_eq!(
+			run_only_mandatory_headers_mode_test(true, true),
+			Some(JustifiedHeader {
+				header: TestSourceHeader(true, 8, 8),
+				proof: TestFinalityProof(8)
+			}),
+		);
+		assert_eq!(
+			run_only_mandatory_headers_mode_test(false, true),
+			Some(JustifiedHeader {
+				header: TestSourceHeader(true, 8, 8),
+				proof: TestFinalityProof(8)
+			}),
+		);
+	}
 
-pub(crate) fn prune_recent_finality_proofs<P: FinalitySyncPipeline>(
-	justified_header_number: P::Number,
-	recent_finality_proofs: &mut FinalityProofs<P>,
-	recent_finality_proofs_limit: usize,
-) {
-	let justified_header_idx = recent_finality_proofs
-		.binary_search_by_key(&justified_header_number, |(header_number, _)| *header_number)
-		.map(|idx| idx + 1)
-		.unwrap_or_else(|idx| idx);
-	let proofs_limit_idx =
-		recent_finality_proofs.len().saturating_sub(recent_finality_proofs_limit);
-
-	*recent_finality_proofs =
-		recent_finality_proofs.split_off(std::cmp::max(justified_header_idx, proofs_limit_idx));
-}
+	#[test]
+	fn different_forks_at_source_and_at_target_are_detected() {
+		let (exit_sender, _exit_receiver) = futures::channel::mpsc::unbounded();
+		let (source_client, target_client) = prepare_test_clients(
+			exit_sender,
+			|_| false,
+			vec![
+				(5, (TestSourceHeader(false, 5, 42), None)),
+				(6, (TestSourceHeader(false, 6, 6), None)),
+				(7, (TestSourceHeader(false, 7, 7), None)),
+				(8, (TestSourceHeader(false, 8, 8), None)),
+				(9, (TestSourceHeader(false, 9, 9), None)),
+				(10, (TestSourceHeader(false, 10, 10), None)),
+			]
+			.into_iter()
+			.collect(),
+		);
 
-fn print_sync_progress<P: FinalitySyncPipeline>(
-	progress_context: (Instant, Option<P::Number>),
-	best_number_at_source: P::Number,
-	best_number_at_target: P::Number,
-) -> (Instant, Option<P::Number>) {
-	let (prev_time, prev_best_number_at_target) = progress_context;
-	let now = Instant::now();
-
-	let need_update = now - prev_time > Duration::from_secs(10) ||
-		prev_best_number_at_target
-			.map(|prev_best_number_at_target| {
-				best_number_at_target.saturating_sub(prev_best_number_at_target) > 10.into()
-			})
-			.unwrap_or(true);
-
-	if !need_update {
-		return (prev_time, prev_best_number_at_target)
-	}
+		let metrics_sync = SyncLoopMetrics::new(None, "source", "target").unwrap();
+		async_std::task::block_on(async {
+			let mut finality_loop = FinalityLoop::new(
+				source_client,
+				target_client,
+				test_sync_params(),
+				Some(metrics_sync.clone()),
+			);
+			finality_loop.run_iteration().await.unwrap()
+		});
 
-	log::info!(
-		target: "bridge",
-		"Synced {:?} of {:?} headers",
-		best_number_at_target,
-		best_number_at_source,
-	);
-	(now, Some(best_number_at_target))
+		assert!(!metrics_sync.is_using_same_fork());
+	}
 }
diff --git a/bridges/relays/finality/src/finality_loop_tests.rs b/bridges/relays/finality/src/finality_loop_tests.rs
deleted file mode 100644
index 774a5c0c673..00000000000
--- a/bridges/relays/finality/src/finality_loop_tests.rs
+++ /dev/null
@@ -1,604 +0,0 @@
-// Copyright 2019-2021 Parity Technologies (UK) Ltd.
-// This file is part of Parity Bridges Common.
-
-// Parity Bridges Common is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Parity Bridges Common is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
-
-//! Tests for finality synchronization loop.
-
-#![cfg(test)]
-
-use crate::{
-	finality_loop::{
-		prune_recent_finality_proofs, read_finality_proofs_from_stream, run_loop_iteration,
-		run_until_connection_lost, select_better_recent_finality_proof, select_header_to_submit,
-		FinalityLoopState, FinalityProofs, FinalitySyncParams, RestartableFinalityProofsStream,
-		SourceClient, TargetClient,
-	},
-	sync_loop_metrics::SyncLoopMetrics,
-	FinalityPipeline, FinalitySyncPipeline, SourceClientBase, SourceHeader,
-};
-
-use async_trait::async_trait;
-use bp_header_chain::{FinalityProof, GrandpaConsensusLogReader};
-use futures::{FutureExt, Stream, StreamExt};
-use parking_lot::Mutex;
-use relay_utils::{
-	relay_loop::Client as RelayClient, FailedClient, HeaderId, MaybeConnectionError,
-	TrackedTransactionStatus, TransactionTracker,
-};
-use std::{
-	collections::HashMap,
-	pin::Pin,
-	sync::Arc,
-	time::{Duration, Instant},
-};
-
-type IsMandatory = bool;
-type TestNumber = u64;
-type TestHash = u64;
-
-#[derive(Clone, Debug)]
-struct TestTransactionTracker(TrackedTransactionStatus<HeaderId<TestHash, TestNumber>>);
-
-impl Default for TestTransactionTracker {
-	fn default() -> TestTransactionTracker {
-		TestTransactionTracker(TrackedTransactionStatus::Finalized(Default::default()))
-	}
-}
-
-#[async_trait]
-impl TransactionTracker for TestTransactionTracker {
-	type HeaderId = HeaderId<TestHash, TestNumber>;
-
-	async fn wait(self) -> TrackedTransactionStatus<HeaderId<TestHash, TestNumber>> {
-		self.0
-	}
-}
-
-#[derive(Debug, Clone)]
-enum TestError {
-	NonConnection,
-}
-
-impl MaybeConnectionError for TestError {
-	fn is_connection_error(&self) -> bool {
-		false
-	}
-}
-
-#[derive(Debug, Clone)]
-struct TestFinalitySyncPipeline;
-
-impl FinalityPipeline for TestFinalitySyncPipeline {
-	const SOURCE_NAME: &'static str = "TestSource";
-	const TARGET_NAME: &'static str = "TestTarget";
-
-	type Hash = TestHash;
-	type Number = TestNumber;
-	type FinalityProof = TestFinalityProof;
-}
-
-impl FinalitySyncPipeline for TestFinalitySyncPipeline {
-	type ConsensusLogReader = GrandpaConsensusLogReader<TestNumber>;
-	type Header = TestSourceHeader;
-}
-
-#[derive(Debug, Clone, PartialEq, Eq)]
-struct TestSourceHeader(IsMandatory, TestNumber, TestHash);
-
-impl SourceHeader<TestHash, TestNumber, GrandpaConsensusLogReader<TestNumber>>
-	for TestSourceHeader
-{
-	fn hash(&self) -> TestHash {
-		self.2
-	}
-
-	fn number(&self) -> TestNumber {
-		self.1
-	}
-
-	fn is_mandatory(&self) -> bool {
-		self.0
-	}
-}
-
-#[derive(Debug, Clone, PartialEq, Eq)]
-struct TestFinalityProof(TestNumber);
-
-impl FinalityProof<TestNumber> for TestFinalityProof {
-	fn target_header_number(&self) -> TestNumber {
-		self.0
-	}
-}
-
-#[derive(Debug, Clone, Default)]
-struct ClientsData {
-	source_best_block_number: TestNumber,
-	source_headers: HashMap<TestNumber, (TestSourceHeader, Option<TestFinalityProof>)>,
-	source_proofs: Vec<TestFinalityProof>,
-
-	target_best_block_id: HeaderId<TestHash, TestNumber>,
-	target_headers: Vec<(TestSourceHeader, TestFinalityProof)>,
-	target_transaction_tracker: TestTransactionTracker,
-}
-
-#[derive(Clone)]
-struct TestSourceClient {
-	on_method_call: Arc<dyn Fn(&mut ClientsData) + Send + Sync>,
-	data: Arc<Mutex<ClientsData>>,
-}
-
-#[async_trait]
-impl RelayClient for TestSourceClient {
-	type Error = TestError;
-
-	async fn reconnect(&mut self) -> Result<(), TestError> {
-		unreachable!()
-	}
-}
-
-#[async_trait]
-impl SourceClientBase<TestFinalitySyncPipeline> for TestSourceClient {
-	type FinalityProofsStream = Pin<Box<dyn Stream<Item = TestFinalityProof> + 'static + Send>>;
-
-	async fn finality_proofs(&self) -> Result<Self::FinalityProofsStream, TestError> {
-		let mut data = self.data.lock();
-		(self.on_method_call)(&mut data);
-		Ok(futures::stream::iter(data.source_proofs.clone()).boxed())
-	}
-}
-
-#[async_trait]
-impl SourceClient<TestFinalitySyncPipeline> for TestSourceClient {
-	async fn best_finalized_block_number(&self) -> Result<TestNumber, TestError> {
-		let mut data = self.data.lock();
-		(self.on_method_call)(&mut data);
-		Ok(data.source_best_block_number)
-	}
-
-	async fn header_and_finality_proof(
-		&self,
-		number: TestNumber,
-	) -> Result<(TestSourceHeader, Option<TestFinalityProof>), TestError> {
-		let mut data = self.data.lock();
-		(self.on_method_call)(&mut data);
-		data.source_headers.get(&number).cloned().ok_or(TestError::NonConnection)
-	}
-}
-
-#[derive(Clone)]
-struct TestTargetClient {
-	on_method_call: Arc<dyn Fn(&mut ClientsData) + Send + Sync>,
-	data: Arc<Mutex<ClientsData>>,
-}
-
-#[async_trait]
-impl RelayClient for TestTargetClient {
-	type Error = TestError;
-
-	async fn reconnect(&mut self) -> Result<(), TestError> {
-		unreachable!()
-	}
-}
-
-#[async_trait]
-impl TargetClient<TestFinalitySyncPipeline> for TestTargetClient {
-	type TransactionTracker = TestTransactionTracker;
-
-	async fn best_finalized_source_block_id(
-		&self,
-	) -> Result<HeaderId<TestHash, TestNumber>, TestError> {
-		let mut data = self.data.lock();
-		(self.on_method_call)(&mut data);
-		Ok(data.target_best_block_id)
-	}
-
-	async fn submit_finality_proof(
-		&self,
-		header: TestSourceHeader,
-		proof: TestFinalityProof,
-	) -> Result<TestTransactionTracker, TestError> {
-		let mut data = self.data.lock();
-		(self.on_method_call)(&mut data);
-		data.target_best_block_id = HeaderId(header.number(), header.hash());
-		data.target_headers.push((header, proof));
-		(self.on_method_call)(&mut data);
-		Ok(data.target_transaction_tracker.clone())
-	}
-}
-
-fn prepare_test_clients(
-	exit_sender: futures::channel::mpsc::UnboundedSender<()>,
-	state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static,
-	source_headers: HashMap<TestNumber, (TestSourceHeader, Option<TestFinalityProof>)>,
-) -> (TestSourceClient, TestTargetClient) {
-	let internal_state_function: Arc<dyn Fn(&mut ClientsData) + Send + Sync> =
-		Arc::new(move |data| {
-			if state_function(data) {
-				exit_sender.unbounded_send(()).unwrap();
-			}
-		});
-	let clients_data = Arc::new(Mutex::new(ClientsData {
-		source_best_block_number: 10,
-		source_headers,
-		source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)],
-
-		target_best_block_id: HeaderId(5, 5),
-		target_headers: vec![],
-		target_transaction_tracker: TestTransactionTracker(TrackedTransactionStatus::Finalized(
-			Default::default(),
-		)),
-	}));
-	(
-		TestSourceClient {
-			on_method_call: internal_state_function.clone(),
-			data: clients_data.clone(),
-		},
-		TestTargetClient { on_method_call: internal_state_function, data: clients_data },
-	)
-}
-
-fn test_sync_params() -> FinalitySyncParams {
-	FinalitySyncParams {
-		tick: Duration::from_secs(0),
-		recent_finality_proofs_limit: 1024,
-		stall_timeout: Duration::from_secs(1),
-		only_mandatory_headers: false,
-	}
-}
-
-fn run_sync_loop(
-	state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static,
-) -> (ClientsData, Result<(), FailedClient>) {
-	let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded();
-	let (source_client, target_client) = prepare_test_clients(
-		exit_sender,
-		state_function,
-		vec![
-			(5, (TestSourceHeader(false, 5, 5), None)),
-			(6, (TestSourceHeader(false, 6, 6), None)),
-			(7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))),
-			(8, (TestSourceHeader(true, 8, 8), Some(TestFinalityProof(8)))),
-			(9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))),
-			(10, (TestSourceHeader(false, 10, 10), None)),
-		]
-		.into_iter()
-		.collect(),
-	);
-	let sync_params = test_sync_params();
-
-	let clients_data = source_client.data.clone();
-	let result = async_std::task::block_on(run_until_connection_lost(
-		source_client,
-		target_client,
-		sync_params,
-		None,
-		exit_receiver.into_future().map(|(_, _)| ()),
-	));
-
-	let clients_data = clients_data.lock().clone();
-	(clients_data, result)
-}
-
-#[test]
-fn finality_sync_loop_works() {
-	let (client_data, result) = run_sync_loop(|data| {
-		// header#7 has persistent finality proof, but it isn't mandatory => it isn't submitted,
-		// because header#8 has persistent finality proof && it is mandatory => it is submitted
-		// header#9 has persistent finality proof, but it isn't mandatory => it is submitted,
-		// because   there are no more persistent finality proofs
-		//
-		// once this ^^^ is done, we generate more blocks && read proof for blocks 12 and 14 from
-		// the stream
-		if data.target_best_block_id.0 == 9 {
-			data.source_best_block_number = 14;
-			data.source_headers.insert(11, (TestSourceHeader(false, 11, 11), None));
-			data.source_headers
-				.insert(12, (TestSourceHeader(false, 12, 12), Some(TestFinalityProof(12))));
-			data.source_headers.insert(13, (TestSourceHeader(false, 13, 13), None));
-			data.source_headers
-				.insert(14, (TestSourceHeader(false, 14, 14), Some(TestFinalityProof(14))));
-		}
-		// once this ^^^ is done, we generate more blocks && read persistent proof for block 16
-		if data.target_best_block_id.0 == 14 {
-			data.source_best_block_number = 17;
-			data.source_headers.insert(15, (TestSourceHeader(false, 15, 15), None));
-			data.source_headers
-				.insert(16, (TestSourceHeader(false, 16, 16), Some(TestFinalityProof(16))));
-			data.source_headers.insert(17, (TestSourceHeader(false, 17, 17), None));
-		}
-
-		data.target_best_block_id.0 == 16
-	});
-
-	assert_eq!(result, Ok(()));
-	assert_eq!(
-		client_data.target_headers,
-		vec![
-			// before adding 11..14: finality proof for mandatory header#8
-			(TestSourceHeader(true, 8, 8), TestFinalityProof(8)),
-			// before adding 11..14: persistent finality proof for non-mandatory header#9
-			(TestSourceHeader(false, 9, 9), TestFinalityProof(9)),
-			// after adding 11..14: ephemeral finality proof for non-mandatory header#14
-			(TestSourceHeader(false, 14, 14), TestFinalityProof(14)),
-			// after adding 15..17: persistent finality proof for non-mandatory header#16
-			(TestSourceHeader(false, 16, 16), TestFinalityProof(16)),
-		],
-	);
-}
-
-fn run_only_mandatory_headers_mode_test(
-	only_mandatory_headers: bool,
-	has_mandatory_headers: bool,
-) -> Option<(TestSourceHeader, TestFinalityProof)> {
-	let (exit_sender, _) = futures::channel::mpsc::unbounded();
-	let (source_client, target_client) = prepare_test_clients(
-		exit_sender,
-		|_| false,
-		vec![
-			(6, (TestSourceHeader(false, 6, 6), Some(TestFinalityProof(6)))),
-			(7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))),
-			(8, (TestSourceHeader(has_mandatory_headers, 8, 8), Some(TestFinalityProof(8)))),
-			(9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))),
-			(10, (TestSourceHeader(false, 10, 10), Some(TestFinalityProof(10)))),
-		]
-		.into_iter()
-		.collect(),
-	);
-	async_std::task::block_on(select_header_to_submit(
-		&source_client,
-		&target_client,
-		&mut RestartableFinalityProofsStream::from(futures::stream::empty().boxed()),
-		&mut vec![],
-		10,
-		5,
-		&FinalitySyncParams {
-			tick: Duration::from_secs(0),
-			recent_finality_proofs_limit: 0,
-			stall_timeout: Duration::from_secs(0),
-			only_mandatory_headers,
-		},
-	))
-	.unwrap()
-}
-
-#[test]
-fn select_header_to_submit_skips_non_mandatory_headers_when_only_mandatory_headers_are_required() {
-	assert_eq!(run_only_mandatory_headers_mode_test(true, false), None);
-	assert_eq!(
-		run_only_mandatory_headers_mode_test(false, false),
-		Some((TestSourceHeader(false, 10, 10), TestFinalityProof(10))),
-	);
-}
-
-#[test]
-fn select_header_to_submit_selects_mandatory_headers_when_only_mandatory_headers_are_required() {
-	assert_eq!(
-		run_only_mandatory_headers_mode_test(true, true),
-		Some((TestSourceHeader(true, 8, 8), TestFinalityProof(8))),
-	);
-	assert_eq!(
-		run_only_mandatory_headers_mode_test(false, true),
-		Some((TestSourceHeader(true, 8, 8), TestFinalityProof(8))),
-	);
-}
-
-#[test]
-fn select_better_recent_finality_proof_works() {
-	// if there are no unjustified headers, nothing is changed
-	assert_eq!(
-		select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
-			&[(5, TestFinalityProof(5))],
-			&mut vec![],
-			Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
-		),
-		Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
-	);
-
-	// if there are no recent finality proofs, nothing is changed
-	assert_eq!(
-		select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
-			&[],
-			&mut vec![TestSourceHeader(false, 5, 5)],
-			Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
-		),
-		Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
-	);
-
-	// if there's no intersection between recent finality proofs and unjustified headers, nothing is
-	// changed
-	let mut unjustified_headers =
-		vec![TestSourceHeader(false, 9, 9), TestSourceHeader(false, 10, 10)];
-	assert_eq!(
-		select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
-			&[(1, TestFinalityProof(1)), (4, TestFinalityProof(4))],
-			&mut unjustified_headers,
-			Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
-		),
-		Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
-	);
-
-	// if there's intersection between recent finality proofs and unjustified headers, but there are
-	// no proofs in this intersection, nothing is changed
-	let mut unjustified_headers = vec![
-		TestSourceHeader(false, 8, 8),
-		TestSourceHeader(false, 9, 9),
-		TestSourceHeader(false, 10, 10),
-	];
-	assert_eq!(
-		select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
-			&[(7, TestFinalityProof(7)), (11, TestFinalityProof(11))],
-			&mut unjustified_headers,
-			Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
-		),
-		Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
-	);
-	assert_eq!(
-		unjustified_headers,
-		vec![
-			TestSourceHeader(false, 8, 8),
-			TestSourceHeader(false, 9, 9),
-			TestSourceHeader(false, 10, 10)
-		]
-	);
-
-	// if there's intersection between recent finality proofs and unjustified headers and there's
-	// a proof in this intersection:
-	// - this better (last from intersection) proof is selected;
-	// - 'obsolete' unjustified headers are pruned.
-	let mut unjustified_headers = vec![
-		TestSourceHeader(false, 8, 8),
-		TestSourceHeader(false, 9, 9),
-		TestSourceHeader(false, 10, 10),
-	];
-	assert_eq!(
-		select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
-			&[(7, TestFinalityProof(7)), (9, TestFinalityProof(9))],
-			&mut unjustified_headers,
-			Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
-		),
-		Some((TestSourceHeader(false, 9, 9), TestFinalityProof(9))),
-	);
-}
-
-#[test]
-fn read_finality_proofs_from_stream_works() {
-	// when stream is currently empty, nothing is changed
-	let mut recent_finality_proofs = vec![(1, TestFinalityProof(1))];
-	let mut stream = futures::stream::pending().into();
-	read_finality_proofs_from_stream::<TestFinalitySyncPipeline, _>(
-		&mut stream,
-		&mut recent_finality_proofs,
-	);
-	assert_eq!(recent_finality_proofs, vec![(1, TestFinalityProof(1))]);
-	assert!(!stream.needs_restart);
-
-	// when stream has entry with target, it is added to the recent proofs container
-	let mut stream = futures::stream::iter(vec![TestFinalityProof(4)])
-		.chain(futures::stream::pending())
-		.into();
-	read_finality_proofs_from_stream::<TestFinalitySyncPipeline, _>(
-		&mut stream,
-		&mut recent_finality_proofs,
-	);
-	assert_eq!(recent_finality_proofs, vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]);
-	assert!(!stream.needs_restart);
-
-	// when stream has ended, we'll need to restart it
-	let mut stream = futures::stream::empty().into();
-	read_finality_proofs_from_stream::<TestFinalitySyncPipeline, _>(
-		&mut stream,
-		&mut recent_finality_proofs,
-	);
-	assert_eq!(recent_finality_proofs, vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]);
-	assert!(stream.needs_restart);
-}
-
-#[test]
-fn prune_recent_finality_proofs_works() {
-	let original_recent_finality_proofs: FinalityProofs<TestFinalitySyncPipeline> = vec![
-		(10, TestFinalityProof(10)),
-		(13, TestFinalityProof(13)),
-		(15, TestFinalityProof(15)),
-		(17, TestFinalityProof(17)),
-		(19, TestFinalityProof(19)),
-	]
-	.into_iter()
-	.collect();
-
-	// when there's proof for justified header in the vec
-	let mut recent_finality_proofs = original_recent_finality_proofs.clone();
-	prune_recent_finality_proofs::<TestFinalitySyncPipeline>(10, &mut recent_finality_proofs, 1024);
-	assert_eq!(&original_recent_finality_proofs[1..], recent_finality_proofs,);
-
-	// when there are no proof for justified header in the vec
-	let mut recent_finality_proofs = original_recent_finality_proofs.clone();
-	prune_recent_finality_proofs::<TestFinalitySyncPipeline>(11, &mut recent_finality_proofs, 1024);
-	assert_eq!(&original_recent_finality_proofs[1..], recent_finality_proofs,);
-
-	// when there are too many entries after initial prune && they also need to be pruned
-	let mut recent_finality_proofs = original_recent_finality_proofs.clone();
-	prune_recent_finality_proofs::<TestFinalitySyncPipeline>(10, &mut recent_finality_proofs, 2);
-	assert_eq!(&original_recent_finality_proofs[3..], recent_finality_proofs,);
-
-	// when last entry is pruned
-	let mut recent_finality_proofs = original_recent_finality_proofs.clone();
-	prune_recent_finality_proofs::<TestFinalitySyncPipeline>(19, &mut recent_finality_proofs, 2);
-	assert_eq!(&original_recent_finality_proofs[5..], recent_finality_proofs,);
-
-	// when post-last entry is pruned
-	let mut recent_finality_proofs = original_recent_finality_proofs.clone();
-	prune_recent_finality_proofs::<TestFinalitySyncPipeline>(20, &mut recent_finality_proofs, 2);
-	assert_eq!(&original_recent_finality_proofs[5..], recent_finality_proofs,);
-}
-
-#[test]
-fn different_forks_at_source_and_at_target_are_detected() {
-	let (exit_sender, _exit_receiver) = futures::channel::mpsc::unbounded();
-	let (source_client, target_client) = prepare_test_clients(
-		exit_sender,
-		|_| false,
-		vec![
-			(5, (TestSourceHeader(false, 5, 42), None)),
-			(6, (TestSourceHeader(false, 6, 6), None)),
-			(7, (TestSourceHeader(false, 7, 7), None)),
-			(8, (TestSourceHeader(false, 8, 8), None)),
-			(9, (TestSourceHeader(false, 9, 9), None)),
-			(10, (TestSourceHeader(false, 10, 10), None)),
-		]
-		.into_iter()
-		.collect(),
-	);
-
-	let mut progress = (Instant::now(), None);
-	let mut finality_proofs_stream = futures::stream::iter(vec![]).boxed().into();
-	let mut recent_finality_proofs = Vec::new();
-	let metrics_sync = SyncLoopMetrics::new(None, "source", "target").unwrap();
-	async_std::task::block_on(run_loop_iteration::<TestFinalitySyncPipeline, _, _>(
-		&source_client,
-		&target_client,
-		FinalityLoopState {
-			progress: &mut progress,
-			finality_proofs_stream: &mut finality_proofs_stream,
-			recent_finality_proofs: &mut recent_finality_proofs,
-			submitted_header_number: None,
-		},
-		&test_sync_params(),
-		&Some(metrics_sync.clone()),
-	))
-	.unwrap();
-
-	assert!(!metrics_sync.is_using_same_fork());
-}
-
-#[test]
-fn stalls_when_transaction_tracker_returns_error() {
-	let (_, result) = run_sync_loop(|data| {
-		data.target_transaction_tracker = TestTransactionTracker(TrackedTransactionStatus::Lost);
-		data.target_best_block_id = HeaderId(5, 5);
-		data.target_best_block_id.0 == 16
-	});
-
-	assert_eq!(result, Err(FailedClient::Both));
-}
-
-#[test]
-fn stalls_when_transaction_tracker_returns_finalized_but_transaction_fails() {
-	let (_, result) = run_sync_loop(|data| {
-		data.target_best_block_id = HeaderId(5, 5);
-		data.target_best_block_id.0 == 16
-	});
-
-	assert_eq!(result, Err(FailedClient::Both));
-}
diff --git a/bridges/relays/finality/src/finality_proofs.rs b/bridges/relays/finality/src/finality_proofs.rs
new file mode 100644
index 00000000000..d457c0693bf
--- /dev/null
+++ b/bridges/relays/finality/src/finality_proofs.rs
@@ -0,0 +1,227 @@
+// Copyright 2019-2023 Parity Technologies (UK) Ltd.
+// This file is part of Parity Bridges Common.
+
+// Parity Bridges Common is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Parity Bridges Common is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
+
+use crate::{base::SourceClientBase, FinalityPipeline};
+
+use bp_header_chain::FinalityProof;
+use futures::{FutureExt, Stream, StreamExt};
+use std::pin::Pin;
+
+/// Finality proofs container. Ordered by target header number.
+pub type FinalityProofs<P> =
+	Vec<(<P as FinalityPipeline>::Number, <P as FinalityPipeline>::FinalityProof)>;
+
+/// Source finality proofs stream that may be restarted.
+pub struct FinalityProofsStream<P: FinalityPipeline, SC: SourceClientBase<P>> {
+	/// The underlying stream.
+	stream: Option<Pin<Box<SC::FinalityProofsStream>>>,
+}
+
+impl<P: FinalityPipeline, SC: SourceClientBase<P>> FinalityProofsStream<P, SC> {
+	pub fn new() -> Self {
+		Self { stream: None }
+	}
+
+	fn next(&mut self) -> Option<<SC::FinalityProofsStream as Stream>::Item> {
+		let stream = match &mut self.stream {
+			Some(stream) => stream,
+			None => return None,
+		};
+
+		match stream.next().now_or_never() {
+			Some(Some(finality_proof)) => Some(finality_proof),
+			Some(None) => {
+				self.stream = None;
+				None
+			},
+			None => None,
+		}
+	}
+
+	pub async fn ensure_stream(&mut self, source_client: &SC) -> Result<(), SC::Error> {
+		if self.stream.is_none() {
+			log::warn!(target: "bridge", "{} finality proofs stream is being started / restarted",
+				P::SOURCE_NAME);
+
+			let stream = source_client.finality_proofs().await.map_err(|error| {
+				log::error!(
+					target: "bridge",
+					"Failed to subscribe to {} justifications: {:?}",
+					P::SOURCE_NAME,
+					error,
+				);
+
+				error
+			})?;
+			self.stream = Some(Box::pin(stream));
+		}
+
+		Ok(())
+	}
+}
+
+/// Source finality proofs buffer.
+pub struct FinalityProofsBuf<P: FinalityPipeline> {
+	/// Proofs buffer.
+	buf: FinalityProofs<P>,
+}
+
+impl<P: FinalityPipeline> FinalityProofsBuf<P> {
+	pub fn new(buf: FinalityProofs<P>) -> Self {
+		Self { buf }
+	}
+
+	pub fn buf(&self) -> &FinalityProofs<P> {
+		&self.buf
+	}
+
+	pub fn fill<SC: SourceClientBase<P>>(&mut self, stream: &mut FinalityProofsStream<P, SC>) {
+		let mut proofs_count = 0;
+		let mut first_header_number = None;
+		let mut last_header_number = None;
+		while let Some(finality_proof) = stream.next() {
+			let target_header_number = finality_proof.target_header_number();
+			first_header_number.get_or_insert(target_header_number);
+			last_header_number = Some(target_header_number);
+			proofs_count += 1;
+
+			self.buf.push((target_header_number, finality_proof));
+		}
+
+		if proofs_count != 0 {
+			log::trace!(
+				target: "bridge",
+				"Read {} finality proofs from {} finality stream for headers in range [{:?}; {:?}]",
+				proofs_count,
+				P::SOURCE_NAME,
+				first_header_number,
+				last_header_number,
+			);
+		}
+	}
+
+	pub fn prune(&mut self, until_hdr_num: P::Number, buf_limit: usize) {
+		let kept_hdr_idx = self
+			.buf
+			.binary_search_by_key(&until_hdr_num, |(hdr_num, _)| *hdr_num)
+			.map(|idx| idx + 1)
+			.unwrap_or_else(|idx| idx);
+		let buf_limit_idx = self.buf.len().saturating_sub(buf_limit);
+
+		self.buf = self.buf.split_off(std::cmp::max(kept_hdr_idx, buf_limit_idx));
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use crate::mock::*;
+
+	impl<P: FinalityPipeline, SC: SourceClientBase<P>> FinalityProofsStream<P, SC> {
+		fn from_stream(stream: SC::FinalityProofsStream) -> Self {
+			Self { stream: Some(Box::pin(stream)) }
+		}
+	}
+
+	#[test]
+	fn finality_proofs_buf_fill_works() {
+		// when stream is currently empty, nothing is changed
+		let mut finality_proofs_buf =
+			FinalityProofsBuf::<TestFinalitySyncPipeline> { buf: vec![(1, TestFinalityProof(1))] };
+		let mut stream =
+			FinalityProofsStream::<TestFinalitySyncPipeline, TestSourceClient>::from_stream(
+				Box::pin(futures::stream::pending()),
+			);
+		finality_proofs_buf.fill(&mut stream);
+		assert_eq!(finality_proofs_buf.buf, vec![(1, TestFinalityProof(1))]);
+		assert!(stream.stream.is_some());
+
+		// when stream has entry with target, it is added to the recent proofs container
+		let mut stream =
+			FinalityProofsStream::<TestFinalitySyncPipeline, TestSourceClient>::from_stream(
+				Box::pin(
+					futures::stream::iter(vec![TestFinalityProof(4)])
+						.chain(futures::stream::pending()),
+				),
+			);
+		finality_proofs_buf.fill(&mut stream);
+		assert_eq!(
+			finality_proofs_buf.buf,
+			vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]
+		);
+		assert!(stream.stream.is_some());
+
+		// when stream has ended, we'll need to restart it
+		let mut stream =
+			FinalityProofsStream::<TestFinalitySyncPipeline, TestSourceClient>::from_stream(
+				Box::pin(futures::stream::empty()),
+			);
+		finality_proofs_buf.fill(&mut stream);
+		assert_eq!(
+			finality_proofs_buf.buf,
+			vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]
+		);
+		assert!(stream.stream.is_none());
+	}
+
+	#[test]
+	fn finality_proofs_buf_prune_works() {
+		let original_finality_proofs_buf: FinalityProofs<TestFinalitySyncPipeline> = vec![
+			(10, TestFinalityProof(10)),
+			(13, TestFinalityProof(13)),
+			(15, TestFinalityProof(15)),
+			(17, TestFinalityProof(17)),
+			(19, TestFinalityProof(19)),
+		]
+		.into_iter()
+		.collect();
+
+		// when there's proof for justified header in the vec
+		let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
+			buf: original_finality_proofs_buf.clone(),
+		};
+		finality_proofs_buf.prune(10, 1024);
+		assert_eq!(&original_finality_proofs_buf[1..], finality_proofs_buf.buf,);
+
+		// when there are no proof for justified header in the vec
+		let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
+			buf: original_finality_proofs_buf.clone(),
+		};
+		finality_proofs_buf.prune(11, 1024);
+		assert_eq!(&original_finality_proofs_buf[1..], finality_proofs_buf.buf,);
+
+		// when there are too many entries after initial prune && they also need to be pruned
+		let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
+			buf: original_finality_proofs_buf.clone(),
+		};
+		finality_proofs_buf.prune(10, 2);
+		assert_eq!(&original_finality_proofs_buf[3..], finality_proofs_buf.buf,);
+
+		// when last entry is pruned
+		let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
+			buf: original_finality_proofs_buf.clone(),
+		};
+		finality_proofs_buf.prune(19, 2);
+		assert_eq!(&original_finality_proofs_buf[5..], finality_proofs_buf.buf,);
+
+		// when post-last entry is pruned
+		let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
+			buf: original_finality_proofs_buf.clone(),
+		};
+		finality_proofs_buf.prune(20, 2);
+		assert_eq!(&original_finality_proofs_buf[5..], finality_proofs_buf.buf,);
+	}
+}
diff --git a/bridges/relays/finality/src/headers.rs b/bridges/relays/finality/src/headers.rs
new file mode 100644
index 00000000000..bdb05c9d9b7
--- /dev/null
+++ b/bridges/relays/finality/src/headers.rs
@@ -0,0 +1,237 @@
+// Copyright 2019-2021 Parity Technologies (UK) Ltd.
+// This file is part of Parity Bridges Common.
+
+// Parity Bridges Common is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Parity Bridges Common is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
+
+use crate::{
+	finality_loop::SyncInfo, finality_proofs::FinalityProofsBuf, Error, FinalitySyncPipeline,
+	SourceClient, SourceHeader, TargetClient,
+};
+
+use std::cmp::Ordering;
+
+/// Unjustified headers container. Ordered by header number.
+pub type UnjustifiedHeaders<H> = Vec<H>;
+
+#[derive(Debug)]
+#[cfg_attr(test, derive(Clone, PartialEq))]
+pub struct JustifiedHeader<P: FinalitySyncPipeline> {
+	pub header: P::Header,
+	pub proof: P::FinalityProof,
+}
+
+impl<P: FinalitySyncPipeline> JustifiedHeader<P> {
+	pub fn number(&self) -> P::Number {
+		self.header.number()
+	}
+}
+
+/// Finality proof that has been selected by the `read_missing_headers` function.
+pub enum JustifiedHeaderSelector<P: FinalitySyncPipeline> {
+	/// Mandatory header and its proof has been selected. We shall submit proof for this header.
+	Mandatory(JustifiedHeader<P>),
+	/// Regular header and its proof has been selected. We may submit this proof, or proof for
+	/// some better header.
+	Regular(UnjustifiedHeaders<P::Header>, JustifiedHeader<P>),
+	/// We haven't found any missing header with persistent proof at the target client.
+	None(UnjustifiedHeaders<P::Header>),
+}
+
+impl<P: FinalitySyncPipeline> JustifiedHeaderSelector<P> {
+	pub(crate) async fn new<SC: SourceClient<P>, TC: TargetClient<P>>(
+		source_client: &SC,
+		info: &SyncInfo<P>,
+	) -> Result<Self, Error<P, SC::Error, TC::Error>> {
+		let mut unjustified_headers = Vec::new();
+		let mut maybe_justified_header = None;
+
+		let mut header_number = info.best_number_at_target + 1.into();
+		while header_number <= info.best_number_at_source {
+			let (header, maybe_proof) = source_client
+				.header_and_finality_proof(header_number)
+				.await
+				.map_err(Error::Source)?;
+
+			match (header.is_mandatory(), maybe_proof) {
+				(true, Some(proof)) => {
+					log::trace!(target: "bridge", "Header {:?} is mandatory", header_number);
+					return Ok(Self::Mandatory(JustifiedHeader { header, proof }))
+				},
+				(true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())),
+				(false, Some(proof)) => {
+					log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number);
+					unjustified_headers.clear();
+					maybe_justified_header = Some(JustifiedHeader { header, proof });
+				},
+				(false, None) => {
+					unjustified_headers.push(header);
+				},
+			}
+
+			header_number = header_number + 1.into();
+		}
+
+		log::trace!(
+			target: "bridge",
+			"Read {} {} headers. Selected finality proof for header: {:?}",
+			info.num_headers(),
+			P::SOURCE_NAME,
+			maybe_justified_header.as_ref().map(|justified_header| &justified_header.header),
+		);
+
+		Ok(match maybe_justified_header {
+			Some(justified_header) => Self::Regular(unjustified_headers, justified_header),
+			None => Self::None(unjustified_headers),
+		})
+	}
+
+	pub fn select_mandatory(self) -> Option<JustifiedHeader<P>> {
+		match self {
+			JustifiedHeaderSelector::Mandatory(header) => Some(header),
+			_ => None,
+		}
+	}
+
+	pub fn select(self, buf: &FinalityProofsBuf<P>) -> Option<JustifiedHeader<P>> {
+		let (unjustified_headers, maybe_justified_header) = match self {
+			JustifiedHeaderSelector::Mandatory(justified_header) => return Some(justified_header),
+			JustifiedHeaderSelector::Regular(unjustified_headers, justified_header) =>
+				(unjustified_headers, Some(justified_header)),
+			JustifiedHeaderSelector::None(unjustified_headers) => (unjustified_headers, None),
+		};
+
+		let mut finality_proofs_iter = buf.buf().iter().rev();
+		let mut maybe_finality_proof = finality_proofs_iter.next();
+
+		let mut unjustified_headers_iter = unjustified_headers.iter().rev();
+		let mut maybe_unjustified_header = unjustified_headers_iter.next();
+
+		while let (Some(finality_proof), Some(unjustified_header)) =
+			(maybe_finality_proof, maybe_unjustified_header)
+		{
+			match finality_proof.0.cmp(&unjustified_header.number()) {
+				Ordering::Equal => {
+					log::trace!(
+						target: "bridge",
+						"Managed to improve selected {} finality proof {:?} to {:?}.",
+						P::SOURCE_NAME,
+						maybe_justified_header.as_ref().map(|justified_header| justified_header.number()),
+						finality_proof.0
+					);
+					return Some(JustifiedHeader {
+						header: unjustified_header.clone(),
+						proof: finality_proof.1.clone(),
+					})
+				},
+				Ordering::Less => maybe_unjustified_header = unjustified_headers_iter.next(),
+				Ordering::Greater => {
+					maybe_finality_proof = finality_proofs_iter.next();
+				},
+			}
+		}
+
+		log::trace!(
+			target: "bridge",
+			"Could not improve selected {} finality proof {:?}.",
+			P::SOURCE_NAME,
+			maybe_justified_header.as_ref().map(|justified_header| justified_header.number())
+		);
+		maybe_justified_header
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use crate::mock::*;
+
+	#[test]
+	fn select_better_recent_finality_proof_works() {
+		// if there are no unjustified headers, nothing is changed
+		let finality_proofs_buf =
+			FinalityProofsBuf::<TestFinalitySyncPipeline>::new(vec![(5, TestFinalityProof(5))]);
+		let justified_header =
+			JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) };
+		let selector = JustifiedHeaderSelector::Regular(vec![], justified_header.clone());
+		assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header));
+
+		// if there are no buffered finality proofs, nothing is changed
+		let finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline>::new(vec![]);
+		let justified_header =
+			JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) };
+		let selector = JustifiedHeaderSelector::Regular(
+			vec![TestSourceHeader(false, 5, 5)],
+			justified_header.clone(),
+		);
+		assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header));
+
+		// if there's no intersection between recent finality proofs and unjustified headers,
+		// nothing is changed
+		let finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline>::new(vec![
+			(1, TestFinalityProof(1)),
+			(4, TestFinalityProof(4)),
+		]);
+		let justified_header =
+			JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) };
+		let selector = JustifiedHeaderSelector::Regular(
+			vec![TestSourceHeader(false, 9, 9), TestSourceHeader(false, 10, 10)],
+			justified_header.clone(),
+		);
+		assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header));
+
+		// if there's intersection between recent finality proofs and unjustified headers, but there
+		// are no proofs in this intersection, nothing is changed
+		let finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline>::new(vec![
+			(7, TestFinalityProof(7)),
+			(11, TestFinalityProof(11)),
+		]);
+		let justified_header =
+			JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) };
+		let selector = JustifiedHeaderSelector::Regular(
+			vec![
+				TestSourceHeader(false, 8, 8),
+				TestSourceHeader(false, 9, 9),
+				TestSourceHeader(false, 10, 10),
+			],
+			justified_header.clone(),
+		);
+		assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header));
+
+		// if there's intersection between recent finality proofs and unjustified headers and
+		// there's a proof in this intersection:
+		// - this better (last from intersection) proof is selected;
+		// - 'obsolete' unjustified headers are pruned.
+		let finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline>::new(vec![
+			(7, TestFinalityProof(7)),
+			(9, TestFinalityProof(9)),
+		]);
+		let justified_header =
+			JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) };
+		let selector = JustifiedHeaderSelector::Regular(
+			vec![
+				TestSourceHeader(false, 8, 8),
+				TestSourceHeader(false, 9, 9),
+				TestSourceHeader(false, 10, 10),
+			],
+			justified_header,
+		);
+		assert_eq!(
+			selector.select(&finality_proofs_buf),
+			Some(JustifiedHeader {
+				header: TestSourceHeader(false, 9, 9),
+				proof: TestFinalityProof(9)
+			})
+		);
+	}
+}
diff --git a/bridges/relays/finality/src/lib.rs b/bridges/relays/finality/src/lib.rs
index 599cf2f9f9d..51cd9a09355 100644
--- a/bridges/relays/finality/src/lib.rs
+++ b/bridges/relays/finality/src/lib.rs
@@ -26,11 +26,14 @@ pub use crate::{
 };
 
 use bp_header_chain::ConsensusLogReader;
+use relay_utils::{FailedClient, MaybeConnectionError};
 use std::fmt::Debug;
 
 mod base;
 mod finality_loop;
-mod finality_loop_tests;
+mod finality_proofs;
+mod headers;
+mod mock;
 mod sync_loop_metrics;
 
 /// Finality proofs synchronization pipeline.
@@ -50,3 +53,38 @@ pub trait SourceHeader<Hash, Number, Reader>: Clone + Debug + PartialEq + Send +
 	/// Returns true if this header needs to be submitted to target node.
 	fn is_mandatory(&self) -> bool;
 }
+
+/// Error that may happen inside finality synchronization loop.
+#[derive(Debug)]
+enum Error<P: FinalitySyncPipeline, SourceError, TargetError> {
+	/// Source client request has failed with given error.
+	Source(SourceError),
+	/// Target client request has failed with given error.
+	Target(TargetError),
+	/// Finality proof for mandatory header is missing from the source node.
+	MissingMandatoryFinalityProof(P::Number),
+	/// `submit_finality_proof` transaction failed
+	ProofSubmissionTxFailed {
+		#[allow(dead_code)]
+		submitted_number: P::Number,
+		#[allow(dead_code)]
+		best_number_at_target: P::Number,
+	},
+	/// `submit_finality_proof` transaction lost
+	ProofSubmissionTxLost,
+}
+
+impl<P, SourceError, TargetError> Error<P, SourceError, TargetError>
+where
+	P: FinalitySyncPipeline,
+	SourceError: MaybeConnectionError,
+	TargetError: MaybeConnectionError,
+{
+	fn fail_if_connection_error(&self) -> Result<(), FailedClient> {
+		match *self {
+			Error::Source(ref error) if error.is_connection_error() => Err(FailedClient::Source),
+			Error::Target(ref error) if error.is_connection_error() => Err(FailedClient::Target),
+			_ => Ok(()),
+		}
+	}
+}
diff --git a/bridges/relays/finality/src/mock.rs b/bridges/relays/finality/src/mock.rs
new file mode 100644
index 00000000000..181504ce260
--- /dev/null
+++ b/bridges/relays/finality/src/mock.rs
@@ -0,0 +1,209 @@
+// Copyright 2019-2021 Parity Technologies (UK) Ltd.
+// This file is part of Parity Bridges Common.
+
+// Parity Bridges Common is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Parity Bridges Common is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Tests for finality synchronization loop.
+
+#![cfg(test)]
+
+use crate::{
+	base::SourceClientBase,
+	finality_loop::{SourceClient, TargetClient},
+	FinalityPipeline, FinalitySyncPipeline, SourceHeader,
+};
+
+use async_trait::async_trait;
+use bp_header_chain::{FinalityProof, GrandpaConsensusLogReader};
+use futures::{Stream, StreamExt};
+use parking_lot::Mutex;
+use relay_utils::{
+	relay_loop::Client as RelayClient, HeaderId, MaybeConnectionError, TrackedTransactionStatus,
+	TransactionTracker,
+};
+use std::{collections::HashMap, pin::Pin, sync::Arc};
+
+type IsMandatory = bool;
+pub type TestNumber = u64;
+type TestHash = u64;
+
+#[derive(Clone, Debug)]
+pub struct TestTransactionTracker(pub TrackedTransactionStatus<HeaderId<TestHash, TestNumber>>);
+
+impl Default for TestTransactionTracker {
+	fn default() -> TestTransactionTracker {
+		TestTransactionTracker(TrackedTransactionStatus::Finalized(Default::default()))
+	}
+}
+
+#[async_trait]
+impl TransactionTracker for TestTransactionTracker {
+	type HeaderId = HeaderId<TestHash, TestNumber>;
+
+	async fn wait(self) -> TrackedTransactionStatus<HeaderId<TestHash, TestNumber>> {
+		self.0
+	}
+}
+
+#[derive(Debug, Clone)]
+pub enum TestError {
+	NonConnection,
+}
+
+impl MaybeConnectionError for TestError {
+	fn is_connection_error(&self) -> bool {
+		false
+	}
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct TestFinalitySyncPipeline;
+
+impl FinalityPipeline for TestFinalitySyncPipeline {
+	const SOURCE_NAME: &'static str = "TestSource";
+	const TARGET_NAME: &'static str = "TestTarget";
+
+	type Hash = TestHash;
+	type Number = TestNumber;
+	type FinalityProof = TestFinalityProof;
+}
+
+impl FinalitySyncPipeline for TestFinalitySyncPipeline {
+	type ConsensusLogReader = GrandpaConsensusLogReader<TestNumber>;
+	type Header = TestSourceHeader;
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct TestSourceHeader(pub IsMandatory, pub TestNumber, pub TestHash);
+
+impl SourceHeader<TestHash, TestNumber, GrandpaConsensusLogReader<TestNumber>>
+	for TestSourceHeader
+{
+	fn hash(&self) -> TestHash {
+		self.2
+	}
+
+	fn number(&self) -> TestNumber {
+		self.1
+	}
+
+	fn is_mandatory(&self) -> bool {
+		self.0
+	}
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct TestFinalityProof(pub TestNumber);
+
+impl FinalityProof<TestNumber> for TestFinalityProof {
+	fn target_header_number(&self) -> TestNumber {
+		self.0
+	}
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct ClientsData {
+	pub source_best_block_number: TestNumber,
+	pub source_headers: HashMap<TestNumber, (TestSourceHeader, Option<TestFinalityProof>)>,
+	pub source_proofs: Vec<TestFinalityProof>,
+
+	pub target_best_block_id: HeaderId<TestHash, TestNumber>,
+	pub target_headers: Vec<(TestSourceHeader, TestFinalityProof)>,
+	pub target_transaction_tracker: TestTransactionTracker,
+}
+
+#[derive(Clone)]
+pub struct TestSourceClient {
+	pub on_method_call: Arc<dyn Fn(&mut ClientsData) + Send + Sync>,
+	pub data: Arc<Mutex<ClientsData>>,
+}
+
+#[async_trait]
+impl RelayClient for TestSourceClient {
+	type Error = TestError;
+
+	async fn reconnect(&mut self) -> Result<(), TestError> {
+		unreachable!()
+	}
+}
+
+#[async_trait]
+impl SourceClientBase<TestFinalitySyncPipeline> for TestSourceClient {
+	type FinalityProofsStream = Pin<Box<dyn Stream<Item = TestFinalityProof> + 'static + Send>>;
+
+	async fn finality_proofs(&self) -> Result<Self::FinalityProofsStream, TestError> {
+		let mut data = self.data.lock();
+		(self.on_method_call)(&mut data);
+		Ok(futures::stream::iter(data.source_proofs.clone()).boxed())
+	}
+}
+
+#[async_trait]
+impl SourceClient<TestFinalitySyncPipeline> for TestSourceClient {
+	async fn best_finalized_block_number(&self) -> Result<TestNumber, TestError> {
+		let mut data = self.data.lock();
+		(self.on_method_call)(&mut data);
+		Ok(data.source_best_block_number)
+	}
+
+	async fn header_and_finality_proof(
+		&self,
+		number: TestNumber,
+	) -> Result<(TestSourceHeader, Option<TestFinalityProof>), TestError> {
+		let mut data = self.data.lock();
+		(self.on_method_call)(&mut data);
+		data.source_headers.get(&number).cloned().ok_or(TestError::NonConnection)
+	}
+}
+
+#[derive(Clone)]
+pub struct TestTargetClient {
+	pub on_method_call: Arc<dyn Fn(&mut ClientsData) + Send + Sync>,
+	pub data: Arc<Mutex<ClientsData>>,
+}
+
+#[async_trait]
+impl RelayClient for TestTargetClient {
+	type Error = TestError;
+
+	async fn reconnect(&mut self) -> Result<(), TestError> {
+		unreachable!()
+	}
+}
+
+#[async_trait]
+impl TargetClient<TestFinalitySyncPipeline> for TestTargetClient {
+	type TransactionTracker = TestTransactionTracker;
+
+	async fn best_finalized_source_block_id(
+		&self,
+	) -> Result<HeaderId<TestHash, TestNumber>, TestError> {
+		let mut data = self.data.lock();
+		(self.on_method_call)(&mut data);
+		Ok(data.target_best_block_id)
+	}
+
+	async fn submit_finality_proof(
+		&self,
+		header: TestSourceHeader,
+		proof: TestFinalityProof,
+	) -> Result<TestTransactionTracker, TestError> {
+		let mut data = self.data.lock();
+		(self.on_method_call)(&mut data);
+		data.target_best_block_id = HeaderId(header.number(), header.hash());
+		data.target_headers.push((header, proof));
+		(self.on_method_call)(&mut data);
+		Ok(data.target_transaction_tracker.clone())
+	}
+}
diff --git a/bridges/relays/lib-substrate-relay/src/finality/source.rs b/bridges/relays/lib-substrate-relay/src/finality/source.rs
index 41c6c53daf4..c94af610895 100644
--- a/bridges/relays/lib-substrate-relay/src/finality/source.rs
+++ b/bridges/relays/lib-substrate-relay/src/finality/source.rs
@@ -125,7 +125,7 @@ impl<P: SubstrateFinalitySyncPipeline> SubstrateFinalitySource<P> {
 		Error,
 	> {
 		let client = self.client.clone();
-		let best_finalized_block_number = self.client.best_finalized_header_number().await?;
+		let best_finalized_block_number = client.best_finalized_header_number().await?;
 		Ok(try_unfold((client, block_number), move |(client, current_block_number)| async move {
 			// if we've passed the `best_finalized_block_number`, we no longer need persistent
 			// justifications
diff --git a/bridges/relays/utils/src/relay_loop.rs b/bridges/relays/utils/src/relay_loop.rs
index 11e14744a07..dad7293de6d 100644
--- a/bridges/relays/utils/src/relay_loop.rs
+++ b/bridges/relays/utils/src/relay_loop.rs
@@ -130,19 +130,19 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
 
 				match result {
 					Ok(()) => break,
-					Err(failed_client) =>
+					Err(failed_client) => {
+						log::debug!(target: "bridge", "Restarting relay loop");
+
 						reconnect_failed_client(
 							failed_client,
 							self.reconnect_delay,
 							&mut self.source_client,
 							&mut self.target_client,
 						)
-						.await,
+						.await
+					},
 				}
-
-				log::debug!(target: "bridge", "Restarting relay loop");
 			}
-
 			Ok(())
 		};
 
@@ -194,7 +194,7 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
 						Err(err) => {
 							log::trace!(
 								target: "bridge-metrics",
-								"Failed to create tokio runtime. Prometheus meterics are not available: {:?}",
+								"Failed to create tokio runtime. Prometheus metrics are not available: {:?}",
 								err,
 							);
 							return
-- 
GitLab