From 8b262ea60b8f2cc704f401e0fc9928d5196a2098 Mon Sep 17 00:00:00 2001
From: Svyatoslav Nikolsky <svyatonik@gmail.com>
Date: Fri, 21 Apr 2023 09:56:22 +0300
Subject: [PATCH] submit lane unblock transactions from relay (#2030)

* submit lane unblock transactions from relay

* moved body of select_nonces_to_deliver to the separate select_race_action

* extracted latest_confirmed_nonce_at_source method

* return Option<RaceAction> from select_race_action

* make required_source_header_at_target async

* remove extra argument from required_source_header_at_target

* small fixes in tests

* Revert "return Option<RaceAction> from select_race_action"

This reverts commit 9f13dbfae39a5a45564550e8c89b10a524a68729.

* implement required_source_header_at_target using what-if approach

* fix compilation

* fmt

* clippy

* moved some code to the can_submit_transaction_with
---
 bridges/relays/client-substrate/src/chain.rs  |   2 +-
 bridges/relays/lib-substrate-relay/src/lib.rs |   2 +-
 bridges/relays/messages/Cargo.toml            |   1 +
 .../relays/messages/src/message_lane_loop.rs  |  30 +-
 .../messages/src/message_race_delivery.rs     | 547 ++++++++++++------
 .../relays/messages/src/message_race_loop.rs  |  34 +-
 .../messages/src/message_race_strategy.rs     |   6 +-
 7 files changed, 428 insertions(+), 194 deletions(-)

diff --git a/bridges/relays/client-substrate/src/chain.rs b/bridges/relays/client-substrate/src/chain.rs
index 8c7dc00aa67..54c9ad4f3b6 100644
--- a/bridges/relays/client-substrate/src/chain.rs
+++ b/bridges/relays/client-substrate/src/chain.rs
@@ -55,7 +55,7 @@ pub trait Chain: ChainBase + Clone {
 	/// Block type.
 	type SignedBlock: Member + Serialize + DeserializeOwned + BlockWithJustification<Self::Header>;
 	/// The aggregated `Call` type.
-	type Call: Clone + Codec + Debug + Send;
+	type Call: Clone + Codec + Debug + Send + Sync;
 }
 
 /// Substrate-based relay chain that supports parachains.
diff --git a/bridges/relays/lib-substrate-relay/src/lib.rs b/bridges/relays/lib-substrate-relay/src/lib.rs
index 37a4d602e59..f9bd80d5079 100644
--- a/bridges/relays/lib-substrate-relay/src/lib.rs
+++ b/bridges/relays/lib-substrate-relay/src/lib.rs
@@ -91,7 +91,7 @@ impl<AccountId> TaggedAccount<AccountId> {
 }
 
 /// Batch call builder.
-pub trait BatchCallBuilder<Call>: Clone + Send {
+pub trait BatchCallBuilder<Call>: Clone + Send + Sync {
 	/// Create batch call from given calls vector.
 	fn build_batch_call(&self, _calls: Vec<Call>) -> Call;
 }
diff --git a/bridges/relays/messages/Cargo.toml b/bridges/relays/messages/Cargo.toml
index 8c4b8257d5a..a45b2728105 100644
--- a/bridges/relays/messages/Cargo.toml
+++ b/bridges/relays/messages/Cargo.toml
@@ -8,6 +8,7 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
 [dependencies]
 async-std = { version = "1.6.5", features = ["attributes"] }
 async-trait = "0.1"
+env_logger = "0.10"
 futures = "0.3.28"
 hex = "0.4"
 log = "0.4.17"
diff --git a/bridges/relays/messages/src/message_lane_loop.rs b/bridges/relays/messages/src/message_lane_loop.rs
index ba86f05ffd3..b681d86d2ae 100644
--- a/bridges/relays/messages/src/message_lane_loop.rs
+++ b/bridges/relays/messages/src/message_lane_loop.rs
@@ -111,7 +111,7 @@ pub struct NoncesSubmitArtifacts<T> {
 
 /// Batch transaction that already submit some headers and needs to be extended with
 /// messages/delivery proof before sending.
-pub trait BatchTransaction<HeaderId>: Debug + Send {
+pub trait BatchTransaction<HeaderId>: Debug + Send + Sync {
 	/// Header that was required in the original call and which is bundled within this
 	/// batch transaction.
 	fn required_header_id(&self) -> HeaderId;
@@ -622,11 +622,19 @@ pub(crate) mod tests {
 	}
 
 	impl TestClientData {
-		fn receive_messages(&mut self, proof: TestMessagesProof) {
+		fn receive_messages(
+			&mut self,
+			maybe_batch_tx: Option<TestMessagesBatchTransaction>,
+			proof: TestMessagesProof,
+		) {
 			self.target_state.best_self =
 				HeaderId(self.target_state.best_self.0 + 1, self.target_state.best_self.1 + 1);
 			self.target_state.best_finalized_self = self.target_state.best_self;
 			self.target_latest_received_nonce = *proof.0.end();
+			if let Some(maybe_batch_tx) = maybe_batch_tx {
+				self.target_state.best_finalized_peer_at_best_self =
+					Some(maybe_batch_tx.required_header_id());
+			}
 			if let Some(target_latest_confirmed_received_nonce) = proof.1 {
 				self.target_latest_confirmed_received_nonce =
 					target_latest_confirmed_received_nonce;
@@ -634,10 +642,18 @@ pub(crate) mod tests {
 			self.submitted_messages_proofs.push(proof);
 		}
 
-		fn receive_messages_delivery_proof(&mut self, proof: TestMessagesReceivingProof) {
+		fn receive_messages_delivery_proof(
+			&mut self,
+			maybe_batch_tx: Option<TestConfirmationBatchTransaction>,
+			proof: TestMessagesReceivingProof,
+		) {
 			self.source_state.best_self =
 				HeaderId(self.source_state.best_self.0 + 1, self.source_state.best_self.1 + 1);
 			self.source_state.best_finalized_self = self.source_state.best_self;
+			if let Some(maybe_batch_tx) = maybe_batch_tx {
+				self.source_state.best_finalized_peer_at_best_self =
+					Some(maybe_batch_tx.required_header_id());
+			}
 			self.submitted_messages_receiving_proofs.push(proof);
 			self.source_latest_confirmed_received_nonce = proof;
 		}
@@ -760,13 +776,13 @@ pub(crate) mod tests {
 
 		async fn submit_messages_receiving_proof(
 			&self,
-			_maybe_batch_tx: Option<Self::BatchTransaction>,
+			maybe_batch_tx: Option<Self::BatchTransaction>,
 			_generated_at_block: TargetHeaderIdOf<TestMessageLane>,
 			proof: TestMessagesReceivingProof,
 		) -> Result<Self::TransactionTracker, TestError> {
 			let mut data = self.data.lock();
 			(self.tick)(&mut data);
-			data.receive_messages_delivery_proof(proof);
+			data.receive_messages_delivery_proof(maybe_batch_tx, proof);
 			(self.post_tick)(&mut data);
 			Ok(TestTransactionTracker(data.source_tracked_transaction_status))
 		}
@@ -885,7 +901,7 @@ pub(crate) mod tests {
 
 		async fn submit_messages_proof(
 			&self,
-			_maybe_batch_tx: Option<Self::BatchTransaction>,
+			maybe_batch_tx: Option<Self::BatchTransaction>,
 			_generated_at_header: SourceHeaderIdOf<TestMessageLane>,
 			nonces: RangeInclusive<MessageNonce>,
 			proof: TestMessagesProof,
@@ -895,7 +911,7 @@ pub(crate) mod tests {
 			if data.is_target_fails {
 				return Err(TestError)
 			}
-			data.receive_messages(proof);
+			data.receive_messages(maybe_batch_tx, proof);
 			(self.post_tick)(&mut data);
 			Ok(NoncesSubmitArtifacts {
 				nonces,
diff --git a/bridges/relays/messages/src/message_race_delivery.rs b/bridges/relays/messages/src/message_race_delivery.rs
index 7a245858b32..4af02ba2b56 100644
--- a/bridges/relays/messages/src/message_race_delivery.rs
+++ b/bridges/relays/messages/src/message_race_delivery.rs
@@ -290,7 +290,185 @@ impl<P: MessageLane, SC, TC> std::fmt::Debug for MessageDeliveryStrategy<P, SC,
 	}
 }
 
-impl<P: MessageLane, SC, TC> MessageDeliveryStrategy<P, SC, TC> {
+impl<P: MessageLane, SC, TC> MessageDeliveryStrategy<P, SC, TC>
+where
+	P: MessageLane,
+	SC: MessageLaneSourceClient<P>,
+	TC: MessageLaneTargetClient<P>,
+{
+	/// Returns true if some race action can be selected (with `select_race_action`) at given
+	/// `best_finalized_source_header_id_at_best_target` source header at target.
+	async fn can_submit_transaction_with<
+		RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>,
+	>(
+		&self,
+		mut race_state: RS,
+		maybe_best_finalized_source_header_id_at_best_target: Option<SourceHeaderIdOf<P>>,
+	) -> bool {
+		if let Some(best_finalized_source_header_id_at_best_target) =
+			maybe_best_finalized_source_header_id_at_best_target
+		{
+			race_state.set_best_finalized_source_header_id_at_best_target(
+				best_finalized_source_header_id_at_best_target,
+			);
+
+			return self.select_race_action(race_state).await.is_some()
+		}
+
+		false
+	}
+
+	async fn select_race_action<RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>>(
+		&self,
+		race_state: RS,
+	) -> Option<(RangeInclusive<MessageNonce>, MessageProofParameters)> {
+		let best_target_nonce = self.strategy.best_at_target()?;
+		let best_finalized_source_header_id_at_best_target =
+			race_state.best_finalized_source_header_id_at_best_target()?;
+		let latest_confirmed_nonce_at_source = self
+			.latest_confirmed_nonce_at_source(&best_finalized_source_header_id_at_best_target)
+			.unwrap_or(best_target_nonce);
+		let target_nonces = self.target_nonces.as_ref()?;
+
+		// There's additional condition in the message delivery race: target would reject messages
+		// if there are too much unconfirmed messages at the inbound lane.
+
+		// The receiving race is responsible to deliver confirmations back to the source chain. So
+		// if there's a lot of unconfirmed messages, let's wait until it'll be able to do its job.
+		let latest_received_nonce_at_target = target_nonces.latest_nonce;
+		let confirmations_missing =
+			latest_received_nonce_at_target.checked_sub(latest_confirmed_nonce_at_source);
+		match confirmations_missing {
+			Some(confirmations_missing)
+				if confirmations_missing >= self.max_unconfirmed_nonces_at_target =>
+			{
+				log::debug!(
+					target: "bridge",
+					"Cannot deliver any more messages from {} to {}. Too many unconfirmed nonces \
+					at target: target.latest_received={:?}, source.latest_confirmed={:?}, max={:?}",
+					MessageDeliveryRace::<P>::source_name(),
+					MessageDeliveryRace::<P>::target_name(),
+					latest_received_nonce_at_target,
+					latest_confirmed_nonce_at_source,
+					self.max_unconfirmed_nonces_at_target,
+				);
+
+				return None
+			},
+			_ => (),
+		}
+
+		// Ok - we may have new nonces to deliver. But target may still reject new messages, because
+		// we haven't notified it that (some) messages have been confirmed. So we may want to
+		// include updated `source.latest_confirmed` in the proof.
+		//
+		// Important note: we're including outbound state lane proof whenever there are unconfirmed
+		// nonces on the target chain. Other strategy is to include it only if it's absolutely
+		// necessary.
+		let latest_confirmed_nonce_at_target = target_nonces.nonces_data.confirmed_nonce;
+		let outbound_state_proof_required =
+			latest_confirmed_nonce_at_target < latest_confirmed_nonce_at_source;
+
+		// The target node would also reject messages if there are too many entries in the
+		// "unrewarded relayers" set. If we are unable to prove new rewards to the target node, then
+		// we should wait for confirmations race.
+		let unrewarded_limit_reached =
+			target_nonces.nonces_data.unrewarded_relayers.unrewarded_relayer_entries >=
+				self.max_unrewarded_relayer_entries_at_target ||
+				target_nonces.nonces_data.unrewarded_relayers.total_messages >=
+					self.max_unconfirmed_nonces_at_target;
+		if unrewarded_limit_reached {
+			// so there are already too many unrewarded relayer entries in the set
+			//
+			// => check if we can prove enough rewards. If not, we should wait for more rewards to
+			// be paid
+			let number_of_rewards_being_proved =
+				latest_confirmed_nonce_at_source.saturating_sub(latest_confirmed_nonce_at_target);
+			let enough_rewards_being_proved = number_of_rewards_being_proved >=
+				target_nonces.nonces_data.unrewarded_relayers.messages_in_oldest_entry;
+			if !enough_rewards_being_proved {
+				return None
+			}
+		}
+
+		// If we're here, then the confirmations race did its job && sending side now knows that
+		// messages have been delivered. Now let's select nonces that we want to deliver.
+		//
+		// We may deliver at most:
+		//
+		// max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target -
+		// latest_confirmed_nonce_at_target)
+		//
+		// messages in the batch. But since we're including outbound state proof in the batch, then
+		// it may be increased to:
+		//
+		// max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target -
+		// latest_confirmed_nonce_at_source)
+		let future_confirmed_nonce_at_target = if outbound_state_proof_required {
+			latest_confirmed_nonce_at_source
+		} else {
+			latest_confirmed_nonce_at_target
+		};
+		let max_nonces = latest_received_nonce_at_target
+			.checked_sub(future_confirmed_nonce_at_target)
+			.and_then(|diff| self.max_unconfirmed_nonces_at_target.checked_sub(diff))
+			.unwrap_or_default();
+		let max_nonces = std::cmp::min(max_nonces, self.max_messages_in_single_batch);
+		let max_messages_weight_in_single_batch = self.max_messages_weight_in_single_batch;
+		let max_messages_size_in_single_batch = self.max_messages_size_in_single_batch;
+		let lane_source_client = self.lane_source_client.clone();
+		let lane_target_client = self.lane_target_client.clone();
+
+		// select nonces from nonces, available for delivery
+		let selected_nonces = match self.strategy.available_source_queue_indices(race_state) {
+			Some(available_source_queue_indices) => {
+				let source_queue = self.strategy.source_queue();
+				let reference = RelayMessagesBatchReference {
+					max_messages_in_this_batch: max_nonces,
+					max_messages_weight_in_single_batch,
+					max_messages_size_in_single_batch,
+					lane_source_client: lane_source_client.clone(),
+					lane_target_client: lane_target_client.clone(),
+					best_target_nonce,
+					nonces_queue: source_queue.clone(),
+					nonces_queue_range: available_source_queue_indices,
+					metrics: self.metrics_msg.clone(),
+				};
+
+				MessageRaceLimits::decide(reference).await
+			},
+			None => {
+				// we still may need to submit delivery transaction with zero messages to
+				// unblock the lane. But it'll only be accepted if the lane is blocked
+				// (i.e. when `unrewarded_limit_reached` is `true`)
+				None
+			},
+		};
+
+		// check if we need unblocking transaction and we may submit it
+		#[allow(clippy::reversed_empty_ranges)]
+		let selected_nonces = match selected_nonces {
+			Some(selected_nonces) => selected_nonces,
+			None if unrewarded_limit_reached && outbound_state_proof_required => 1..=0,
+			_ => return None,
+		};
+
+		let dispatch_weight = self.dispatch_weight_for_range(&selected_nonces);
+		Some((
+			selected_nonces,
+			MessageProofParameters { outbound_state_proof_required, dispatch_weight },
+		))
+	}
+
+	/// Returns lastest confirmed message at source chain, given source block.
+	fn latest_confirmed_nonce_at_source(&self, at: &SourceHeaderIdOf<P>) -> Option<MessageNonce> {
+		self.latest_confirmed_nonces_at_source
+			.iter()
+			.take_while(|(id, _)| id.0 <= at.0)
+			.last()
+			.map(|(_, nonce)| *nonce)
+	}
+
 	/// Returns total weight of all undelivered messages.
 	fn dispatch_weight_for_range(&self, range: &RangeInclusive<MessageNonce>) -> Weight {
 		self.strategy
@@ -322,9 +500,10 @@ where
 		self.strategy.is_empty()
 	}
 
-	fn required_source_header_at_target<RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>>(
+	async fn required_source_header_at_target<
+		RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>,
+	>(
 		&self,
-		current_best: &SourceHeaderIdOf<P>,
 		race_state: RS,
 	) -> Option<SourceHeaderIdOf<P>> {
 		// we have already submitted something - let's wait until it is mined
@@ -332,32 +511,41 @@ where
 			return None
 		}
 
-		let has_nonces_to_deliver = !self.strategy.is_empty();
-		let header_required_for_messages_delivery =
-			self.strategy.required_source_header_at_target(current_best, race_state);
-		let header_required_for_reward_confirmations_delivery = self
-			.latest_confirmed_nonces_at_source
-			.back()
-			.filter(|(id, nonce)| *nonce != 0 && id.0 > current_best.0)
-			.map(|(id, _)| id.clone());
-		match (
-			has_nonces_to_deliver,
-			header_required_for_messages_delivery,
-			header_required_for_reward_confirmations_delivery,
-		) {
-			// if we need to delver messages and proof-of-delivery-confirmations, then we need to
-			// select the most recent header to avoid extra roundtrips
-			(true, Some(id1), Some(id2)) => Some(if id1.0 > id2.0 { id1 } else { id2 }),
-			// if we only need to deliver messages - fine, let's require some source header
-			//
-			// if we need new header for proof-of-delivery-confirmations - let's also ask for that.
-			// Even though it may require additional header, we'll be sure that we won't block the
-			// lane (sometimes we can't deliver messages without proof-of-delivery-confirmations)
-			(true, a, b) => a.or(b),
-			// we never submit delivery transaction without messages, so if `has_nonces_to_deliver`
-			// if `false`, we don't need any source headers at target
-			(false, _, _) => None,
+		// if we can deliver something using current race state, go on
+		let selected_nonces = self.select_race_action(race_state.clone()).await;
+		if selected_nonces.is_some() {
+			return None
+		}
+
+		// check if we may deliver some messages if we'll relay require source header
+		// to target first
+		let maybe_source_header_for_delivery =
+			self.strategy.source_queue().back().map(|(id, _)| id.clone());
+		if self
+			.can_submit_transaction_with(
+				race_state.clone(),
+				maybe_source_header_for_delivery.clone(),
+			)
+			.await
+		{
+			return maybe_source_header_for_delivery
+		}
+
+		// ok, we can't delivery anything even if we relay some source blocks first. But maybe
+		// the lane is blocked and we need to submit unblock transaction?
+		let maybe_source_header_for_reward_confirmation =
+			self.latest_confirmed_nonces_at_source.back().map(|(id, _)| id.clone());
+		if self
+			.can_submit_transaction_with(
+				race_state.clone(),
+				maybe_source_header_for_reward_confirmation.clone(),
+			)
+			.await
+		{
+			return maybe_source_header_for_reward_confirmation
 		}
+
+		None
 	}
 
 	fn best_at_source(&self) -> Option<MessageNonce> {
@@ -436,128 +624,7 @@ where
 		&self,
 		race_state: RS,
 	) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> {
-		let best_target_nonce = self.strategy.best_at_target()?;
-		let best_finalized_source_header_id_at_best_target =
-			race_state.best_finalized_source_header_id_at_best_target()?;
-		let latest_confirmed_nonce_at_source = self
-			.latest_confirmed_nonces_at_source
-			.iter()
-			.take_while(|(id, _)| id.0 <= best_finalized_source_header_id_at_best_target.0)
-			.last()
-			.map(|(_, nonce)| *nonce)
-			.unwrap_or(best_target_nonce);
-		let target_nonces = self.target_nonces.as_ref()?;
-
-		// There's additional condition in the message delivery race: target would reject messages
-		// if there are too much unconfirmed messages at the inbound lane.
-
-		// The receiving race is responsible to deliver confirmations back to the source chain. So
-		// if there's a lot of unconfirmed messages, let's wait until it'll be able to do its job.
-		let latest_received_nonce_at_target = target_nonces.latest_nonce;
-		let confirmations_missing =
-			latest_received_nonce_at_target.checked_sub(latest_confirmed_nonce_at_source);
-		match confirmations_missing {
-			Some(confirmations_missing)
-				if confirmations_missing >= self.max_unconfirmed_nonces_at_target =>
-			{
-				log::debug!(
-					target: "bridge",
-					"Cannot deliver any more messages from {} to {}. Too many unconfirmed nonces \
-					at target: target.latest_received={:?}, source.latest_confirmed={:?}, max={:?}",
-					MessageDeliveryRace::<P>::source_name(),
-					MessageDeliveryRace::<P>::target_name(),
-					latest_received_nonce_at_target,
-					latest_confirmed_nonce_at_source,
-					self.max_unconfirmed_nonces_at_target,
-				);
-
-				return None
-			},
-			_ => (),
-		}
-
-		// Ok - we may have new nonces to deliver. But target may still reject new messages, because
-		// we haven't notified it that (some) messages have been confirmed. So we may want to
-		// include updated `source.latest_confirmed` in the proof.
-		//
-		// Important note: we're including outbound state lane proof whenever there are unconfirmed
-		// nonces on the target chain. Other strategy is to include it only if it's absolutely
-		// necessary.
-		let latest_confirmed_nonce_at_target = target_nonces.nonces_data.confirmed_nonce;
-		let outbound_state_proof_required =
-			latest_confirmed_nonce_at_target < latest_confirmed_nonce_at_source;
-
-		// The target node would also reject messages if there are too many entries in the
-		// "unrewarded relayers" set. If we are unable to prove new rewards to the target node, then
-		// we should wait for confirmations race.
-		let unrewarded_relayer_entries_limit_reached =
-			target_nonces.nonces_data.unrewarded_relayers.unrewarded_relayer_entries >=
-				self.max_unrewarded_relayer_entries_at_target;
-		if unrewarded_relayer_entries_limit_reached {
-			// so there are already too many unrewarded relayer entries in the set
-			//
-			// => check if we can prove enough rewards. If not, we should wait for more rewards to
-			// be paid
-			let number_of_rewards_being_proved =
-				latest_confirmed_nonce_at_source.saturating_sub(latest_confirmed_nonce_at_target);
-			let enough_rewards_being_proved = number_of_rewards_being_proved >=
-				target_nonces.nonces_data.unrewarded_relayers.messages_in_oldest_entry;
-			if !enough_rewards_being_proved {
-				return None
-			}
-		}
-
-		// If we're here, then the confirmations race did its job && sending side now knows that
-		// messages have been delivered. Now let's select nonces that we want to deliver.
-		//
-		// We may deliver at most:
-		//
-		// max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target -
-		// latest_confirmed_nonce_at_target)
-		//
-		// messages in the batch. But since we're including outbound state proof in the batch, then
-		// it may be increased to:
-		//
-		// max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target -
-		// latest_confirmed_nonce_at_source)
-		let future_confirmed_nonce_at_target = if outbound_state_proof_required {
-			latest_confirmed_nonce_at_source
-		} else {
-			latest_confirmed_nonce_at_target
-		};
-		let max_nonces = latest_received_nonce_at_target
-			.checked_sub(future_confirmed_nonce_at_target)
-			.and_then(|diff| self.max_unconfirmed_nonces_at_target.checked_sub(diff))
-			.unwrap_or_default();
-		let max_nonces = std::cmp::min(max_nonces, self.max_messages_in_single_batch);
-		let max_messages_weight_in_single_batch = self.max_messages_weight_in_single_batch;
-		let max_messages_size_in_single_batch = self.max_messages_size_in_single_batch;
-		let lane_source_client = self.lane_source_client.clone();
-		let lane_target_client = self.lane_target_client.clone();
-
-		let available_source_queue_indices =
-			self.strategy.available_source_queue_indices(race_state)?;
-		let source_queue = self.strategy.source_queue();
-
-		let reference = RelayMessagesBatchReference {
-			max_messages_in_this_batch: max_nonces,
-			max_messages_weight_in_single_batch,
-			max_messages_size_in_single_batch,
-			lane_source_client: lane_source_client.clone(),
-			lane_target_client: lane_target_client.clone(),
-			best_target_nonce,
-			nonces_queue: source_queue.clone(),
-			nonces_queue_range: available_source_queue_indices,
-			metrics: self.metrics_msg.clone(),
-		};
-
-		let selected_nonces = MessageRaceLimits::decide(reference).await?;
-		let dispatch_weight = self.dispatch_weight_for_range(&selected_nonces);
-
-		Some((
-			selected_nonces,
-			MessageProofParameters { outbound_state_proof_required, dispatch_weight },
-		))
+		self.select_race_action(race_state).await
 	}
 }
 
@@ -980,31 +1047,41 @@ mod tests {
 		);
 		// nothing needs to be delivered now and we don't need any new headers
 		assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
-		assert_eq!(strategy.required_source_header_at_target(&header_id(1), state.clone()), None);
-
-		// now let's generate two more nonces [24; 25] at the soruce;
-		strategy.source_nonces_updated(header_id(2), source_nonces(24..=25, 19, 0));
-		//
-		// - so now we'll need to relay source block#2 to be able to accept messages [24; 25].
-		assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
-		assert_eq!(
-			strategy.required_source_header_at_target(&header_id(1), state.clone()),
-			Some(header_id(2))
-		);
+		assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None);
 
-		// let's relay source block#2
+		// block#2 is generated
 		state.best_finalized_source_header_id_at_source = Some(header_id(2));
 		state.best_finalized_source_header_id_at_best_target = Some(header_id(2));
 		state.best_target_header_id = Some(header_id(2));
 		state.best_finalized_target_header_id = Some(header_id(2));
 
+		// now let's generate two more nonces [24; 25] at the source;
+		strategy.source_nonces_updated(header_id(2), source_nonces(24..=25, 19, 0));
+		//
+		// we don't need to relay more headers to target, because messages [20; 23] have
+		// not confirmed to source yet
+		assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
+		assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None);
+
+		// let's relay source block#3
+		state.best_finalized_source_header_id_at_source = Some(header_id(3));
+		state.best_finalized_source_header_id_at_best_target = Some(header_id(3));
+		state.best_target_header_id = Some(header_id(3));
+		state.best_finalized_target_header_id = Some(header_id(3));
+
 		// and ask strategy again => still nothing to deliver, because parallel confirmations
 		// race need to be pushed further
 		assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
-		assert_eq!(strategy.required_source_header_at_target(&header_id(2), state.clone()), None);
+		assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None);
+
+		// let's relay source block#3
+		state.best_finalized_source_header_id_at_source = Some(header_id(4));
+		state.best_finalized_source_header_id_at_best_target = Some(header_id(4));
+		state.best_target_header_id = Some(header_id(4));
+		state.best_finalized_target_header_id = Some(header_id(4));
 
 		// let's confirm messages [20; 23]
-		strategy.source_nonces_updated(header_id(2), source_nonces(24..=25, 23, 0));
+		strategy.source_nonces_updated(header_id(4), source_nonces(24..=25, 23, 0));
 
 		// and ask strategy again => now we have everything required to deliver remaining
 		// [24; 25] nonces and proof of [20; 23] confirmation
@@ -1012,7 +1089,7 @@ mod tests {
 			strategy.select_nonces_to_deliver(state.clone()).await,
 			Some(((24..=25), proof_parameters(true, 2))),
 		);
-		assert_eq!(strategy.required_source_header_at_target(&header_id(2), state), None);
+		assert_eq!(strategy.required_source_header_at_target(state).await, None);
 	}
 
 	#[async_std::test]
@@ -1041,9 +1118,9 @@ mod tests {
 		);
 	}
 
-	#[test]
+	#[async_std::test]
 	#[allow(clippy::reversed_empty_ranges)]
-	fn no_source_headers_required_at_target_if_lanes_are_empty() {
+	async fn no_source_headers_required_at_target_if_lanes_are_empty() {
 		let (state, _) = prepare_strategy();
 		let mut strategy = TestStrategy {
 			max_unrewarded_relayer_entries_at_target: 4,
@@ -1073,7 +1150,7 @@ mod tests {
 			strategy.latest_confirmed_nonces_at_source,
 			VecDeque::from([(source_header_id, 0)])
 		);
-		assert_eq!(strategy.required_source_header_at_target(&source_header_id, state), None);
+		assert_eq!(strategy.required_source_header_at_target(state).await, None);
 	}
 
 	#[async_std::test]
@@ -1159,4 +1236,138 @@ mod tests {
 			)),
 		);
 	}
+
+	#[async_std::test]
+	#[allow(clippy::reversed_empty_ranges)]
+	async fn delivery_race_is_able_to_unblock_lane() {
+		// step 1: messages 20..=23 are delivered from source to target at target block 2
+		fn at_target_block_2_deliver_messages(
+			strategy: &mut TestStrategy,
+			state: &mut TestRaceState,
+			occupied_relayer_slots: MessageNonce,
+			occupied_message_slots: MessageNonce,
+		) {
+			let nonces_at_target = TargetClientNonces {
+				latest_nonce: 23,
+				nonces_data: DeliveryRaceTargetNoncesData {
+					confirmed_nonce: 19,
+					unrewarded_relayers: UnrewardedRelayersState {
+						unrewarded_relayer_entries: occupied_relayer_slots,
+						total_messages: occupied_message_slots,
+						..Default::default()
+					},
+				},
+			};
+
+			state.best_target_header_id = Some(header_id(2));
+			state.best_finalized_target_header_id = Some(header_id(2));
+
+			strategy.best_target_nonces_updated(nonces_at_target.clone(), state);
+			strategy.finalized_target_nonces_updated(nonces_at_target, state);
+		}
+
+		// step 2: delivery of messages 20..=23 is confirmed to the source node at source block 2
+		fn at_source_block_2_deliver_confirmations(
+			strategy: &mut TestStrategy,
+			state: &mut TestRaceState,
+		) {
+			state.best_finalized_source_header_id_at_source = Some(header_id(2));
+
+			strategy.source_nonces_updated(
+				header_id(2),
+				SourceClientNonces { new_nonces: Default::default(), confirmed_nonce: Some(23) },
+			);
+		}
+
+		// step 3: finalize source block 2 at target block 3 and select nonces to deliver
+		async fn at_target_block_3_select_nonces_to_deliver(
+			strategy: &TestStrategy,
+			mut state: TestRaceState,
+		) -> Option<(RangeInclusive<MessageNonce>, MessageProofParameters)> {
+			state.best_finalized_source_header_id_at_best_target = Some(header_id(2));
+			state.best_target_header_id = Some(header_id(3));
+			state.best_finalized_target_header_id = Some(header_id(3));
+
+			strategy.select_nonces_to_deliver(state).await
+		}
+
+		let max_unrewarded_relayer_entries_at_target = 4;
+		let max_unconfirmed_nonces_at_target = 4;
+		let expected_rewards_proof = Some((
+			1..=0,
+			MessageProofParameters {
+				outbound_state_proof_required: true,
+				dispatch_weight: Weight::zero(),
+			},
+		));
+
+		// TODO: also fix + test `required_source_header_at_target`
+
+		// when lane is NOT blocked
+		let (mut state, mut strategy) = prepare_strategy();
+		at_target_block_2_deliver_messages(
+			&mut strategy,
+			&mut state,
+			max_unrewarded_relayer_entries_at_target - 1,
+			max_unconfirmed_nonces_at_target - 1,
+		);
+		at_source_block_2_deliver_confirmations(&mut strategy, &mut state);
+		assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None);
+		assert_eq!(at_target_block_3_select_nonces_to_deliver(&strategy, state).await, None);
+
+		// when lane is blocked by no-relayer-slots in unrewarded relayers vector
+		let (mut state, mut strategy) = prepare_strategy();
+		at_target_block_2_deliver_messages(
+			&mut strategy,
+			&mut state,
+			max_unrewarded_relayer_entries_at_target,
+			max_unconfirmed_nonces_at_target - 1,
+		);
+		at_source_block_2_deliver_confirmations(&mut strategy, &mut state);
+		assert_eq!(
+			strategy.required_source_header_at_target(state.clone()).await,
+			Some(header_id(2))
+		);
+		assert_eq!(
+			at_target_block_3_select_nonces_to_deliver(&strategy, state).await,
+			expected_rewards_proof
+		);
+
+		// when lane is blocked by no-message-slots in unrewarded relayers vector
+		let (mut state, mut strategy) = prepare_strategy();
+		at_target_block_2_deliver_messages(
+			&mut strategy,
+			&mut state,
+			max_unrewarded_relayer_entries_at_target - 1,
+			max_unconfirmed_nonces_at_target,
+		);
+		at_source_block_2_deliver_confirmations(&mut strategy, &mut state);
+		assert_eq!(
+			strategy.required_source_header_at_target(state.clone()).await,
+			Some(header_id(2))
+		);
+		assert_eq!(
+			at_target_block_3_select_nonces_to_deliver(&strategy, state).await,
+			expected_rewards_proof
+		);
+
+		// when lane is blocked by no-message-slots and no-message-slots in unrewarded relayers
+		// vector
+		let (mut state, mut strategy) = prepare_strategy();
+		at_target_block_2_deliver_messages(
+			&mut strategy,
+			&mut state,
+			max_unrewarded_relayer_entries_at_target - 1,
+			max_unconfirmed_nonces_at_target,
+		);
+		at_source_block_2_deliver_confirmations(&mut strategy, &mut state);
+		assert_eq!(
+			strategy.required_source_header_at_target(state.clone()).await,
+			Some(header_id(2))
+		);
+		assert_eq!(
+			at_target_block_3_select_nonces_to_deliver(&strategy, state).await,
+			expected_rewards_proof
+		);
+	}
 }
diff --git a/bridges/relays/messages/src/message_race_loop.rs b/bridges/relays/messages/src/message_race_loop.rs
index 7e3f84dd5d1..be7d5b46756 100644
--- a/bridges/relays/messages/src/message_race_loop.rs
+++ b/bridges/relays/messages/src/message_race_loop.rs
@@ -41,14 +41,14 @@ use std::{
 /// One of races within lane.
 pub trait MessageRace {
 	/// Header id of the race source.
-	type SourceHeaderId: Debug + Clone + PartialEq + Send;
+	type SourceHeaderId: Debug + Clone + PartialEq + Send + Sync;
 	/// Header id of the race source.
-	type TargetHeaderId: Debug + Clone + PartialEq + Send;
+	type TargetHeaderId: Debug + Clone + PartialEq + Send + Sync;
 
 	/// Message nonce used in the race.
 	type MessageNonce: Debug + Clone;
 	/// Proof that is generated and delivered in this race.
-	type Proof: Debug + Clone + Send;
+	type Proof: Debug + Clone + Send + Sync;
 
 	/// Name of the race source.
 	fn source_name() -> String;
@@ -175,9 +175,8 @@ pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug {
 	/// Should return true if nothing has to be synced.
 	fn is_empty(&self) -> bool;
 	/// Return id of source header that is required to be on target to continue synchronization.
-	fn required_source_header_at_target<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
+	async fn required_source_header_at_target<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
 		&self,
-		current_best: &SourceHeaderId,
 		race_state: RS,
 	) -> Option<SourceHeaderId>;
 	/// Return the best nonce at source node.
@@ -218,7 +217,11 @@ pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug {
 }
 
 /// State of the race.
-pub trait RaceState<SourceHeaderId, TargetHeaderId>: Send {
+pub trait RaceState<SourceHeaderId, TargetHeaderId>: Clone + Send + Sync {
+	/// Set best finalized source header id at the best block on the target
+	/// client (at the `best_finalized_source_header_id_at_best_target`).
+	fn set_best_finalized_source_header_id_at_best_target(&mut self, id: SourceHeaderId);
+
 	/// Best finalized source header id at the source client.
 	fn best_finalized_source_header_id_at_source(&self) -> Option<SourceHeaderId>;
 	/// Best finalized source header id at the best block on the target
@@ -281,11 +284,15 @@ impl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> Default
 impl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> RaceState<SourceHeaderId, TargetHeaderId>
 	for RaceStateImpl<SourceHeaderId, TargetHeaderId, Proof, BatchTx>
 where
-	SourceHeaderId: Clone + Send,
-	TargetHeaderId: Clone + Send,
-	Proof: Clone + Send,
-	BatchTx: Clone + Send,
+	SourceHeaderId: Clone + Send + Sync,
+	TargetHeaderId: Clone + Send + Sync,
+	Proof: Clone + Send + Sync,
+	BatchTx: Clone + Send + Sync,
 {
+	fn set_best_finalized_source_header_id_at_best_target(&mut self, id: SourceHeaderId) {
+		self.best_finalized_source_header_id_at_best_target = Some(id);
+	}
+
 	fn best_finalized_source_header_id_at_source(&self) -> Option<SourceHeaderId> {
 		self.best_finalized_source_header_id_at_source.clone()
 	}
@@ -430,10 +437,9 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
 				).fail_if_connection_error(FailedClient::Source)?;
 
 				// ask for more headers if we have nonces to deliver and required headers are missing
-				source_required_header = race_state
-					.best_finalized_source_header_id_at_best_target
-					.as_ref()
-					.and_then(|best| strategy.required_source_header_at_target(best, race_state.clone()));
+				source_required_header = strategy
+					.required_source_header_at_target(race_state.clone())
+					.await;
 			},
 			nonces = target_best_nonces => {
 				target_best_nonces_required = false;
diff --git a/bridges/relays/messages/src/message_race_strategy.rs b/bridges/relays/messages/src/message_race_strategy.rs
index e6016448c95..718c296391c 100644
--- a/bridges/relays/messages/src/message_race_strategy.rs
+++ b/bridges/relays/messages/src/message_race_strategy.rs
@@ -205,16 +205,16 @@ impl<
 		self.source_queue.is_empty()
 	}
 
-	fn required_source_header_at_target<
+	async fn required_source_header_at_target<
 		RS: RaceState<
 			HeaderId<SourceHeaderHash, SourceHeaderNumber>,
 			HeaderId<TargetHeaderHash, TargetHeaderNumber>,
 		>,
 	>(
 		&self,
-		current_best: &HeaderId<SourceHeaderHash, SourceHeaderNumber>,
-		_race_state: RS,
+		race_state: RS,
 	) -> Option<HeaderId<SourceHeaderHash, SourceHeaderNumber>> {
+		let current_best = race_state.best_finalized_source_header_id_at_best_target()?;
 		self.source_queue
 			.back()
 			.and_then(|(h, _)| if h.0 > current_best.0 { Some(h.clone()) } else { None })
-- 
GitLab