diff --git a/Cargo.lock b/Cargo.lock
index 9393f8d606de669eda452fd931ea65ad62b528fc..55ef63fa2bf51feed8c90797a295987e832c7bcf 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2624,9 +2624,9 @@ dependencies = [
 
 [[package]]
 name = "clap-num"
-version = "1.1.1"
+version = "1.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0e063d263364859dc54fb064cedb7c122740cd4733644b14b176c097f51e8ab7"
+checksum = "488557e97528174edaa2ee268b23a809e0c598213a4bbcb4f34575a46fda147e"
 dependencies = [
  "num-traits",
 ]
@@ -2844,10 +2844,11 @@ checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
 
 [[package]]
 name = "colored"
-version = "2.1.0"
+version = "2.0.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cbf2150cce219b664a8a70df7a1f933836724b503f8a413af9365b4dcc4d90b8"
+checksum = "2674ec482fbc38012cf31e6c42ba0177b431a0cb6f15fe40efa5aab1bda516f6"
 dependencies = [
+ "is-terminal",
  "lazy_static",
  "windows-sys 0.48.0",
 ]
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
index 975abbca4b68f7794fa4414f7cfee12a4163ee46..86d9a726d7bec371c157014adac0dfb1a926fb7e 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
@@ -62,6 +62,9 @@ pub struct ChainHeadConfig {
 	pub subscription_max_pinned_duration: Duration,
 	/// The maximum number of ongoing operations per subscription.
 	pub subscription_max_ongoing_operations: usize,
+	/// Stop all subscriptions if the distance between the leaves and the current finalized
+	/// block is larger than this value.
+	pub max_lagging_distance: usize,
 	/// The maximum number of items reported by the `chainHead_storage` before
 	/// pagination is required.
 	pub operation_max_storage_items: usize,
@@ -88,6 +91,10 @@ const MAX_ONGOING_OPERATIONS: usize = 16;
 /// before paginations is required.
 const MAX_STORAGE_ITER_ITEMS: usize = 5;
 
+/// Stop all subscriptions if the distance between the leaves and the current finalized
+/// block is larger than this value.
+const MAX_LAGGING_DISTANCE: usize = 128;
+
 /// The maximum number of `chainHead_follow` subscriptions per connection.
 const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;
 
@@ -97,6 +104,7 @@ impl Default for ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: MAX_PINNED_DURATION,
 			subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		}
@@ -116,6 +124,9 @@ pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
 	/// The maximum number of items reported by the `chainHead_storage` before
 	/// pagination is required.
 	operation_max_storage_items: usize,
+	/// Stop all subscriptions if the distance between the leaves and the current finalized
+	/// block is larger than this value.
+	max_lagging_distance: usize,
 	/// Phantom member to pin the block type.
 	_phantom: PhantomData<Block>,
 }
@@ -140,6 +151,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
 				backend,
 			),
 			operation_max_storage_items: config.operation_max_storage_items,
+			max_lagging_distance: config.max_lagging_distance,
 			_phantom: PhantomData,
 		}
 	}
@@ -187,6 +199,7 @@ where
 		let subscriptions = self.subscriptions.clone();
 		let backend = self.backend.clone();
 		let client = self.client.clone();
+		let max_lagging_distance = self.max_lagging_distance;
 
 		let fut = async move {
 			// Ensure the current connection ID has enough space to accept a new subscription.
@@ -207,8 +220,8 @@ where
 			let Some(sub_data) =
 				reserved_subscription.insert_subscription(sub_id.clone(), with_runtime)
 			else {
-				// Inserting the subscription can only fail if the JsonRPSee
-				// generated a duplicate subscription ID.
+				// Inserting the subscription can only fail if the JsonRPSee generated a duplicate
+				// subscription ID.
 				debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
 				let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
 				let _ = sink.send(msg).await;
@@ -222,9 +235,13 @@ where
 				subscriptions,
 				with_runtime,
 				sub_id.clone(),
+				max_lagging_distance,
 			);
-
-			chain_head_follow.generate_events(sink, sub_data).await;
+			let result = chain_head_follow.generate_events(sink, sub_data).await;
+			if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
+				debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id);
+				reserved_subscription.stop_all_subscriptions();
+			}
 
 			debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
 		};
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
index 90cc62a36fa91957e499b5d32cb95e92f728da06..0d87a45c07e295784bef4c946ef05d3641234ce4 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
@@ -41,12 +41,14 @@ use sp_api::CallApiAt;
 use sp_blockchain::{
 	Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
 };
