From 0a3f8ace26eeaade9a83e498c2e5b284a7deec02 Mon Sep 17 00:00:00 2001
From: Svyatoslav Nikolsky <svyatonik@gmail.com>
Date: Fri, 27 May 2022 16:49:50 +0300
Subject: [PATCH] fixed on-demand parachains relay case: if better relay header
 is delivered, then we must select para header that may be proved using this
 relay header (#1419)

---
 .../src/on_demand/parachains.rs               | 413 ++++++------------
 .../src/parachains/source.rs                  |  57 +--
 .../relays/parachains/src/parachains_loop.rs  |  88 +++-
 3 files changed, 245 insertions(+), 313 deletions(-)

diff --git a/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs b/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs
index 8f1bee35200..d8cc60ab297 100644
--- a/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs
+++ b/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs
@@ -37,14 +37,14 @@ use num_traits::Zero;
 use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber};
 use parachains_relay::parachains_loop::{ParachainSyncParams, TargetClient};
 use relay_substrate_client::{
-	AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError,
+	AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf,
 	TransactionSignScheme,
 };
 use relay_utils::{
 	metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, HeaderId,
 };
 use sp_runtime::traits::Header as HeaderT;
-use std::{cmp::Ordering, collections::BTreeMap};
+use std::fmt::Debug;
 
 /// On-demand Substrate <-> Substrate parachain finality relay.
 ///
