diff --git a/Cargo.lock b/Cargo.lock
index 9a8eff4691e5a7bd56d0cde87ab9912ca76a50b9..24612391d3fdc6af91b95162ead15a2cf4023804 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6840,9 +6840,9 @@ checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd"
 
 [[package]]
 name = "jsonrpsee"
-version = "0.22.0"
+version = "0.22.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4a95f7cc23d5fab0cdeeaf6bad8c8f5e7a3aa7f0d211957ea78232b327ab27b0"
+checksum = "87f3ae45a64cfc0882934f963be9431b2a165d667f53140358181f262aca0702"
 dependencies = [
  "jsonrpsee-core",
  "jsonrpsee-http-client",
@@ -6856,9 +6856,9 @@ dependencies = [
 
 [[package]]
 name = "jsonrpsee-client-transport"
-version = "0.22.0"
+version = "0.22.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b1736cfa3845fd9f8f43751f2b8e0e83f7b6081e754502f7d63b6587692cc83"
+checksum = "455fc882e56f58228df2aee36b88a1340eafd707c76af2fa68cf94b37d461131"
 dependencies = [
  "futures-util",
  "http",
@@ -6877,9 +6877,9 @@ dependencies = [
 
 [[package]]
 name = "jsonrpsee-core"
-version = "0.22.0"
+version = "0.22.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "82030d038658974732103e623ba2e0abec03bbbe175b39c0a2fafbada60c5868"
+checksum = "b75568f4f9696e3a47426e1985b548e1a9fcb13372a5e320372acaf04aca30d1"
 dependencies = [
  "anyhow",
  "async-lock 3.3.0",
@@ -6903,9 +6903,9 @@ dependencies = [
 
 [[package]]
 name = "jsonrpsee-http-client"
-version = "0.22.0"
+version = "0.22.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "36a06ef0de060005fddf772d54597bb6a8b0413da47dcffd304b0306147b9678"
+checksum = "9e7a95e346f55df84fb167b7e06470e196e7d5b9488a21d69c5d9732043ba7ba"
 dependencies = [
  "async-trait",
  "hyper",
@@ -6923,22 +6923,22 @@ dependencies = [
 
 [[package]]
 name = "jsonrpsee-proc-macros"
-version = "0.22.0"
+version = "0.22.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "69fc56131589f82e57805f7338b87023db4aafef813555708b159787e34ad6bc"
+checksum = "30ca066e73dd70294aebc5c2675d8ffae43be944af027c857ce0d4c51785f014"
 dependencies = [
  "heck 0.4.1",
  "proc-macro-crate 3.0.0",
  "proc-macro2",
  "quote",
- "syn 1.0.109",
+ "syn 2.0.53",
 ]
 
 [[package]]
 name = "jsonrpsee-server"
-version = "0.22.0"
+version = "0.22.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d85be77fe5b2a94589e3164fb780017f7aff7d646b49278c0d0346af16975c8e"
+checksum = "0e29c1bd1f9bba83c864977c73404e505f74f730fa0db89dd490ec174e36d7f0"
 dependencies = [
  "futures-util",
  "http",
@@ -6960,9 +6960,9 @@ dependencies = [
 
 [[package]]
 name = "jsonrpsee-types"
-version = "0.22.0"
+version = "0.22.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9a48fdc1202eafc51c63e00406575e59493284ace8b8b61aa16f3a6db5d64f1a"
+checksum = "3467fd35feeee179f71ab294516bdf3a81139e7aeebdd860e46897c12e1a3368"
 dependencies = [
  "anyhow",
  "beef",
@@ -6973,9 +6973,9 @@ dependencies = [
 
 [[package]]
 name = "jsonrpsee-ws-client"
-version = "0.22.0"
+version = "0.22.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c5ce25d70a8e4d3cc574bbc3cad0137c326ad64b194793d5e7bbdd3fa4504181"
+checksum = "68ca71e74983f624c0cb67828e480a981586074da8ad3a2f214c6a3f884edab9"
 dependencies = [
  "http",
  "jsonrpsee-client-transport",
diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml
index c62b3e789d389047f66004f5f76a6bb947ce8b9e..937e5c6b626a0e16870321c43e7ec43fb43f1405 100644
--- a/substrate/client/rpc-spec-v2/Cargo.toml
+++ b/substrate/client/rpc-spec-v2/Cargo.toml
@@ -44,6 +44,7 @@ futures-util = { version = "0.3.30", default-features = false }
 rand = "0.8.5"
 
 [dev-dependencies]
+jsonrpsee = { version = "0.22", features = ["server", "ws-client"] }
 serde_json = { workspace = true, default-features = true }
 tokio = { version = "1.22.0", features = ["macros"] }
 substrate-test-runtime-client = { path = "../../test-utils/runtime/client" }
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/api.rs b/substrate/client/rpc-spec-v2/src/chain_head/api.rs
index 00000e1fb277bbc49aeb343d233b00d408fc45c4..3851adac2644d09b824383dba2e80d6877eff01a 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/api.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/api.rs
@@ -27,7 +27,7 @@ use crate::{
 	common::events::StorageQuery,
 };
 use jsonrpsee::{proc_macros::rpc, server::ResponsePayload};
-use sp_rpc::list::ListOrValue;
+pub use sp_rpc::list::ListOrValue;
 
 #[rpc(client, server)]
 pub trait ChainHeadApi<Hash> {
@@ -54,8 +54,8 @@ pub trait ChainHeadApi<Hash> {
 	/// # Unstable
 	///
 	/// This method is unstable and subject to change in the future.
-	#[method(name = "chainHead_unstable_body", blocking)]
-	fn chain_head_unstable_body(
+	#[method(name = "chainHead_unstable_body", raw_method)]
+	async fn chain_head_unstable_body(
 		&self,
 		follow_subscription: String,
 		hash: Hash,
@@ -73,8 +73,8 @@ pub trait ChainHeadApi<Hash> {
 	/// # Unstable
 	///
 	/// This method is unstable and subject to change in the future.
-	#[method(name = "chainHead_unstable_header", blocking)]
-	fn chain_head_unstable_header(
+	#[method(name = "chainHead_unstable_header", raw_method)]
+	async fn chain_head_unstable_header(
 		&self,
 		follow_subscription: String,
 		hash: Hash,
@@ -85,8 +85,8 @@ pub trait ChainHeadApi<Hash> {
 	/// # Unstable
 	///
 	/// This method is unstable and subject to change in the future.
-	#[method(name = "chainHead_unstable_storage", blocking)]
-	fn chain_head_unstable_storage(
+	#[method(name = "chainHead_unstable_storage", raw_method)]
+	async fn chain_head_unstable_storage(
 		&self,
 		follow_subscription: String,
 		hash: Hash,
@@ -99,8 +99,8 @@ pub trait ChainHeadApi<Hash> {
 	/// # Unstable
 	///
 	/// This method is unstable and subject to change in the future.
-	#[method(name = "chainHead_unstable_call", blocking)]
-	fn chain_head_unstable_call(
+	#[method(name = "chainHead_unstable_call", raw_method)]
+	async fn chain_head_unstable_call(
 		&self,
 		follow_subscription: String,
 		hash: Hash,
@@ -118,8 +118,8 @@ pub trait ChainHeadApi<Hash> {
 	/// # Unstable
 	///
 	/// This method is unstable and subject to change in the future.
-	#[method(name = "chainHead_unstable_unpin", blocking)]
-	fn chain_head_unstable_unpin(
+	#[method(name = "chainHead_unstable_unpin", raw_method)]
+	async fn chain_head_unstable_unpin(
 		&self,
 		follow_subscription: String,
 		hash_or_hashes: ListOrValue<Hash>,
@@ -131,8 +131,8 @@ pub trait ChainHeadApi<Hash> {
 	/// # Unstable
 	///
 	/// This method is unstable and subject to change in the future.
-	#[method(name = "chainHead_unstable_continue", blocking)]
-	fn chain_head_unstable_continue(
+	#[method(name = "chainHead_unstable_continue", raw_method)]
+	async fn chain_head_unstable_continue(
 		&self,
 		follow_subscription: String,
 		operation_id: String,
@@ -145,8 +145,8 @@ pub trait ChainHeadApi<Hash> {
 	/// # Unstable
 	///
 	/// This method is unstable and subject to change in the future.
-	#[method(name = "chainHead_unstable_stopOperation", blocking)]
-	fn chain_head_unstable_stop_operation(
+	#[method(name = "chainHead_unstable_stopOperation", raw_method)]
+	async fn chain_head_unstable_stop_operation(
 		&self,
 		follow_subscription: String,
 		operation_id: String,
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
index 2bda22b452391e5b3ef3f7367b43331079591aa3..975abbca4b68f7794fa4414f7cfee12a4163ee46 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
@@ -34,10 +34,10 @@ use crate::{
 	hex_string, SubscriptionTaskExecutor,
 };
 use codec::Encode;
-use futures::future::FutureExt;
+use futures::{channel::oneshot, future::FutureExt};
 use jsonrpsee::{
-	core::async_trait, server::ResponsePayload, types::SubscriptionId, MethodResponseFuture,
-	PendingSubscriptionSink, SubscriptionSink,
+	core::async_trait, server::ResponsePayload, types::SubscriptionId, ConnectionDetails,
+	MethodResponseFuture, PendingSubscriptionSink, SubscriptionSink,
 };
 use log::debug;
 use sc_client_api::{
@@ -65,6 +65,8 @@ pub struct ChainHeadConfig {
 	/// The maximum number of items reported by the `chainHead_storage` before
 	/// pagination is required.
 	pub operation_max_storage_items: usize,
+	/// The maximum number of `chainHead_follow` subscriptions per connection.
+	pub max_follow_subscriptions_per_connection: usize,
 }
 
 /// Maximum pinned blocks across all connections.
@@ -86,6 +88,9 @@ const MAX_ONGOING_OPERATIONS: usize = 16;
 /// before paginations is required.
 const MAX_STORAGE_ITER_ITEMS: usize = 5;
 
+/// The maximum number of `chainHead_follow` subscriptions per connection.
+const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;
+
 impl Default for ChainHeadConfig {
 	fn default() -> Self {
 		ChainHeadConfig {
@@ -93,6 +98,7 @@ impl Default for ChainHeadConfig {
 			subscription_max_pinned_duration: MAX_PINNED_DURATION,
 			subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
 			operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		}
 	}
 }
@@ -106,7 +112,7 @@ pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
 	/// Executor to spawn subscriptions.
 	executor: SubscriptionTaskExecutor,
 	/// Keep track of the pinned blocks for each subscription.
-	subscriptions: Arc<SubscriptionManagement<Block, BE>>,
+	subscriptions: SubscriptionManagement<Block, BE>,
 	/// The maximum number of items reported by the `chainHead_storage` before
 	/// pagination is required.
 	operation_max_storage_items: usize,
@@ -126,12 +132,13 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
 			client,
 			backend: backend.clone(),
 			executor,
-			subscriptions: Arc::new(SubscriptionManagement::new(
+			subscriptions: SubscriptionManagement::new(
 				config.global_max_pinned_blocks,
 				config.subscription_max_pinned_duration,
 				config.subscription_max_ongoing_operations,
+				config.max_follow_subscriptions_per_connection,
 				backend,
-			)),
+			),
 			operation_max_storage_items: config.operation_max_storage_items,
 			_phantom: PhantomData,
 		}
@@ -182,12 +189,23 @@ where
 		let client = self.client.clone();
 
 		let fut = async move {
+			// Ensure the current connection ID has enough space to accept a new subscription.
+			let connection_id = pending.connection_id();
+			// The RAII `reserved_subscription` will clean up resources on drop:
+			// - free the reserved subscription for the connection ID.
+			// - remove the subscription ID from the subscription management.
+			let Some(mut reserved_subscription) = subscriptions.reserve_subscription(connection_id)
+			else {
+				pending.reject(ChainHeadRpcError::ReachedLimits).await;
+				return
+			};
+
 			let Ok(sink) = pending.accept().await else { return };
 
 			let sub_id = read_subscription_id_as_string(&sink);
-
 			// Keep track of the subscription.
-			let Some(sub_data) = subscriptions.insert_subscription(sub_id.clone(), with_runtime)
+			let Some(sub_data) =
+				reserved_subscription.insert_subscription(sub_id.clone(), with_runtime)
 			else {
 				// Inserting the subscription can only fail if the JsonRPSee
 				// generated a duplicate subscription ID.
@@ -201,91 +219,117 @@ where
 			let mut chain_head_follow = ChainHeadFollower::new(
 				client,
 				backend,
-				subscriptions.clone(),
+				subscriptions,
 				with_runtime,
 				sub_id.clone(),
 			);
 
 			chain_head_follow.generate_events(sink, sub_data).await;
 
-			subscriptions.remove_subscription(&sub_id);
 			debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
 		};
 
 		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
 	}
 
-	fn chain_head_unstable_body(
+	async fn chain_head_unstable_body(
 		&self,
+		connection_details: ConnectionDetails,
 		follow_subscription: String,
 		hash: Block::Hash,
 	) -> ResponsePayload<'static, MethodResponse> {
-		let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
-			Ok(block) => block,
-			Err(SubscriptionManagementError::SubscriptionAbsent) |
-			Err(SubscriptionManagementError::ExceededLimits) =>
-				return ResponsePayload::success(MethodResponse::LimitReached),
-			Err(SubscriptionManagementError::BlockHashAbsent) => {
-				// Block is not part of the subscription.
-				return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
-			},
-			Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
-		};
+		if !self
+			.subscriptions
+			.contains_subscription(connection_details.id(), &follow_subscription)
+		{
+			// The spec says to return `LimitReached` if the follow subscription is invalid or
+			// stale.
+			return ResponsePayload::success(MethodResponse::LimitReached);
+		}
 
-		let operation_id = block_guard.operation().operation_id();
+		let client = self.client.clone();
+		let subscriptions = self.subscriptions.clone();
+		let executor = self.executor.clone();
+
+		let result = spawn_blocking(&self.executor, async move {
+			let mut block_guard = match subscriptions.lock_block(&follow_subscription, hash, 1) {
+				Ok(block) => block,
+				Err(SubscriptionManagementError::SubscriptionAbsent) |
+				Err(SubscriptionManagementError::ExceededLimits) =>
+					return ResponsePayload::success(MethodResponse::LimitReached),
+				Err(SubscriptionManagementError::BlockHashAbsent) => {
+					// Block is not part of the subscription.
+					return ResponsePayload::error(ChainHeadRpcError::InvalidBlock);
+				},
+				Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
+			};
 
-		let event = match self.client.block(hash) {
-			Ok(Some(signed_block)) => {
-				let extrinsics = signed_block
-					.block
-					.extrinsics()
-					.iter()
-					.map(|extrinsic| hex_string(&extrinsic.encode()))
-					.collect();
-				FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
+			let operation_id = block_guard.operation().operation_id();
+
+			let event = match client.block(hash) {
+				Ok(Some(signed_block)) => {
+					let extrinsics = signed_block
+						.block
+						.extrinsics()
+						.iter()
+						.map(|extrinsic| hex_string(&extrinsic.encode()))
+						.collect();
+					FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
+						operation_id: operation_id.clone(),
+						value: extrinsics,
+					})
+				},
+				Ok(None) => {
+					// The block's body was pruned. This subscription ID has become invalid.
+					debug!(
+						target: LOG_TARGET,
+						"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
+						&follow_subscription,
+						hash
+					);
+					subscriptions.remove_subscription(&follow_subscription);
+					return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
+				},
+				Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
 					operation_id: operation_id.clone(),
-					value: extrinsics,
-				})
-			},
-			Ok(None) => {
-				// The block's body was pruned. This subscription ID has become invalid.
-				debug!(
-					target: LOG_TARGET,
-					"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
-					&follow_subscription,
-					hash
-				);
-				self.subscriptions.remove_subscription(&follow_subscription);
-				return ResponsePayload::error(ChainHeadRpcError::InvalidBlock)
-			},
-			Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
-				operation_id: operation_id.clone(),
-				error: error.to_string(),
-			}),
-		};
+					error: error.to_string(),
+				}),
+			};
 
-		let (rp, rp_fut) = method_started_response(operation_id, None);
+			let (rp, rp_fut) = method_started_response(operation_id, None);
+			let fut = async move {
+				// Wait for the server to send out the response and if it produces an error no event
+				// should be generated.
+				if rp_fut.await.is_err() {
+					return;
+				}
 
-		let fut = async move {
-			// Events should only by generated
-			// if the response was successfully propagated.
-			if rp_fut.await.is_err() {
-				return;
-			}
-			let _ = block_guard.response_sender().unbounded_send(event);
-		};
+				let _ = block_guard.response_sender().unbounded_send(event);
+			};
+			executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
 
-		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
+			rp
+		});
 
-		rp
+		result
+			.await
+			.unwrap_or_else(|_| ResponsePayload::success(MethodResponse::LimitReached))
 	}
 
-	fn chain_head_unstable_header(
+	async fn chain_head_unstable_header(
 		&self,
+		connection_details: ConnectionDetails,
 		follow_subscription: String,
 		hash: Block::Hash,
 	) -> Result<Option<String>, ChainHeadRpcError> {
-		let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
+		if !self
+			.subscriptions
+			.contains_subscription(connection_details.id(), &follow_subscription)
+		{
+			return Ok(None);
+		}
+
+		let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
 			Ok(block) => block,
 			Err(SubscriptionManagementError::SubscriptionAbsent) |
 			Err(SubscriptionManagementError::ExceededLimits) => return Ok(None),
@@ -296,19 +340,35 @@ where
 			Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
 		};
 
-		self.client
-			.header(hash)
-			.map(|opt_header| opt_header.map(|h| hex_string(&h.encode())))
-			.map_err(|err| ChainHeadRpcError::InternalError(err.to_string()))
+		let client = self.client.clone();
+		let result = spawn_blocking(&self.executor, async move {
+			let _block_guard = block_guard;
+
+			client
+				.header(hash)
+				.map(|opt_header| opt_header.map(|h| hex_string(&h.encode())))
+				.map_err(|err| ChainHeadRpcError::InternalError(err.to_string()))
+		});
+		result.await.unwrap_or_else(|_| Ok(None))
 	}
 
-	fn chain_head_unstable_storage(
+	async fn chain_head_unstable_storage(
 		&self,
+		connection_details: ConnectionDetails,
 		follow_subscription: String,
 		hash: Block::Hash,
 		items: Vec<StorageQuery<String>>,
 		child_trie: Option<String>,
 	) -> ResponsePayload<'static, MethodResponse> {
+		if !self
+			.subscriptions
+			.contains_subscription(connection_details.id(), &follow_subscription)
+		{
+			// The spec says to return `LimitReached` if the follow subscription is invalid or
+			// stale.
+			return ResponsePayload::success(MethodResponse::LimitReached);
+		}
+
 		// Gain control over parameter parsing and returned error.
 		let items = match items
 			.into_iter()
@@ -357,25 +417,25 @@ where
 		let mut items = items;
 		items.truncate(num_operations);
 
-		let (rp, rp_is_success) = method_started_response(operation_id, Some(discarded));
-
+		let (rp, rp_fut) = method_started_response(operation_id, Some(discarded));
 		let fut = async move {
-			// Events should only by generated
-			// if the response was successfully propagated.
-			if rp_is_success.await.is_err() {
+			// Wait for the server to send out the response and if it produces an error no event
+			// should be generated.
+			if rp_fut.await.is_err() {
 				return;
 			}
+
 			storage_client.generate_events(block_guard, hash, items, child_trie).await;
 		};
-
 		self.executor
 			.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
 
 		rp
 	}
 
-	fn chain_head_unstable_call(
+	async fn chain_head_unstable_call(
 		&self,
+		connection_details: ConnectionDetails,
 		follow_subscription: String,
 		hash: Block::Hash,
 		function: String,
@@ -386,6 +446,15 @@ where
 			Err(err) => return ResponsePayload::error(err),
 		};
 
+		if !self
+			.subscriptions
+			.contains_subscription(connection_details.id(), &follow_subscription)
+		{
+			// The spec says to return `LimitReached` if the follow subscription is invalid or
+			// stale.
+			return ResponsePayload::success(MethodResponse::LimitReached);
+		}
+
 		let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
 			Ok(block) => block,
 			Err(SubscriptionManagementError::SubscriptionAbsent) |
@@ -408,44 +477,53 @@ where
 		}
 
 		let operation_id = block_guard.operation().operation_id();
-		let event = self
-			.client
-			.executor()
-			.call(hash, &function, &call_parameters, CallContext::Offchain)
-			.map(|result| {
-				FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
-					operation_id: operation_id.clone(),
-					output: hex_string(&result),
-				})
-			})
-			.unwrap_or_else(|error| {
-				FollowEvent::<Block::Hash>::OperationError(OperationError {
-					operation_id: operation_id.clone(),
-					error: error.to_string(),
-				})
-			});
-
-		let (rp, rp_fut) = method_started_response(operation_id, None);
+		let client = self.client.clone();
 
+		let (rp, rp_fut) = method_started_response(operation_id.clone(), None);
 		let fut = async move {
-			// Events should only by generated
-			// if the response was successfully propagated.
+			// Wait for the server to send out the response and if it produces an error no event
+			// should be generated.
 			if rp_fut.await.is_err() {
-				return;
+				return
 			}
+
+			let event = client
+				.executor()
+				.call(hash, &function, &call_parameters, CallContext::Offchain)
+				.map(|result| {
+					FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
+						operation_id: operation_id.clone(),
+						output: hex_string(&result),
+					})
+				})
+				.unwrap_or_else(|error| {
+					FollowEvent::<Block::Hash>::OperationError(OperationError {
+						operation_id: operation_id.clone(),
+						error: error.to_string(),
+					})
+				});
+
 			let _ = block_guard.response_sender().unbounded_send(event);
 		};
-
-		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
+		self.executor
+			.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
 
 		rp
 	}
 
-	fn chain_head_unstable_unpin(
+	async fn chain_head_unstable_unpin(
 		&self,
+		connection_details: ConnectionDetails,
 		follow_subscription: String,
 		hash_or_hashes: ListOrValue<Block::Hash>,
 	) -> Result<(), ChainHeadRpcError> {
+		if !self
+			.subscriptions
+			.contains_subscription(connection_details.id(), &follow_subscription)
+		{
+			return Ok(());
+		}
+
 		let result = match hash_or_hashes {
 			ListOrValue::Value(hash) =>
 				self.subscriptions.unpin_blocks(&follow_subscription, [hash]),
@@ -469,11 +547,19 @@ where
 		}
 	}
 
-	fn chain_head_unstable_continue(
+	async fn chain_head_unstable_continue(
 		&self,
+		connection_details: ConnectionDetails,
 		follow_subscription: String,
 		operation_id: String,
 	) -> Result<(), ChainHeadRpcError> {
+		if !self
+			.subscriptions
+			.contains_subscription(connection_details.id(), &follow_subscription)
+		{
+			return Ok(())
+		}
+
 		let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id)
 		else {
 			return Ok(())
@@ -487,11 +573,19 @@ where
 		}
 	}
 
-	fn chain_head_unstable_stop_operation(
+	async fn chain_head_unstable_stop_operation(
 		&self,
+		connection_details: ConnectionDetails,
 		follow_subscription: String,
 		operation_id: String,
 	) -> Result<(), ChainHeadRpcError> {
+		if !self
+			.subscriptions
+			.contains_subscription(connection_details.id(), &follow_subscription)
+		{
+			return Ok(())
+		}
+
 		let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id)
 		else {
 			return Ok(())
@@ -510,3 +604,26 @@ fn method_started_response(
 	let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items });
 	ResponsePayload::success(rp).notify_on_completion()
 }
+
+/// Spawn a blocking future on the provided executor and return the result on a oneshot channel.
+///
+/// This is a wrapper to extract the result of a `executor.spawn_blocking` future.
+fn spawn_blocking<R>(
+	executor: &SubscriptionTaskExecutor,
+	fut: impl std::future::Future<Output = R> + Send + 'static,
+) -> oneshot::Receiver<R>
+where
+	R: Send + 'static,
+{
+	let (tx, rx) = oneshot::channel();
+
+	let blocking_fut = async move {
+		let result = fut.await;
+		// Send the result back on the channel.
+		let _ = tx.send(result);
+	};
+
+	executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), blocking_fut.boxed());
+
+	rx
+}
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
index afa99f3aa1648823aee646304c7c2b2cd71da6e2..90cc62a36fa91957e499b5d32cb95e92f728da06 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
@@ -60,7 +60,7 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
 	/// Backend of the chain.
 	backend: Arc<BE>,
 	/// Subscriptions handle.
-	sub_handle: Arc<SubscriptionManagement<Block, BE>>,
+	sub_handle: SubscriptionManagement<Block, BE>,
 	/// Subscription was started with the runtime updates flag.
 	with_runtime: bool,
 	/// Subscription ID.
@@ -74,7 +74,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
 	pub fn new(
 		client: Arc<Client>,
 		backend: Arc<BE>,
-		sub_handle: Arc<SubscriptionManagement<Block, BE>>,
+		sub_handle: SubscriptionManagement<Block, BE>,
 		with_runtime: bool,
 		sub_id: String,
 	) -> Self {
@@ -546,7 +546,12 @@ where
 		EventStream: Stream<Item = NotificationType<Block>> + Unpin,
 	{
 		let mut stream_item = stream.next();
-		let mut stop_event = rx_stop;
+
+		// The stop event can be triggered by the chainHead logic when the pinned
+		// block guarantee cannot be hold. Or when the client is disconnected.
+		let connection_closed = sink.closed();
+		tokio::pin!(connection_closed);
+		let mut stop_event = futures_util::future::select(rx_stop, connection_closed);
 
 		while let Either::Left((Some(event), next_stop_event)) =
 			futures_util::future::select(stream_item, stop_event).await
@@ -594,8 +599,10 @@ where
 			stop_event = next_stop_event;
 		}
 
-		// If we got here either the substrate streams have closed
-		// or the `Stop` receiver was triggered.
+		// If we got here either:
+		// - the substrate streams have closed
+		// - the `Stop` receiver was triggered internally (cannot hold the pinned block guarantee)
+		// - the client disconnected.
 		let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
 		let _ = sink.send(msg).await;
 	}
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/error.rs b/substrate/client/rpc-spec-v2/src/chain_head/error.rs
index 8c50e445aa0cf6480d3a672f7120cd7a492f4c99..35604db0660091b3bc8ea7053e3ccd5391f9a32a 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/error.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/error.rs
@@ -23,6 +23,9 @@ use jsonrpsee::types::error::ErrorObject;
 /// ChainHead RPC errors.
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
+	/// Maximum number of chainHead_follow has been reached.
+	#[error("Maximum number of chainHead_follow has been reached")]
+	ReachedLimits,
 	/// The provided block hash is invalid.
 	#[error("Invalid block hash")]
 	InvalidBlock,
@@ -46,6 +49,8 @@ pub enum Error {
 /// Errors for `chainHead` RPC module, as defined in
 /// <https://github.com/paritytech/json-rpc-interface-spec>.
 pub mod rpc_spec_v2 {
+	/// Maximum number of chainHead_follow has been reached.
+	pub const REACHED_LIMITS: i32 = -32800;
 	/// The provided block hash is invalid.
 	pub const INVALID_BLOCK_ERROR: i32 = -32801;
 	/// The follow subscription was started with `withRuntime` set to `false`.
@@ -70,6 +75,8 @@ impl From<Error> for ErrorObject<'static> {
 		let msg = e.to_string();
 
 		match e {
+			Error::ReachedLimits =>
+				ErrorObject::owned(rpc_spec_v2::REACHED_LIMITS, msg, None::<()>),
 			Error::InvalidBlock =>
 				ErrorObject::owned(rpc_spec_v2::INVALID_BLOCK_ERROR, msg, None::<()>),
 			Error::InvalidRuntimeCall(_) =>
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
index d2879679501fd17eada1b826ff01478cce24b7a3..1ebee3c80fc8ea9b11637560d067a376b6a6a1d3 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
@@ -1455,4 +1455,57 @@ mod tests {
 		let permit_three = ops.reserve_at_most(1).unwrap();
 		assert_eq!(permit_three.num_ops, 1);
 	}
+
+	#[test]
+	fn reserved_subscription_cleans_resources() {
+		let builder = TestClientBuilder::new();
+		let backend = builder.backend();
+		let subs = Arc::new(parking_lot::RwLock::new(SubscriptionsInner::new(
+			10,
+			Duration::from_secs(10),
+			MAX_OPERATIONS_PER_SUB,
+			backend,
+		)));
+
+		// Maximum 2 subscriptions per connection.
+		let rpc_connections = crate::common::connections::RpcConnections::new(2);
+
+		let subscription_management =
+			crate::chain_head::subscription::SubscriptionManagement::_from_inner(
+				subs.clone(),
+				rpc_connections.clone(),
+			);
+
+		let reserved_sub_first = subscription_management.reserve_subscription(1).unwrap();
+		let mut reserved_sub_second = subscription_management.reserve_subscription(1).unwrap();
+		// Subscriptions reserved but not yet populated.
+		assert_eq!(subs.read().subs.len(), 0);
+
+		// Cannot reserve anymore.
+		assert!(subscription_management.reserve_subscription(1).is_none());
+		// Drop the first subscription.
+		drop(reserved_sub_first);
+		// Space is freed-up for the rpc connections.
+		let mut reserved_sub_first = subscription_management.reserve_subscription(1).unwrap();
+
+		// Insert subscriptions.
+		let _sub_data_first =
+			reserved_sub_first.insert_subscription("sub1".to_string(), true).unwrap();
+		let _sub_data_second =
+			reserved_sub_second.insert_subscription("sub2".to_string(), true).unwrap();
+		// Check we have 2 subscriptions under management.
+		assert_eq!(subs.read().subs.len(), 2);
+
+		// Drop first reserved subscription.
+		drop(reserved_sub_first);
+		// Check that the subscription is removed.
+		assert_eq!(subs.read().subs.len(), 1);
+		// Space is freed-up for the rpc connections.
+		let reserved_sub_first = subscription_management.reserve_subscription(1).unwrap();
+
+		// Drop all subscriptions.
+		drop(reserved_sub_first);
+		drop(reserved_sub_second);
+		assert_eq!(subs.read().subs.len(), 0);
+	}
 }
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
index c830e662da2e5c3499f86ae1ddc40bd3b6f4e40a..5b016af1aa49a36c22639593900e4fa7f2faae19 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
@@ -16,6 +16,7 @@
 // You should have received a copy of the GNU General Public License
 // along with this program. If not, see <https://www.gnu.org/licenses/>.
 
+use jsonrpsee::ConnectionId;
 use parking_lot::RwLock;
 use sc_client_api::Backend;
 use sp_runtime::traits::Block as BlockT;
@@ -24,6 +25,11 @@ use std::{sync::Arc, time::Duration};
 mod error;
 mod inner;
 
+use crate::{
+	chain_head::chain_head::LOG_TARGET,
+	common::connections::{RegisteredConnection, ReservedConnection, RpcConnections},
+};
+
 use self::inner::SubscriptionsInner;
 
 pub use self::inner::OperationState;
@@ -34,7 +40,22 @@ pub use inner::{BlockGuard, InsertedSubscriptionData};
 pub struct SubscriptionManagement<Block: BlockT, BE: Backend<Block>> {
 	/// Manage subscription by mapping the subscription ID
 	/// to a set of block hashes.
-	inner: RwLock<SubscriptionsInner<Block, BE>>,
+	inner: Arc<RwLock<SubscriptionsInner<Block, BE>>>,
+
+	/// Ensures that chainHead methods can be called from a single connection context.
+	///
+	/// For example, `chainHead_storage` cannot be called with a subscription ID that
+	/// was obtained from a different connection.
+	rpc_connections: RpcConnections,
+}
+
+impl<Block: BlockT, BE: Backend<Block>> Clone for SubscriptionManagement<Block, BE> {
+	fn clone(&self) -> Self {
+		SubscriptionManagement {
+			inner: self.inner.clone(),
+			rpc_connections: self.rpc_connections.clone(),
+		}
+	}
 }
 
 impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
@@ -43,30 +64,55 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
 		global_max_pinned_blocks: usize,
 		local_max_pin_duration: Duration,
 		max_ongoing_operations: usize,
+		max_follow_subscriptions_per_connection: usize,
 		backend: Arc<BE>,
 	) -> Self {
 		SubscriptionManagement {
-			inner: RwLock::new(SubscriptionsInner::new(
+			inner: Arc::new(RwLock::new(SubscriptionsInner::new(
 				global_max_pinned_blocks,
 				local_max_pin_duration,
 				max_ongoing_operations,
 				backend,
-			)),
+			))),
+			rpc_connections: RpcConnections::new(max_follow_subscriptions_per_connection),
 		}
 	}
 
-	/// Insert a new subscription ID.
+	/// Create a new instance from the inner state.
 	///
-	/// If the subscription was not previously inserted, returns the receiver that is
-	/// triggered upon the "Stop" event. Otherwise, if the subscription ID was already
-	/// inserted returns none.
-	pub fn insert_subscription(
+	/// # Note
+	///
+	/// Used for testing.
+	#[cfg(test)]
+	pub(crate) fn _from_inner(
+		inner: Arc<RwLock<SubscriptionsInner<Block, BE>>>,
+		rpc_connections: RpcConnections,
+	) -> Self {
+		SubscriptionManagement { inner, rpc_connections }
+	}
+
+	/// Reserve space for a subscriptions.
+	///
+	/// Fails if the connection ID is has reached the maximum number of active subscriptions.
+	pub fn reserve_subscription(
 		&self,
-		sub_id: String,
-		runtime_updates: bool,
-	) -> Option<InsertedSubscriptionData<Block>> {
-		let mut inner = self.inner.write();
-		inner.insert_subscription(sub_id, runtime_updates)
+		connection_id: ConnectionId,
+	) -> Option<ReservedSubscription<Block, BE>> {
+		let reserved_token = self.rpc_connections.reserve_space(connection_id)?;
+
+		Some(ReservedSubscription {
+			state: ConnectionState::Reserved(reserved_token),
+			inner: self.inner.clone(),
+		})
+	}
+
+	/// Check if the given connection contains the given subscription.
+	pub fn contains_subscription(
+		&self,
+		connection_id: ConnectionId,
+		subscription_id: &str,
+	) -> bool {
+		self.rpc_connections.contains_identifier(connection_id, subscription_id)
 	}
 
 	/// Remove the subscription ID with associated pinned blocks.
@@ -136,3 +182,63 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
 		inner.get_operation(sub_id, operation_id)
 	}
 }
+
+/// The state of the connection.
+///
+/// The state starts in a [`ConnectionState::Reserved`] state and then transitions to
+/// [`ConnectionState::Registered`] when the subscription is inserted.
+enum ConnectionState {
+	Reserved(ReservedConnection),
+	Registered { _unregister_on_drop: RegisteredConnection, sub_id: String },
+	Empty,
+}
+
+/// RAII wrapper that removes the subscription from internal mappings and
+/// gives back the reserved space for the connection.
+pub struct ReservedSubscription<Block: BlockT, BE: Backend<Block>> {
+	state: ConnectionState,
+	inner: Arc<RwLock<SubscriptionsInner<Block, BE>>>,
+}
+
+impl<Block: BlockT, BE: Backend<Block>> ReservedSubscription<Block, BE> {
+	/// Insert a new subscription ID.
+	///
+	/// If the subscription was not previously inserted, returns the receiver that is
+	/// triggered upon the "Stop" event. Otherwise, if the subscription ID was already
+	/// inserted returns none.
+	///
+	/// # Note
+	///
+	/// This method should be called only once.
+	pub fn insert_subscription(
+		&mut self,
+		sub_id: String,
+		runtime_updates: bool,
+	) -> Option<InsertedSubscriptionData<Block>> {
+		match std::mem::replace(&mut self.state, ConnectionState::Empty) {
+			ConnectionState::Reserved(reserved) => {
+				let registered_token = reserved.register(sub_id.clone())?;
+				self.state = ConnectionState::Registered {
+					_unregister_on_drop: registered_token,
+					sub_id: sub_id.clone(),
+				};
+
+				let mut inner = self.inner.write();
+				inner.insert_subscription(sub_id, runtime_updates)
+			},
+			// Cannot insert multiple subscriptions into one single reserved space.
+			ConnectionState::Registered { .. } | ConnectionState::Empty => {
+				log::error!(target: LOG_TARGET, "Called insert_subscription on a connection that is not reserved");
+				None
+			},
+		}
+	}
+}
+
+impl<Block: BlockT, BE: Backend<Block>> Drop for ReservedSubscription<Block, BE> {
+	fn drop(&mut self) {
+		if let ConnectionState::Registered { sub_id, .. } = &self.state {
+			self.inner.write().remove_subscription(sub_id);
+		}
+	}
+}
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
index 30152efb5b623d664845453c64873dc45cdbb278..c3f10a201c589d94e186739c0cdf9c4fd3b63890 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
@@ -17,7 +17,7 @@
 // along with this program. If not, see <https://www.gnu.org/licenses/>.
 
 use crate::{
-	chain_head::{event::MethodResponse, test_utils::ChainHeadMockClient},
+	chain_head::{api::ChainHeadApiClient, event::MethodResponse, test_utils::ChainHeadMockClient},
 	common::events::{StorageQuery, StorageQueryType, StorageResultType},
 	hex_string,
 };
@@ -27,8 +27,12 @@ use assert_matches::assert_matches;
 use codec::{Decode, Encode};
 use futures::Future;
 use jsonrpsee::{
-	core::server::Subscription as RpcSubscription, rpc_params, MethodsError as Error, RpcModule,
+	core::{
+		client::Subscription as RpcClientSubscription, server::Subscription as RpcSubscription,
+	},
+	rpc_params, MethodsError as Error, RpcModule,
 };
+
 use sc_block_builder::BlockBuilderBuilder;
 use sc_client_api::ChildInfo;
 use sc_service::client::new_in_mem;
@@ -59,6 +63,8 @@ const MAX_PINNED_BLOCKS: usize = 32;
 const MAX_PINNED_SECS: u64 = 60;
 const MAX_OPERATIONS: usize = 16;
 const MAX_PAGINATION_LIMIT: usize = 5;
+const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;
+
 const INVALID_HASH: [u8; 32] = [1; 32];
 const KEY: &[u8] = b":mock";
 const VALUE: &[u8] = b"hello world";
@@ -66,6 +72,35 @@ const CHILD_STORAGE_KEY: &[u8] = b"child";
 const CHILD_VALUE: &[u8] = b"child value";
 const DOES_NOT_PRODUCE_EVENTS_SECONDS: u64 = 10;
 
+/// Start an RPC server with the chainHead module.
+pub async fn run_server() -> std::net::SocketAddr {
+	let builder = TestClientBuilder::new();
+	let backend = builder.backend();
+	let client = Arc::new(builder.build());
+
+	let api = ChainHead::new(
+		client,
+		backend,
+		Arc::new(TaskExecutor::default()),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: 1,
+		},
+	)
+	.into_rpc();
+
+	let server = jsonrpsee::server::ServerBuilder::default().build("127.0.0.1:0").await.unwrap();
+
+	let addr = server.local_addr().unwrap();
+	let handle = server.start(api);
+
+	tokio::spawn(handle.stopped());
+	addr
+}
+
 async fn get_next_event<T: serde::de::DeserializeOwned>(sub: &mut RpcSubscription) -> T {
 	let (event, _sub_id) = tokio::time::timeout(std::time::Duration::from_secs(60), sub.next())
 		.await
@@ -113,6 +148,7 @@ async fn setup_api() -> (
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -163,6 +199,7 @@ async fn follow_subscription_produces_blocks() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -231,6 +268,7 @@ async fn follow_with_runtime() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -543,6 +581,7 @@ async fn call_runtime_without_flag() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -1201,6 +1240,7 @@ async fn separate_operation_ids_for_subscriptions() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -1289,6 +1329,7 @@ async fn follow_generates_initial_blocks() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -1444,6 +1485,7 @@ async fn follow_exceeding_pinned_blocks() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -1520,6 +1562,7 @@ async fn follow_with_unpin() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -1631,6 +1674,7 @@ async fn unpin_duplicate_hashes() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -1733,6 +1777,7 @@ async fn follow_with_multiple_unpin_hashes() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -1886,6 +1931,7 @@ async fn follow_prune_best_block() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -2071,6 +2117,7 @@ async fn follow_forks_pruned_block() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -2230,6 +2277,7 @@ async fn follow_report_multiple_pruned_block() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -2475,6 +2523,7 @@ async fn pin_block_references() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -2612,6 +2661,7 @@ async fn follow_finalized_before_new_block() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -2726,6 +2776,7 @@ async fn ensure_operation_limits_works() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: 1,
 			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -2830,6 +2881,7 @@ async fn check_continue_operation() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: 1,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -3012,6 +3064,7 @@ async fn stop_storage_operation() {
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
 			operation_max_storage_items: 1,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
 	)
 	.into_rpc();
@@ -3297,3 +3350,176 @@ async fn storage_closest_merkle_value() {
 		merkle_values_rhs.get(&hex_string(b":AAAA")).unwrap()
 	);
 }
+
+#[tokio::test]
+async fn chain_head_single_connection_context() {
+	let server_addr = run_server().await;
+	let server_url = format!("ws://{}", server_addr);
+	let client = jsonrpsee::ws_client::WsClientBuilder::default()
+		.build(&server_url)
+		.await
+		.unwrap();
+	// Calls cannot be made from a different connection context.
+	let second_client = jsonrpsee::ws_client::WsClientBuilder::default()
+		.build(&server_url)
+		.await
+		.unwrap();
+
+	let mut sub: RpcClientSubscription<FollowEvent<String>> =
+		ChainHeadApiClient::<String>::chain_head_unstable_follow(&client, true)
+			.await
+			.unwrap();
+
+	let event = tokio::time::timeout(std::time::Duration::from_secs(60), sub.next())
+		.await
+		.unwrap()
+		.unwrap()
+		.unwrap();
+	let finalized_hash = match event {
+		FollowEvent::Initialized(init) => init.finalized_block_hashes.into_iter().last().unwrap(),
+		_ => panic!("Expected FollowEvent::Initialized"),
+	};
+
+	let first_sub_id = match sub.kind() {
+		jsonrpsee::core::client::SubscriptionKind::Subscription(id) => match id {
+			jsonrpsee::types::SubscriptionId::Num(num) => num.to_string(),
+			jsonrpsee::types::SubscriptionId::Str(s) => s.to_string(),
+		},
+		_ => panic!("Unexpected subscription ID"),
+	};
+
+	// Trying to unpin from a different connection will have no effect.
+	let _response = ChainHeadApiClient::<String>::chain_head_unstable_unpin(
+		&second_client,
+		first_sub_id.clone(),
+		crate::chain_head::api::ListOrValue::Value(finalized_hash.clone()),
+	)
+	.await
+	.unwrap();
+
+	// Body can still be fetched from the first subscription.
+	let response: MethodResponse = ChainHeadApiClient::<String>::chain_head_unstable_body(
+		&client,
+		first_sub_id.clone(),
+		finalized_hash.clone(),
+	)
+	.await
+	.unwrap();
+	assert_matches!(response, MethodResponse::Started(_started));
+
+	// Cannot make a call from a different connection context.
+	let response: MethodResponse = ChainHeadApiClient::<String>::chain_head_unstable_body(
+		&second_client,
+		first_sub_id.clone(),
+		finalized_hash.clone(),
+	)
+	.await
+	.unwrap();
+	assert_matches!(response, MethodResponse::LimitReached);
+
+	let response: Option<String> = ChainHeadApiClient::<String>::chain_head_unstable_header(
+		&client,
+		first_sub_id.clone(),
+		finalized_hash.clone(),
+	)
+	.await
+	.unwrap();
+	assert!(response.is_some());
+	// Cannot make a call from a different connection context.
+	let response: Option<String> = ChainHeadApiClient::<String>::chain_head_unstable_header(
+		&second_client,
+		first_sub_id.clone(),
+		finalized_hash.clone(),
+	)
+	.await
+	.unwrap();
+	assert!(response.is_none());
+
+	let key = hex_string(&KEY);
+	let response: MethodResponse = ChainHeadApiClient::<String>::chain_head_unstable_storage(
+		&client,
+		first_sub_id.clone(),
+		finalized_hash.clone(),
+		vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }],
+		None,
+	)
+	.await
+	.unwrap();
+	assert_matches!(response, MethodResponse::Started(_started));
+	// Cannot make a call from a different connection context.
+	let response: MethodResponse = ChainHeadApiClient::<String>::chain_head_unstable_storage(
+		&second_client,
+		first_sub_id.clone(),
+		finalized_hash.clone(),
+		vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }],
+		None,
+	)
+	.await
+	.unwrap();
+	assert_matches!(response, MethodResponse::LimitReached);
+
+	let alice_id = AccountKeyring::Alice.to_account_id();
+	// Hex encoded scale encoded bytes representing the call parameters.
+	let call_parameters = hex_string(&alice_id.encode());
+	let response: MethodResponse = ChainHeadApiClient::<String>::chain_head_unstable_call(
+		&client,
+		first_sub_id.clone(),
+		finalized_hash.clone(),
+		"AccountNonceApi_account_nonce".into(),
+		call_parameters.clone(),
+	)
+	.await
+	.unwrap();
+	assert_matches!(response, MethodResponse::Started(_started));
+	// Cannot make a call from a different connection context.
+	let response: MethodResponse = ChainHeadApiClient::<String>::chain_head_unstable_call(
+		&second_client,
+		first_sub_id.clone(),
+		finalized_hash.clone(),
+		"AccountNonceApi_account_nonce".into(),
+		call_parameters.clone(),
+	)
+	.await
+	.unwrap();
+	assert_matches!(response, MethodResponse::LimitReached);
+}
+
+#[tokio::test]
+async fn chain_head_limit_reached() {
+	let builder = TestClientBuilder::new();
+	let backend = builder.backend();
+	let client = Arc::new(builder.build());
+
+	// Maximum of 1 chainHead_follow subscription.
+	let api = ChainHead::new(
+		client.clone(),
+		backend,
+		Arc::new(TaskExecutor::default()),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: 1,
+		},
+	)
+	.into_rpc();
+
+	let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
+	// Initialized must always be reported first.
+	let _event: FollowEvent<String> = get_next_event(&mut sub).await;
+
+	let error = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap_err();
+	assert!(error
+		.to_string()
+		.contains("Maximum number of chainHead_follow has been reached"));
+
+	// After dropping the subscription, other subscriptions are allowed to be created.
+	drop(sub);
+	// Ensure the `chainHead_unfollow` is propagated to the server.
+	tokio::time::sleep(std::time::Duration::from_secs(5)).await;
+
+	let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap();
+	// Initialized must always be reported first.
+	let _event: FollowEvent<String> = get_next_event(&mut sub).await;
+}
diff --git a/substrate/client/rpc-spec-v2/src/common/connections.rs b/substrate/client/rpc-spec-v2/src/common/connections.rs
new file mode 100644
index 0000000000000000000000000000000000000000..c16a80bf49db93853ca223ab632dbc544082ae44
--- /dev/null
+++ b/substrate/client/rpc-spec-v2/src/common/connections.rs
@@ -0,0 +1,262 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+use jsonrpsee::ConnectionId;
+use parking_lot::Mutex;
+use std::{
+	collections::{HashMap, HashSet},
+	sync::Arc,
+};
+
+/// Connection state which keeps track whether a connection exist and
+/// the number of concurrent operations.
+#[derive(Default, Clone)]
+pub struct RpcConnections {
+	/// The number of identifiers that can be registered for each connection.
+	///
+	/// # Example
+	///
+	/// This is used to limit how many `chainHead_follow` subscriptions are active at one time.
+	capacity: usize,
+	/// Map the connecton ID to a set of identifiers.
+	data: Arc<Mutex<HashMap<ConnectionId, ConnectionData>>>,
+}
+
+#[derive(Default)]
+struct ConnectionData {
+	/// The total number of identifiers for the given connection.
+	///
+	/// An identifier for a connection might be:
+	/// - the subscription ID for chainHead_follow
+	/// - the operation ID for the transactionBroadcast API
+	/// - or simply how many times the transaction API has been called.
+	///
+	/// # Note
+	///
+	/// Because a pending subscription sink does not expose the future subscription ID,
+	/// we cannot register a subscription ID before the pending subscription is accepted.
+	/// This variable ensures that we have enough capacity to register an identifier, after
+	/// the subscription is accepted. Otherwise, a jsonrpc error object should be returned.
+	num_identifiers: usize,
+	/// Active registered identifiers for the given connection.
+	///
+	/// # Note
+	///
+	/// For chainHead, this represents the subscription ID.
+	/// For transactionBroadcast, this represents the operation ID.
+	/// For transaction, this is empty and the number of active calls is tracked by
+	/// [`Self::num_identifiers`].
+	identifiers: HashSet<String>,
+}
+
+impl RpcConnections {
+	/// Constructs a new instance of [`RpcConnections`].
+	pub fn new(capacity: usize) -> Self {
+		RpcConnections { capacity, data: Default::default() }
+	}
+
+	/// Reserve space for a new connection identifier.
+	///
+	/// If the number of active identifiers for the given connection exceeds the capacity,
+	/// returns None.
+	pub fn reserve_space(&self, connection_id: ConnectionId) -> Option<ReservedConnection> {
+		let mut data = self.data.lock();
+
+		let entry = data.entry(connection_id).or_insert_with(ConnectionData::default);
+		if entry.num_identifiers >= self.capacity {
+			return None;
+		}
+		entry.num_identifiers = entry.num_identifiers.saturating_add(1);
+
+		Some(ReservedConnection { connection_id, rpc_connections: Some(self.clone()) })
+	}
+
+	/// Gives back the reserved space before the connection identifier is registered.
+	///
+	/// # Note
+	///
+	/// This may happen if the pending subscription cannot be accepted (unlikely).
+	fn unreserve_space(&self, connection_id: ConnectionId) {
+		let mut data = self.data.lock();
+
+		let entry = data.entry(connection_id).or_insert_with(ConnectionData::default);
+		entry.num_identifiers = entry.num_identifiers.saturating_sub(1);
+
+		if entry.num_identifiers == 0 {
+			data.remove(&connection_id);
+		}
+	}
+
+	/// Register an identifier for the given connection.
+	///
+	/// Users must call [`Self::reserve_space`] before calling this method to ensure enough
+	/// space is available.
+	///
+	/// Returns true if the identifier was inserted successfully, false if the identifier was
+	/// already inserted or reached capacity.
+	fn register_identifier(&self, connection_id: ConnectionId, identifier: String) -> bool {
+		let mut data = self.data.lock();
+
+		let entry = data.entry(connection_id).or_insert_with(ConnectionData::default);
+		// Should be already checked `Self::reserve_space`.
+		if entry.identifiers.len() >= self.capacity {
+			return false;
+		}
+
+		entry.identifiers.insert(identifier)
+	}
+
+	/// Unregister an identifier for the given connection.
+	fn unregister_identifier(&self, connection_id: ConnectionId, identifier: &str) {
+		let mut data = self.data.lock();
+		if let Some(connection_data) = data.get_mut(&connection_id) {
+			connection_data.identifiers.remove(identifier);
+			connection_data.num_identifiers = connection_data.num_identifiers.saturating_sub(1);
+
+			if connection_data.num_identifiers == 0 {
+				data.remove(&connection_id);
+			}
+		}
+	}
+
+	/// Check if the given connection contains the given identifier.
+	pub fn contains_identifier(&self, connection_id: ConnectionId, identifier: &str) -> bool {
+		let data = self.data.lock();
+		data.get(&connection_id)
+			.map(|connection_data| connection_data.identifiers.contains(identifier))
+			.unwrap_or(false)
+	}
+}
+
+/// RAII wrapper that ensures the reserved space is given back if the object is
+/// dropped before the identifier is registered.
+pub struct ReservedConnection {
+	connection_id: ConnectionId,
+	rpc_connections: Option<RpcConnections>,
+}
+
+impl ReservedConnection {
+	/// Register the identifier for the given connection.
+	pub fn register(mut self, identifier: String) -> Option<RegisteredConnection> {
+		let rpc_connections = self.rpc_connections.take()?;
+
+		if rpc_connections.register_identifier(self.connection_id, identifier.clone()) {
+			Some(RegisteredConnection {
+				connection_id: self.connection_id,
+				identifier,
+				rpc_connections,
+			})
+		} else {
+			None
+		}
+	}
+}
+
+impl Drop for ReservedConnection {
+	fn drop(&mut self) {
+		if let Some(rpc_connections) = self.rpc_connections.take() {
+			rpc_connections.unreserve_space(self.connection_id);
+		}
+	}
+}
+
+/// RAII wrapper that ensures the identifier is unregistered if the object is dropped.
+pub struct RegisteredConnection {
+	connection_id: ConnectionId,
+	identifier: String,
+	rpc_connections: RpcConnections,
+}
+
+impl Drop for RegisteredConnection {
+	fn drop(&mut self) {
+		self.rpc_connections.unregister_identifier(self.connection_id, &self.identifier);
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+
+	#[test]
+	fn reserve_space() {
+		let rpc_connections = RpcConnections::new(2);
+		let reserved = rpc_connections.reserve_space(1);
+		assert!(reserved.is_some());
+		assert_eq!(1, rpc_connections.data.lock().get(&1).unwrap().num_identifiers);
+		assert_eq!(rpc_connections.data.lock().len(), 1);
+
+		let reserved = reserved.unwrap();
+		let registered = reserved.register("identifier1".to_string()).unwrap();
+		assert!(rpc_connections.contains_identifier(1, "identifier1"));
+		assert_eq!(1, rpc_connections.data.lock().get(&1).unwrap().num_identifiers);
+		drop(registered);
+
+		// Data is dropped.
+		assert!(rpc_connections.data.lock().get(&1).is_none());
+		assert!(rpc_connections.data.lock().is_empty());
+		// Checks can still happen.
+		assert!(!rpc_connections.contains_identifier(1, "identifier1"));
+	}
+
+	#[test]
+	fn reserve_space_capacity_reached() {
+		let rpc_connections = RpcConnections::new(2);
+
+		// Reserve identifier for connection 1.
+		let reserved = rpc_connections.reserve_space(1);
+		assert!(reserved.is_some());
+		assert_eq!(1, rpc_connections.data.lock().get(&1).unwrap().num_identifiers);
+
+		// Add identifier for connection 1.
+		let reserved = reserved.unwrap();
+		let registered = reserved.register("identifier1".to_string()).unwrap();
+		assert!(rpc_connections.contains_identifier(1, "identifier1"));
+		assert_eq!(1, rpc_connections.data.lock().get(&1).unwrap().num_identifiers);
+
+		// Reserve identifier for connection 1 again.
+		let reserved = rpc_connections.reserve_space(1);
+		assert!(reserved.is_some());
+		assert_eq!(2, rpc_connections.data.lock().get(&1).unwrap().num_identifiers);
+
+		// Add identifier for connection 1 again.
+		let reserved = reserved.unwrap();
+		let registered_second = reserved.register("identifier2".to_string()).unwrap();
+		assert!(rpc_connections.contains_identifier(1, "identifier2"));
+		assert_eq!(2, rpc_connections.data.lock().get(&1).unwrap().num_identifiers);
+
+		// Cannot reserve more identifiers.
+		let reserved = rpc_connections.reserve_space(1);
+		assert!(reserved.is_none());
+
+		// Drop the first identifier.
+		drop(registered);
+		assert_eq!(1, rpc_connections.data.lock().get(&1).unwrap().num_identifiers);
+		assert!(rpc_connections.contains_identifier(1, "identifier2"));
+		assert!(!rpc_connections.contains_identifier(1, "identifier1"));
+
+		// Can reserve again after clearing the space.
+		let reserved = rpc_connections.reserve_space(1);
+		assert!(reserved.is_some());
+		assert_eq!(2, rpc_connections.data.lock().get(&1).unwrap().num_identifiers);
+
+		// Ensure data is cleared.
+		drop(reserved);
+		drop(registered_second);
+		assert!(rpc_connections.data.lock().get(&1).is_none());
+	}
+}
diff --git a/substrate/client/rpc-spec-v2/src/common/mod.rs b/substrate/client/rpc-spec-v2/src/common/mod.rs
index ac1af8fce3c9bc42a00fa747f62289aef0e04356..3167561d649a2f51f24b864ffa581df16b139202 100644
--- a/substrate/client/rpc-spec-v2/src/common/mod.rs
+++ b/substrate/client/rpc-spec-v2/src/common/mod.rs
@@ -13,5 +13,6 @@
 
 //! Common types and functionality for the RPC-V2 spec.
 
+pub mod connections;
 pub mod events;
 pub mod storage;