-use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
+use sp_runtime::{
+	traits::{Block as BlockT, Header as HeaderT, NumberFor},
+	SaturatedConversion, Saturating,
+};
 use std::{
 	collections::{HashSet, VecDeque},
 	sync::Arc,
 };
-
 /// The maximum number of finalized blocks provided by the
 /// `Initialized` event.
 const MAX_FINALIZED_BLOCKS: usize = 16;
@@ -67,6 +69,9 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
 	sub_id: String,
 	/// The best reported block by this subscription.
 	best_block_cache: Option<Block::Hash>,
+	/// Stop all subscriptions if the distance between the leaves and the current finalized
+	/// block is larger than this value.
+	max_lagging_distance: usize,
 }
 
 impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
@@ -77,8 +82,17 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
 		sub_handle: SubscriptionManagement<Block, BE>,
 		with_runtime: bool,
 		sub_id: String,
+		max_lagging_distance: usize,
 	) -> Self {
-		Self { client, backend, sub_handle, with_runtime, sub_id, best_block_cache: None }
+		Self {
+			client,
+			backend,
+			sub_handle,
+			with_runtime,
+			sub_id,
+			best_block_cache: None,
+			max_lagging_distance,
+		}
 	}
 }
 
@@ -186,6 +200,35 @@ where
 		}
 	}
 
+	/// Check the distance between the provided blocks does not exceed a
+	/// a reasonable range.
+	///
+	/// When the blocks are too far apart (potentially millions of blocks):
+	///  - Tree route is expensive to calculate.
+	///  - The RPC layer will not be able to generate the `NewBlock` events for all blocks.
+	///
+	/// This edge-case can happen for parachains where the relay chain syncs slower to
+	/// the head of the chain than the parachain node that is synced already.
+	fn distace_within_reason(
+		&self,
+		block: Block::Hash,
+		finalized: Block::Hash,
+	) -> Result<(), SubscriptionManagementError> {
+		let Some(block_num) = self.client.number(block)? else {
+			return Err(SubscriptionManagementError::BlockHashAbsent)
+		};
+		let Some(finalized_num) = self.client.number(finalized)? else {
+			return Err(SubscriptionManagementError::BlockHashAbsent)
+		};
+
+		let distance: usize = block_num.saturating_sub(finalized_num).saturated_into();
+		if distance > self.max_lagging_distance {
+			return Err(SubscriptionManagementError::BlockDistanceTooLarge);
+		}
+
+		Ok(())
+	}
+
 	/// Get the in-memory blocks of the client, starting from the provided finalized hash.
 	///
 	/// The reported blocks are pinned by this function.
@@ -198,6 +241,13 @@ where
 		let mut pruned_forks = HashSet::new();
 		let mut finalized_block_descendants = Vec::new();
 		let mut unique_descendants = HashSet::new();
+
+		// Ensure all leaves are within a reasonable distance from the finalized block,
+		// before traversing the tree.
+		for leaf in &leaves {
+			self.distace_within_reason(*leaf, finalized)?;
+		}
+
 		for leaf in leaves {
 			let tree_route = sp_blockchain::tree_route(blockchain, finalized, leaf)?;
 
@@ -542,7 +592,8 @@ where
 		mut to_ignore: HashSet<Block::Hash>,
 		sink: SubscriptionSink,
 		rx_stop: oneshot::Receiver<()>,
-	) where
+	) -> Result<(), SubscriptionManagementError>
+	where
 		EventStream: Stream<Item = NotificationType<Block>> + Unpin,
 	{
 		let mut stream_item = stream.next();
@@ -576,7 +627,7 @@ where
 					);
 					let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
 					let _ = sink.send(msg).await;
-					return
+					return Err(err)
 				},
 			};
 
@@ -591,7 +642,8 @@ where
 
 					let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
 					let _ = sink.send(msg).await;
