diff --git a/bridges/chains/chain-kusama/src/lib.rs b/bridges/chains/chain-kusama/src/lib.rs
index a81004afe8127b556211d0207d2bc1f9ecc02955..fd7172c5869d468ff534e54f9ef6278cf86a88ed 100644
--- a/bridges/chains/chain-kusama/src/lib.rs
+++ b/bridges/chains/chain-kusama/src/lib.rs
@@ -67,6 +67,8 @@ pub const PARAS_PALLET_NAME: &str = "Paras";
 
 /// Name of the With-Kusama GRANDPA pallet instance that is deployed at bridged chains.
 pub const WITH_KUSAMA_GRANDPA_PALLET_NAME: &str = "BridgeKusamaGrandpa";
+/// Name of the With-Kusama parachains pallet instance that is deployed at bridged chains.
+pub const WITH_KUSAMA_BRIDGE_PARACHAINS_PALLET_NAME: &str = "BridgeKusamaParachains";
 
 /// Maximal size of encoded `bp_parachains::ParaStoredHeaderData` structure among all Polkadot
 /// parachains.
diff --git a/bridges/chains/chain-polkadot/src/lib.rs b/bridges/chains/chain-polkadot/src/lib.rs
index 00d35783a9b61844bab7701fdb60711125447ca3..a8cac0467d574e9355a8fe9ba2e7c2378019349d 100644
--- a/bridges/chains/chain-polkadot/src/lib.rs
+++ b/bridges/chains/chain-polkadot/src/lib.rs
@@ -69,6 +69,8 @@ pub const PARAS_PALLET_NAME: &str = "Paras";
 
 /// Name of the With-Polkadot GRANDPA pallet instance that is deployed at bridged chains.
 pub const WITH_POLKADOT_GRANDPA_PALLET_NAME: &str = "BridgePolkadotGrandpa";
+/// Name of the With-Polkadot parachains pallet instance that is deployed at bridged chains.
+pub const WITH_POLKADOT_BRIDGE_PARACHAINS_PALLET_NAME: &str = "BridgePolkadotParachains";
 
 /// Maximal size of encoded `bp_parachains::ParaStoredHeaderData` structure among all Polkadot
 /// parachains.
diff --git a/bridges/chains/chain-rococo/src/lib.rs b/bridges/chains/chain-rococo/src/lib.rs
index 2385dd2cbb250181ce5f46aef9f1e76f8fd010d2..b290fe71c829d08130556a2b061c0d63f0787d4c 100644
--- a/bridges/chains/chain-rococo/src/lib.rs
+++ b/bridges/chains/chain-rococo/src/lib.rs
@@ -67,6 +67,8 @@ pub const PARAS_PALLET_NAME: &str = "Paras";
 
 /// Name of the With-Rococo GRANDPA pallet instance that is deployed at bridged chains.
 pub const WITH_ROCOCO_GRANDPA_PALLET_NAME: &str = "BridgeRococoGrandpa";
+/// Name of the With-Rococo parachains pallet instance that is deployed at bridged chains.
+pub const WITH_ROCOCO_BRIDGE_PARACHAINS_PALLET_NAME: &str = "BridgeRococoParachains";
 
 /// Maximal size of encoded `bp_parachains::ParaStoredHeaderData` structure among all Rococo
 /// parachains.
diff --git a/bridges/chains/chain-westend/src/lib.rs b/bridges/chains/chain-westend/src/lib.rs
index b344b7f4bf93392c08502446513a9ae39296b512..ef451f7de0a9640bc1a278e1c712bbb099193ceb 100644
--- a/bridges/chains/chain-westend/src/lib.rs
+++ b/bridges/chains/chain-westend/src/lib.rs
@@ -67,6 +67,8 @@ pub const PARAS_PALLET_NAME: &str = "Paras";
 
 /// Name of the With-Westend GRANDPA pallet instance that is deployed at bridged chains.
 pub const WITH_WESTEND_GRANDPA_PALLET_NAME: &str = "BridgeWestendGrandpa";
+/// Name of the With-Westend parachains pallet instance that is deployed at bridged chains.
+pub const WITH_WESTEND_BRIDGE_PARACHAINS_PALLET_NAME: &str = "BridgeWestendParachains";
 
 /// Maximal size of encoded `bp_parachains::ParaStoredHeaderData` structure among all Westend
 /// parachains.