@@ -142,9 +142,8 @@ async fn background_task<P: SubstrateParachainsPipeline>(
 	let target_transactions_mortality = target_transaction_params.mortality;
 
 	let mut relay_state = RelayState::Idle;
-	let mut headers_map_cache = BTreeMap::new();
 	let mut required_parachain_header_number = Zero::zero();
-	let required_para_header_number_ref = Arc::new(Mutex::new(required_parachain_header_number));
+	let required_para_header_number_ref = Arc::new(Mutex::new(None));
 
 	let mut restart_relay = true;
 	let parachains_relay_task = futures::future::Fuse::terminated();
@@ -191,7 +190,10 @@ async fn background_task<P: SubstrateParachainsPipeline>(
 		// the workflow of the on-demand parachains relay is:
 		//
 		// 1) message relay (or any other dependent relay) sees new message at parachain header
-		// `PH`; 2) it sees that the target chain does not know `PH`;
+		// `PH`;
+		//
+		// 2) it sees that the target chain does not know `PH`;
+		//
 		// 3) it asks on-demand parachains relay to relay `PH` to the target chain;
 		//
 		// Phase#1: relaying relay chain header
@@ -204,21 +206,21 @@ async fn background_task<P: SubstrateParachainsPipeline>(
 		// Phase#2: relaying parachain header
 		//
 		// 7) on-demand parachains relay sets `ParachainsSource::maximal_header_number` to the
-		// `PH'.number()`. 8) parachains finality relay sees that the parachain head has been
-		// updated and relays `PH'` to    the target chain.
+		//    `PH'.number()`.
+		// 8) parachains finality relay sees that the parachain head has been
+		//    updated and relays `PH'` to    the target chain.
 
 		// select headers to relay
 		let relay_data = read_relay_data(
 			&parachains_source,
 			&parachains_target,
 			required_parachain_header_number,
-			&mut headers_map_cache,
 		)
 		.await;
 		match relay_data {
-			Ok(mut relay_data) => {
+			Ok(relay_data) => {
 				let prev_relay_state = relay_state;
-				relay_state = select_headers_to_relay(&mut relay_data, relay_state);
+				relay_state = select_headers_to_relay(&relay_data, relay_state);
 				log::trace!(
 					target: "bridge",
 					"Selected new relay state in {}: {:?} using old state {:?} and data {:?}",
@@ -244,13 +246,13 @@ async fn background_task<P: SubstrateParachainsPipeline>(
 		// requirements
 		match relay_state {
 			RelayState::Idle => (),
-			RelayState::RelayingRelayHeader(required_relay_header, _) => {
+			RelayState::RelayingRelayHeader(required_relay_header) => {
 				on_demand_source_relay_to_target_headers
 					.require_more_headers(required_relay_header)
 					.await;
 			},
 			RelayState::RelayingParaHeader(required_para_header) => {
-				*required_para_header_number_ref.lock().await = required_para_header;
+				*required_para_header_number_ref.lock().await = Some(required_para_header);
 			},
 		}
 
@@ -300,55 +302,44 @@ fn on_demand_parachains_relay_name<SourceChain: Chain, TargetChain: Chain>() ->
 
 /// On-demand relay state.
 #[derive(Clone, Copy, Debug, PartialEq)]
-enum RelayState<SourceParaBlock, SourceRelayBlock> {
+enum RelayState<ParaHash, ParaNumber, RelayNumber> {
 	/// On-demand relay is not doing anything.
 	Idle,
 	/// Relaying given relay header to relay given parachain header later.
-	RelayingRelayHeader(SourceRelayBlock, SourceParaBlock),
+	RelayingRelayHeader(RelayNumber),
 	/// Relaying given parachain header.
-	RelayingParaHeader(SourceParaBlock),
+	RelayingParaHeader(HeaderId<ParaHash, ParaNumber>),
 }
 
 /// Data gathered from source and target clients, used by on-demand relay.
 #[derive(Debug)]
-struct RelayData<'a, SourceParaBlock, SourceRelayBlock> {
+struct RelayData<ParaHash, ParaNumber, RelayNumber> {
 	/// Parachain header number that is required at the target chain.
-	pub required_para_header: SourceParaBlock,
+	pub required_para_header: ParaNumber,
 	/// Parachain header number, known to the target chain.
-	pub para_header_at_target: SourceParaBlock,
-	/// Parachain header number, known to the source (relay) chain.
-	pub para_header_at_source: Option<SourceParaBlock>,
+	pub para_header_at_target: ParaNumber,
+	/// Parachain header id, known to the source (relay) chain.
+	pub para_header_at_source: Option<HeaderId<ParaHash, ParaNumber>>,
+	/// Parachain header, that is available at the source relay chain at `relay_header_at_target`
+	/// block.
+	pub para_header_at_relay_header_at_target: Option<HeaderId<ParaHash, ParaNumber>>,
 	/// Relay header number at the source chain.
-	pub relay_header_at_source: SourceRelayBlock,
+	pub relay_header_at_source: RelayNumber,
 	/// Relay header number at the target chain.
-	pub relay_header_at_target: SourceRelayBlock,
-	/// Map of relay to para header block numbers for recent relay headers.
-	///
-	/// Even if we have been trying to relay relay header #100 to relay parachain header #50
-	/// afterwards, it may happen that the relay header #200 may be relayed instead - either
-	/// by us (e.g. if GRANDPA justification is generated for #200, or if we are only syncing
-	/// mandatory headers), or by other relayer. Then, instead of parachain header #50 we may
-	/// relay parachain header #70.
-	///
-	/// This cache is especially important, given that we assume that the nodes we're connected
-	/// to are not necessarily archive nodes. Then, if current relay chain block is #210 and #200
-	/// has been delivered to the target chain, we have more chances to generate storage proof
-	/// at relay block #200 than on relay block #100, which is most likely has pruned state
-	/// already.
-	pub headers_map_cache: &'a mut BTreeMap<SourceRelayBlock, SourceParaBlock>,
+	pub relay_header_at_target: RelayNumber,
 }
 
 /// Read required data from source and target clients.
-async fn read_relay_data<'a, P: SubstrateParachainsPipeline>(
+async fn read_relay_data<P: SubstrateParachainsPipeline>(
 	source: &ParachainsSource<P>,
 	target: &ParachainsTarget<P>,
 	required_header_number: BlockNumberOf<P::SourceParachain>,
-	headers_map_cache: &'a mut BTreeMap<
-		BlockNumberOf<P::SourceRelayChain>,
+) -> Result<
+	RelayData<
+		HashOf<P::SourceParachain>,
 		BlockNumberOf<P::SourceParachain>,
+		BlockNumberOf<P::SourceRelayChain>,
 	>,
-) -> Result<
-	RelayData<'a, BlockNumberOf<P::SourceParachain>, BlockNumberOf<P::SourceRelayChain>>,
 	FailedClient,
 >
 where
@@ -398,7 +389,7 @@ where
 		)
 		.await
 		.map_err(map_source_err)?
-		.map(|h| *h.number());
+		.map(|h| HeaderId(*h.number(), h.hash()));
 
 	let relay_header_at_source = best_finalized_relay_block_id.0;
 	let relay_header_at_target =
@@ -408,68 +399,52 @@ where
 			P::SourceRelayChain::BEST_FINALIZED_HEADER_ID_METHOD,
 		)
 		.await
-		.map_err(map_target_err)?
-		.0;
+		.map_err(map_target_err)?;
+
+	let para_header_at_relay_header_at_target = source
+		.on_chain_parachain_header(relay_header_at_target, P::SOURCE_PARACHAIN_PARA_ID.into())
+		.await
+		.map_err(map_source_err)?
+		.map(|h| HeaderId(*h.number(), h.hash()));
 
 	Ok(RelayData {
 		required_para_header: required_header_number,
 		para_header_at_target,
 		para_header_at_source,
 		relay_header_at_source,
-		relay_header_at_target,
-		headers_map_cache,
+		relay_header_at_target: relay_header_at_target.0,
+		para_header_at_relay_header_at_target,
 	})
 }
 
-// This number is bigger than the session length of any well-known Substrate-based relay
-// chain. We expect that the underlying on-demand relay will submit at least 1 header per
-// session.
-const MAX_HEADERS_MAP_CACHE_ENTRIES: usize = 4096;
-
 /// Select relay and parachain headers that need to be relayed.
-fn select_headers_to_relay<'a, SourceParaBlock, SourceRelayBlock>(
-	data: &mut RelayData<'a, SourceParaBlock, SourceRelayBlock>,
-	mut state: RelayState<SourceParaBlock, SourceRelayBlock>,
-) -> RelayState<SourceParaBlock, SourceRelayBlock>
+fn select_headers_to_relay<ParaHash, ParaNumber, RelayNumber>(
+	data: &RelayData<ParaHash, ParaNumber, RelayNumber>,
+	mut state: RelayState<ParaHash, ParaNumber, RelayNumber>,
+) -> RelayState<ParaHash, ParaNumber, RelayNumber>
 where
-	RelayData<'a, SourceParaBlock, SourceRelayBlock>: std::fmt::Debug, // TODO: remove
-	SourceParaBlock: Copy + PartialOrd,
-	SourceRelayBlock: Copy + Ord,
+	ParaHash: Clone,
+	ParaNumber: Copy + PartialOrd,
+	RelayNumber: Copy + Debug + Ord,
 {
-	// despite of our current state, we want to update the headers map cache
-	if let Some(para_header_at_source) = data.para_header_at_source {
-		data.headers_map_cache
-			.insert(data.relay_header_at_source, para_header_at_source);
-		if data.headers_map_cache.len() > MAX_HEADERS_MAP_CACHE_ENTRIES {
-			let first_key = *data.headers_map_cache.keys().next().expect("map is not empty; qed");
-			data.headers_map_cache.remove(&first_key);
-		}
-	}
-
 	// this switch is responsible for processing `RelayingRelayHeader` state
 	match state {
 		RelayState::Idle | RelayState::RelayingParaHeader(_) => (),
-		RelayState::RelayingRelayHeader(relay_header_number, para_header_number) => {
-			match data.relay_header_at_target.cmp(&relay_header_number) {
-				Ordering::Less => {
-					// relay header hasn't yet been relayed
-					return RelayState::RelayingRelayHeader(relay_header_number, para_header_number)
-				},
-				Ordering::Equal => {
-					// relay header has been realyed and we may continue with parachain header
-					state = RelayState::RelayingParaHeader(para_header_number);
-				},
-				Ordering::Greater => {
-					// relay header descendant has been relayed and we may need to change parachain
-					// header that we want to relay
-					let next_para_header_number = data
-						.headers_map_cache
-						.range(..=data.relay_header_at_target)
-						.next_back()
-						.map(|(_, next_para_header_number)| *next_para_header_number)
-						.unwrap_or_else(|| para_header_number);
-					state = RelayState::RelayingParaHeader(next_para_header_number);
-				},
+		RelayState::RelayingRelayHeader(relay_header_number) => {
+			if data.relay_header_at_target < relay_header_number {
+				// required relay header hasn't yet been relayed
+				return RelayState::RelayingRelayHeader(relay_header_number)
+			}
+
+			// we may switch to `RelayingParaHeader` if parachain head is available
+			if let Some(para_header_at_relay_header_at_target) =
+				data.para_header_at_relay_header_at_target.clone()
+			{
+				state = RelayState::RelayingParaHeader(para_header_at_relay_header_at_target);
+			} else {
+				// otherwise, we'd need to restart (this may happen only if parachain has been
+				// deregistered)
+				state = RelayState::Idle;
 			}
 		},
 	}
@@ -477,11 +452,11 @@ where
 	// this switch is responsible for processing `RelayingParaHeader` state
 	match state {
 		RelayState::Idle => (),
-		RelayState::RelayingRelayHeader(_, _) => unreachable!("processed by previous match; qed"),
-		RelayState::RelayingParaHeader(para_header_number) => {
-			if data.para_header_at_target < para_header_number {
+		RelayState::RelayingRelayHeader(_) => unreachable!("processed by previous match; qed"),
+		RelayState::RelayingParaHeader(para_header_id) => {
+			if data.para_header_at_target < para_header_id.0 {
 				// parachain header hasn't yet been relayed
-				return RelayState::RelayingParaHeader(para_header_number)
+				return RelayState::RelayingParaHeader(para_header_id)
 			}
 		},
 	}
@@ -491,8 +466,14 @@ where
 		return RelayState::Idle
 	}
 
+	// if we haven't read para head from the source, we can't yet do anyhting
+	let para_header_at_source = match data.para_header_at_source {
+		Some(ref para_header_at_source) => para_header_at_source.clone(),
+		None => return RelayState::Idle,
+	};
+
 	// if required header is not available even at the source chain, let's wait
-	if Some(data.required_para_header) > data.para_header_at_source {
+	if data.required_para_header > para_header_at_source.0 {
 		return RelayState::Idle
 	}
 
@@ -501,14 +482,11 @@ where
 
 	// we need relay chain header first
 	if data.relay_header_at_target < data.relay_header_at_source {
-		return RelayState::RelayingRelayHeader(
-			data.relay_header_at_source,
-			data.required_para_header,
-		)
+		return RelayState::RelayingRelayHeader(data.relay_header_at_source)
 	}
 
 	// if all relay headers synced, we may start directly with parachain header
-	RelayState::RelayingParaHeader(data.required_para_header)
+	RelayState::RelayingParaHeader(para_header_at_source)
 }
 
 #[cfg(test)]
@@ -519,17 +497,17 @@ mod tests {
 	fn relay_waits_for_relay_header_to_be_delivered() {
 		assert_eq!(
 			select_headers_to_relay(
-				&mut RelayData {
-					required_para_header: 100,
+				&RelayData {
+					required_para_header: 90,
 					para_header_at_target: 50,
-					para_header_at_source: Some(110),
+					para_header_at_source: Some(HeaderId(110, 110)),
 					relay_header_at_source: 800,
 					relay_header_at_target: 700,
-					headers_map_cache: &mut BTreeMap::new(),
+					para_header_at_relay_header_at_target: Some(HeaderId(100, 100)),
 				},
-				RelayState::RelayingRelayHeader(750, 100),
+				RelayState::RelayingRelayHeader(750),
 			),
-			RelayState::RelayingRelayHeader(750, 100),
+			RelayState::RelayingRelayHeader(750),
 		);
 	}
 
@@ -537,53 +515,17 @@ mod tests {
 	fn relay_starts_relaying_requested_para_header_after_relay_header_is_delivered() {
 		assert_eq!(
 			select_headers_to_relay(
-				&mut RelayData {
-					required_para_header: 100,
+				&RelayData {
+					required_para_header: 90,
 					para_header_at_target: 50,
-					para_header_at_source: Some(110),
+					para_header_at_source: Some(HeaderId(110, 110)),
 					relay_header_at_source: 800,
 					relay_header_at_target: 750,
-					headers_map_cache: &mut BTreeMap::new(),
-				},
-				RelayState::RelayingRelayHeader(750, 100),
-			),
-			RelayState::RelayingParaHeader(100),
-		);
-	}
-
-	#[test]
-	fn relay_selects_same_para_header_after_better_relay_header_is_delivered_1() {
-		assert_eq!(
-			select_headers_to_relay(
-				&mut RelayData {
-					required_para_header: 100,
-					para_header_at_target: 50,
-					para_header_at_source: Some(110),
-					relay_header_at_source: 800,
-					relay_header_at_target: 780,
-					headers_map_cache: &mut vec![(700, 90), (750, 100)].into_iter().collect(),
-				},
-				RelayState::RelayingRelayHeader(750, 100),
-			),
-			RelayState::RelayingParaHeader(100),
-		);
-	}
-
-	#[test]
-	fn relay_selects_same_para_header_after_better_relay_header_is_delivered_2() {
-		assert_eq!(
-			select_headers_to_relay(
-				&mut RelayData {
-					required_para_header: 100,
-					para_header_at_target: 50,
-					para_header_at_source: Some(110),
-					relay_header_at_source: 800,
-					relay_header_at_target: 780,
-					headers_map_cache: &mut BTreeMap::new(),
+					para_header_at_relay_header_at_target: Some(HeaderId(100, 100)),
 				},
-				RelayState::RelayingRelayHeader(750, 100),
+				RelayState::RelayingRelayHeader(750),
 			),
-			RelayState::RelayingParaHeader(100),
+			RelayState::RelayingParaHeader(HeaderId(100, 100)),
 		);
 	}
 
@@ -591,37 +533,34 @@ mod tests {
 	fn relay_selects_better_para_header_after_better_relay_header_is_delivered() {
 		assert_eq!(
 			select_headers_to_relay(
-				&mut RelayData {
-					required_para_header: 100,
+				&RelayData {
+					required_para_header: 90,
 					para_header_at_target: 50,
-					para_header_at_source: Some(120),
+					para_header_at_source: Some(HeaderId(110, 110)),
 					relay_header_at_source: 800,
 					relay_header_at_target: 780,
-					headers_map_cache: &mut vec![(700, 90), (750, 100), (780, 110), (790, 120)]
-						.into_iter()
-						.collect(),
+					para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
 				},
-				RelayState::RelayingRelayHeader(750, 100),
+				RelayState::RelayingRelayHeader(750),
 			),
-			RelayState::RelayingParaHeader(110),
+			RelayState::RelayingParaHeader(HeaderId(105, 105)),
 		);
 	}
-
 	#[test]
 	fn relay_waits_for_para_header_to_be_delivered() {
 		assert_eq!(
 			select_headers_to_relay(
-				&mut RelayData {
-					required_para_header: 100,
+				&RelayData {
+					required_para_header: 90,
 					para_header_at_target: 50,
-					para_header_at_source: Some(110),
+					para_header_at_source: Some(HeaderId(110, 110)),
 					relay_header_at_source: 800,
-					relay_header_at_target: 700,
-					headers_map_cache: &mut BTreeMap::new(),
+					relay_header_at_target: 780,
+					para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
 				},
-				RelayState::RelayingParaHeader(100),
+				RelayState::RelayingParaHeader(HeaderId(105, 105)),
 			),
-			RelayState::RelayingParaHeader(100),
+			RelayState::RelayingParaHeader(HeaderId(105, 105)),
 		);
 	}
 
@@ -629,13 +568,13 @@ mod tests {
 	fn relay_stays_idle_if_required_para_header_is_already_delivered() {
 		assert_eq!(
 			select_headers_to_relay(
-				&mut RelayData {
-					required_para_header: 100,
-					para_header_at_target: 100,
-					para_header_at_source: Some(110),
+				&RelayData {
+					required_para_header: 90,
+					para_header_at_target: 105,
+					para_header_at_source: Some(HeaderId(110, 110)),
 					relay_header_at_source: 800,
-					relay_header_at_target: 700,
-					headers_map_cache: &mut BTreeMap::new(),
+					relay_header_at_target: 780,
+					para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
 				},
 				RelayState::Idle,
 			),
@@ -647,13 +586,13 @@ mod tests {
 	fn relay_waits_for_required_para_header_to_appear_at_source_1() {
 		assert_eq!(
 			select_headers_to_relay(
-				&mut RelayData {
-					required_para_header: 110,
-					para_header_at_target: 100,
+				&RelayData {
+					required_para_header: 120,
+					para_header_at_target: 105,
 					para_header_at_source: None,
 					relay_header_at_source: 800,
-					relay_header_at_target: 700,
-					headers_map_cache: &mut BTreeMap::new(),
+					relay_header_at_target: 780,
+					para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
 				},
 				RelayState::Idle,
 			),
@@ -665,13 +604,13 @@ mod tests {
 	fn relay_waits_for_required_para_header_to_appear_at_source_2() {
 		assert_eq!(
 			select_headers_to_relay(
-				&mut RelayData {
-					required_para_header: 110,
-					para_header_at_target: 100,
-					para_header_at_source: Some(100),
+				&RelayData {
+					required_para_header: 120,
+					para_header_at_target: 105,
+					para_header_at_source: Some(HeaderId(110, 110)),
 					relay_header_at_source: 800,
-					relay_header_at_target: 700,
-					headers_map_cache: &mut BTreeMap::new(),
+					relay_header_at_target: 780,
+					para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
 				},
 				RelayState::Idle,
 			),
@@ -683,17 +622,17 @@ mod tests {
 	fn relay_starts_relaying_relay_header_when_new_para_header_is_requested() {
 		assert_eq!(
 			select_headers_to_relay(
-				&mut RelayData {
-					required_para_header: 110,
-					para_header_at_target: 100,
-					para_header_at_source: Some(110),
+				&RelayData {
+					required_para_header: 120,
+					para_header_at_target: 105,
+					para_header_at_source: Some(HeaderId(125, 125)),
 					relay_header_at_source: 800,
-					relay_header_at_target: 700,
-					headers_map_cache: &mut BTreeMap::new(),
+					relay_header_at_target: 780,
+					para_header_at_relay_header_at_target: Some(HeaderId(105, 105)),
 				},
 				RelayState::Idle,
 			),
-			RelayState::RelayingRelayHeader(800, 110),
+			RelayState::RelayingRelayHeader(800),
 		);
 	}
 
@@ -701,97 +640,35 @@ mod tests {
 	fn relay_starts_relaying_para_header_when_new_para_header_is_requested() {
 		assert_eq!(
 			select_headers_to_relay(
-				&mut RelayData {
-					required_para_header: 110,
-					para_header_at_target: 100,
-					para_header_at_source: Some(110),
+				&RelayData {
+					required_para_header: 120,
+					para_header_at_target: 105,
+					para_header_at_source: Some(HeaderId(125, 125)),
 					relay_header_at_source: 800,
 					relay_header_at_target: 800,
-					headers_map_cache: &mut BTreeMap::new(),
+					para_header_at_relay_header_at_target: Some(HeaderId(125, 125)),
 				},
 				RelayState::Idle,
 			),
-			RelayState::RelayingParaHeader(110),
+			RelayState::RelayingParaHeader(HeaderId(125, 125)),
 		);
 	}
 
 	#[test]
-	fn headers_map_cache_is_updated() {
-		let mut headers_map_cache = BTreeMap::new();
-
-		// when parachain header is known, map is updated
-		select_headers_to_relay(
-			&mut RelayData {
-				required_para_header: 0,
-				para_header_at_target: 50,
-				para_header_at_source: Some(110),
-				relay_header_at_source: 800,
-				relay_header_at_target: 700,
-				headers_map_cache: &mut headers_map_cache,
-			},
-			RelayState::RelayingRelayHeader(750, 100),
-		);
-		assert_eq!(headers_map_cache.clone().into_iter().collect::<Vec<_>>(), vec![(800, 110)],);
-
-		// when parachain header is not known, map is NOT updated
-		select_headers_to_relay(
-			&mut RelayData {
-				required_para_header: 0,
-				para_header_at_target: 50,
-				para_header_at_source: None,
-				relay_header_at_source: 800,
-				relay_header_at_target: 700,
-				headers_map_cache: &mut headers_map_cache,
-			},
-			RelayState::RelayingRelayHeader(750, 100),
-		);
-		assert_eq!(headers_map_cache.clone().into_iter().collect::<Vec<_>>(), vec![(800, 110)],);
-
-		// map auto-deduplicates equal entries
-		select_headers_to_relay(
-			&mut RelayData {
-				required_para_header: 0,
-				para_header_at_target: 50,
-				para_header_at_source: Some(110),
-				relay_header_at_source: 800,
-				relay_header_at_target: 700,
-				headers_map_cache: &mut headers_map_cache,
-			},
-			RelayState::RelayingRelayHeader(750, 100),
-		);
-		assert_eq!(headers_map_cache.clone().into_iter().collect::<Vec<_>>(), vec![(800, 110)],);
-
-		// nothing is pruned if number of map entries is < MAX_HEADERS_MAP_CACHE_ENTRIES
-		for i in 1..MAX_HEADERS_MAP_CACHE_ENTRIES {
-			select_headers_to_relay(
-				&mut RelayData {
-					required_para_header: 0,
-					para_header_at_target: 50,
-					para_header_at_source: Some(110 + i),
-					relay_header_at_source: 800 + i,
-					relay_header_at_target: 700,
-					headers_map_cache: &mut headers_map_cache,
+	fn relay_goes_idle_when_parachain_is_deregistered() {
+		assert_eq!(
+			select_headers_to_relay::<i32, _, _>(
+				&RelayData {
+					required_para_header: 120,
+					para_header_at_target: 105,
+					para_header_at_source: None,
+					relay_header_at_source: 800,
+					relay_header_at_target: 800,
+					para_header_at_relay_header_at_target: None,
 				},
-				RelayState::RelayingRelayHeader(750, 100),
-			);
-			assert_eq!(headers_map_cache.len(), i + 1);
-		}
-
-		// when we add next entry, the oldest one is pruned
-		assert!(headers_map_cache.contains_key(&800));
-		assert_eq!(headers_map_cache.len(), MAX_HEADERS_MAP_CACHE_ENTRIES);
-		select_headers_to_relay(
-			&mut RelayData {
-				required_para_header: 0,
-				para_header_at_target: 50,
-				para_header_at_source: Some(110 + MAX_HEADERS_MAP_CACHE_ENTRIES),
-				relay_header_at_source: 800 + MAX_HEADERS_MAP_CACHE_ENTRIES,
-				relay_header_at_target: 700,
-				headers_map_cache: &mut headers_map_cache,
-			},
-			RelayState::RelayingRelayHeader(750, 100),
+				RelayState::RelayingRelayHeader(800),
+			),
+			RelayState::Idle,
 		);
-		assert!(!headers_map_cache.contains_key(&800));
-		assert_eq!(headers_map_cache.len(), MAX_HEADERS_MAP_CACHE_ENTRIES);
 	}
 }
diff --git a/bridges/relays/lib-substrate-relay/src/parachains/source.rs b/bridges/relays/lib-substrate-relay/src/parachains/source.rs
index 3ae735ab893..ea30143e4b6 100644
--- a/bridges/relays/lib-substrate-relay/src/parachains/source.rs
+++ b/bridges/relays/lib-substrate-relay/src/parachains/source.rs
@@ -16,39 +16,38 @@
 
 //! Parachain heads source.
 
-use crate::{
-	finality::source::RequiredHeaderNumberRef,
-	parachains::{ParachainsPipelineAdapter, SubstrateParachainsPipeline},
-};
+use crate::parachains::{ParachainsPipelineAdapter, SubstrateParachainsPipeline};
 
 use async_std::sync::{Arc, Mutex};
 use async_trait::async_trait;
 use bp_parachains::parachain_head_storage_key_at_source;
 use bp_polkadot_core::parachains::{ParaHash, ParaHead, ParaHeadsProof, ParaId};
 use codec::Decode;
-use parachains_relay::parachains_loop::SourceClient;
+use parachains_relay::parachains_loop::{ParaHashAtSource, SourceClient};
 use relay_substrate_client::{
 	Chain, Client, Error as SubstrateError, HeaderIdOf, HeaderOf, RelayChain,
 };
 use relay_utils::relay_loop::Client as RelayClient;
 use sp_runtime::traits::Header as HeaderT;
 
+/// Shared updatable reference to the maximal parachain header id that we want to sync from the
+/// source.
+pub type RequiredHeaderIdRef<C> = Arc<Mutex<Option<HeaderIdOf<C>>>>;
+
 /// Substrate client as parachain heads source.
 #[derive(Clone)]
 pub struct ParachainsSource<P: SubstrateParachainsPipeline> {
 	client: Client<P::SourceRelayChain>,
-	maximal_header_number: Option<RequiredHeaderNumberRef<P::SourceParachain>>,
-	previous_parachain_head: Arc<Mutex<Option<ParaHash>>>,
+	maximal_header_id: Option<RequiredHeaderIdRef<P::SourceParachain>>,
 }
 
 impl<P: SubstrateParachainsPipeline> ParachainsSource<P> {
 	/// Creates new parachains source client.
 	pub fn new(
 		client: Client<P::SourceRelayChain>,
-		maximal_header_number: Option<RequiredHeaderNumberRef<P::SourceParachain>>,
+		maximal_header_id: Option<RequiredHeaderIdRef<P::SourceParachain>>,
 	) -> Self {
-		let previous_parachain_head = Arc::new(Mutex::new(None));
-		ParachainsSource { client, maximal_header_number, previous_parachain_head }
+		ParachainsSource { client, maximal_header_id }
 	}
 
 	/// Returns reference to the underlying RPC client.
@@ -102,7 +101,7 @@ where
 		&self,
 		at_block: HeaderIdOf<P::SourceRelayChain>,
 		para_id: ParaId,
-	) -> Result<Option<ParaHash>, Self::Error> {
+	) -> Result<ParaHashAtSource, Self::Error> {
 		// we don't need to support many parachains now
 		if para_id.0 != P::SOURCE_PARACHAIN_PARA_ID {
 			return Err(SubstrateError::Custom(format!(
@@ -112,29 +111,33 @@ where
 			)))
 		}
 
-		let parachain_head = match self.on_chain_parachain_header(at_block, para_id).await? {
+		Ok(match self.on_chain_parachain_header(at_block, para_id).await? {
 			Some(parachain_header) => {
-				let mut parachain_head = Some(parachain_header.hash());
+				let mut parachain_head = ParaHashAtSource::Some(parachain_header.hash());
 				// never return head that is larger than requested. This way we'll never sync
-				// headers past `maximal_header_number`
-				if let Some(ref maximal_header_number) = self.maximal_header_number {
-					let maximal_header_number = *maximal_header_number.lock().await;
-					if *parachain_header.number() > maximal_header_number {
-						let previous_parachain_head = *self.previous_parachain_head.lock().await;
-						if let Some(previous_parachain_head) = previous_parachain_head {
-							parachain_head = Some(previous_parachain_head);
-						}
+				// headers past `maximal_header_id`
+				if let Some(ref maximal_header_id) = self.maximal_header_id {
+					let maximal_header_id = *maximal_header_id.lock().await;
+					match maximal_header_id {
+						Some(maximal_header_id)
+							if *parachain_header.number() > maximal_header_id.0 =>
+						{
+							// we don't want this header yet => let's report previously requested
+							// header
+							parachain_head = ParaHashAtSource::Some(maximal_header_id.1);
+						},
+						Some(_) => (),
+						None => {
+							// on-demand relay has not yet asked us to sync anything let's do that
+							parachain_head = ParaHashAtSource::Unavailable;
+						},
 					}
 				}
 
 				parachain_head
 			},
-			None => None,
-		};
-
-		*self.previous_parachain_head.lock().await = parachain_head;
-
-		Ok(parachain_head)
+			None => ParaHashAtSource::None,
+		})
 	}
 
 	async fn prove_parachain_heads(
diff --git a/bridges/relays/parachains/src/parachains_loop.rs b/bridges/relays/parachains/src/parachains_loop.rs
index 827a5d4430b..fd173f2d25b 100644
--- a/bridges/relays/parachains/src/parachains_loop.rs
+++ b/bridges/relays/parachains/src/parachains_loop.rs
@@ -52,6 +52,23 @@ pub enum ParachainSyncStrategy {
 	All,
 }
 
+/// Parachain head hash, available at the source (relay) chain.
+#[derive(Clone, Copy, Debug)]
+pub enum ParaHashAtSource {
+	/// There's no parachain head at the source chain.
+	///
+	/// Normally it means that the parachain is not registered there.
+	None,
+	/// Parachain head with given hash is available at the source chain.
+	Some(ParaHash),
+	/// The source client refuses to report parachain head hash at this moment.
+	///
+	/// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used.
+	/// This variant must be treated as "we don't want to update parachain head value at the
+	/// target chain at this moment".
+	Unavailable,
+}
+
 /// Source client used in parachain heads synchronization loop.
 #[async_trait]
 pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
@@ -63,7 +80,7 @@ pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
 		&self,
 		at_block: HeaderIdOf<P::SourceChain>,
 		para_id: ParaId,
-	) -> Result<Option<ParaHash>, Self::Error>;
+	) -> Result<ParaHashAtSource, Self::Error>;
 
 	/// Get parachain heads proof.
 	async fn prove_parachain_heads(
@@ -291,7 +308,7 @@ where
 
 /// Given heads at source and target clients, returns set of heads that are out of sync.
 fn select_parachains_to_update<P: ParachainsPipeline>(
-	heads_at_source: BTreeMap<ParaId, Option<ParaHash>>,
+	heads_at_source: BTreeMap<ParaId, ParaHashAtSource>,
 	heads_at_target: BTreeMap<ParaId, Option<BestParaHeadHash>>,
 	best_finalized_relay_block: HeaderIdOf<P::SourceChain>,
 ) -> Vec<ParaId>
@@ -317,7 +334,12 @@ where
 		.zip(heads_at_target.into_iter())
 		.filter(|((para, head_at_source), (_, head_at_target))| {
 			let needs_update = match (head_at_source, head_at_target) {
-				(Some(head_at_source), Some(head_at_target))
+				(ParaHashAtSource::Unavailable, _) => {
+					// source client has politely asked us not to update current parachain head
+					// at the target chain
+					false
+				},
+				(ParaHashAtSource::Some(head_at_source), Some(head_at_target))
 					if head_at_target.at_relay_block_number < best_finalized_relay_block.0 &&
 						head_at_target.head_hash != *head_at_source =>
 				{
@@ -325,22 +347,22 @@ where
 					// client
 					true
 				},
-				(Some(_), Some(_)) => {
+				(ParaHashAtSource::Some(_), Some(_)) => {
 					// this is normal case when relay has recently updated heads, when parachain is
 					// not progressing or when our source client is
 					false
 				},
-				(Some(_), None) => {
+				(ParaHashAtSource::Some(_), None) => {
 					// parachain is not yet known to the target client. This is true when parachain
 					// or bridge has been just onboarded/started
 					true
 				},
-				(None, Some(_)) => {
+				(ParaHashAtSource::None, Some(_)) => {
 					// parachain/parathread has been offboarded removed from the system. It needs to
 					// be propageted to the target client
 					true
 				},
-				(None, None) => {
+				(ParaHashAtSource::None, None) => {
 					// all's good - parachain is unknown to both clients
 					false
 				},
@@ -378,7 +400,7 @@ async fn read_heads_at_source<P: ParachainsPipeline>(
 	source_client: &impl SourceClient<P>,
 	at_relay_block: &HeaderIdOf<P::SourceChain>,
 	parachains: &[ParaId],
-) -> Result<BTreeMap<ParaId, Option<ParaHash>>, FailedClient> {
+) -> Result<BTreeMap<ParaId, ParaHashAtSource>, FailedClient> {
 	let mut para_head_hashes = BTreeMap::new();
 	for para in parachains {
 		let para_head = source_client.parachain_head(*at_relay_block, *para).await;
@@ -554,7 +576,7 @@ mod tests {
 	#[derive(Clone, Debug)]
 	struct TestClientData {
 		source_sync_status: Result<bool, TestError>,
-		source_heads: BTreeMap<u32, Result<ParaHash, TestError>>,
+		source_heads: BTreeMap<u32, Result<ParaHashAtSource, TestError>>,
 		source_proofs: BTreeMap<u32, Result<Vec<u8>, TestError>>,
 
 		target_best_block: Result<HeaderIdOf<TestChain>, TestError>,
@@ -569,7 +591,9 @@ mod tests {
 		pub fn minimal() -> Self {
 			TestClientData {
 				source_sync_status: Ok(true),
-				source_heads: vec![(PARA_ID, Ok(PARA_0_HASH))].into_iter().collect(),
+				source_heads: vec![(PARA_ID, Ok(ParaHashAtSource::Some(PARA_0_HASH)))]
+					.into_iter()
+					.collect(),
 				source_proofs: vec![(PARA_ID, Ok(PARA_0_HASH.encode()))].into_iter().collect(),
 
 				target_best_block: Ok(HeaderId(0, Default::default())),
@@ -615,8 +639,11 @@ mod tests {
 			&self,
 			_at_block: HeaderIdOf<TestChain>,
 			para_id: ParaId,
-		) -> Result<Option<ParaHash>, TestError> {
-			self.data.lock().await.source_heads.get(&para_id.0).cloned().transpose()
+		) -> Result<ParaHashAtSource, TestError> {
+			match self.data.lock().await.source_heads.get(&para_id.0).cloned() {
+				Some(result) => result,
+				None => Ok(ParaHashAtSource::None),
+			}
 		}
 
 		async fn prove_parachain_heads(
@@ -923,7 +950,7 @@ mod tests {
 	fn parachain_is_not_updated_if_it_is_unknown_to_both_clients() {
 		assert_eq!(
 			select_parachains_to_update::<TestParachainsPipeline>(
-				vec![(ParaId(PARA_ID), None)].into_iter().collect(),
+				vec![(ParaId(PARA_ID), ParaHashAtSource::None)].into_iter().collect(),
 				vec![(ParaId(PARA_ID), None)].into_iter().collect(),
 				HeaderId(10, Default::default()),
 			),
@@ -935,7 +962,9 @@ mod tests {
 	fn parachain_is_not_updated_if_it_has_been_updated_at_better_relay_block() {
 		assert_eq!(
 			select_parachains_to_update::<TestParachainsPipeline>(
-				vec![(ParaId(PARA_ID), Some(PARA_0_HASH))].into_iter().collect(),
+				vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_0_HASH))]
+					.into_iter()
+					.collect(),
 				vec![(
 					ParaId(PARA_ID),
 					Some(BestParaHeadHash { at_relay_block_number: 20, head_hash: PARA_1_HASH })
@@ -952,7 +981,9 @@ mod tests {
 	fn parachain_is_not_updated_if_hash_is_the_same_at_next_relay_block() {
 		assert_eq!(
 			select_parachains_to_update::<TestParachainsPipeline>(
-				vec![(ParaId(PARA_ID), Some(PARA_0_HASH))].into_iter().collect(),
+				vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_0_HASH))]
+					.into_iter()
+					.collect(),
 				vec![(
 					ParaId(PARA_ID),
 					Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH })
@@ -969,7 +1000,7 @@ mod tests {
 	fn parachain_is_updated_after_offboarding() {
 		assert_eq!(
 			select_parachains_to_update::<TestParachainsPipeline>(
-				vec![(ParaId(PARA_ID), None)].into_iter().collect(),
+				vec![(ParaId(PARA_ID), ParaHashAtSource::None)].into_iter().collect(),
 				vec![(
 					ParaId(PARA_ID),
 					Some(BestParaHeadHash {
@@ -989,7 +1020,9 @@ mod tests {
 	fn parachain_is_updated_after_onboarding() {
 		assert_eq!(
 			select_parachains_to_update::<TestParachainsPipeline>(
-				vec![(ParaId(PARA_ID), Some(PARA_0_HASH))].into_iter().collect(),
+				vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_0_HASH))]
+					.into_iter()
+					.collect(),
 				vec![(ParaId(PARA_ID), None)].into_iter().collect(),
 				HeaderId(10, Default::default()),
 			),
@@ -1001,7 +1034,9 @@ mod tests {
 	fn parachain_is_updated_if_newer_head_is_known() {
 		assert_eq!(
 			select_parachains_to_update::<TestParachainsPipeline>(
-				vec![(ParaId(PARA_ID), Some(PARA_1_HASH))].into_iter().collect(),
+				vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_1_HASH))]
+					.into_iter()
+					.collect(),
 				vec![(
 					ParaId(PARA_ID),
 					Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH })
@@ -1014,6 +1049,23 @@ mod tests {
 		);
 	}
 
+	#[test]
+	fn parachain_is_not_updated_if_source_head_is_unavailable() {
+		assert_eq!(
+			select_parachains_to_update::<TestParachainsPipeline>(
+				vec![(ParaId(PARA_ID), ParaHashAtSource::Unavailable)].into_iter().collect(),
+				vec![(
+					ParaId(PARA_ID),
+					Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH })
+				)]
+				.into_iter()
+				.collect(),
+				HeaderId(10, Default::default()),
+			),
+			vec![],
+		);
+	}
+
 	#[test]
 	fn is_update_required_works() {
 		let mut sync_params = ParachainSyncParams {
-- 
GitLab