-					return
+					// No need to propagate this error further, the client disconnected.
+					return Ok(())
 				}
 			}
 
@@ -605,6 +657,7 @@ where
 		// - the client disconnected.
 		let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
 		let _ = sink.send(msg).await;
+		Ok(())
 	}
 
 	/// Generate the block events for the `chainHead_follow` method.
@@ -612,7 +665,7 @@ where
 		&mut self,
 		sink: SubscriptionSink,
 		sub_data: InsertedSubscriptionData<Block>,
-	) {
+	) -> Result<(), SubscriptionManagementError> {
 		// Register for the new block and finalized notifications.
 		let stream_import = self
 			.client
@@ -640,7 +693,7 @@ where
 				);
 				let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
 				let _ = sink.send(msg).await;
-				return
+				return Err(err)
 			},
 		};
 
@@ -650,6 +703,6 @@ where
 		let stream = stream::once(futures::future::ready(initial)).chain(merged);
 
 		self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop)
-			.await;
+			.await
 	}
 }
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs
index 2c22e51ca4dc70102fca98f4eea7f66d8e044a02..91ce26db22a5f1291e86a6159c33761af4dec82f 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs
@@ -41,6 +41,9 @@ pub enum SubscriptionManagementError {
 	/// The unpin method was called with duplicate hashes.
 	#[error("Duplicate hashes")]
 	DuplicateHashes,
+	/// The distance between the leaves and the current finalized block is too large.
+	#[error("Distance too large")]
+	BlockDistanceTooLarge,
 	/// Custom error.
 	#[error("Subscription error {0}")]
 	Custom(String),
@@ -57,6 +60,7 @@ impl PartialEq for SubscriptionManagementError {
 			(Self::BlockHeaderAbsent, Self::BlockHeaderAbsent) |
 			(Self::SubscriptionAbsent, Self::SubscriptionAbsent) |
 			(Self::DuplicateHashes, Self::DuplicateHashes) => true,
+			(Self::BlockDistanceTooLarge, Self::BlockDistanceTooLarge) => true,
 			(Self::Custom(lhs), Self::Custom(rhs)) => lhs == rhs,
 			_ => false,
 		}
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
index 1ebee3c80fc8ea9b11637560d067a376b6a6a1d3..0e5ccb91d39a61a76a850119590699d563d88df8 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
@@ -560,6 +560,7 @@ pub struct SubscriptionsInner<Block: BlockT, BE: Backend<Block>> {
 	max_ongoing_operations: usize,
 	/// Map the subscription ID to internal details of the subscription.
 	subs: HashMap<String, SubscriptionState<Block>>,
+
 	/// Backend pinning / unpinning blocks.
 	///
 	/// The `Arc` is handled one level-above, but substrate exposes the backend as Arc<T>.
@@ -623,6 +624,15 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
 		}
 	}
 
+	/// All active subscriptions are removed.
+	pub fn stop_all_subscriptions(&mut self) {
+		let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect();
+
+		for sub_id in to_remove {
+			self.remove_subscription(&sub_id);
+		}
+	}
+
 	/// Ensure that a new block could be pinned.
 	///
 	/// If the global number of blocks has been reached this method
@@ -878,6 +888,30 @@ mod tests {
 		(backend, client)
 	}
 