diff --git a/bridges/relays/client-substrate/src/chain.rs b/bridges/relays/client-substrate/src/chain.rs
index 2aba5f5674d97b2bcf8b3e2633c36b6bee52d98a..40269fe64c879249e9f0ed5ffe070d9fc606bdb6 100644
--- a/bridges/relays/client-substrate/src/chain.rs
+++ b/bridges/relays/client-substrate/src/chain.rs
@@ -46,6 +46,12 @@ pub trait Chain: ChainBase + Clone {
 	/// Keep in mind that this method is normally provided by the other chain, which is
 	/// bridged with this chain.
 	const BEST_FINALIZED_HEADER_ID_METHOD: &'static str;
+	/// Name of the runtime API method that is returning interval between source chain
+	/// headers that may be submitted for free to the target chain.
+	///
+	/// Keep in mind that this method is normally provided by the other chain, which is
+	/// bridged with this chain.
+	const FREE_HEADERS_INTERVAL_METHOD: &'static str;
 
 	/// Average block interval.
 	///
@@ -75,6 +81,9 @@ pub trait ChainWithRuntimeVersion: Chain {
 pub trait RelayChain: Chain {
 	/// Name of the `runtime_parachains::paras` pallet in the runtime of this chain.
 	const PARAS_PALLET_NAME: &'static str;
+	/// Name of the `pallet-bridge-parachains`, deployed at the **bridged** chain to sync
+	/// parachains of **this** chain.
+	const WITH_CHAIN_BRIDGE_PARACHAINS_PALLET_NAME: &'static str;
 }
 
 /// Substrate-based chain that is using direct GRANDPA finality from minimal relay-client point of
diff --git a/bridges/relays/client-substrate/src/test_chain.rs b/bridges/relays/client-substrate/src/test_chain.rs
index d1203a2c58eaec3dd758913a2ce3cf778290102d..cfd241c022a269da799e8e03c4398566d98a14a0 100644
--- a/bridges/relays/client-substrate/src/test_chain.rs
+++ b/bridges/relays/client-substrate/src/test_chain.rs
@@ -56,6 +56,7 @@ impl bp_runtime::Chain for TestChain {
 impl Chain for TestChain {
 	const NAME: &'static str = "Test";
 	const BEST_FINALIZED_HEADER_ID_METHOD: &'static str = "TestMethod";
+	const FREE_HEADERS_INTERVAL_METHOD: &'static str = "TestMethod";
 	const AVERAGE_BLOCK_INTERVAL: Duration = Duration::from_millis(0);
 
 	type SignedBlock = sp_runtime::generic::SignedBlock<
@@ -124,6 +125,7 @@ impl bp_runtime::UnderlyingChainProvider for TestParachain {
 impl Chain for TestParachain {
 	const NAME: &'static str = "TestParachain";
 	const BEST_FINALIZED_HEADER_ID_METHOD: &'static str = "TestParachainMethod";
+	const FREE_HEADERS_INTERVAL_METHOD: &'static str = "TestParachainMethod";
 	const AVERAGE_BLOCK_INTERVAL: Duration = Duration::from_millis(0);
 
 	type SignedBlock = sp_runtime::generic::SignedBlock<
diff --git a/bridges/relays/finality/README.md b/bridges/relays/finality/README.md
index 92e765cea0e505be7854e17a9b91df520bba32b0..89b9d1399584a8575f2ae47cda7c8ee064314697 100644
--- a/bridges/relays/finality/README.md
+++ b/bridges/relays/finality/README.md
@@ -33,7 +33,9 @@ node. The transaction is then tracked by the relay until it is mined and finaliz
 The main entrypoint for the crate is the [`run` function](./src/finality_loop.rs), which takes source and target
 clients and [`FinalitySyncParams`](./src/finality_loop.rs) parameters. The most important parameter is the
 `only_mandatory_headers` - it is set to `true`, the relay will only submit mandatory headers. Since transactions
-with mandatory headers are fee-free, the cost of running such relay is zero (in terms of fees).
+with mandatory headers are fee-free, the cost of running such relay is zero (in terms of fees). If a similar,
+`only_free_headers` parameter, is set to `true`, then free headers (if configured in the runtime) are also
+relayed.
 
 ## Finality Relay Metrics
 
diff --git a/bridges/relays/finality/src/finality_loop.rs b/bridges/relays/finality/src/finality_loop.rs
index e31d8a708122db84c4c87f257edee7ee4ba616bb..8b3def868a453703600850a463cf2f07988811df 100644
--- a/bridges/relays/finality/src/finality_loop.rs
+++ b/bridges/relays/finality/src/finality_loop.rs
@@ -29,7 +29,7 @@ use crate::{
 use async_trait::async_trait;
 use backoff::{backoff::Backoff, ExponentialBackoff};
 use futures::{future::Fuse, select, Future, FutureExt};
-use num_traits::Saturating;
+use num_traits::{Saturating, Zero};
 use relay_utils::{
 	metrics::MetricsParams, relay_loop::Client as RelayClient, retry_backoff, FailedClient,
 	HeaderId, MaybeConnectionError, TrackedTransactionStatus, TransactionTracker,
@@ -39,6 +39,17 @@ use std::{
 	time::{Duration, Instant},
 };
 
+/// Type of headers that we relay.
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum HeadersToRelay {
+	/// Relay all headers.
+	All,
+	/// Relay only mandatory headers.
+	Mandatory,
+	/// Relay only free (including mandatory) headers.
+	Free,
+}
+
 /// Finality proof synchronization loop parameters.
 #[derive(Debug, Clone)]
 pub struct FinalitySyncParams {
@@ -63,7 +74,7 @@ pub struct FinalitySyncParams {
 	/// Timeout before we treat our transactions as lost and restart the whole sync process.
 	pub stall_timeout: Duration,
 	/// If true, only mandatory headers are relayed.
-	pub only_mandatory_headers: bool,
+	pub headers_to_relay: HeadersToRelay,
 }
 
 /// Source client used in finality synchronization loop.
@@ -90,11 +101,16 @@ pub trait TargetClient<P: FinalitySyncPipeline>: RelayClient {
 		&self,
 	) -> Result<HeaderId<P::Hash, P::Number>, Self::Error>;
 
+	/// Get free source headers submission interval, if it is configured in the
+	/// target runtime.
+	async fn free_source_headers_interval(&self) -> Result<Option<P::Number>, Self::Error>;
+
 	/// Submit header finality proof.
 	async fn submit_finality_proof(
 		&self,
 		header: P::Header,
 		proof: P::FinalityProof,
+		is_free_execution_expected: bool,
 	) -> Result<Self::TransactionTracker, Self::Error>;
 }
 
@@ -104,9 +120,13 @@ pub fn metrics_prefix<P: FinalitySyncPipeline>() -> String {
 	format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME)
 }
 
+/// Finality sync information.
 pub struct SyncInfo<P: FinalitySyncPipeline> {
+	/// Best finalized header at the source client.
 	pub best_number_at_source: P::Number,
+	/// Best source header, known to the target client.
 	pub best_number_at_target: P::Number,
+	/// Whether the target client follows the same fork as the source client do.
 	pub is_using_same_fork: bool,
 }
 
@@ -183,6 +203,7 @@ impl<Tracker: TransactionTracker, Number: Debug + PartialOrd> Transaction<Tracke
 		target_client: &TC,
 		header: P::Header,
 		justification: P::FinalityProof,
+		is_free_execution_expected: bool,
 	) -> Result<Self, TC::Error> {
 		let header_number = header.number();
 		log::debug!(
@@ -193,7 +214,9 @@ impl<Tracker: TransactionTracker, Number: Debug + PartialOrd> Transaction<Tracke
 			P::TARGET_NAME,
 		);
 
-		let tracker = target_client.submit_finality_proof(header, justification).await?;
+		let tracker = target_client
+			.submit_finality_proof(header, justification, is_free_execution_expected)
+			.await?;
 		Ok(Transaction { tracker, header_number })
 	}
 
@@ -292,6 +315,7 @@ impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> Finality
 	pub async fn select_header_to_submit(
 		&mut self,
 		info: &SyncInfo<P>,
+		free_headers_interval: Option<P::Number>,
 	) -> Result<Option<JustifiedHeader<P>>, Error<P, SC::Error, TC::Error>> {
 		// to see that the loop is progressing
 		log::trace!(
@@ -302,9 +326,15 @@ impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> Finality
 		);
 
 		// read missing headers
-		let selector = JustifiedHeaderSelector::new::<SC, TC>(&self.source_client, info).await?;
+		let selector = JustifiedHeaderSelector::new::<SC, TC>(
+			&self.source_client,
+			info,
+			self.sync_params.headers_to_relay,
+			free_headers_interval,
+		)
+		.await?;
 		// if we see that the header schedules GRANDPA change, we need to submit it
-		if self.sync_params.only_mandatory_headers {
+		if self.sync_params.headers_to_relay == HeadersToRelay::Mandatory {
 			return Ok(selector.select_mandatory())
 		}
 
@@ -312,7 +342,12 @@ impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> Finality
 		// => even if we have already selected some header and its persistent finality proof,
 		// we may try to select better header by reading non-persistent proofs from the stream
 		self.finality_proofs_buf.fill(&mut self.finality_proofs_stream);
-		let maybe_justified_header = selector.select(&self.finality_proofs_buf);
+		let maybe_justified_header = selector.select(
+			info,
+			self.sync_params.headers_to_relay,
+			free_headers_interval,
+			&self.finality_proofs_buf,
+		);
 
 		// remove obsolete 'recent' finality proofs + keep its size under certain limit
 		let oldest_finality_proof_to_keep = maybe_justified_header
@@ -329,6 +364,7 @@ impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> Finality
 
 	pub async fn run_iteration(
 		&mut self,
+		free_headers_interval: Option<P::Number>,
 	) -> Result<
 		Option<Transaction<TC::TransactionTracker, P::Number>>,
 		Error<P, SC::Error, TC::Error>,
@@ -345,12 +381,16 @@ impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> Finality
 		}
 
 		// submit new header if we have something new
-		match self.select_header_to_submit(&info).await? {
+		match self.select_header_to_submit(&info, free_headers_interval).await? {
 			Some(header) => {
-				let transaction =
-					Transaction::submit(&self.target_client, header.header, header.proof)
-						.await
-						.map_err(Error::Target)?;
+				let transaction = Transaction::submit(
+					&self.target_client,
+					header.header,
+					header.proof,
+					self.sync_params.headers_to_relay == HeadersToRelay::Free,
+				)
+				.await
+				.map_err(Error::Target)?;
 				self.best_submitted_number = Some(transaction.header_number);
 				Ok(Some(transaction))
 			},
@@ -378,9 +418,11 @@ impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> Finality
 		let exit_signal = exit_signal.fuse();
 		futures::pin_mut!(exit_signal, proof_submission_tx_tracker);
 
+		let free_headers_interval = free_headers_interval(&self.target_client).await?;
+
 		loop {
 			// run loop iteration
-			let next_tick = match self.run_iteration().await {
+			let next_tick = match self.run_iteration(free_headers_interval).await {
 				Ok(Some(tx)) => {
 					proof_submission_tx_tracker
 						.set(tx.track::<P, SC, _>(self.target_client.clone()).fuse());
@@ -433,6 +475,52 @@ impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> Finality
 	}
 }
 
+async fn free_headers_interval<P: FinalitySyncPipeline>(
+	target_client: &impl TargetClient<P>,
+) -> Result<Option<P::Number>, FailedClient> {
+	match target_client.free_source_headers_interval().await {
+		Ok(Some(free_headers_interval)) if !free_headers_interval.is_zero() => {
+			log::trace!(
+				target: "bridge",
+				"Free headers interval for {} headers at {} is: {:?}",
+				P::SOURCE_NAME,
+				P::TARGET_NAME,
+				free_headers_interval,
+			);
+			Ok(Some(free_headers_interval))
+		},
+		Ok(Some(_free_headers_interval)) => {
+			log::trace!(
+				target: "bridge",
+				"Free headers interval for {} headers at {} is zero. Not submitting any free headers",
+				P::SOURCE_NAME,
+				P::TARGET_NAME,
+			);
+			Ok(None)
+		},
+		Ok(None) => {
+			log::trace!(
+				target: "bridge",
+				"Free headers interval for {} headers at {} is None. Not submitting any free headers",
+				P::SOURCE_NAME,
+				P::TARGET_NAME,
+			);
+
+			Ok(None)
+		},
+		Err(e) => {
+			log::error!(
+				target: "bridge",
+				"Failed to read free headers interval for {} headers at {}: {:?}",
+				P::SOURCE_NAME,
+				P::TARGET_NAME,
+				e,
+			);
+			Err(FailedClient::Target)
+		},
+	}
+}
+
 /// Run finality proofs synchronization loop.
 pub async fn run<P: FinalitySyncPipeline>(
 	source_client: impl SourceClient<P>,
@@ -509,7 +597,7 @@ mod tests {
 			tick: Duration::from_secs(0),
 			recent_finality_proofs_limit: 1024,
 			stall_timeout: Duration::from_secs(1),
-			only_mandatory_headers: false,
+			headers_to_relay: HeadersToRelay::All,
 		}
 	}
 
@@ -593,8 +681,8 @@ mod tests {
 		);
 	}
 
-	fn run_only_mandatory_headers_mode_test(
-		only_mandatory_headers: bool,
+	fn run_headers_to_relay_mode_test(
+		headers_to_relay: HeadersToRelay,
 		has_mandatory_headers: bool,
 	) -> Option<JustifiedHeader<TestFinalitySyncPipeline>> {
 		let (exit_sender, _) = futures::channel::mpsc::unbounded();
@@ -619,7 +707,7 @@ mod tests {
 					tick: Duration::from_secs(0),
 					recent_finality_proofs_limit: 0,
 					stall_timeout: Duration::from_secs(0),
-					only_mandatory_headers,
+					headers_to_relay,
 				},
 				None,
 			);
@@ -628,16 +716,22 @@ mod tests {
 				best_number_at_target: 5,
 				is_using_same_fork: true,
 			};
-			finality_loop.select_header_to_submit(&info).await.unwrap()
+			finality_loop.select_header_to_submit(&info, Some(3)).await.unwrap()
 		})
 	}
 
 	#[test]
-	fn select_header_to_submit_skips_non_mandatory_headers_when_only_mandatory_headers_are_required(
-	) {
-		assert_eq!(run_only_mandatory_headers_mode_test(true, false), None);
+	fn select_header_to_submit_may_select_non_mandatory_header() {
+		assert_eq!(run_headers_to_relay_mode_test(HeadersToRelay::Mandatory, false), None);
 		assert_eq!(
-			run_only_mandatory_headers_mode_test(false, false),
+			run_headers_to_relay_mode_test(HeadersToRelay::Free, false),
+			Some(JustifiedHeader {
+				header: TestSourceHeader(false, 10, 10),
+				proof: TestFinalityProof(10)
+			}),
+		);
+		assert_eq!(
+			run_headers_to_relay_mode_test(HeadersToRelay::All, false),
 			Some(JustifiedHeader {
 				header: TestSourceHeader(false, 10, 10),
 				proof: TestFinalityProof(10)
@@ -646,17 +740,23 @@ mod tests {
 	}
 
 	#[test]
-	fn select_header_to_submit_selects_mandatory_headers_when_only_mandatory_headers_are_required()
-	{
+	fn select_header_to_submit_may_select_mandatory_header() {
+		assert_eq!(
+			run_headers_to_relay_mode_test(HeadersToRelay::Mandatory, true),
+			Some(JustifiedHeader {
+				header: TestSourceHeader(true, 8, 8),
+				proof: TestFinalityProof(8)
+			}),
+		);
 		assert_eq!(
-			run_only_mandatory_headers_mode_test(true, true),
+			run_headers_to_relay_mode_test(HeadersToRelay::Free, true),
 			Some(JustifiedHeader {
 				header: TestSourceHeader(true, 8, 8),
 				proof: TestFinalityProof(8)
 			}),
 		);
 		assert_eq!(
-			run_only_mandatory_headers_mode_test(false, true),
+			run_headers_to_relay_mode_test(HeadersToRelay::All, true),
 			Some(JustifiedHeader {
 				header: TestSourceHeader(true, 8, 8),
 				proof: TestFinalityProof(8)
@@ -690,7 +790,7 @@ mod tests {
 				test_sync_params(),
 				Some(metrics_sync.clone()),
 			);
-			finality_loop.run_iteration().await.unwrap()
+			finality_loop.run_iteration(None).await.unwrap()
 		});
 
 		assert!(!metrics_sync.is_using_same_fork());
diff --git a/bridges/relays/finality/src/headers.rs b/bridges/relays/finality/src/headers.rs
index 91f7cd0378ecd9ac8a0ee558266d993cc2253c9e..5bba4a384562d1f97334cd809ba47267698308f9 100644
--- a/bridges/relays/finality/src/headers.rs
+++ b/bridges/relays/finality/src/headers.rs
@@ -16,10 +16,11 @@
 
 use crate::{
 	finality_loop::SyncInfo, finality_proofs::FinalityProofsBuf, Error, FinalitySyncPipeline,
-	SourceClient, SourceHeader, TargetClient,
+	HeadersToRelay, SourceClient, SourceHeader, TargetClient,
 };
 
 use bp_header_chain::FinalityProof;
+use num_traits::Saturating;
 use std::cmp::Ordering;
 
 /// Unjustified headers container. Ordered by header number.
@@ -50,9 +51,13 @@ pub enum JustifiedHeaderSelector<P: FinalitySyncPipeline> {
 }
 
 impl<P: FinalitySyncPipeline> JustifiedHeaderSelector<P> {
+	/// Selects last header with persistent justification, missing from the target and matching
+	/// the `headers_to_relay` criteria.
 	pub(crate) async fn new<SC: SourceClient<P>, TC: TargetClient<P>>(
 		source_client: &SC,
 		info: &SyncInfo<P>,
+		headers_to_relay: HeadersToRelay,
+		free_headers_interval: Option<P::Number>,
 	) -> Result<Self, Error<P, SC::Error, TC::Error>> {
 		let mut unjustified_headers = Vec::new();
 		let mut maybe_justified_header = None;
@@ -70,12 +75,19 @@ impl<P: FinalitySyncPipeline> JustifiedHeaderSelector<P> {
 					return Ok(Self::Mandatory(JustifiedHeader { header, proof }))
 				},
 				(true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())),
-				(false, Some(proof)) => {
+				(false, Some(proof))
+					if need_to_relay::<P>(
+						info,
+						headers_to_relay,
+						free_headers_interval,
+						&header,
+					) =>
+				{
 					log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number);
 					unjustified_headers.clear();
 					maybe_justified_header = Some(JustifiedHeader { header, proof });
 				},
-				(false, None) => {
+				_ => {
 					unjustified_headers.push(header);
 				},
 			}
@@ -97,6 +109,7 @@ impl<P: FinalitySyncPipeline> JustifiedHeaderSelector<P> {
 		})
 	}
 
+	/// Returns selected mandatory header if we have seen one. Otherwise returns `None`.
 	pub fn select_mandatory(self) -> Option<JustifiedHeader<P>> {
 		match self {
 			JustifiedHeaderSelector::Mandatory(header) => Some(header),
@@ -104,7 +117,15 @@ impl<P: FinalitySyncPipeline> JustifiedHeaderSelector<P> {
 		}
 	}
 
-	pub fn select(self, buf: &FinalityProofsBuf<P>) -> Option<JustifiedHeader<P>> {
+	/// Tries to improve previously selected header using ephemeral
+	/// justifications stream.
+	pub fn select(
+		self,
+		info: &SyncInfo<P>,
+		headers_to_relay: HeadersToRelay,
+		free_headers_interval: Option<P::Number>,
+		buf: &FinalityProofsBuf<P>,
+	) -> Option<JustifiedHeader<P>> {
 		let (unjustified_headers, maybe_justified_header) = match self {
 			JustifiedHeaderSelector::Mandatory(justified_header) => return Some(justified_header),
 			JustifiedHeaderSelector::Regular(unjustified_headers, justified_header) =>
@@ -122,7 +143,14 @@ impl<P: FinalitySyncPipeline> JustifiedHeaderSelector<P> {
 			(maybe_finality_proof, maybe_unjustified_header)
 		{
 			match finality_proof.target_header_number().cmp(&unjustified_header.number()) {
-				Ordering::Equal => {
+				Ordering::Equal
+					if need_to_relay::<P>(
+						info,
+						headers_to_relay,
+						free_headers_interval,
+						&unjustified_header,
+					) =>
+				{
 					log::trace!(
 						target: "bridge",
 						"Managed to improve selected {} finality proof {:?} to {:?}.",
@@ -135,6 +163,10 @@ impl<P: FinalitySyncPipeline> JustifiedHeaderSelector<P> {
 						proof: finality_proof.clone(),
 					})
 				},
+				Ordering::Equal => {
+					maybe_finality_proof = finality_proofs_iter.next();
+					maybe_unjustified_header = unjustified_headers_iter.next();
+				},
 				Ordering::Less => maybe_unjustified_header = unjustified_headers_iter.next(),
 				Ordering::Greater => {
 					maybe_finality_proof = finality_proofs_iter.next();
@@ -152,6 +184,27 @@ impl<P: FinalitySyncPipeline> JustifiedHeaderSelector<P> {
 	}
 }
 
+/// Returns true if we want to relay header `header_number`.
+fn need_to_relay<P: FinalitySyncPipeline>(
+	info: &SyncInfo<P>,
+	headers_to_relay: HeadersToRelay,
+	free_headers_interval: Option<P::Number>,
+	header: &P::Header,
+) -> bool {
+	match headers_to_relay {
+		HeadersToRelay::All => true,
+		HeadersToRelay::Mandatory => header.is_mandatory(),
+		HeadersToRelay::Free =>
+			header.is_mandatory() ||
+				free_headers_interval
+					.map(|free_headers_interval| {
+						header.number().saturating_sub(info.best_number_at_target) >=
+							free_headers_interval
+					})
+					.unwrap_or(false),
+	}
+}
+
 #[cfg(test)]
 mod tests {
 	use super::*;
@@ -159,13 +212,22 @@ mod tests {
 
 	#[test]
 	fn select_better_recent_finality_proof_works() {
+		let info = SyncInfo {
+			best_number_at_source: 10,
+			best_number_at_target: 5,
+			is_using_same_fork: true,
+		};
+
 		// if there are no unjustified headers, nothing is changed
 		let finality_proofs_buf =
 			FinalityProofsBuf::<TestFinalitySyncPipeline>::new(vec![TestFinalityProof(5)]);
 		let justified_header =
 			JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) };
 		let selector = JustifiedHeaderSelector::Regular(vec![], justified_header.clone());
-		assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header));
+		assert_eq!(
+			selector.select(&info, HeadersToRelay::All, None, &finality_proofs_buf),
+			Some(justified_header)
+		);
 
 		// if there are no buffered finality proofs, nothing is changed
 		let finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline>::new(vec![]);
@@ -175,7 +237,10 @@ mod tests {
 			vec![TestSourceHeader(false, 5, 5)],
 			justified_header.clone(),
 		);
-		assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header));
+		assert_eq!(
+			selector.select(&info, HeadersToRelay::All, None, &finality_proofs_buf),
+			Some(justified_header)
+		);
 
 		// if there's no intersection between recent finality proofs and unjustified headers,
 		// nothing is changed
@@ -189,7 +254,10 @@ mod tests {
 			vec![TestSourceHeader(false, 9, 9), TestSourceHeader(false, 10, 10)],
 			justified_header.clone(),
 		);
-		assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header));
+		assert_eq!(
+			selector.select(&info, HeadersToRelay::All, None, &finality_proofs_buf),
+			Some(justified_header)
+		);
 
 		// if there's intersection between recent finality proofs and unjustified headers, but there
 		// are no proofs in this intersection, nothing is changed
@@ -207,7 +275,10 @@ mod tests {
 			],
 			justified_header.clone(),
 		);
-		assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header));
+		assert_eq!(
+			selector.select(&info, HeadersToRelay::All, None, &finality_proofs_buf),
+			Some(justified_header)
+		);
 
 		// if there's intersection between recent finality proofs and unjustified headers and
 		// there's a proof in this intersection:
@@ -228,11 +299,63 @@ mod tests {
 			justified_header,
 		);
 		assert_eq!(
-			selector.select(&finality_proofs_buf),
+			selector.select(&info, HeadersToRelay::All, None, &finality_proofs_buf),
 			Some(JustifiedHeader {
 				header: TestSourceHeader(false, 9, 9),
 				proof: TestFinalityProof(9)
 			})
 		);
+
+		// when only free headers needs to be relayed and there are no free headers
+		let finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline>::new(vec![
+			TestFinalityProof(7),
+			TestFinalityProof(9),
+		]);
+		let selector = JustifiedHeaderSelector::None(vec![
+			TestSourceHeader(false, 8, 8),
+			TestSourceHeader(false, 9, 9),
+			TestSourceHeader(false, 10, 10),
+		]);
+		assert_eq!(
+			selector.select(&info, HeadersToRelay::Free, Some(7), &finality_proofs_buf),
+			None,
+		);
+
+		// when only free headers needs to be relayed, mandatory header may be selected
+		let finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline>::new(vec![
+			TestFinalityProof(6),
+			TestFinalityProof(9),
+		]);
+		let selector = JustifiedHeaderSelector::None(vec![
+			TestSourceHeader(false, 8, 8),
+			TestSourceHeader(true, 9, 9),
+			TestSourceHeader(false, 10, 10),
+		]);
+		assert_eq!(
+			selector.select(&info, HeadersToRelay::Free, Some(7), &finality_proofs_buf),
+			Some(JustifiedHeader {
+				header: TestSourceHeader(true, 9, 9),
+				proof: TestFinalityProof(9)
+			})
+		);
+
+		// when only free headers needs to be relayed and there is free header
+		let finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline>::new(vec![
+			TestFinalityProof(7),
+			TestFinalityProof(9),
+			TestFinalityProof(14),
+		]);
+		let selector = JustifiedHeaderSelector::None(vec![
+			TestSourceHeader(false, 7, 7),
+			TestSourceHeader(false, 10, 10),
+			TestSourceHeader(false, 14, 14),
+		]);
+		assert_eq!(
+			selector.select(&info, HeadersToRelay::Free, Some(7), &finality_proofs_buf),
+			Some(JustifiedHeader {
+				header: TestSourceHeader(false, 14, 14),
+				proof: TestFinalityProof(14)
+			})
+		);
 	}
 }
diff --git a/bridges/relays/finality/src/lib.rs b/bridges/relays/finality/src/lib.rs
index 3579e68e1ef9c686575e3ddba239bead7bd9312f..4346f96674b4c43c153ad8bf55cb5ee963871849 100644
--- a/bridges/relays/finality/src/lib.rs
+++ b/bridges/relays/finality/src/lib.rs
@@ -21,7 +21,9 @@
 
 pub use crate::{
 	base::{FinalityPipeline, SourceClientBase},
-	finality_loop::{metrics_prefix, run, FinalitySyncParams, SourceClient, TargetClient},
+	finality_loop::{
+		metrics_prefix, run, FinalitySyncParams, HeadersToRelay, SourceClient, TargetClient,
+	},
 	finality_proofs::{FinalityProofsBuf, FinalityProofsStream},
 	sync_loop_metrics::SyncLoopMetrics,
 };
diff --git a/bridges/relays/finality/src/mock.rs b/bridges/relays/finality/src/mock.rs
index e3ec4e4d0d47a04ce5a22ee75374ebe08064df5e..69357f71ce27d54a2ca4866e3fd6db0a73fb44e2 100644
--- a/bridges/relays/finality/src/mock.rs
+++ b/bridges/relays/finality/src/mock.rs
@@ -198,10 +198,15 @@ impl TargetClient<TestFinalitySyncPipeline> for TestTargetClient {
 		Ok(data.target_best_block_id)
 	}
 
+	async fn free_source_headers_interval(&self) -> Result<Option<TestNumber>, TestError> {
+		Ok(Some(3))
+	}
+
 	async fn submit_finality_proof(
 		&self,
 		header: TestSourceHeader,
 		proof: TestFinalityProof,
+		_is_free_execution_expected: bool,
 	) -> Result<TestTransactionTracker, TestError> {
 		let mut data = self.data.lock();
 		(self.on_method_call)(&mut data);
diff --git a/bridges/relays/lib-substrate-relay/src/cli/relay_headers.rs b/bridges/relays/lib-substrate-relay/src/cli/relay_headers.rs
index 90558ed46138366221c1f9834d21060e7e54e66b..cf1957c7323b473c7ed2e412cce9ceab06f52831 100644
--- a/bridges/relays/lib-substrate-relay/src/cli/relay_headers.rs
+++ b/bridges/relays/lib-substrate-relay/src/cli/relay_headers.rs
@@ -24,6 +24,7 @@ use relay_utils::metrics::{GlobalMetrics, StandaloneMetric};
 use crate::{
 	cli::{bridge::*, chain_schema::*, PrometheusParams},
 	finality::SubstrateFinalitySyncPipeline,
+	HeadersToRelay,
 };
 
 /// Chain headers relaying params.
@@ -33,6 +34,10 @@ pub struct RelayHeadersParams {
 	/// are relayed.
 	#[structopt(long)]
 	only_mandatory_headers: bool,
+	/// If passed, only free headers (mandatory and every Nth header, if configured in runtime)
+	/// are relayed. Overrides `only_mandatory_headers`.
+	#[structopt(long)]
+	only_free_headers: bool,
 	#[structopt(flatten)]
 	source: SourceConnectionParams,
 	#[structopt(flatten)]
@@ -43,11 +48,22 @@ pub struct RelayHeadersParams {
 	prometheus_params: PrometheusParams,
 }
 
+impl RelayHeadersParams {
+	fn headers_to_relay(&self) -> HeadersToRelay {
+		match (self.only_mandatory_headers, self.only_free_headers) {
+			(_, true) => HeadersToRelay::Free,
+			(true, false) => HeadersToRelay::Mandatory,
+			_ => HeadersToRelay::All,
+		}
+	}
+}
+
 /// Trait used for relaying headers between 2 chains.
 #[async_trait]
 pub trait HeadersRelayer: RelayToRelayHeadersCliBridge {
 	/// Relay headers.
 	async fn relay_headers(data: RelayHeadersParams) -> anyhow::Result<()> {
+		let headers_to_relay = data.headers_to_relay();
 		let source_client = data.source.into_client::<Self::Source>().await?;
 		let target_client = data.target.into_client::<Self::Target>().await?;
 		let target_transactions_mortality = data.target_sign.target_transactions_mortality;
@@ -67,7 +83,7 @@ pub trait HeadersRelayer: RelayToRelayHeadersCliBridge {
 		crate::finality::run::<Self::Finality>(
 			source_client,
 			target_client,
-			data.only_mandatory_headers,
+			headers_to_relay,
 			target_transactions_params,
 			metrics_params,
 		)
diff --git a/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/mod.rs b/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/mod.rs
index 27e9f1c21ba0dae1480ef8128afdfd635a1d22c2..a796df6721b8c8afd7f401f92e2fca6afcb41b02 100644
--- a/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/mod.rs
+++ b/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/mod.rs
@@ -40,7 +40,7 @@ use crate::{
 	cli::{bridge::MessagesCliBridge, HexLaneId, PrometheusParams},
 	messages_lane::{MessagesRelayLimits, MessagesRelayParams},
 	on_demand::OnDemandRelay,
-	TaggedAccount, TransactionParams,
+	HeadersToRelay, TaggedAccount, TransactionParams,
 };
 use bp_messages::LaneId;
 use bp_runtime::BalanceOf;
@@ -61,11 +61,25 @@ pub struct HeadersAndMessagesSharedParams {
 	/// are relayed.
 	#[structopt(long)]
 	pub only_mandatory_headers: bool,
+	/// If passed, only free headers (mandatory and every Nth header, if configured in runtime)
+	/// are relayed. Overrides `only_mandatory_headers`.
+	#[structopt(long)]
+	pub only_free_headers: bool,
 	#[structopt(flatten)]
 	/// Prometheus metrics params.
 	pub prometheus_params: PrometheusParams,
 }
 
+impl HeadersAndMessagesSharedParams {
+	fn headers_to_relay(&self) -> HeadersToRelay {
+		match (self.only_mandatory_headers, self.only_free_headers) {
+			(_, true) => HeadersToRelay::Free,
+			(true, false) => HeadersToRelay::Mandatory,
+			_ => HeadersToRelay::All,
+		}
+	}
+}
+
 /// Bridge parameters, shared by all bridge types.
 pub struct Full2WayBridgeCommonParams<
 	Left: ChainWithTransactions + ChainWithRuntimeVersion,
@@ -418,6 +432,7 @@ mod tests {
 				shared: HeadersAndMessagesSharedParams {
 					lane: vec![HexLaneId([0x00, 0x00, 0x00, 0x00])],
 					only_mandatory_headers: false,
+					only_free_headers: false,
 					prometheus_params: PrometheusParams {
 						no_prometheus: false,
 						prometheus_host: "0.0.0.0".into(),
diff --git a/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/parachain_to_parachain.rs b/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/parachain_to_parachain.rs
index 76accfa29050613070c6579103d4e41f6084eea6..7f6f40777823679c97577f1244eb9a860948d267 100644
--- a/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/parachain_to_parachain.rs
+++ b/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/parachain_to_parachain.rs
@@ -180,7 +180,7 @@ where
 				self.left_relay.clone(),
 				self.common.right.client.clone(),
 				self.common.right.tx_params.clone(),
-				self.common.shared.only_mandatory_headers,
+				self.common.shared.headers_to_relay(),
 				Some(self.common.metrics_params.clone()),
 			);
 		let right_relay_to_left_on_demand_headers =
@@ -188,7 +188,7 @@ where
 				self.right_relay.clone(),
 				self.common.left.client.clone(),
 				self.common.left.tx_params.clone(),
-				self.common.shared.only_mandatory_headers,
+				self.common.shared.headers_to_relay(),
 				Some(self.common.metrics_params.clone()),
 			);
 
diff --git a/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/relay_to_parachain.rs b/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/relay_to_parachain.rs
index b75ac3e60c26934c7dda603a2c7a91649d17eb52..5911fe49df4adfc955cbab4d142998fbc7ed4d22 100644
--- a/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/relay_to_parachain.rs
+++ b/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/relay_to_parachain.rs
@@ -171,7 +171,7 @@ where
 				self.common.left.client.clone(),
 				self.common.right.client.clone(),
 				self.common.right.tx_params.clone(),
-				self.common.shared.only_mandatory_headers,
+				self.common.shared.headers_to_relay(),
 				None,
 			);
 		let right_relay_to_left_on_demand_headers =
@@ -179,7 +179,7 @@ where
 				self.right_relay.clone(),
 				self.common.left.client.clone(),
 				self.common.left.tx_params.clone(),
-				self.common.shared.only_mandatory_headers,
+				self.common.shared.headers_to_relay(),
 				Some(self.common.metrics_params.clone()),
 			);
 		let right_to_left_on_demand_parachains = OnDemandParachainsRelay::<
diff --git a/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/relay_to_relay.rs b/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/relay_to_relay.rs
index b397ff50a20a62833d96a3687c8d3c3494efc5c2..832df4ae4003ced1715d7b9d495989d9163417d5 100644
--- a/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/relay_to_relay.rs
+++ b/bridges/relays/lib-substrate-relay/src/cli/relay_headers_and_messages/relay_to_relay.rs
@@ -152,7 +152,7 @@ where
 				self.common.left.client.clone(),
 				self.common.right.client.clone(),
 				self.common.right.tx_params.clone(),
-				self.common.shared.only_mandatory_headers,
+				self.common.shared.headers_to_relay(),
 				None,
 			);
 		let right_to_left_on_demand_headers =
@@ -160,7 +160,7 @@ where
 				self.common.right.client.clone(),
 				self.common.left.client.clone(),
 				self.common.left.tx_params.clone(),
-				self.common.shared.only_mandatory_headers,
+				self.common.shared.headers_to_relay(),
 				None,
 			);
 
diff --git a/bridges/relays/lib-substrate-relay/src/cli/relay_parachains.rs b/bridges/relays/lib-substrate-relay/src/cli/relay_parachains.rs
index e5a52349469bbdf36c7de228078d8d10b0e882f0..1425233add1ee3a58b1729a10ed2bb2d2fbc8667 100644
--- a/bridges/relays/lib-substrate-relay/src/cli/relay_parachains.rs
+++ b/bridges/relays/lib-substrate-relay/src/cli/relay_parachains.rs
@@ -43,6 +43,10 @@ pub struct RelayParachainsParams {
 	target: TargetConnectionParams,
 	#[structopt(flatten)]
 	target_sign: TargetSigningParams,
+	/// If passed, only free headers (those, available at "free" relay chain headers)
+	/// are relayed.
+	#[structopt(long)]
+	only_free_headers: bool,
 	#[structopt(flatten)]
 	prometheus_params: PrometheusParams,
 }
@@ -59,9 +63,9 @@ where
 {
 	/// Start relaying parachains finality.
 	async fn relay_parachains(data: RelayParachainsParams) -> anyhow::Result<()> {
-		let source_client = data.source.into_client::<Self::SourceRelay>().await?;
+		let source_chain_client = data.source.into_client::<Self::SourceRelay>().await?;
 		let source_client = ParachainsSource::<Self::ParachainFinality>::new(
-			source_client,
+			source_chain_client.clone(),
 			Arc::new(Mutex::new(AvailableHeader::Missing)),
 		);
 
@@ -69,9 +73,10 @@ where
 			signer: data.target_sign.to_keypair::<Self::Target>()?,
 			mortality: data.target_sign.target_transactions_mortality,
 		};
-		let target_client = data.target.into_client::<Self::Target>().await?;
+		let target_chain_client = data.target.into_client::<Self::Target>().await?;
 		let target_client = ParachainsTarget::<Self::ParachainFinality>::new(
-			target_client.clone(),
+			source_chain_client,
+			target_chain_client,
 			target_transaction_params,
 		);
 
@@ -83,6 +88,7 @@ where
 			source_client,
 			target_client,
 			metrics_params,
+			data.only_free_headers,
 			futures::future::pending(),
 		)
 		.await
diff --git a/bridges/relays/lib-substrate-relay/src/finality/mod.rs b/bridges/relays/lib-substrate-relay/src/finality/mod.rs
index 206f628b143b8a085dec9a0ef5de929c178c25f6..a06857ae1d9b2f2340214c2bfa9df06c9683eead 100644
--- a/bridges/relays/lib-substrate-relay/src/finality/mod.rs
+++ b/bridges/relays/lib-substrate-relay/src/finality/mod.rs
@@ -25,7 +25,7 @@ use crate::{
 
 use async_trait::async_trait;
 use bp_header_chain::justification::{GrandpaJustification, JustificationVerificationContext};
-use finality_relay::{FinalityPipeline, FinalitySyncPipeline};
+use finality_relay::{FinalityPipeline, FinalitySyncPipeline, HeadersToRelay};
 use pallet_bridge_grandpa::{Call as BridgeGrandpaCall, Config as BridgeGrandpaConfig};
 use relay_substrate_client::{
 	transaction_stall_timeout, AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain,
@@ -115,6 +115,7 @@ pub trait SubmitFinalityProofCallBuilder<P: SubstrateFinalitySyncPipeline> {
 	fn build_submit_finality_proof_call(
 		header: SyncHeader<HeaderOf<P::SourceChain>>,
 		proof: SubstrateFinalityProof<P>,
+		is_free_execution_expected: bool,
 		context: <<P as SubstrateFinalityPipeline>::FinalityEngine as Engine<P::SourceChain>>::FinalityVerificationContext,
 	) -> CallOf<P::TargetChain>;
 }
@@ -142,6 +143,7 @@ where
 	fn build_submit_finality_proof_call(
 		header: SyncHeader<HeaderOf<P::SourceChain>>,
 		proof: GrandpaJustification<HeaderOf<P::SourceChain>>,
+		_is_free_execution_expected: bool,
 		_context: JustificationVerificationContext,
 	) -> CallOf<P::TargetChain> {
 		BridgeGrandpaCall::<R, I>::submit_finality_proof {
@@ -176,6 +178,7 @@ macro_rules! generate_submit_finality_proof_call_builder {
 						<$pipeline as $crate::finality_base::SubstrateFinalityPipeline>::SourceChain
 					>
 				>,
+				_is_free_execution_expected: bool,
 				_context: bp_header_chain::justification::JustificationVerificationContext,
 			) -> relay_substrate_client::CallOf<
 				<$pipeline as $crate::finality_base::SubstrateFinalityPipeline>::TargetChain
@@ -215,6 +218,7 @@ macro_rules! generate_submit_finality_proof_ex_call_builder {
 						<$pipeline as $crate::finality_base::SubstrateFinalityPipeline>::SourceChain
 					>
 				>,
+				is_free_execution_expected: bool,
 				context: bp_header_chain::justification::JustificationVerificationContext,
 			) -> relay_substrate_client::CallOf<
 				<$pipeline as $crate::finality_base::SubstrateFinalityPipeline>::TargetChain
@@ -223,7 +227,8 @@ macro_rules! generate_submit_finality_proof_ex_call_builder {
 					$bridge_grandpa($submit_finality_proof {
 						finality_target: Box::new(header.into_inner()),
 						justification: proof,
-						current_set_id: context.authority_set_id
+						current_set_id: context.authority_set_id,
+						is_free_execution_expected,
 					})
 				}
 			}
@@ -235,15 +240,16 @@ macro_rules! generate_submit_finality_proof_ex_call_builder {
 pub async fn run<P: SubstrateFinalitySyncPipeline>(
 	source_client: Client<P::SourceChain>,
 	target_client: Client<P::TargetChain>,
-	only_mandatory_headers: bool,
+	headers_to_relay: HeadersToRelay,
 	transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
 	metrics_params: MetricsParams,
 ) -> anyhow::Result<()> {
 	log::info!(
 		target: "bridge",
-		"Starting {} -> {} finality proof relay",
+		"Starting {} -> {} finality proof relay: relaying {:?} headers",
 		P::SourceChain::NAME,
 		P::TargetChain::NAME,
+		headers_to_relay,
 	);
 
 	finality_relay::run(
@@ -260,7 +266,7 @@ pub async fn run<P: SubstrateFinalitySyncPipeline>(
 				P::TargetChain::AVERAGE_BLOCK_INTERVAL,
 				relay_utils::STALL_TIMEOUT,
 			),
-			only_mandatory_headers,
+			headers_to_relay,
 		},
 		metrics_params,
 		futures::future::pending(),
diff --git a/bridges/relays/lib-substrate-relay/src/finality/target.rs b/bridges/relays/lib-substrate-relay/src/finality/target.rs
index 18464d523f4f6cf4c9c165af82f6b7c2c2504070..adbcfe0096d5f46ddacdf80e335f74967febca0e 100644
--- a/bridges/relays/lib-substrate-relay/src/finality/target.rs
+++ b/bridges/relays/lib-substrate-relay/src/finality/target.rs
@@ -25,9 +25,10 @@ use crate::{
 };
 
 use async_trait::async_trait;
+use bp_runtime::BlockNumberOf;
 use finality_relay::TargetClient;
 use relay_substrate_client::{
-	AccountKeyPairOf, Client, Error, HeaderIdOf, HeaderOf, SyncHeader, TransactionEra,
+	AccountKeyPairOf, Chain, Client, Error, HeaderIdOf, HeaderOf, SyncHeader, TransactionEra,
 	TransactionTracker, UnsignedTransaction,
 };
 use relay_utils::relay_loop::Client as RelayClient;
@@ -103,10 +104,23 @@ impl<P: SubstrateFinalitySyncPipeline> TargetClient<FinalitySyncPipelineAdapter<
 		.ok_or(Error::BridgePalletIsNotInitialized)?)
 	}
 
+	async fn free_source_headers_interval(
+		&self,
+	) -> Result<Option<BlockNumberOf<P::SourceChain>>, Self::Error> {
+		self.client
+			.typed_state_call(
+				P::SourceChain::FREE_HEADERS_INTERVAL_METHOD.into(),
+				(),
+				Some(self.client.best_header().await?.hash()),
+			)
+			.await
+	}
+
 	async fn submit_finality_proof(
 		&self,
 		header: SyncHeader<HeaderOf<P::SourceChain>>,
 		mut proof: SubstrateFinalityProof<P>,
+		is_free_execution_expected: bool,
 	) -> Result<Self::TransactionTracker, Error> {
 		// verify and runtime module at target chain may require optimized finality proof
 		let context =
@@ -115,7 +129,10 @@ impl<P: SubstrateFinalitySyncPipeline> TargetClient<FinalitySyncPipelineAdapter<
 		// now we may submit optimized finality proof
 		let mortality = self.transaction_params.mortality;
 		let call = P::SubmitFinalityProofCallBuilder::build_submit_finality_proof_call(
-			header, proof, context,
+			header,
+			proof,
+			is_free_execution_expected,
+			context,
 		);
 		self.client
 			.submit_and_watch_signed_extrinsic(
diff --git a/bridges/relays/lib-substrate-relay/src/lib.rs b/bridges/relays/lib-substrate-relay/src/lib.rs
index b90453ae0db2d60364a18f70d5be2f278544cd37..b3e8e7ed9a2059bb07134640aa4e0bc98494a6a1 100644
--- a/bridges/relays/lib-substrate-relay/src/lib.rs
+++ b/bridges/relays/lib-substrate-relay/src/lib.rs
@@ -22,6 +22,9 @@ use relay_substrate_client::{Chain, ChainWithUtilityPallet, UtilityPallet};
 
 use std::marker::PhantomData;
 
+// to avoid `finality_relay` dependency in other crates
+pub use finality_relay::HeadersToRelay;
+
 pub mod cli;
 pub mod equivocation;
 pub mod error;
diff --git a/bridges/relays/lib-substrate-relay/src/on_demand/headers.rs b/bridges/relays/lib-substrate-relay/src/on_demand/headers.rs
index e8a2a3c6c58aaedeb55da67871d0ddb51830338d..74f3a70c5e81bbc1d27162a74fb8dadab46a6d09 100644
--- a/bridges/relays/lib-substrate-relay/src/on_demand/headers.rs
+++ b/bridges/relays/lib-substrate-relay/src/on_demand/headers.rs
@@ -28,7 +28,7 @@ use futures::{select, FutureExt};
 use num_traits::{One, Saturating, Zero};
 use sp_runtime::traits::Header;
 
-use finality_relay::{FinalitySyncParams, TargetClient as FinalityTargetClient};
+use finality_relay::{FinalitySyncParams, HeadersToRelay, TargetClient as FinalityTargetClient};
 use relay_substrate_client::{
 	AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain, Client, Error as SubstrateError,
 	HeaderIdOf,
@@ -75,7 +75,7 @@ impl<P: SubstrateFinalitySyncPipeline> OnDemandHeadersRelay<P> {
 		source_client: Client<P::SourceChain>,
 		target_client: Client<P::TargetChain>,
 		target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
-		only_mandatory_headers: bool,
+		headers_to_relay: HeadersToRelay,
 		metrics_params: Option<MetricsParams>,
 	) -> Self
 	where
@@ -94,7 +94,7 @@ impl<P: SubstrateFinalitySyncPipeline> OnDemandHeadersRelay<P> {
 				source_client,
 				target_client,
 				target_transaction_params,
-				only_mandatory_headers,
+				headers_to_relay,
 				required_header_number,
 				metrics_params,
 			)
@@ -191,7 +191,7 @@ impl<P: SubstrateFinalitySyncPipeline> OnDemandRelay<P::SourceChain, P::TargetCh
 
 			// and then craft the submit-proof call
 			let call = P::SubmitFinalityProofCallBuilder::build_submit_finality_proof_call(
-				header, proof, context,
+				header, proof, false, context,
 			);
 
 			return Ok((header_id, vec![call]));
@@ -204,7 +204,7 @@ async fn background_task<P: SubstrateFinalitySyncPipeline>(
 	source_client: Client<P::SourceChain>,
 	target_client: Client<P::TargetChain>,
 	target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
-	only_mandatory_headers: bool,
+	headers_to_relay: HeadersToRelay,
 	required_header_number: RequiredHeaderNumberRef<P::SourceChain>,
 	metrics_params: Option<MetricsParams>,
 ) where
@@ -346,11 +346,11 @@ async fn background_task<P: SubstrateFinalitySyncPipeline>(
 			log::info!(
 				target: "bridge",
 				"[{}] Starting on-demand headers relay task\n\t\
-					Only mandatory headers: {}\n\t\
+					Headers to relay: {:?}\n\t\
 					Tx mortality: {:?} (~{}m)\n\t\
 					Stall timeout: {:?}",
 				relay_task_name,
-				only_mandatory_headers,
+				headers_to_relay,
 				target_transactions_mortality,
 				stall_timeout.as_secs_f64() / 60.0f64,
 				stall_timeout,
@@ -367,7 +367,7 @@ async fn background_task<P: SubstrateFinalitySyncPipeline>(
 						),
 						recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT,
 						stall_timeout,
-						only_mandatory_headers,
+						headers_to_relay,
 					},
 					metrics_params.clone().unwrap_or_else(MetricsParams::disabled),
 					futures::future::pending(),
diff --git a/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs b/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs
index f67c002bba7f9c61925cb83e96433dbd40db7d4b..966bdc3107203a61cf405adba2cf09124330954e 100644
--- a/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs
+++ b/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs
@@ -222,6 +222,7 @@ where
 			proved_relay_block,
 			vec![(para_id, para_hash)],
 			para_proof,
+			false,
 		));
 
 		Ok((proved_parachain_block, calls))
@@ -256,8 +257,11 @@ async fn background_task<P: SubstrateParachainsPipeline>(
 
 	let mut parachains_source =
 		ParachainsSource::<P>::new(source_relay_client.clone(), required_para_header_ref.clone());
-	let mut parachains_target =
-		ParachainsTarget::<P>::new(target_client.clone(), target_transaction_params.clone());
+	let mut parachains_target = ParachainsTarget::<P>::new(
+		source_relay_client.clone(),
+		target_client.clone(),
+		target_transaction_params.clone(),
+	);
 
 	loop {
 		select! {
@@ -392,6 +396,8 @@ async fn background_task<P: SubstrateParachainsPipeline>(
 					parachains_source.clone(),
 					parachains_target.clone(),
 					MetricsParams::disabled(),
+					// we do not support free parachain headers relay in on-demand relays
+					false,
 					futures::future::pending(),
 				)
 				.fuse(),
@@ -481,7 +487,7 @@ where
 	let para_header_at_target = best_finalized_peer_header_at_self::<
 		P::TargetChain,
 		P::SourceParachain,
-	>(target.client(), best_target_block_hash)
+	>(target.target_client(), best_target_block_hash)
 	.await;
 	// if there are no parachain heads at the target (`NoParachainHeadAtTarget`), we'll need to
 	// submit at least one. Otherwise the pallet will be treated as uninitialized and messages
@@ -504,7 +510,7 @@ where
 	let relay_header_at_target = best_finalized_peer_header_at_self::<
 		P::TargetChain,
 		P::SourceRelayChain,
-	>(target.client(), best_target_block_hash)
+	>(target.target_client(), best_target_block_hash)
 	.await
 	.map_err(map_target_err)?;
 
diff --git a/bridges/relays/lib-substrate-relay/src/parachains/mod.rs b/bridges/relays/lib-substrate-relay/src/parachains/mod.rs
index 722f9b61f9f08d87dac3bd95a0780d8422097a38..8b128bb770dd7a05d28ad46d4561f4d859b1deb6 100644
--- a/bridges/relays/lib-substrate-relay/src/parachains/mod.rs
+++ b/bridges/relays/lib-substrate-relay/src/parachains/mod.rs
@@ -71,6 +71,7 @@ pub trait SubmitParachainHeadsCallBuilder<P: SubstrateParachainsPipeline>:
 		at_relay_block: HeaderIdOf<P::SourceRelayChain>,
 		parachains: Vec<(ParaId, ParaHash)>,
 		parachain_heads_proof: ParaHeadsProof,
+		is_free_execution_expected: bool,
 	) -> CallOf<P::TargetChain>;
 }
 
@@ -97,6 +98,7 @@ where
 		at_relay_block: HeaderIdOf<P::SourceRelayChain>,
 		parachains: Vec<(ParaId, ParaHash)>,
 		parachain_heads_proof: ParaHeadsProof,
+		_is_free_execution_expected: bool,
 	) -> CallOf<P::TargetChain> {
 		BridgeParachainsCall::<R, I>::submit_parachain_heads {
 			at_relay_block: (at_relay_block.0, at_relay_block.1),
diff --git a/bridges/relays/lib-substrate-relay/src/parachains/target.rs b/bridges/relays/lib-substrate-relay/src/parachains/target.rs
index 6df7bc0a742a9f6693a422b994c7a9203f3c8b74..e10d15b6edf6c75773e2e10bc9caf2e406632284 100644
--- a/bridges/relays/lib-substrate-relay/src/parachains/target.rs
+++ b/bridges/relays/lib-substrate-relay/src/parachains/target.rs
@@ -24,42 +24,53 @@ use crate::{
 };
 
 use async_trait::async_trait;
-use bp_polkadot_core::parachains::{ParaHash, ParaHeadsProof, ParaId};
-use bp_runtime::HeaderIdProvider;
-use codec::Decode;
+use bp_parachains::{
+	ImportedParaHeadsKeyProvider, ParaInfo, ParaStoredHeaderData, ParasInfoKeyProvider,
+};
+use bp_polkadot_core::{
+	parachains::{ParaHash, ParaHeadsProof, ParaId},
+	BlockNumber as RelayBlockNumber,
+};
+use bp_runtime::{
+	Chain as ChainBase, HeaderId, HeaderIdProvider, StorageDoubleMapKeyProvider,
+	StorageMapKeyProvider,
+};
 use parachains_relay::parachains_loop::TargetClient;
 use relay_substrate_client::{
-	AccountIdOf, AccountKeyPairOf, Chain, Client, Error as SubstrateError, HeaderIdOf,
-	ParachainBase, TransactionEra, TransactionTracker, UnsignedTransaction,
+	AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError,
+	HeaderIdOf, ParachainBase, RelayChain, TransactionEra, TransactionTracker, UnsignedTransaction,
 };
 use relay_utils::relay_loop::Client as RelayClient;
-use sp_core::{Bytes, Pair};
+use sp_core::Pair;
 
 /// Substrate client as parachain heads source.
 pub struct ParachainsTarget<P: SubstrateParachainsPipeline> {
-	client: Client<P::TargetChain>,
+	source_client: Client<P::SourceRelayChain>,
+	target_client: Client<P::TargetChain>,
 	transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
 }
 
 impl<P: SubstrateParachainsPipeline> ParachainsTarget<P> {
 	/// Creates new parachains target client.
 	pub fn new(
-		client: Client<P::TargetChain>,
+		source_client: Client<P::SourceRelayChain>,
+		target_client: Client<P::TargetChain>,
 		transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
 	) -> Self {
-		ParachainsTarget { client, transaction_params }
+		ParachainsTarget { source_client, target_client, transaction_params }
 	}
 
 	/// Returns reference to the underlying RPC client.
-	pub fn client(&self) -> &Client<P::TargetChain> {
-		&self.client
+	pub fn target_client(&self) -> &Client<P::TargetChain> {
+		&self.target_client
 	}
 }
 
 impl<P: SubstrateParachainsPipeline> Clone for ParachainsTarget<P> {
 	fn clone(&self) -> Self {
 		ParachainsTarget {
-			client: self.client.clone(),
+			source_client: self.source_client.clone(),
+			target_client: self.target_client.clone(),
 			transaction_params: self.transaction_params.clone(),
 		}
 	}
@@ -70,7 +81,9 @@ impl<P: SubstrateParachainsPipeline> RelayClient for ParachainsTarget<P> {
 	type Error = SubstrateError;
 
 	async fn reconnect(&mut self) -> Result<(), SubstrateError> {
-		self.client.reconnect().await
+		self.target_client.reconnect().await?;
+		self.source_client.reconnect().await?;
+		Ok(())
 	}
 }
 
@@ -79,11 +92,13 @@ impl<P> TargetClient<ParachainsPipelineAdapter<P>> for ParachainsTarget<P>
 where
 	P: SubstrateParachainsPipeline,
 	AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TargetChain> as Pair>::Public>,
+	P::SourceParachain: ChainBase<Hash = ParaHash>,
+	P::SourceRelayChain: ChainBase<BlockNumber = RelayBlockNumber>,
 {
 	type TransactionTracker = TransactionTracker<P::TargetChain, Client<P::TargetChain>>;
 
 	async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error> {
-		let best_header = self.client.best_header().await?;
+		let best_header = self.target_client.best_header().await?;
 		let best_id = best_header.id();
 
 		Ok(best_id)
@@ -93,7 +108,7 @@ where
 		&self,
 		at_block: &HeaderIdOf<P::TargetChain>,
 	) -> Result<HeaderIdOf<P::SourceRelayChain>, Self::Error> {
-		self.client
+		self.target_client
 			.typed_state_call::<_, Option<HeaderIdOf<P::SourceRelayChain>>>(
 				P::SourceRelayChain::BEST_FINALIZED_HEADER_ID_METHOD.into(),
 				(),
@@ -104,23 +119,57 @@ where
 			.unwrap_or(Err(SubstrateError::BridgePalletIsNotInitialized))
 	}
 
+	async fn free_source_relay_headers_interval(
+		&self,
+	) -> Result<Option<BlockNumberOf<P::SourceRelayChain>>, Self::Error> {
+		self.target_client
+			.typed_state_call(P::SourceRelayChain::FREE_HEADERS_INTERVAL_METHOD.into(), (), None)
+			.await
+	}
+
 	async fn parachain_head(
 		&self,
 		at_block: HeaderIdOf<P::TargetChain>,
-	) -> Result<Option<HeaderIdOf<P::SourceParachain>>, Self::Error> {
-		let encoded_best_finalized_source_para_block = self
-			.client
-			.state_call(
-				P::SourceParachain::BEST_FINALIZED_HEADER_ID_METHOD.into(),
-				Bytes(Vec::new()),
-				Some(at_block.1),
-			)
-			.await?;
+	) -> Result<
+		Option<(HeaderIdOf<P::SourceRelayChain>, HeaderIdOf<P::SourceParachain>)>,
+		Self::Error,
+	> {
+		// read best parachain head from the target bridge-parachains pallet
+		let storage_key = ParasInfoKeyProvider::final_key(
+			P::SourceRelayChain::WITH_CHAIN_BRIDGE_PARACHAINS_PALLET_NAME,
+			&P::SourceParachain::PARACHAIN_ID.into(),
+		);
+		let storage_value: Option<ParaInfo> =
+			self.target_client.storage_value(storage_key, Some(at_block.hash())).await?;
+		let para_info = match storage_value {
+			Some(para_info) => para_info,
+			None => return Ok(None),
+		};
+
+		// now we need to get full header ids. For source relay chain it is simple, because we
+		// are connected
+		let relay_header_id = self
+			.source_client
+			.header_by_number(para_info.best_head_hash.at_relay_block_number)
+			.await?
+			.id();
 
-		Ok(Option::<HeaderIdOf<P::SourceParachain>>::decode(
-			&mut &encoded_best_finalized_source_para_block.0[..],
-		)
-		.map_err(SubstrateError::ResponseParseFailed)?)
+		// for parachain, we need to read from the target chain runtime storage
+		let storage_key = ImportedParaHeadsKeyProvider::final_key(
+			P::SourceRelayChain::WITH_CHAIN_BRIDGE_PARACHAINS_PALLET_NAME,
+			&P::SourceParachain::PARACHAIN_ID.into(),
+			&para_info.best_head_hash.head_hash,
+		);
+		let storage_value: Option<ParaStoredHeaderData> =
+			self.target_client.storage_value(storage_key, Some(at_block.hash())).await?;
+		let para_head_number = match storage_value {
+			Some(para_head_data) =>
+				para_head_data.decode_parachain_head_data::<P::SourceParachain>()?.number,
+			None => return Ok(None),
+		};
+
+		let para_head_id = HeaderId(para_head_number, para_info.best_head_hash.head_hash);
+		Ok(Some((relay_header_id, para_head_id)))
 	}
 
 	async fn submit_parachain_head_proof(
@@ -128,14 +177,16 @@ where
 		at_relay_block: HeaderIdOf<P::SourceRelayChain>,
 		updated_head_hash: ParaHash,
 		proof: ParaHeadsProof,
+		is_free_execution_expected: bool,
 	) -> Result<Self::TransactionTracker, Self::Error> {
 		let transaction_params = self.transaction_params.clone();
 		let call = P::SubmitParachainHeadsCallBuilder::build_submit_parachain_heads_call(
 			at_relay_block,
 			vec![(ParaId(P::SourceParachain::PARACHAIN_ID), updated_head_hash)],
 			proof,
+			is_free_execution_expected,
 		);
-		self.client
+		self.target_client
 			.submit_and_watch_signed_extrinsic(
 				&transaction_params.signer,
 				move |best_block_id, transaction_nonce| {
diff --git a/bridges/relays/parachains/src/parachains_loop.rs b/bridges/relays/parachains/src/parachains_loop.rs
index 41ebbf5aadede2b4a1f0c9dcde73ee83bd5b0766..55f236eeac1d842ff9ed0f337cf1ea2c725dc763 100644
--- a/bridges/relays/parachains/src/parachains_loop.rs
+++ b/bridges/relays/parachains/src/parachains_loop.rs
@@ -25,7 +25,7 @@ use futures::{
 	future::{FutureExt, Shared},
 	poll, select_biased,
 };
-use relay_substrate_client::{Chain, HeaderIdOf, ParachainBase};
+use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf, ParachainBase};
 use relay_utils::{
 	metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient,
 	TrackedTransactionStatus, TransactionTracker,
@@ -96,17 +96,27 @@ pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
 	/// Get best block id.
 	async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error>;
 
-	/// Get best finalized source relay chain block id.
+	/// Get best finalized source relay chain block id. If `free_source_relay_headers_interval`
+	/// is `Some(_)`, the returned
 	async fn best_finalized_source_relay_chain_block(
 		&self,
 		at_block: &HeaderIdOf<P::TargetChain>,
 	) -> Result<HeaderIdOf<P::SourceRelayChain>, Self::Error>;
+	/// Get free source **relay** headers submission interval, if it is configured in the
+	/// target runtime. We assume that the target chain will accept parachain header, proved
+	/// at such relay header for free.
+	async fn free_source_relay_headers_interval(
+		&self,
+	) -> Result<Option<BlockNumberOf<P::SourceRelayChain>>, Self::Error>;
 
 	/// Get parachain head id at given block.
 	async fn parachain_head(
 		&self,
 		at_block: HeaderIdOf<P::TargetChain>,
-	) -> Result<Option<HeaderIdOf<P::SourceParachain>>, Self::Error>;
+	) -> Result<
+		Option<(HeaderIdOf<P::SourceRelayChain>, HeaderIdOf<P::SourceParachain>)>,
+		Self::Error,
+	>;
 
 	/// Submit parachain heads proof.
 	async fn submit_parachain_head_proof(
@@ -114,6 +124,7 @@ pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
 		at_source_block: HeaderIdOf<P::SourceRelayChain>,
 		para_head_hash: ParaHash,
 		proof: ParaHeadsProof,
+		is_free_execution_expected: bool,
 	) -> Result<Self::TransactionTracker, Self::Error>;
 }
 
@@ -133,6 +144,7 @@ pub async fn run<P: ParachainsPipeline>(
 	source_client: impl SourceClient<P>,
 	target_client: impl TargetClient<P>,
 	metrics_params: MetricsParams,
+	only_free_headers: bool,
 	exit_signal: impl Future<Output = ()> + 'static + Send,
 ) -> Result<(), relay_utils::Error>
 where
@@ -145,7 +157,13 @@ where
 		.expose()
 		.await?
 		.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
-			run_until_connection_lost(source_client, target_client, metrics, exit_signal.clone())
+			run_until_connection_lost(
+				source_client,
+				target_client,
+				metrics,
+				only_free_headers,
+				exit_signal.clone(),
+			)
 		})
 		.await
 }
@@ -155,6 +173,7 @@ async fn run_until_connection_lost<P: ParachainsPipeline>(
 	source_client: impl SourceClient<P>,
 	target_client: impl TargetClient<P>,
 	metrics: Option<ParachainsLoopMetrics>,
+	only_free_headers: bool,
 	exit_signal: impl Future<Output = ()> + Send,
 ) -> Result<(), FailedClient>
 where
@@ -166,6 +185,47 @@ where
 		P::TargetChain::AVERAGE_BLOCK_INTERVAL,
 	);
 
+	// free parachain header = header, available (proved) at free relay chain block. Let's
+	// read interval of free source relay chain blocks from target client
+	let free_source_relay_headers_interval = if only_free_headers {
+		let free_source_relay_headers_interval =
+			target_client.free_source_relay_headers_interval().await.map_err(|e| {
+				log::warn!(
+					target: "bridge",
+					"Failed to read free {} headers interval at {}: {:?}",
+					P::SourceRelayChain::NAME,
+					P::TargetChain::NAME,
+					e,
+				);
+				FailedClient::Target
+			})?;
+		match free_source_relay_headers_interval {
+			Some(free_source_relay_headers_interval) if free_source_relay_headers_interval != 0 => {
+				log::trace!(
+					target: "bridge",
+					"Free {} headers interval at {}: {:?}",
+					P::SourceRelayChain::NAME,
+					P::TargetChain::NAME,
+					free_source_relay_headers_interval,
+				);
+				free_source_relay_headers_interval
+			},
+			_ => {
+				log::warn!(
+					target: "bridge",
+					"Invalid free {} headers interval at {}: {:?}",
+					P::SourceRelayChain::NAME,
+					P::TargetChain::NAME,
+					free_source_relay_headers_interval,
+				);
+				return Err(FailedClient::Target)
+			},
+		}
+	} else {
+		// ignore - we don't need it
+		0
+	};
+
 	let mut submitted_heads_tracker: Option<SubmittedHeadsTracker<P>> = None;
 
 	futures::pin_mut!(exit_signal);
@@ -211,7 +271,7 @@ where
 			log::warn!(target: "bridge", "Failed to read best {} block: {:?}", P::SourceRelayChain::NAME, e);
 			FailedClient::Target
 		})?;
-		let head_at_target =
+		let (relay_of_head_at_target, head_at_target) =
 			read_head_at_target(&target_client, metrics.as_ref(), &best_target_block).await?;
 
 		// check if our transaction has been mined
@@ -238,9 +298,9 @@ where
 			}
 		}
 
-		// we have no active transaction and may need to update heads, but do we have something for
-		// update?
-		let best_finalized_relay_block = target_client
+		// in all-headers strategy we'll be submitting para head, available at
+		// `best_finalized_relay_block_at_target`
+		let best_finalized_relay_block_at_target = target_client
 			.best_finalized_source_relay_chain_block(&best_target_block)
 			.await
 			.map_err(|e| {
@@ -253,21 +313,56 @@ where
 				);
 				FailedClient::Target
 			})?;
+
+		// ..but if we only need to submit free headers, we need to submit para
+		// head, available at best free source relay chain header, known to the
+		// target chain
+		let prove_at_relay_block = if only_free_headers {
+			match relay_of_head_at_target {
+				Some(relay_of_head_at_target) => {
+					// find last free relay chain header in the range that we are interested in
+					let scan_range_begin = relay_of_head_at_target.number();
+					let scan_range_end = best_finalized_relay_block_at_target.number();
+					if scan_range_end.saturating_sub(scan_range_begin) <
+						free_source_relay_headers_interval
+					{
+						// there are no new **free** relay chain headers in the range
+						log::trace!(
+							target: "bridge",
+							"Waiting for new free {} headers at {}: scanned {:?}..={:?}",
+							P::SourceRelayChain::NAME,
+							P::TargetChain::NAME,
+							scan_range_begin,
+							scan_range_end,
+						);
+						continue;
+					}
+
+					// we may submit new parachain head for free
+					best_finalized_relay_block_at_target
+				},
+				None => {
+					// no parachain head at target => let's submit first one
+					best_finalized_relay_block_at_target
+				},
+			}
+		} else {
+			best_finalized_relay_block_at_target
+		};
+
+		// now let's check if we need to update parachain head at all
 		let head_at_source =
-			read_head_at_source(&source_client, metrics.as_ref(), &best_finalized_relay_block)
-				.await?;
+			read_head_at_source(&source_client, metrics.as_ref(), &prove_at_relay_block).await?;
 		let is_update_required = is_update_required::<P>(
 			head_at_source,
 			head_at_target,
-			best_finalized_relay_block,
+			prove_at_relay_block,
 			best_target_block,
 		);
 
 		if is_update_required {
-			let (head_proof, head_hash) = source_client
-				.prove_parachain_head(best_finalized_relay_block)
-				.await
-				.map_err(|e| {
+			let (head_proof, head_hash) =
+				source_client.prove_parachain_head(prove_at_relay_block).await.map_err(|e| {
 					log::warn!(
 						target: "bridge",
 						"Failed to prove {} parachain ParaId({}) heads: {:?}",
@@ -283,12 +378,17 @@ where
 				P::SourceRelayChain::NAME,
 				P::SourceParachain::PARACHAIN_ID,
 				P::TargetChain::NAME,
-				best_finalized_relay_block,
+				prove_at_relay_block,
 				head_hash,
 			);
 
 			let transaction_tracker = target_client
-				.submit_parachain_head_proof(best_finalized_relay_block, head_hash, head_proof)
+				.submit_parachain_head_proof(
+					prove_at_relay_block,
+					head_hash,
+					head_proof,
+					only_free_headers,
+				)
 				.await
 				.map_err(|e| {
 					log::warn!(
@@ -311,7 +411,7 @@ where
 fn is_update_required<P: ParachainsPipeline>(
 	head_at_source: AvailableHeader<HeaderIdOf<P::SourceParachain>>,
 	head_at_target: Option<HeaderIdOf<P::SourceParachain>>,
-	best_finalized_relay_block_at_source: HeaderIdOf<P::SourceRelayChain>,
+	prove_at_relay_block: HeaderIdOf<P::SourceRelayChain>,
 	best_target_block: HeaderIdOf<P::TargetChain>,
 ) -> bool
 where
@@ -326,7 +426,7 @@ where
 		P::SourceParachain::PARACHAIN_ID,
 		P::TargetChain::NAME,
 		P::SourceRelayChain::NAME,
-		best_finalized_relay_block_at_source,
+		prove_at_relay_block,
 		head_at_source,
 		P::TargetChain::NAME,
 		best_target_block,
@@ -413,24 +513,28 @@ async fn read_head_at_source<P: ParachainsPipeline>(
 	}
 }
 
-/// Reads parachain head from the target client.
+/// Reads parachain head from the target client. Also returns source relay chain header
+/// that has been used to prove that head.
 async fn read_head_at_target<P: ParachainsPipeline>(
 	target_client: &impl TargetClient<P>,
 	metrics: Option<&ParachainsLoopMetrics>,
 	at_block: &HeaderIdOf<P::TargetChain>,
-) -> Result<Option<HeaderIdOf<P::SourceParachain>>, FailedClient> {
+) -> Result<
+	(Option<HeaderIdOf<P::SourceRelayChain>>, Option<HeaderIdOf<P::SourceParachain>>),
+	FailedClient,
+> {
 	let para_head_id = target_client.parachain_head(*at_block).await;
 	match para_head_id {
-		Ok(Some(para_head_id)) => {
+		Ok(Some((relay_header_id, para_head_id))) => {
 			if let Some(metrics) = metrics {
 				metrics.update_best_parachain_block_at_target(
 					ParaId(P::SourceParachain::PARACHAIN_ID),
 					para_head_id.number(),
 				);
 			}
-			Ok(Some(para_head_id))
+			Ok((Some(relay_header_id), Some(para_head_id)))
 		},
-		Ok(None) => Ok(None),
+		Ok(None) => Ok((None, None)),
 		Err(e) => {
 			log::warn!(
 				target: "bridge",
@@ -543,6 +647,7 @@ mod tests {
 	use relay_substrate_client::test_chain::{TestChain, TestParachain};
 	use relay_utils::{HeaderId, MaybeConnectionError};
 	use sp_core::H256;
+	use std::collections::HashMap;
 
 	const PARA_10_HASH: ParaHash = H256([10u8; 32]);
 	const PARA_20_HASH: ParaHash = H256([20u8; 32]);
@@ -590,14 +695,21 @@ mod tests {
 	#[derive(Clone, Debug)]
 	struct TestClientData {
 		source_sync_status: Result<bool, TestError>,
-		source_head: Result<AvailableHeader<HeaderIdOf<TestParachain>>, TestError>,
+		source_head: HashMap<
+			BlockNumberOf<TestChain>,
+			Result<AvailableHeader<HeaderIdOf<TestParachain>>, TestError>,
+		>,
 		source_proof: Result<(), TestError>,
 
+		target_free_source_relay_headers_interval:
+			Result<Option<BlockNumberOf<TestChain>>, TestError>,
 		target_best_block: Result<HeaderIdOf<TestChain>, TestError>,
 		target_best_finalized_source_block: Result<HeaderIdOf<TestChain>, TestError>,
-		target_head: Result<Option<HeaderIdOf<TestParachain>>, TestError>,
+		#[allow(clippy::type_complexity)]
+		target_head: Result<Option<(HeaderIdOf<TestChain>, HeaderIdOf<TestParachain>)>, TestError>,
 		target_submit_result: Result<(), TestError>,
 
+		submitted_proof_at_source_relay_block: Option<HeaderIdOf<TestChain>>,
 		exit_signal_sender: Option<Box<futures::channel::mpsc::UnboundedSender<()>>>,
 	}
 
@@ -605,14 +717,18 @@ mod tests {
 		pub fn minimal() -> Self {
 			TestClientData {
 				source_sync_status: Ok(true),
-				source_head: Ok(AvailableHeader::Available(HeaderId(0, PARA_20_HASH))),
+				source_head: vec![(0, Ok(AvailableHeader::Available(HeaderId(0, PARA_20_HASH))))]
+					.into_iter()
+					.collect(),
 				source_proof: Ok(()),
 
+				target_free_source_relay_headers_interval: Ok(None),
 				target_best_block: Ok(HeaderId(0, Default::default())),
 				target_best_finalized_source_block: Ok(HeaderId(0, Default::default())),
 				target_head: Ok(None),
 				target_submit_result: Ok(()),
 
+				submitted_proof_at_source_relay_block: None,
 				exit_signal_sender: None,
 			}
 		}
@@ -649,16 +765,24 @@ mod tests {
 
 		async fn parachain_head(
 			&self,
-			_at_block: HeaderIdOf<TestChain>,
+			at_block: HeaderIdOf<TestChain>,
 		) -> Result<AvailableHeader<HeaderIdOf<TestParachain>>, TestError> {
-			self.data.lock().await.source_head.clone()
+			self.data
+				.lock()
+				.await
+				.source_head
+				.get(&at_block.0)
+				.expect(&format!("SourceClient::parachain_head({})", at_block.0))
+				.clone()
 		}
 
 		async fn prove_parachain_head(
 			&self,
-			_at_block: HeaderIdOf<TestChain>,
+			at_block: HeaderIdOf<TestChain>,
 		) -> Result<(ParaHeadsProof, ParaHash), TestError> {
-			let head = *self.data.lock().await.source_head.clone()?.as_available().unwrap();
+			let head_result =
+				SourceClient::<TestParachainsPipeline>::parachain_head(self, at_block).await?;
+			let head = head_result.as_available().unwrap();
 			let storage_proof = vec![head.hash().encode()];
 			let proof = (ParaHeadsProof { storage_proof }, head.hash());
 			self.data.lock().await.source_proof.clone().map(|_| proof)
@@ -680,21 +804,29 @@ mod tests {
 			self.data.lock().await.target_best_finalized_source_block.clone()
 		}
 
+		async fn free_source_relay_headers_interval(
+			&self,
+		) -> Result<Option<BlockNumberOf<TestParachain>>, TestError> {
+			self.data.lock().await.target_free_source_relay_headers_interval.clone()
+		}
+
 		async fn parachain_head(
 			&self,
 			_at_block: HeaderIdOf<TestChain>,
-		) -> Result<Option<HeaderIdOf<TestParachain>>, TestError> {
+		) -> Result<Option<(HeaderIdOf<TestChain>, HeaderIdOf<TestParachain>)>, TestError> {
 			self.data.lock().await.target_head.clone()
 		}
 
 		async fn submit_parachain_head_proof(
 			&self,
-			_at_source_block: HeaderIdOf<TestChain>,
+			at_source_block: HeaderIdOf<TestChain>,
 			_updated_parachain_head: ParaHash,
 			_proof: ParaHeadsProof,
+			_is_free_execution_expected: bool,
 		) -> Result<TestTransactionTracker, Self::Error> {
 			let mut data = self.data.lock().await;
 			data.target_submit_result.clone()?;
+			data.submitted_proof_at_source_relay_block = Some(at_source_block);
 
 			if let Some(mut exit_signal_sender) = data.exit_signal_sender.take() {
 				exit_signal_sender.send(()).await.unwrap();
@@ -715,6 +847,7 @@ mod tests {
 				TestClient::from(test_source_client),
 				TestClient::from(TestClientData::minimal()),
 				None,
+				false,
 				futures::future::pending(),
 			)),
 			Err(FailedClient::Source),
@@ -731,6 +864,7 @@ mod tests {
 				TestClient::from(TestClientData::minimal()),
 				TestClient::from(test_target_client),
 				None,
+				false,
 				futures::future::pending(),
 			)),
 			Err(FailedClient::Target),
@@ -747,6 +881,7 @@ mod tests {
 				TestClient::from(TestClientData::minimal()),
 				TestClient::from(test_target_client),
 				None,
+				false,
 				futures::future::pending(),
 			)),
 			Err(FailedClient::Target),
@@ -763,6 +898,7 @@ mod tests {
 				TestClient::from(TestClientData::minimal()),
 				TestClient::from(test_target_client),
 				None,
+				false,
 				futures::future::pending(),
 			)),
 			Err(FailedClient::Target),
@@ -772,13 +908,14 @@ mod tests {
 	#[test]
 	fn when_source_client_fails_to_read_heads() {
 		let mut test_source_client = TestClientData::minimal();
-		test_source_client.source_head = Err(TestError::Error);
+		test_source_client.source_head.insert(0, Err(TestError::Error));
 
 		assert_eq!(
 			async_std::task::block_on(run_until_connection_lost(
 				TestClient::from(test_source_client),
 				TestClient::from(TestClientData::minimal()),
 				None,
+				false,
 				futures::future::pending(),
 			)),
 			Err(FailedClient::Source),
@@ -795,6 +932,7 @@ mod tests {
 				TestClient::from(test_source_client),
 				TestClient::from(TestClientData::minimal()),
 				None,
+				false,
 				futures::future::pending(),
 			)),
 			Err(FailedClient::Source),
@@ -811,6 +949,7 @@ mod tests {
 				TestClient::from(TestClientData::minimal()),
 				TestClient::from(test_target_client),
 				None,
+				false,
 				futures::future::pending(),
 			)),
 			Err(FailedClient::Target),
@@ -825,12 +964,108 @@ mod tests {
 				TestClient::from(TestClientData::minimal()),
 				TestClient::from(TestClientData::with_exit_signal_sender(exit_signal_sender)),
 				None,
+				false,
 				exit_signal.into_future().map(|(_, _)| ()),
 			)),
 			Ok(()),
 		);
 	}
 
+	#[async_std::test]
+	async fn free_headers_are_relayed() {
+		// prepare following case:
+		// 1) best source relay at target: 95
+		// 2) best source parachain at target: 5 at relay 50
+		// 3) free headers interval: 10
+		// 4) at source relay chain block 90 source parachain block is 9
+		// +
+		// 5) best finalized source relay chain block is 95
+		// 6) at source relay chain block 95 source parachain block is 42
+		// =>
+		// parachain block 42 would have been relayed, because 95 - 50 > 10
+		let (exit_signal_sender, exit_signal) = futures::channel::mpsc::unbounded();
+		let clients_data = TestClientData {
+			source_sync_status: Ok(true),
+			source_head: vec![
+				(90, Ok(AvailableHeader::Available(HeaderId(9, [9u8; 32].into())))),
+				(95, Ok(AvailableHeader::Available(HeaderId(42, [42u8; 32].into())))),
+			]
+			.into_iter()
+			.collect(),
+			source_proof: Ok(()),
+
+			target_free_source_relay_headers_interval: Ok(Some(10)),
+			target_best_block: Ok(HeaderId(200, [200u8; 32].into())),
+			target_best_finalized_source_block: Ok(HeaderId(95, [95u8; 32].into())),
+			target_head: Ok(Some((HeaderId(50, [50u8; 32].into()), HeaderId(5, [5u8; 32].into())))),
+			target_submit_result: Ok(()),
+
+			submitted_proof_at_source_relay_block: None,
+			exit_signal_sender: Some(Box::new(exit_signal_sender)),
+		};
+
+		let source_client = TestClient::from(clients_data.clone());
+		let target_client = TestClient::from(clients_data);
+		assert_eq!(
+			run_until_connection_lost(
+				source_client,
+				target_client.clone(),
+				None,
+				true,
+				exit_signal.into_future().map(|(_, _)| ()),
+			)
+			.await,
+			Ok(()),
+		);
+
+		assert_eq!(
+			target_client
+				.data
+				.lock()
+				.await
+				.submitted_proof_at_source_relay_block
+				.map(|id| id.0),
+			Some(95)
+		);
+
+		// now source relay block chain 104 is mined with parachain head #84
+		// => since 104 - 95 < 10, there are no free headers
+		// => nothing is submitted
+		let mut clients_data: TestClientData = target_client.data.lock().await.clone();
+		clients_data
+			.source_head
+			.insert(104, Ok(AvailableHeader::Available(HeaderId(84, [84u8; 32].into()))));
+		clients_data.target_best_finalized_source_block = Ok(HeaderId(104, [104u8; 32].into()));
+		clients_data.target_head =
+			Ok(Some((HeaderId(95, [95u8; 32].into()), HeaderId(42, [42u8; 32].into()))));
+		clients_data.target_best_block = Ok(HeaderId(255, [255u8; 32].into()));
+		clients_data.exit_signal_sender = None;
+
+		let source_client = TestClient::from(clients_data.clone());
+		let target_client = TestClient::from(clients_data);
+		assert_eq!(
+			run_until_connection_lost(
+				source_client,
+				target_client.clone(),
+				None,
+				true,
+				async_std::task::sleep(std::time::Duration::from_millis(100)),
+			)
+			.await,
+			Ok(()),
+		);
+
+		assert_eq!(
+			target_client
+				.data
+				.lock()
+				.await
+				.submitted_proof_at_source_relay_block
+				.map(|id| id.0),
+			Some(95)
+		);
+	}
+
 	fn test_tx_tracker() -> SubmittedHeadsTracker<TestParachainsPipeline> {
 		SubmittedHeadsTracker::new(
 			AvailableHeader::Available(HeaderId(20, PARA_20_HASH)),
diff --git a/prdoc/pr_4157.prdoc b/prdoc/pr_4157.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..783eaa2dd4276fbf35dd996c06865f734433a3af
--- /dev/null
+++ b/prdoc/pr_4157.prdoc
@@ -0,0 +1,29 @@
+# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
+# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
+
+title: "Bridge: added free headers submission support to the substrate-relay"
+
+doc:
+  - audience: Node Dev
+    description: |
+      Bridge finality and parachains relayer now supports mode, where it only submits some headers
+      for free. There's a setting in a runtime configuration, which introduces this "free header"
+      concept. Submitting such header is considered a common good deed, so it is free for relayers.
+
+crates:
+  - name: bp-bridge-hub-kusama
+    bump: major
+  - name: bp-bridge-hub-polkadot
+    bump: major
+  - name: bp-bridge-hub-rococo
+    bump: major
+  - name: bp-bridge-hub-westend
+    bump: major
+  - name: relay-substrate-client
+    bump: major
+  - name: finality-relay
+    bump: major
+  - name: substrate-relay-helper
+    bump: major
+  - name: parachains-relay
+    bump: major