From 53cdf66071b82d6b8ef51d7225b91a6d43366a63 Mon Sep 17 00:00:00 2001
From: Svyatoslav Nikolsky <svyatonik@gmail.com>
Date: Thu, 11 Mar 2021 16:33:01 +0300
Subject: [PATCH] More tests for finality relay (#816)

* more tests for finality relay

* clippy

* remove env_logger dep

* fmt

* more clippy

* removed prune_unjustified_headers

* review
---
 .../finality-relay/src/finality_loop.rs       | 265 ++++++++++--------
 .../finality-relay/src/finality_loop_tests.rs | 177 ++++++++----
 bridges/relays/finality-relay/src/lib.rs      |   6 +-
 .../substrate-client/src/finality_source.rs   |  59 ++--
 .../relays/substrate/src/finality_pipeline.rs |   4 +-
 .../substrate/src/millau_headers_to_rialto.rs |   2 +-
 .../substrate/src/rialto_headers_to_millau.rs |   2 +-
 7 files changed, 315 insertions(+), 200 deletions(-)

diff --git a/bridges/relays/finality-relay/src/finality_loop.rs b/bridges/relays/finality-relay/src/finality_loop.rs
index 50c23b37572..af5da42cee7 100644
--- a/bridges/relays/finality-relay/src/finality_loop.rs
+++ b/bridges/relays/finality-relay/src/finality_loop.rs
@@ -136,16 +136,21 @@ pub fn run<P: FinalitySyncPipeline>(
 }
 
 /// Unjustified headers container. Ordered by header number.
-pub(crate) type UnjustifiedHeaders<P> = Vec<<P as FinalitySyncPipeline>::Header>;
+pub(crate) type UnjustifiedHeaders<H> = Vec<H>;
 /// Finality proofs container. Ordered by target header number.
 pub(crate) type FinalityProofs<P> = Vec<(
 	<P as FinalitySyncPipeline>::Number,
 	<P as FinalitySyncPipeline>::FinalityProof,
 )>;
+/// Reference to finality proofs container.
+pub(crate) type FinalityProofsRef<'a, P> = &'a [(
+	<P as FinalitySyncPipeline>::Number,
+	<P as FinalitySyncPipeline>::FinalityProof,
+)];
 
 /// Error that may happen inside finality synchronization loop.
 #[derive(Debug)]
-enum Error<P: FinalitySyncPipeline, SourceError, TargetError> {
+pub(crate) enum Error<P: FinalitySyncPipeline, SourceError, TargetError> {
 	/// Source client request has failed with given error.
 	Source(SourceError),
 	/// Target client request has failed with given error.
@@ -182,13 +187,23 @@ struct Transaction<Number> {
 }
 
 /// Finality proofs stream that may be restarted.
-struct RestartableFinalityProofsStream<S> {
+pub(crate) struct RestartableFinalityProofsStream<S> {
 	/// Flag that the stream needs to be restarted.
-	needs_restart: bool,
+	pub(crate) needs_restart: bool,
 	/// The stream itself.
 	stream: Pin<Box<S>>,
 }
 
+#[cfg(test)]
+impl<S> From<S> for RestartableFinalityProofsStream<S> {
+	fn from(stream: S) -> Self {
+		RestartableFinalityProofsStream {
+			needs_restart: false,
+			stream: Box::pin(stream),
+		}
+	}
+}
+
 /// Finality synchronization loop state.
 struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> {
 	/// Synchronization loop progress.
@@ -272,6 +287,8 @@ async fn run_until_connection_lost<P: FinalitySyncPipeline>(
 			}
 		};
 		if finality_proofs_stream.needs_restart {
+			log::warn!(target: "bridge", "{} finality proofs stream is being restarted", P::SOURCE_NAME);
+
 			finality_proofs_stream.needs_restart = false;
 			finality_proofs_stream.stream = Box::pin(restart_finality_proofs_stream().await?);
 		}
@@ -368,7 +385,7 @@ where
 
 async fn select_header_to_submit<P, SC, TC>(
 	source_client: &SC,
-	_target_client: &TC,
+	target_client: &TC,
 	finality_proofs_stream: &mut RestartableFinalityProofsStream<SC::FinalityProofsStream>,
 	recent_finality_proofs: &mut FinalityProofs<P>,
 	best_number_at_source: P::Number,
@@ -380,9 +397,6 @@ where
 	SC: SourceClient<P>,
 	TC: TargetClient<P>,
 {
-	let mut selected_finality_proof = None;
-	let mut unjustified_headers = Vec::new();
-
 	// to see that the loop is progressing
 	log::trace!(
 		target: "bridge",
@@ -393,6 +407,70 @@ where
 
 	// read missing headers. if we see that the header schedules GRANDPA change, we need to
 	// submit this header
+	let selected_finality_proof = read_missing_headers::<P, SC, TC>(
+		source_client,
+		target_client,
+		best_number_at_source,
+		best_number_at_target,
+	)
+	.await?;
+	let (mut unjustified_headers, mut selected_finality_proof) = match selected_finality_proof {
+		SelectedFinalityProof::Mandatory(header, finality_proof) => return Ok(Some((header, finality_proof))),
+		SelectedFinalityProof::Regular(unjustified_headers, header, finality_proof) => {
+			(unjustified_headers, Some((header, finality_proof)))
+		}
+		SelectedFinalityProof::None(unjustified_headers) => (unjustified_headers, None),
+	};
+
+	// all headers that are missing from the target client are non-mandatory
+	// => even if we have already selected some header and its persistent finality proof,
+	// we may try to select better header by reading non-persistent proofs from the stream
+	read_finality_proofs_from_stream::<P, _>(finality_proofs_stream, recent_finality_proofs);
+	selected_finality_proof = select_better_recent_finality_proof::<P>(
+		recent_finality_proofs,
+		&mut unjustified_headers,
+		selected_finality_proof,
+	);
+
+	// remove obsolete 'recent' finality proofs + keep its size under certain limit
+	let oldest_finality_proof_to_keep = selected_finality_proof
+		.as_ref()
+		.map(|(header, _)| header.number())
+		.unwrap_or(best_number_at_target);
+	prune_recent_finality_proofs::<P>(
+		oldest_finality_proof_to_keep,
+		recent_finality_proofs,
+		sync_params.recent_finality_proofs_limit,
+	);
+
+	Ok(selected_finality_proof)
+}
+
+/// Finality proof that has been selected by the `read_missing_headers` function.
+pub(crate) enum SelectedFinalityProof<Header, FinalityProof> {
+	/// Mandatory header and its proof has been selected. We shall submit proof for this header.
+	Mandatory(Header, FinalityProof),
+	/// Regular header and its proof has been selected. We may submit this proof, or proof for
+	/// some better header.
+	Regular(UnjustifiedHeaders<Header>, Header, FinalityProof),
+	/// We haven't found any missing header with persistent proof at the target client.
+	None(UnjustifiedHeaders<Header>),
+}
+
+/// Read missing headers and their persistent finality proofs from the target client.
+///
+/// If we have found some header with known proof, it is returned.
+/// Otherwise, `SelectedFinalityProof::None` is returned.
+///
+/// Unless we have found mandatory header, all missing headers are collected and returned.
+pub(crate) async fn read_missing_headers<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>>(
+	source_client: &SC,
+	_target_client: &TC,
+	best_number_at_source: P::Number,
+	best_number_at_target: P::Number,
+) -> Result<SelectedFinalityProof<P::Header, P::FinalityProof>, Error<P, SC::Error, TC::Error>> {
+	let mut unjustified_headers = Vec::new();
+	let mut selected_finality_proof = None;
 	let mut header_number = best_number_at_target + One::one();
 	while header_number <= best_number_at_source {
 		let (header, finality_proof) = source_client
@@ -404,13 +482,13 @@ where
 		match (is_mandatory, finality_proof) {
 			(true, Some(finality_proof)) => {
 				log::trace!(target: "bridge", "Header {:?} is mandatory", header_number);
-				return Ok(Some((header, finality_proof)));
+				return Ok(SelectedFinalityProof::Mandatory(header, finality_proof));
 			}
 			(true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())),
 			(false, Some(finality_proof)) => {
 				log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number);
+				unjustified_headers.clear();
 				selected_finality_proof = Some((header, finality_proof));
-				prune_unjustified_headers::<P>(header_number, &mut unjustified_headers);
 			}
 			(false, None) => {
 				unjustified_headers.push(header);
@@ -420,37 +498,17 @@ where
 		header_number = header_number + One::one();
 	}
 
-	// see if we can improve finality by using recent finality proofs
-	if !unjustified_headers.is_empty() && !recent_finality_proofs.is_empty() {
-		const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed";
-
-		// we need proofs for headers in range unjustified_range_begin..=unjustified_range_end
-		let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number();
-		let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number();
-
-		// we have proofs for headers in range buffered_range_begin..=buffered_range_end
-		let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0;
-		let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0;
-
-		// we have two ranges => find intersection
-		let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin);
-		let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end);
-		let intersection = intersection_begin..=intersection_end;
-
-		// find last proof from intersection
-		let selected_finality_proof_index = recent_finality_proofs
-			.binary_search_by_key(intersection.end(), |(number, _)| *number)
-			.unwrap_or_else(|index| index.saturating_sub(1));
-		let (selected_header_number, finality_proof) = &recent_finality_proofs[selected_finality_proof_index];
-		if intersection.contains(selected_header_number) {
-			// now remove all obsolete headers and extract selected header
-			let selected_header = prune_unjustified_headers::<P>(*selected_header_number, &mut unjustified_headers)
-				.expect("unjustified_headers contain all headers from intersection; qed");
-			selected_finality_proof = Some((selected_header, finality_proof.clone()));
-		}
-	}
+	Ok(match selected_finality_proof {
+		Some((header, proof)) => SelectedFinalityProof::Regular(unjustified_headers, header, proof),
+		None => SelectedFinalityProof::None(unjustified_headers),
+	})
+}
 
-	// read all proofs from the stream, probably selecting updated proof that we're going to submit
+/// Read finality proofs from the stream.
+pub(crate) fn read_finality_proofs_from_stream<P: FinalitySyncPipeline, FPS: Stream<Item = P::FinalityProof>>(
+	finality_proofs_stream: &mut RestartableFinalityProofsStream<FPS>,
+	recent_finality_proofs: &mut FinalityProofs<P>,
+) {
 	loop {
 		let next_proof = finality_proofs_stream.stream.next();
 		let finality_proof = match next_proof.now_or_never() {
@@ -461,49 +519,52 @@ where
 			}
 			None => break,
 		};
-		let finality_proof_target_header_number = match finality_proof.target_header_number() {
-			Some(target_header_number) => target_header_number,
-			None => {
-				continue;
-			}
-		};
 
-		let justified_header =
-			prune_unjustified_headers::<P>(finality_proof_target_header_number, &mut unjustified_headers);
-		if let Some(justified_header) = justified_header {
-			recent_finality_proofs.clear();
-			selected_finality_proof = Some((justified_header, finality_proof));
-		} else {
-			// the number of proofs read during single wakeup is expected to be low, so we aren't pruning
-			// `recent_finality_proofs` collection too often
-			recent_finality_proofs.push((finality_proof_target_header_number, finality_proof));
-		}
+		recent_finality_proofs.push((finality_proof.target_header_number(), finality_proof));
 	}
+}
 
-	// remove obsolete 'recent' finality proofs + keep its size under certain limit
-	let oldest_finality_proof_to_keep = selected_finality_proof
-		.as_ref()
-		.map(|(header, _)| header.number())
-		.unwrap_or(best_number_at_target);
-	prune_recent_finality_proofs::<P>(
-		oldest_finality_proof_to_keep,
-		recent_finality_proofs,
-		sync_params.recent_finality_proofs_limit,
-	);
+/// Try to select better header and its proof, given finality proofs that we
+/// have recently read from the stream.
+pub(crate) fn select_better_recent_finality_proof<P: FinalitySyncPipeline>(
+	recent_finality_proofs: FinalityProofsRef<P>,
+	unjustified_headers: &mut UnjustifiedHeaders<P::Header>,
+	selected_finality_proof: Option<(P::Header, P::FinalityProof)>,
+) -> Option<(P::Header, P::FinalityProof)> {
+	if unjustified_headers.is_empty() || recent_finality_proofs.is_empty() {
+		return selected_finality_proof;
+	}
 
-	Ok(selected_finality_proof)
-}
+	const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed";
 
-/// Remove headers from `unjustified_headers` collection with number lower or equal than `justified_header_number`.
-///
-/// Returns the header that matches `justified_header_number` (if any).
-pub(crate) fn prune_unjustified_headers<P: FinalitySyncPipeline>(
-	justified_header_number: P::Number,
-	unjustified_headers: &mut UnjustifiedHeaders<P>,
-) -> Option<P::Header> {
-	prune_ordered_vec(justified_header_number, unjustified_headers, usize::MAX, |header| {
-		header.number()
-	})
+	// we need proofs for headers in range unjustified_range_begin..=unjustified_range_end
+	let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number();
+	let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number();
+
+	// we have proofs for headers in range buffered_range_begin..=buffered_range_end
+	let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0;
+	let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0;
+
+	// we have two ranges => find intersection
+	let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin);
+	let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end);
+	let intersection = intersection_begin..=intersection_end;
+
+	// find last proof from intersection
+	let selected_finality_proof_index = recent_finality_proofs
+		.binary_search_by_key(intersection.end(), |(number, _)| *number)
+		.unwrap_or_else(|index| index.saturating_sub(1));
+	let (selected_header_number, finality_proof) = &recent_finality_proofs[selected_finality_proof_index];
+	if !intersection.contains(selected_header_number) {
+		return selected_finality_proof;
+	}
+
+	// now remove all obsolete headers and extract selected header
+	let selected_header_position = unjustified_headers
+		.binary_search_by_key(selected_header_number, |header| header.number())
+		.expect("unjustified_headers contain all headers from intersection; qed");
+	let selected_header = unjustified_headers.swap_remove(selected_header_position);
+	Some((selected_header, finality_proof.clone()))
 }
 
 pub(crate) fn prune_recent_finality_proofs<P: FinalitySyncPipeline>(
@@ -511,45 +572,21 @@ pub(crate) fn prune_recent_finality_proofs<P: FinalitySyncPipeline>(
 	recent_finality_proofs: &mut FinalityProofs<P>,
 	recent_finality_proofs_limit: usize,
 ) {
-	prune_ordered_vec(
-		justified_header_number,
-		recent_finality_proofs,
-		recent_finality_proofs_limit,
-		|(header_number, _)| *header_number,
+	let position =
+		recent_finality_proofs.binary_search_by_key(&justified_header_number, |(header_number, _)| *header_number);
+
+	// remove all obsolete elements
+	*recent_finality_proofs = recent_finality_proofs.split_off(
+		position
+			.map(|position| position + 1)
+			.unwrap_or_else(|position| position),
 	);
-}
-
-fn prune_ordered_vec<T, Number: relay_utils::BlockNumberBase>(
-	header_number: Number,
-	ordered_vec: &mut Vec<T>,
-	maximal_vec_size: usize,
-	extract_header_number: impl Fn(&T) -> Number,
-) -> Option<T> {
-	let position = ordered_vec.binary_search_by_key(&header_number, extract_header_number);
-
-	// first extract element we're interested in
-	let extracted_element = match position {
-		Ok(position) => {
-			let updated_vec = ordered_vec.split_off(position + 1);
-			let extracted_element = ordered_vec.pop().expect(
-				"binary_search_by_key has returned Ok(); so there's element at `position`;\
-					we're splitting vec at `position+1`; so we have pruned at least 1 element;\
-					qed",
-			);
-			*ordered_vec = updated_vec;
-			Some(extracted_element)
-		}
-		Err(position) => {
-			*ordered_vec = ordered_vec.split_off(position);
-			None
-		}
-	};
 
 	// now - limit vec by size
-	let split_index = ordered_vec.len().saturating_sub(maximal_vec_size);
-	*ordered_vec = ordered_vec.split_off(split_index);
-
-	extracted_element
+	let split_index = recent_finality_proofs
+		.len()
+		.saturating_sub(recent_finality_proofs_limit);
+	*recent_finality_proofs = recent_finality_proofs.split_off(split_index);
 }
 
 fn print_sync_progress<P: FinalitySyncPipeline>(
diff --git a/bridges/relays/finality-relay/src/finality_loop_tests.rs b/bridges/relays/finality-relay/src/finality_loop_tests.rs
index 5dfe8edd212..53f5225ab7e 100644
--- a/bridges/relays/finality-relay/src/finality_loop_tests.rs
+++ b/bridges/relays/finality-relay/src/finality_loop_tests.rs
@@ -19,8 +19,8 @@
 #![cfg(test)]
 
 use crate::finality_loop::{
-	prune_recent_finality_proofs, prune_unjustified_headers, run, FinalityProofs, FinalitySyncParams, SourceClient,
-	TargetClient, UnjustifiedHeaders,
+	prune_recent_finality_proofs, read_finality_proofs_from_stream, run, select_better_recent_finality_proof,
+	FinalityProofs, FinalitySyncParams, SourceClient, TargetClient,
 };
 use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader};
 
@@ -71,10 +71,10 @@ impl SourceHeader<TestNumber> for TestSourceHeader {
 }
 
 #[derive(Debug, Clone, PartialEq)]
-struct TestFinalityProof(Option<TestNumber>);
+struct TestFinalityProof(TestNumber);
 
 impl FinalityProof<TestNumber> for TestFinalityProof {
-	fn target_header_number(&self) -> Option<TestNumber> {
+	fn target_header_number(&self) -> TestNumber {
 		self.0
 	}
 }
@@ -176,14 +176,14 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync
 		source_best_block_number: 10,
 		source_headers: vec![
 			(6, (TestSourceHeader(false, 6), None)),
-			(7, (TestSourceHeader(false, 7), Some(TestFinalityProof(Some(7))))),
-			(8, (TestSourceHeader(true, 8), Some(TestFinalityProof(Some(8))))),
-			(9, (TestSourceHeader(false, 9), Some(TestFinalityProof(Some(9))))),
+			(7, (TestSourceHeader(false, 7), Some(TestFinalityProof(7)))),
+			(8, (TestSourceHeader(true, 8), Some(TestFinalityProof(8)))),
+			(9, (TestSourceHeader(false, 9), Some(TestFinalityProof(9)))),
 			(10, (TestSourceHeader(false, 10), None)),
 		]
 		.into_iter()
 		.collect(),
-		source_proofs: vec![TestFinalityProof(Some(12)), TestFinalityProof(Some(14))],
+		source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)],
 
 		target_best_block_number: 5,
 		target_headers: vec![],
@@ -222,22 +222,22 @@ fn finality_sync_loop_works() {
 		// header#9 has persistent finality proof, but it isn't mandatory => it is submitted, because
 		//   there are no more persistent finality proofs
 		//
-		// once this ^^^ is done, we generate more blocks && read proof for blocks 12, 14 and 16 from the stream
-		// but we only submit proof for 16
-		//
-		// proof for block 15 is ignored - we haven't managed to decode it
+		// once this ^^^ is done, we generate more blocks && read proof for blocks 12 and 14 from the stream
 		if data.target_best_block_number == 9 {
-			data.source_best_block_number = 17;
+			data.source_best_block_number = 14;
 			data.source_headers.insert(11, (TestSourceHeader(false, 11), None));
 			data.source_headers
-				.insert(12, (TestSourceHeader(false, 12), Some(TestFinalityProof(Some(12)))));
+				.insert(12, (TestSourceHeader(false, 12), Some(TestFinalityProof(12))));
 			data.source_headers.insert(13, (TestSourceHeader(false, 13), None));
 			data.source_headers
-				.insert(14, (TestSourceHeader(false, 14), Some(TestFinalityProof(Some(14)))));
-			data.source_headers
-				.insert(15, (TestSourceHeader(false, 15), Some(TestFinalityProof(None))));
+				.insert(14, (TestSourceHeader(false, 14), Some(TestFinalityProof(14))));
+		}
+		// once this ^^^ is done, we generate more blocks && read persistent proof for block 16
+		if data.target_best_block_number == 14 {
+			data.source_best_block_number = 17;
+			data.source_headers.insert(15, (TestSourceHeader(false, 15), None));
 			data.source_headers
-				.insert(16, (TestSourceHeader(false, 16), Some(TestFinalityProof(Some(16)))));
+				.insert(16, (TestSourceHeader(false, 16), Some(TestFinalityProof(16))));
 			data.source_headers.insert(17, (TestSourceHeader(false, 17), None));
 		}
 
@@ -247,67 +247,132 @@ fn finality_sync_loop_works() {
 	assert_eq!(
 		client_data.target_headers,
 		vec![
-			(TestSourceHeader(true, 8), TestFinalityProof(Some(8))),
-			(TestSourceHeader(false, 9), TestFinalityProof(Some(9))),
-			(TestSourceHeader(false, 16), TestFinalityProof(Some(16))),
+			// before adding 11..14: finality proof for mandatory header#8
+			(TestSourceHeader(true, 8), TestFinalityProof(8)),
+			// before adding 11..14: persistent finality proof for non-mandatory header#9
+			(TestSourceHeader(false, 9), TestFinalityProof(9)),
+			// after adding 11..14: ephemeral finality proof for non-mandatory header#14
+			(TestSourceHeader(false, 14), TestFinalityProof(14)),
+			// after adding 15..17: persistent finality proof for non-mandatory header#16
+			(TestSourceHeader(false, 16), TestFinalityProof(16)),
 		],
 	);
 }
 
 #[test]
-fn prune_unjustified_headers_works() {
-	let original_unjustified_headers: UnjustifiedHeaders<TestFinalitySyncPipeline> = vec![
-		TestSourceHeader(false, 10),
-		TestSourceHeader(false, 13),
-		TestSourceHeader(false, 15),
-		TestSourceHeader(false, 17),
-		TestSourceHeader(false, 19),
-	]
-	.into_iter()
-	.collect();
+fn select_better_recent_finality_proof_works() {
+	// if there are no unjustified headers, nothing is changed
+	assert_eq!(
+		select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
+			&[(5, TestFinalityProof(5))],
+			&mut vec![],
+			Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
+		),
+		Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
+	);
 
-	// when header is in the collection
-	let mut unjustified_headers = original_unjustified_headers.clone();
+	// if there are no recent finality proofs, nothing is changed
 	assert_eq!(
-		prune_unjustified_headers::<TestFinalitySyncPipeline>(10, &mut unjustified_headers),
-		Some(TestSourceHeader(false, 10)),
+		select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
+			&[],
+			&mut vec![TestSourceHeader(false, 5)],
+			Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
+		),
+		Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
 	);
-	assert_eq!(&original_unjustified_headers[1..], unjustified_headers,);
 
-	// when the header doesn't exist in the collection
-	let mut unjustified_headers = original_unjustified_headers.clone();
+	// if there's no intersection between recent finality proofs and unjustified headers, nothing is changed
+	let mut unjustified_headers = vec![TestSourceHeader(false, 9), TestSourceHeader(false, 10)];
 	assert_eq!(
-		prune_unjustified_headers::<TestFinalitySyncPipeline>(11, &mut unjustified_headers),
-		None,
+		select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
+			&[(1, TestFinalityProof(1)), (4, TestFinalityProof(4))],
+			&mut unjustified_headers,
+			Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
+		),
+		Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
 	);
-	assert_eq!(&original_unjustified_headers[1..], unjustified_headers,);
 
-	// when last entry is pruned
-	let mut unjustified_headers = original_unjustified_headers.clone();
+	// if there's intersection between recent finality proofs and unjustified headers, but there are no
+	// proofs in this intersection, nothing is changed
+	let mut unjustified_headers = vec![
+		TestSourceHeader(false, 8),
+		TestSourceHeader(false, 9),
+		TestSourceHeader(false, 10),
+	];
 	assert_eq!(
-		prune_unjustified_headers::<TestFinalitySyncPipeline>(19, &mut unjustified_headers),
-		Some(TestSourceHeader(false, 19)),
+		select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
+			&[(7, TestFinalityProof(7)), (11, TestFinalityProof(11))],
+			&mut unjustified_headers,
+			Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
+		),
+		Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
+	);
+	assert_eq!(
+		unjustified_headers,
+		vec![
+			TestSourceHeader(false, 8),
+			TestSourceHeader(false, 9),
+			TestSourceHeader(false, 10)
+		]
 	);
 
-	assert_eq!(&original_unjustified_headers[5..], unjustified_headers,);
+	// if there's intersection between recent finality proofs and unjustified headers and there's
+	// a proof in this intersection:
+	// - this better (last from intersection) proof is selected;
+	// - 'obsolete' unjustified headers are pruned.
+	let mut unjustified_headers = vec![
+		TestSourceHeader(false, 8),
+		TestSourceHeader(false, 9),
+		TestSourceHeader(false, 10),
+	];
+	assert_eq!(
+		select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
+			&[(7, TestFinalityProof(7)), (9, TestFinalityProof(9))],
+			&mut unjustified_headers,
+			Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
+		),
+		Some((TestSourceHeader(false, 9), TestFinalityProof(9))),
+	);
+}
 