+	fn produce_blocks(
+		mut client: Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
+		num_blocks: usize,
+	) -> Vec<<Block as BlockT>::Hash> {
+		let mut blocks = Vec::with_capacity(num_blocks);
+		let mut parent_hash = client.chain_info().genesis_hash;
+
+		for i in 0..num_blocks {
+			let block = BlockBuilderBuilder::new(&*client)
+				.on_parent_block(parent_hash)
+				.with_parent_block_number(i as u64)
+				.build()
+				.unwrap()
+				.build()
+				.unwrap()
+				.block;
+			parent_hash = block.header.hash();
+			futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
+			blocks.push(block.header.hash());
+		}
+
+		blocks
+	}
+
 	#[test]
 	fn block_state_machine_register_unpin() {
 		let mut state = BlockStateMachine::new();
@@ -1003,37 +1037,10 @@ mod tests {
 
 	#[test]
 	fn unpin_duplicate_hashes() {
-		let (backend, mut client) = init_backend();
-		let block = BlockBuilderBuilder::new(&*client)
-			.on_parent_block(client.chain_info().genesis_hash)
-			.with_parent_block_number(0)
-			.build()
-			.unwrap()
-			.build()
-			.unwrap()
-			.block;
-		let hash_1 = block.header.hash();
-		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
-		let block = BlockBuilderBuilder::new(&*client)
-			.on_parent_block(hash_1)
-			.with_parent_block_number(1)
-			.build()
-			.unwrap()
-			.build()
-			.unwrap()
-			.block;
-		let hash_2 = block.header.hash();
-		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
-		let block = BlockBuilderBuilder::new(&*client)
-			.on_parent_block(hash_2)
-			.with_parent_block_number(2)
-			.build()
-			.unwrap()
-			.build()
-			.unwrap()
-			.block;
-		let hash_3 = block.header.hash();
-		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
+		let (backend, client) = init_backend();
+
+		let hashes = produce_blocks(client, 3);
+		let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
 
 		let mut subs =
 			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
@@ -1102,18 +1109,10 @@ mod tests {
 
 	#[test]
 	fn subscription_check_block() {
-		let (backend, mut client) = init_backend();
-
-		let block = BlockBuilderBuilder::new(&*client)
-			.on_parent_block(client.chain_info().genesis_hash)
-			.with_parent_block_number(0)
-			.build()
-			.unwrap()
-			.build()
-			.unwrap()
-			.block;
-		let hash = block.hash();
-		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
+		let (backend, client) = init_backend();
+
+		let hashes = produce_blocks(client, 1);
+		let hash = hashes[0];
 
 		let mut subs =
 			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
@@ -1140,17 +1139,10 @@ mod tests {
 
 	#[test]
 	fn subscription_ref_count() {
-		let (backend, mut client) = init_backend();
-		let block = BlockBuilderBuilder::new(&*client)
-			.on_parent_block(client.chain_info().genesis_hash)
-			.with_parent_block_number(0)
-			.build()
-			.unwrap()
-			.build()
-			.unwrap()
-			.block;
-		let hash = block.header.hash();
-		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
+		let (backend, client) = init_backend();
+
+		let hashes = produce_blocks(client, 1);
+		let hash = hashes[0];
 
 		let mut subs =
 			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
@@ -1190,37 +1182,10 @@ mod tests {
 
 	#[test]
 	fn subscription_remove_subscription() {
-		let (backend, mut client) = init_backend();
-		let block = BlockBuilderBuilder::new(&*client)
-			.on_parent_block(client.chain_info().genesis_hash)
-			.with_parent_block_number(0)
-			.build()
-			.unwrap()
-			.build()
-			.unwrap()
-			.block;
-		let hash_1 = block.header.hash();
-		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
-		let block = BlockBuilderBuilder::new(&*client)
-			.on_parent_block(hash_1)
-			.with_parent_block_number(1)
-			.build()
-			.unwrap()
-			.build()
-			.unwrap()
-			.block;
-		let hash_2 = block.header.hash();
-		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
-		let block = BlockBuilderBuilder::new(&*client)
-			.on_parent_block(hash_2)
-			.with_parent_block_number(2)
-			.build()
-			.unwrap()
-			.build()
-			.unwrap()
-			.block;
-		let hash_3 = block.header.hash();
-		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
+		let (backend, client) = init_backend();
+
+		let hashes = produce_blocks(client, 3);
+		let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
 
 		let mut subs =
 			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
@@ -1256,37 +1221,10 @@ mod tests {
 
 	#[test]
 	fn subscription_check_limits() {
-		let (backend, mut client) = init_backend();
-		let block = BlockBuilderBuilder::new(&*client)
-			.on_parent_block(client.chain_info().genesis_hash)
-			.with_parent_block_number(0)
-			.build()
-			.unwrap()
-			.build()
-			.unwrap()
-			.block;
-		let hash_1 = block.header.hash();
-		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
-		let block = BlockBuilderBuilder::new(&*client)
-			.on_parent_block(hash_1)
-			.with_parent_block_number(1)
-			.build()
-			.unwrap()
-			.build()
-			.unwrap()
-			.block;
-		let hash_2 = block.header.hash();
-		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
-		let block = BlockBuilderBuilder::new(&*client)
-			.on_parent_block(hash_2)
-			.with_parent_block_number(2)
-			.build()
-			.unwrap()
-			.build()
-			.unwrap()
-			.block;
-		let hash_3 = block.header.hash();
-		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
+		let (backend, client) = init_backend();
+
+		let hashes = produce_blocks(client, 3);
+		let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
 
 		// Maximum number of pinned blocks is 2.
 		let mut subs =
@@ -1328,37 +1266,10 @@ mod tests {
 
 	#[test]
 	fn subscription_check_limits_with_duration() {
-		let (backend, mut client) = init_backend();
-		let block = BlockBuilderBuilder::new(&*client)
-			.on_parent_block(client.chain_info().genesis_hash)
-			.with_parent_block_number(0)
-			.build()
-			.unwrap()
-			.build()
-			.unwrap()
-			.block;
-		let hash_1 = block.hash();
-		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
-		let block = BlockBuilderBuilder::new(&*client)
-			.on_parent_block(hash_1)
-			.with_parent_block_number(1)
-			.build()
-			.unwrap()
-			.build()
-			.unwrap()
-			.block;
-		let hash_2 = block.header.hash();
-		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
-		let block = BlockBuilderBuilder::new(&*client)
-			.on_parent_block(hash_2)
-			.with_parent_block_number(2)
-			.build()
-			.unwrap()
-			.build()
-			.unwrap()
-			.block;
-		let hash_3 = block.header.hash();
-		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
+		let (backend, client) = init_backend();
+
+		let hashes = produce_blocks(client, 3);
+		let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
 
 		// Maximum number of pinned blocks is 2 and maximum pin duration is 5 second.
 		let mut subs =
@@ -1456,6 +1367,39 @@ mod tests {
 		assert_eq!(permit_three.num_ops, 1);
 	}
 
+	#[test]
+	fn stop_all_subscriptions() {
+		let (backend, client) = init_backend();
+
+		let hashes = produce_blocks(client, 3);
+		let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);
+
+		let mut subs =
+			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
+		let id_1 = "abc".to_string();
+		let id_2 = "abcd".to_string();
+
+		// Pin all blocks for the first subscription.
+		let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
+		assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
+		assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
+		assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
+
+		// Pin only block 2 for the second subscription.
+		let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
+		assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
+
+		// Check reference count.
+		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
+		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
+		assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
+		assert_eq!(subs.global_blocks.len(), 3);
+
+		// Stop all active subscriptions.
+		subs.stop_all_subscriptions();
+		assert!(subs.global_blocks.is_empty());
+	}
+
 	#[test]
 	fn reserved_subscription_cleans_resources() {
 		let builder = TestClientBuilder::new();
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
index 5b016af1aa49a36c22639593900e4fa7f2faae19..f266c9d8b34fc774e04a5825740204913851db82 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
@@ -233,6 +233,15 @@ impl<Block: BlockT, BE: Backend<Block>> ReservedSubscription<Block, BE> {
 			},
 		}
 	}
+
+	/// Stop all active subscriptions.
+	///
+	/// For all active subscriptions, the internal data is discarded, blocks are unpinned and the
+	/// `Stop` event will be generated.
+	pub fn stop_all_subscriptions(&self) {
+		let mut inner = self.inner.write();
+		inner.stop_all_subscriptions()
+	}
 }
 
 impl<Block: BlockT, BE: Backend<Block>> Drop for ReservedSubscription<Block, BE> {
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
index c3f10a201c589d94e186739c0cdf9c4fd3b63890..c2bff7c50d5ebd951596f15e9e7423a77b889c0b 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
@@ -63,6 +63,7 @@ const MAX_PINNED_BLOCKS: usize = 32;
 const MAX_PINNED_SECS: u64 = 60;
 const MAX_OPERATIONS: usize = 16;
 const MAX_PAGINATION_LIMIT: usize = 5;
+const MAX_LAGGING_DISTANCE: usize = 128;
 const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;
 
 const INVALID_HASH: [u8; 32] = [1; 32];
@@ -88,6 +89,7 @@ pub async fn run_server() -> std::net::SocketAddr {
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
 			max_follow_subscriptions_per_connection: 1,
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 		},
 	)
 	.into_rpc();
@@ -148,6 +150,7 @@ async fn setup_api() -> (
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -199,6 +202,8 @@ async fn follow_subscription_produces_blocks() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -268,6 +273,8 @@ async fn follow_with_runtime() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -581,6 +588,8 @@ async fn call_runtime_without_flag() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -1240,6 +1249,8 @@ async fn separate_operation_ids_for_subscriptions() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -1329,6 +1340,8 @@ async fn follow_generates_initial_blocks() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -1485,6 +1498,8 @@ async fn follow_exceeding_pinned_blocks() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -1562,6 +1577,8 @@ async fn follow_with_unpin() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -1674,6 +1691,8 @@ async fn unpin_duplicate_hashes() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -1777,6 +1796,8 @@ async fn follow_with_multiple_unpin_hashes() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -1931,6 +1952,8 @@ async fn follow_prune_best_block() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -2117,6 +2140,8 @@ async fn follow_forks_pruned_block() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -2277,6 +2302,8 @@ async fn follow_report_multiple_pruned_block() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -2523,6 +2550,8 @@ async fn pin_block_references() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -2661,6 +2690,8 @@ async fn follow_finalized_before_new_block() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -2776,6 +2807,8 @@ async fn ensure_operation_limits_works() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: 1,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -2881,6 +2914,8 @@ async fn check_continue_operation() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: 1,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -3064,6 +3099,8 @@ async fn stop_storage_operation() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: 1,
+
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
@@ -3351,6 +3388,88 @@ async fn storage_closest_merkle_value() {
 	);
 }
 
+#[tokio::test]
+async fn chain_head_stop_all_subscriptions() {
+	let builder = TestClientBuilder::new();
+	let backend = builder.backend();
+	let mut client = Arc::new(builder.build());
+
+	// Configure the chainHead to stop all subscriptions on lagging distance of 5 blocks.
+	let api = ChainHead::new(
+		client.clone(),
+		backend,
+		Arc::new(TaskExecutor::default()),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_lagging_distance: 5,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
+		},
+	)
+	.into_rpc();
+
+	let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
+
+	// Ensure the imported block is propagated and pinned for this subscription.
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub).await,
+		FollowEvent::Initialized(_)
+	);
+
+	// Import 6 blocks in total to trigger the suspension distance.
+	let mut parent_hash = client.chain_info().genesis_hash;
+	for i in 0..6 {
+		let block = BlockBuilderBuilder::new(&*client)
+			.on_parent_block(parent_hash)
+			.with_parent_block_number(i)
+			.build()
+			.unwrap()
+			.build()
+			.unwrap()
+			.block;
+
+		let hash = block.hash();
+		parent_hash = hash;
+		client.import(BlockOrigin::Own, block.clone()).await.unwrap();
+
+		assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut sub).await,
+			FollowEvent::NewBlock(_)
+		);
+		assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut sub).await,
+			FollowEvent::BestBlockChanged(_)
+		);
+	}
+
+	let mut second_sub =
+		api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
+	// Lagging detected, the stop event is delivered immediately.
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut second_sub).await,
+		FollowEvent::Stop
+	);
+
+	// Ensure that all subscriptions are stopped.
+	assert_matches!(get_next_event::<FollowEvent<String>>(&mut sub).await, FollowEvent::Stop);
+
+	// Other subscriptions cannot be started until the suspension period is over.
+	let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
+	// Should receive the stop event immediately.
+	assert_matches!(get_next_event::<FollowEvent<String>>(&mut sub).await, FollowEvent::Stop);
+
+	// For the next subscription, lagging distance must be smaller.
+	client.finalize_block(parent_hash, None).unwrap();
+
+	let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub).await,
+		FollowEvent::Initialized(_)
+	);
+}
+
 #[tokio::test]
 async fn chain_head_single_connection_context() {
 	let server_addr = run_server().await;
@@ -3500,12 +3619,14 @@ async fn chain_head_limit_reached() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: 1,
 		},
 	)
 	.into_rpc();
 
 	let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
+
 	// Initialized must always be reported first.
 	let _event: FollowEvent<String> = get_next_event(&mut sub).await;