-	// when we try and prune past last entry
-	let mut unjustified_headers = original_unjustified_headers.clone();
+#[test]
+fn read_finality_proofs_from_stream_works() {
+	// when stream is currently empty, nothing is changed
+	let mut recent_finality_proofs = vec![(1, TestFinalityProof(1))];
+	let mut stream = futures::stream::pending().into();
+	read_finality_proofs_from_stream::<TestFinalitySyncPipeline, _>(&mut stream, &mut recent_finality_proofs);
+	assert_eq!(recent_finality_proofs, vec![(1, TestFinalityProof(1))]);
+	assert_eq!(stream.needs_restart, false);
+
+	// when stream has entry with target, it is added to the recent proofs container
+	let mut stream = futures::stream::iter(vec![TestFinalityProof(4)])
+		.chain(futures::stream::pending())
+		.into();
+	read_finality_proofs_from_stream::<TestFinalitySyncPipeline, _>(&mut stream, &mut recent_finality_proofs);
 	assert_eq!(
-		prune_unjustified_headers::<TestFinalitySyncPipeline>(20, &mut unjustified_headers),
-		None,
+		recent_finality_proofs,
+		vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]
+	);
+	assert_eq!(stream.needs_restart, false);
+
+	// when stream has ended, we'll need to restart it
+	let mut stream = futures::stream::empty().into();
+	read_finality_proofs_from_stream::<TestFinalitySyncPipeline, _>(&mut stream, &mut recent_finality_proofs);
+	assert_eq!(
+		recent_finality_proofs,
+		vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]
 	);
-	assert_eq!(&original_unjustified_headers[5..], unjustified_headers,);
+	assert_eq!(stream.needs_restart, true);
 }
 
 #[test]
 fn prune_recent_finality_proofs_works() {
 	let original_recent_finality_proofs: FinalityProofs<TestFinalitySyncPipeline> = vec![
-		(10, TestFinalityProof(Some(10))),
-		(13, TestFinalityProof(Some(13))),
-		(15, TestFinalityProof(Some(15))),
-		(17, TestFinalityProof(Some(17))),
-		(19, TestFinalityProof(Some(19))),
+		(10, TestFinalityProof(10)),
+		(13, TestFinalityProof(13)),
+		(15, TestFinalityProof(15)),
+		(17, TestFinalityProof(17)),
+		(19, TestFinalityProof(19)),
 	]
 	.into_iter()
 	.collect();
diff --git a/bridges/relays/finality-relay/src/lib.rs b/bridges/relays/finality-relay/src/lib.rs
index e9d946e27f1..a246e4bd954 100644
--- a/bridges/relays/finality-relay/src/lib.rs
+++ b/bridges/relays/finality-relay/src/lib.rs
@@ -53,8 +53,6 @@ pub trait SourceHeader<Number>: Clone + Debug + PartialEq + Send + Sync {
 
 /// Abstract finality proof that is justifying block finality.
 pub trait FinalityProof<Number>: Clone + Send + Sync + Debug {
-	/// Return header id that this proof is generated for.
-	///
-	/// None is returned if proof is invalid from relayer PoV.
-	fn target_header_number(&self) -> Option<Number>;
+	/// Return number of header that this proof is generated for.
+	fn target_header_number(&self) -> Number;
 }
diff --git a/bridges/relays/substrate-client/src/finality_source.rs b/bridges/relays/substrate-client/src/finality_source.rs
index 2c76619e867..18293efa128 100644
--- a/bridges/relays/substrate-client/src/finality_source.rs
+++ b/bridges/relays/substrate-client/src/finality_source.rs
@@ -22,6 +22,7 @@ use crate::error::Error;
 use crate::sync_header::SyncHeader;
 
 use async_trait::async_trait;
+use bp_header_chain::justification::decode_justification_target;
 use finality_relay::{FinalityProof, FinalitySyncPipeline, SourceClient, SourceHeader};
 use futures::stream::{unfold, Stream, StreamExt};
 use relay_utils::relay_loop::Client as RelayClient;
@@ -30,26 +31,23 @@ use std::{marker::PhantomData, pin::Pin};
 
 /// Wrapped raw Justification.
 #[derive(Debug, Clone)]
-pub struct Justification<Header> {
+pub struct Justification<Number> {
+	/// Header number decoded from the [`raw_justification`].
+	target_header_number: Number,
+	/// Raw, encoded justification bytes.
 	raw_justification: sp_runtime::Justification,
-	_phantom: PhantomData<Header>,
 }
 
-impl<Header> Justification<Header> {
+impl<Number> Justification<Number> {
 	/// Extract raw justification.
 	pub fn into_inner(self) -> sp_runtime::Justification {
 		self.raw_justification
 	}
 }
 
-impl<Header> FinalityProof<Header::Number> for Justification<Header>
-where
-	Header: HeaderT,
-{
-	fn target_header_number(&self) -> Option<Header::Number> {
-		bp_header_chain::justification::decode_justification_target::<Header>(&self.raw_justification)
-			.ok()
-			.map(|(_, number)| number)
+impl<Number: relay_utils::BlockNumberBase> FinalityProof<Number> for Justification<Number> {
+	fn target_header_number(&self) -> Number {
+		self.target_header_number
 	}
 }
 
@@ -96,11 +94,11 @@ where
 		Hash = C::Hash,
 		Number = C::BlockNumber,
 		Header = SyncHeader<C::Header>,
-		FinalityProof = Justification<C::Header>,
+		FinalityProof = Justification<C::BlockNumber>,
 	>,
 	P::Header: SourceHeader<C::BlockNumber>,
 {
-	type FinalityProofsStream = Pin<Box<dyn Stream<Item = Justification<C::Header>>>>;
+	type FinalityProofsStream = Pin<Box<dyn Stream<Item = Justification<C::BlockNumber>>>>;
 
 	async fn best_finalized_block_number(&self) -> Result<P::Number, Error> {
 		// we **CAN** continue to relay finality proofs if source node is out of sync, because
@@ -122,8 +120,8 @@ where
 				.justification()
 				.cloned()
 				.map(|raw_justification| Justification {
+					target_header_number: number,
 					raw_justification,
-					_phantom: Default::default(),
 				}),
 		))
 	}
@@ -132,14 +130,31 @@ where
 		Ok(unfold(
 			self.client.clone().subscribe_justifications().await?,
 			move |mut subscription| async move {
-				let next_justification = subscription.next().await?;
-				Some((
-					Justification {
-						raw_justification: next_justification.0,
-						_phantom: Default::default(),
-					},
-					subscription,
-				))
+				loop {
+					let next_justification = subscription.next().await?;
+					let decoded_target = decode_justification_target::<C::Header>(&next_justification.0);
+					let target_header_number = match decoded_target {
+						Ok((_, number)) => number,
+						Err(err) => {
+							log::error!(
+								target: "bridge",
+								"Failed to decode justification target from the {} justifications stream: {:?}",
+								P::SOURCE_NAME,
+								err,
+							);
+
+							continue;
+						}
+					};
+
+					return Some((
+						Justification {
+							target_header_number,
+							raw_justification: next_justification.0,
+						},
+						subscription,
+					));
+				}
 			},
 		)
 		.boxed())
diff --git a/bridges/relays/substrate/src/finality_pipeline.rs b/bridges/relays/substrate/src/finality_pipeline.rs
index 7ec592d5dee..21865b6c448 100644
--- a/bridges/relays/substrate/src/finality_pipeline.rs
+++ b/bridges/relays/substrate/src/finality_pipeline.rs
@@ -89,7 +89,7 @@ where
 	type Hash = HashOf<SourceChain>;
 	type Number = BlockNumberOf<SourceChain>;
 	type Header = SyncHeader<SourceChain::Header>;
-	type FinalityProof = Justification<SourceChain::Header>;
+	type FinalityProof = Justification<SourceChain::BlockNumber>;
 }
 
 /// Run Substrate-to-Substrate finality sync.
@@ -103,7 +103,7 @@ pub async fn run<SourceChain, TargetChain, P>(
 		Hash = HashOf<SourceChain>,
 		Number = BlockNumberOf<SourceChain>,
 		Header = SyncHeader<SourceChain::Header>,
-		FinalityProof = Justification<SourceChain::Header>,
+		FinalityProof = Justification<SourceChain::BlockNumber>,
 	>,
 	SourceChain: Clone + Chain,
 	BlockNumberOf<SourceChain>: BlockNumberBase,
diff --git a/bridges/relays/substrate/src/millau_headers_to_rialto.rs b/bridges/relays/substrate/src/millau_headers_to_rialto.rs
index 889676d673b..f84eee03a9f 100644
--- a/bridges/relays/substrate/src/millau_headers_to_rialto.rs
+++ b/bridges/relays/substrate/src/millau_headers_to_rialto.rs
@@ -39,7 +39,7 @@ impl SubstrateFinalitySyncPipeline for MillauFinalityToRialto {
 	async fn make_submit_finality_proof_transaction(
 		&self,
 		header: MillauSyncHeader,
-		proof: Justification<bp_millau::Header>,
+		proof: Justification<bp_millau::BlockNumber>,
 	) -> Result<Self::SignedTransaction, SubstrateError> {
 		let account_id = self.target_sign.signer.public().as_array_ref().clone().into();
 		let nonce = self.target_client.next_account_index(account_id).await?;
diff --git a/bridges/relays/substrate/src/rialto_headers_to_millau.rs b/bridges/relays/substrate/src/rialto_headers_to_millau.rs
index 12b2086728a..5a9bbb12133 100644
--- a/bridges/relays/substrate/src/rialto_headers_to_millau.rs
+++ b/bridges/relays/substrate/src/rialto_headers_to_millau.rs
@@ -39,7 +39,7 @@ impl SubstrateFinalitySyncPipeline for RialtoFinalityToMillau {
 	async fn make_submit_finality_proof_transaction(
 		&self,
 		header: RialtoSyncHeader,
-		proof: Justification<bp_rialto::Header>,
+		proof: Justification<bp_rialto::BlockNumber>,
 	) -> Result<Self::SignedTransaction, SubstrateError> {
 		let account_id = self.target_sign.signer.public().as_array_ref().clone().into();
 		let nonce = self.target_client.next_account_index(account_id).await?;
-- 
GitLab