From 4849b6e865a11a6335a278e3bcbb0795a3ed993e Mon Sep 17 00:00:00 2001
From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
Date: Tue, 8 Aug 2023 21:13:52 +0300
Subject: [PATCH] chainHead: Produce method responses on `chainHead_follow`
 (#14692)

* chainHead/api: Make storage/body/call pure RPC methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Add mpsc channel between RPC methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/subscriptions: Extract mpsc::Sender via BlockGuard

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/subscriptions: Generate and provide the method operation ID

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Generate `chainHead_body` response

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Generate `chainHead_call` response

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Generate `chainHead_storage` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Propagate responses of methods to chainHead_follow

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust `chainHead_body` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust `chainHead_call` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust `chainHead_call` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Ensure unique operation IDs across methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/events: Remove old method events

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Return `InvalidBlock` error if pinning fails

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Wrap subscription IDs

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Ensure separate operation IDs across subscriptions

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: parity-processbot <>
---
 substrate/client/rpc-spec-v2/Cargo.toml       |   2 +-
 .../client/rpc-spec-v2/src/chain_head/api.rs  |  30 +-
 .../rpc-spec-v2/src/chain_head/chain_head.rs  | 225 ++++---
 .../src/chain_head/chain_head_follow.rs       |  14 +-
 .../src/chain_head/chain_head_storage.rs      |  78 ++-
 .../rpc-spec-v2/src/chain_head/event.rs       | 168 ------
 .../client/rpc-spec-v2/src/chain_head/mod.rs  |   4 +-
 .../src/chain_head/subscription/inner.rs      |  84 ++-
 .../src/chain_head/subscription/mod.rs        |   7 +-
 .../rpc-spec-v2/src/chain_head/tests.rs       | 547 +++++++++++++-----
 10 files changed, 664 insertions(+), 495 deletions(-)

diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml
index 4f5c11212a9..b1ab2a87997 100644
--- a/substrate/client/rpc-spec-v2/Cargo.toml
+++ b/substrate/client/rpc-spec-v2/Cargo.toml
@@ -24,6 +24,7 @@ sp-api = { version = "4.0.0-dev", path = "../../primitives/api" }
 sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" }
 sp-version = { version = "22.0.0", path = "../../primitives/version" }
 sc-client-api = { version = "4.0.0-dev", path = "../api" }
+sc-utils = { version = "4.0.0-dev", path = "../utils" }
 codec = { package = "parity-scale-codec", version = "3.6.1" }
 thiserror = "1.0"
 serde = "1.0"
@@ -44,6 +45,5 @@ sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/comm
 sp-maybe-compressed-blob = { version = "4.1.0-dev", path = "../../primitives/maybe-compressed-blob" }
 sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" }
 sc-service = { version = "0.10.0-dev", features = ["test-helpers"], path = "../service" }
-sc-utils = { version = "4.0.0-dev", path = "../utils" }
 assert_matches = "1.3.0"
 pretty_assertions = "1.2.1"
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/api.rs b/substrate/client/rpc-spec-v2/src/chain_head/api.rs
index 8905e036877..c002b75efe0 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/api.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/api.rs
@@ -19,7 +19,7 @@
 #![allow(non_snake_case)]
 
 //! API trait of the chain head.
-use crate::chain_head::event::{ChainHeadEvent, FollowEvent, StorageQuery};
+use crate::chain_head::event::{FollowEvent, MethodResponse, StorageQuery};
 use jsonrpsee::{core::RpcResult, proc_macros::rpc};
 
 #[rpc(client, server)]
@@ -47,12 +47,12 @@ pub trait ChainHeadApi<Hash> {
 	/// # Unstable
 	///
 	/// This method is unstable and subject to change in the future.
-	#[subscription(
-		name = "chainHead_unstable_body",
-		unsubscribe = "chainHead_unstable_stopBody",
-		item = ChainHeadEvent<String>,
-	)]
-	fn chain_head_unstable_body(&self, follow_subscription: String, hash: Hash);
+	#[method(name = "chainHead_unstable_body", blocking)]
+	fn chain_head_unstable_body(
+		&self,
+		follow_subscription: String,
+		hash: Hash,
+	) -> RpcResult<MethodResponse>;
 
 	/// Retrieves the header of a pinned block.
 	///
@@ -86,36 +86,28 @@ pub trait ChainHeadApi<Hash> {
 	/// # Unstable
 	///
 	/// This method is unstable and subject to change in the future.
-	#[subscription(
-		name = "chainHead_unstable_storage",
-		unsubscribe = "chainHead_unstable_stopStorage",
-		item = ChainHeadEvent<String>,
-	)]
+	#[method(name = "chainHead_unstable_storage", blocking)]
 	fn chain_head_unstable_storage(
 		&self,
 		follow_subscription: String,
 		hash: Hash,
 		items: Vec<StorageQuery<String>>,
 		child_trie: Option<String>,
-	);
+	) -> RpcResult<MethodResponse>;
 
 	/// Call into the Runtime API at a specified block's state.
 	///
 	/// # Unstable
 	///
 	/// This method is unstable and subject to change in the future.
-	#[subscription(
-		name = "chainHead_unstable_call",
-		unsubscribe = "chainHead_unstable_stopCall",
-		item = ChainHeadEvent<String>,
-	)]
+	#[method(name = "chainHead_unstable_call", blocking)]
 	fn chain_head_unstable_call(
 		&self,
 		follow_subscription: String,
 		hash: Hash,
 		function: String,
 		call_parameters: String,
-	);
+	) -> RpcResult<MethodResponse>;
 
 	/// Unpin a block reported by the `follow` method.
 	///
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
index a2c9afc0349..16881b05fd7 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
@@ -18,12 +18,16 @@
 
 //! API implementation for `chainHead`.
 
+use super::{
+	chain_head_storage::ChainHeadStorage,
+	event::{MethodResponseStarted, OperationBodyDone, OperationCallDone},
+};
 use crate::{
 	chain_head::{
 		api::ChainHeadApiServer,
 		chain_head_follow::ChainHeadFollower,
 		error::Error as ChainHeadRpcError,
-		event::{ChainHeadEvent, ChainHeadResult, ErrorEvent, FollowEvent},
+		event::{FollowEvent, MethodResponse, OperationError, StorageQuery, StorageQueryType},
 		hex_string,
 		subscription::{SubscriptionManagement, SubscriptionManagementError},
 	},
@@ -47,11 +51,6 @@ use sp_core::{traits::CallContext, Bytes};
 use sp_runtime::traits::Block as BlockT;
 use std::{marker::PhantomData, sync::Arc, time::Duration};
 
-use super::{
-	chain_head_storage::ChainHeadStorage,
-	event::{ChainHeadStorageEvent, StorageQuery, StorageQueryType},
-};
-
 pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
 
 /// An API for chain head RPC calls.
@@ -81,7 +80,6 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
 		max_pinned_duration: Duration,
 	) -> Self {
 		let genesis_hash = hex_string(&genesis_hash.as_ref());
-
 		Self {
 			client,
 			backend: backend.clone(),
@@ -121,11 +119,8 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
 
 /// Parse hex-encoded string parameter as raw bytes.
 ///
-/// If the parsing fails, the subscription is rejected.
-fn parse_hex_param(
-	sink: &mut SubscriptionSink,
-	param: String,
-) -> Result<Vec<u8>, SubscriptionEmptyError> {
+/// If the parsing fails, returns an error propagated to the RPC method.
+fn parse_hex_param(param: String) -> Result<Vec<u8>, ChainHeadRpcError> {
 	// Methods can accept empty parameters.
 	if param.is_empty() {
 		return Ok(Default::default())
@@ -133,10 +128,7 @@ fn parse_hex_param(
 
 	match array_bytes::hex2bytes(&param) {
 		Ok(bytes) => Ok(bytes),
-		Err(_) => {
-			let _ = sink.reject(ChainHeadRpcError::InvalidParam(param));
-			Err(SubscriptionEmptyError)
-		},
+		Err(_) => Err(ChainHeadRpcError::InvalidParam(param)),
 	}
 }
 
@@ -168,7 +160,7 @@ where
 			},
 		};
 		// Keep track of the subscription.
-		let Some(rx_stop) = self.subscriptions.insert_subscription(sub_id.clone(), with_runtime)
+		let Some(sub_data) = self.subscriptions.insert_subscription(sub_id.clone(), with_runtime)
 		else {
 			// Inserting the subscription can only fail if the JsonRPSee
 			// generated a duplicate subscription ID.
@@ -190,7 +182,7 @@ where
 				sub_id.clone(),
 			);
 
-			chain_head_follow.generate_events(sink, rx_stop).await;
+			chain_head_follow.generate_events(sink, sub_data).await;
 
 			subscriptions.remove_subscription(&sub_id);
 			debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id);
@@ -202,59 +194,57 @@ where
 
 	fn chain_head_unstable_body(
 		&self,
-		mut sink: SubscriptionSink,
 		follow_subscription: String,
 		hash: Block::Hash,
-	) -> SubscriptionResult {
-		let client = self.client.clone();
-		let subscriptions = self.subscriptions.clone();
-
-		let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
+	) -> RpcResult<MethodResponse> {
+		let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
 			Ok(block) => block,
 			Err(SubscriptionManagementError::SubscriptionAbsent) => {
 				// Invalid invalid subscription ID.
-				let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
-				return Ok(())
+				return Ok(MethodResponse::LimitReached)
 			},
 			Err(SubscriptionManagementError::BlockHashAbsent) => {
 				// Block is not part of the subscription.
-				let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
-				return Ok(())
-			},
-			Err(error) => {
-				let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
-					error: error.to_string(),
-				}));
-				return Ok(())
+				return Err(ChainHeadRpcError::InvalidBlock.into())
 			},
+			Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
 		};
 
-		let fut = async move {
-			let _block_guard = block_guard;
-			let event = match client.block(hash) {
-				Ok(Some(signed_block)) => {
-					let extrinsics = signed_block.block.extrinsics();
-					let result = hex_string(&extrinsics.encode());
-					ChainHeadEvent::Done(ChainHeadResult { result })
-				},
-				Ok(None) => {
-					// The block's body was pruned. This subscription ID has become invalid.
-					debug!(
-						target: LOG_TARGET,
-						"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
-						&follow_subscription,
-						hash
-					);
-					subscriptions.remove_subscription(&follow_subscription);
-					ChainHeadEvent::<String>::Disjoint
-				},
-				Err(error) => ChainHeadEvent::Error(ErrorEvent { error: error.to_string() }),
-			};
-			let _ = sink.send(&event);
+		let event = match self.client.block(hash) {
+			Ok(Some(signed_block)) => {
+				let extrinsics = signed_block
+					.block
+					.extrinsics()
+					.iter()
+					.map(|extrinsic| hex_string(&extrinsic.encode()))
+					.collect();
+				FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
+					operation_id: block_guard.operation_id(),
+					value: extrinsics,
+				})
+			},
+			Ok(None) => {
+				// The block's body was pruned. This subscription ID has become invalid.
+				debug!(
+					target: LOG_TARGET,
+					"[body][id={:?}] Stopping subscription because hash={:?} was pruned",
+					&follow_subscription,
+					hash
+				);
+				self.subscriptions.remove_subscription(&follow_subscription);
+				return Err(ChainHeadRpcError::InvalidBlock.into())
+			},
+			Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
+				operation_id: block_guard.operation_id(),
+				error: error.to_string(),
+			}),
 		};
 
-		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
-		Ok(())
+		let _ = block_guard.response_sender().unbounded_send(event);
+		Ok(MethodResponse::Started(MethodResponseStarted {
+			operation_id: block_guard.operation_id(),
+			discarded_items: None,
+		}))
 	}
 
 	fn chain_head_unstable_header(
@@ -288,128 +278,113 @@ where
 
 	fn chain_head_unstable_storage(
 		&self,
-		mut sink: SubscriptionSink,
 		follow_subscription: String,
 		hash: Block::Hash,
 		items: Vec<StorageQuery<String>>,
 		child_trie: Option<String>,
-	) -> SubscriptionResult {
+	) -> RpcResult<MethodResponse> {
 		// Gain control over parameter parsing and returned error.
 		let items = items
 			.into_iter()
 			.map(|query| {
 				if query.query_type == StorageQueryType::ClosestDescendantMerkleValue {
 					// Note: remove this once all types are implemented.
-					let _ = sink.reject(ChainHeadRpcError::InvalidParam(
+					return Err(ChainHeadRpcError::InvalidParam(
 						"Storage query type not supported".into(),
-					));
-					return Err(SubscriptionEmptyError)
+					))
 				}
 
 				Ok(StorageQuery {
-					key: StorageKey(parse_hex_param(&mut sink, query.key)?),
+					key: StorageKey(parse_hex_param(query.key)?),
 					query_type: query.query_type,
 				})
 			})
 			.collect::<Result<Vec<_>, _>>()?;
 
 		let child_trie = child_trie
-			.map(|child_trie| parse_hex_param(&mut sink, child_trie))
+			.map(|child_trie| parse_hex_param(child_trie))
 			.transpose()?
 			.map(ChildInfo::new_default_from_vec);
 
-		let client = self.client.clone();
-		let subscriptions = self.subscriptions.clone();
-
-		let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
+		let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
 			Ok(block) => block,
 			Err(SubscriptionManagementError::SubscriptionAbsent) => {
 				// Invalid invalid subscription ID.
-				let _ = sink.send(&ChainHeadStorageEvent::Disjoint);
-				return Ok(())
+				return Ok(MethodResponse::LimitReached)
 			},
 			Err(SubscriptionManagementError::BlockHashAbsent) => {
 				// Block is not part of the subscription.
-				let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
-				return Ok(())
-			},
-			Err(error) => {
-				let _ = sink
-					.send(&ChainHeadStorageEvent::Error(ErrorEvent { error: error.to_string() }));
-				return Ok(())
+				return Err(ChainHeadRpcError::InvalidBlock.into())
 			},
+			Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
 		};
 
-		let storage_client = ChainHeadStorage::<Client, Block, BE>::new(client);
-
+		let storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
+		let operation_id = block_guard.operation_id();
 		let fut = async move {
-			let _block_guard = block_guard;
-
-			storage_client.generate_events(sink, hash, items, child_trie);
+			storage_client.generate_events(block_guard, hash, items, child_trie);
 		};
 
-		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
-		Ok(())
+		self.executor
+			.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
+		Ok(MethodResponse::Started(MethodResponseStarted {
+			operation_id,
+			discarded_items: Some(0),
+		}))
 	}
 
 	fn chain_head_unstable_call(
 		&self,
-		mut sink: SubscriptionSink,
 		follow_subscription: String,
 		hash: Block::Hash,
 		function: String,
 		call_parameters: String,
-	) -> SubscriptionResult {
-		let call_parameters = Bytes::from(parse_hex_param(&mut sink, call_parameters)?);
+	) -> RpcResult<MethodResponse> {
+		let call_parameters = Bytes::from(parse_hex_param(call_parameters)?);
 
-		let client = self.client.clone();
-		let subscriptions = self.subscriptions.clone();
-
-		let block_guard = match subscriptions.lock_block(&follow_subscription, hash) {
+		let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
 			Ok(block) => block,
 			Err(SubscriptionManagementError::SubscriptionAbsent) => {
 				// Invalid invalid subscription ID.
-				let _ = sink.send(&ChainHeadEvent::<String>::Disjoint);
-				return Ok(())
+				return Ok(MethodResponse::LimitReached)
 			},
 			Err(SubscriptionManagementError::BlockHashAbsent) => {
 				// Block is not part of the subscription.
-				let _ = sink.reject(ChainHeadRpcError::InvalidBlock);
-				return Ok(())
-			},
-			Err(error) => {
-				let _ = sink.send(&ChainHeadEvent::<String>::Error(ErrorEvent {
-					error: error.to_string(),
-				}));
-				return Ok(())
+				return Err(ChainHeadRpcError::InvalidBlock.into())
 			},
+			Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
 		};
 
-		let fut = async move {
-			// Reject subscription if with_runtime is false.
-			if !block_guard.has_runtime() {
-				let _ = sink.reject(ChainHeadRpcError::InvalidParam(
-					"The runtime updates flag must be set".into(),
-				));
-				return
-			}
-
-			let res = client
-				.executor()
-				.call(hash, &function, &call_parameters, CallContext::Offchain)
-				.map(|result| {
-					let result = hex_string(&result);
-					ChainHeadEvent::Done(ChainHeadResult { result })
-				})
-				.unwrap_or_else(|error| {
-					ChainHeadEvent::Error(ErrorEvent { error: error.to_string() })
-				});
+		// Reject subscription if with_runtime is false.
+		if !block_guard.has_runtime() {
+			return Err(ChainHeadRpcError::InvalidParam(
+				"The runtime updates flag must be set".to_string(),
+			)
+			.into())
+		}
 
-			let _ = sink.send(&res);
-		};
+		let event = self
+			.client
+			.executor()
+			.call(hash, &function, &call_parameters, CallContext::Offchain)
+			.map(|result| {
+				FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
+					operation_id: block_guard.operation_id(),
+					output: hex_string(&result),
+				})
+			})
+			.unwrap_or_else(|error| {
+				FollowEvent::<Block::Hash>::OperationError(OperationError {
+					operation_id: block_guard.operation_id(),
+					error: error.to_string(),
+				})
+			});
 
-		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
-		Ok(())
+		let _ = block_guard.response_sender().unbounded_send(event);
+		Ok(MethodResponse::Started(MethodResponseStarted {
+			operation_id: block_guard.operation_id(),
+			discarded_items: None,
+		}))
 	}
 
 	fn chain_head_unstable_unpin(
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
index 799978be532..0fa995ce73a 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
@@ -24,7 +24,7 @@ use crate::chain_head::{
 		BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
 		RuntimeVersionEvent,
 	},
-	subscription::{SubscriptionManagement, SubscriptionManagementError},
+	subscription::{InsertedSubscriptionData, SubscriptionManagement, SubscriptionManagementError},
 };
 use futures::{
 	channel::oneshot,
@@ -80,6 +80,8 @@ enum NotificationType<Block: BlockT> {
 	NewBlock(BlockImportNotification<Block>),
 	/// The finalized block notification obtained from `finality_notification_stream`.
 	Finalized(FinalityNotification<Block>),
+	/// The response of `chainHead` method calls.
+	MethodResponse(FollowEvent<Block::Hash>),
 }
 
 /// The initial blocks that should be reported or ignored by the chainHead.
@@ -515,6 +517,7 @@ where
 					self.handle_import_blocks(notification, &startup_point),
 				NotificationType::Finalized(notification) =>
 					self.handle_finalized_blocks(notification, &mut to_ignore, &startup_point),
+				NotificationType::MethodResponse(notification) => Ok(vec![notification]),
 			};
 
 			let events = match events {
@@ -572,7 +575,7 @@ where
 	pub async fn generate_events(
 		&mut self,
 		mut sink: SubscriptionSink,
-		rx_stop: oneshot::Receiver<()>,
+		sub_data: InsertedSubscriptionData<Block>,
 	) {
 		// Register for the new block and finalized notifications.
 		let stream_import = self
@@ -585,6 +588,10 @@ where
 			.finality_notification_stream()
 			.map(|notification| NotificationType::Finalized(notification));
 
+		let stream_responses = sub_data
+			.response_receiver
+			.map(|response| NotificationType::MethodResponse(response));
+
 		let startup_point = StartupPoint::from(self.client.info());
 		let (initial_events, pruned_forks) = match self.generate_init_events(&startup_point) {
 			Ok(blocks) => blocks,
@@ -602,9 +609,10 @@ where
 
 		let initial = NotificationType::InitialEvents(initial_events);
 		let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized);
+		let merged = tokio_stream::StreamExt::merge(merged, stream_responses);
 		let stream = stream::once(futures::future::ready(initial)).chain(merged);
 
-		self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, rx_stop)
+		self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop)
 			.await;
 	}
 }
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs
index df1600628de..393e4489c8c 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs
@@ -20,17 +20,21 @@
 
 use std::{marker::PhantomData, sync::Arc};
 
-use jsonrpsee::SubscriptionSink;
 use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
+use sc_utils::mpsc::TracingUnboundedSender;
 use sp_api::BlockT;
 use sp_core::storage::well_known_keys;
 
+use crate::chain_head::event::OperationStorageItems;
+
 use super::{
 	event::{
-		ChainHeadStorageEvent, ItemsEvent, StorageQuery, StorageQueryType, StorageResult,
+		OperationError, OperationId, StorageQuery, StorageQueryType, StorageResult,
 		StorageResultType,
 	},
-	hex_string, ErrorEvent,
+	hex_string,
+	subscription::BlockGuard,
+	FollowEvent,
 };
 
 /// The maximum number of items the `chainHead_storage` can return
@@ -70,10 +74,10 @@ fn is_key_queryable(key: &[u8]) -> bool {
 }
 
 /// The result of making a query call.
-type QueryResult = Result<Option<StorageResult>, ChainHeadStorageEvent>;
+type QueryResult = Result<Option<StorageResult>, String>;
 
 /// The result of iterating over keys.
-type QueryIterResult = Result<Vec<StorageResult>, ChainHeadStorageEvent>;
+type QueryIterResult = Result<Vec<StorageResult>, String>;
 
 impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE>
 where
@@ -101,11 +105,7 @@ where
 					result: StorageResultType::Value(hex_string(&storage_data.0)),
 				}))
 			})
-			.unwrap_or_else(|err| {
-				QueryResult::Err(ChainHeadStorageEvent::Error(ErrorEvent {
-					error: err.to_string(),
-				}))
-			})
+			.unwrap_or_else(|error| QueryResult::Err(error.to_string()))
 	}
 
 	/// Fetch the hash of a value from storage.
@@ -128,11 +128,7 @@ where
 					result: StorageResultType::Hash(hex_string(&storage_data.as_ref())),
 				}))
 			})
-			.unwrap_or_else(|err| {
-				QueryResult::Err(ChainHeadStorageEvent::Error(ErrorEvent {
-					error: err.to_string(),
-				}))
-			})
+			.unwrap_or_else(|error| QueryResult::Err(error.to_string()))
 	}
 
 	/// Handle iterating over (key, value) or (key, hash) pairs.
@@ -148,7 +144,7 @@ where
 		} else {
 			self.client.storage_keys(hash, Some(key), None)
 		}
-		.map_err(|err| ChainHeadStorageEvent::Error(ErrorEvent { error: err.to_string() }))?;
+		.map_err(|error| error.to_string())?;
 
 		let mut ret = Vec::with_capacity(MAX_ITER_ITEMS);
 		let mut keys_iter = keys_iter.take(MAX_ITER_ITEMS);
@@ -169,14 +165,31 @@ where
 	/// Generate the block events for the `chainHead_storage` method.
 	pub fn generate_events(
 		&self,
-		mut sink: SubscriptionSink,
+		block_guard: BlockGuard<Block, BE>,
 		hash: Block::Hash,
 		items: Vec<StorageQuery<StorageKey>>,
 		child_key: Option<ChildInfo>,
 	) {
+		/// Build and send the opaque error back to the `chainHead_follow` method.
+		fn send_error<Block: BlockT>(
+			sender: &TracingUnboundedSender<FollowEvent<Block::Hash>>,
+			operation_id: String,
+			error: String,
+		) {
+			let _ =
+				sender.unbounded_send(FollowEvent::<Block::Hash>::OperationError(OperationError {
+					operation_id,
+					error,
+				}));
+		}
+
+		let sender = block_guard.response_sender();
+
 		if let Some(child_key) = child_key.as_ref() {
 			if !is_key_queryable(child_key.storage_key()) {
-				let _ = sink.send(&ChainHeadStorageEvent::Done);
+				let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageDone(
+					OperationId { operation_id: block_guard.operation_id() },
+				));
 				return
 			}
 		}
@@ -192,8 +205,8 @@ where
 					match self.query_storage_value(hash, &item.key, child_key.as_ref()) {
 						Ok(Some(value)) => storage_results.push(value),
 						Ok(None) => continue,
-						Err(err) => {
-							let _ = sink.send(&err);
+						Err(error) => {
+							send_error::<Block>(&sender, block_guard.operation_id(), error);
 							return
 						},
 					}
@@ -202,8 +215,8 @@ where
 					match self.query_storage_hash(hash, &item.key, child_key.as_ref()) {
 						Ok(Some(value)) => storage_results.push(value),
 						Ok(None) => continue,
-						Err(err) => {
-							let _ = sink.send(&err);
+						Err(error) => {
+							send_error::<Block>(&sender, block_guard.operation_id(), error);
 							return
 						},
 					},
@@ -214,8 +227,8 @@ where
 					IterQueryType::Value,
 				) {
 					Ok(values) => storage_results.extend(values),
-					Err(err) => {
-						let _ = sink.send(&err);
+					Err(error) => {
+						send_error::<Block>(&sender, block_guard.operation_id(), error);
 						return
 					},
 				},
@@ -226,8 +239,8 @@ where
 					IterQueryType::Hash,
 				) {
 					Ok(values) => storage_results.extend(values),
-					Err(err) => {
-						let _ = sink.send(&err);
+					Err(error) => {
+						send_error::<Block>(&sender, block_guard.operation_id(), error);
 						return
 					},
 				},
@@ -236,10 +249,17 @@ where
 		}
 
 		if !storage_results.is_empty() {
-			let event = ChainHeadStorageEvent::Items(ItemsEvent { items: storage_results });
-			let _ = sink.send(&event);
+			let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageItems(
+				OperationStorageItems {
+					operation_id: block_guard.operation_id(),
+					items: storage_results,
+				},
+			));
 		}
 
-		let _ = sink.send(&ChainHeadStorageEvent::Done);
+		let _ =
+			sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageDone(OperationId {
+				operation_id: block_guard.operation_id(),
+			}));
 	}
 }
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/event.rs b/substrate/client/rpc-spec-v2/src/chain_head/event.rs
index 971a0a9f46b..65bc8b247c8 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/event.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/event.rs
@@ -276,31 +276,6 @@ pub enum FollowEvent<Hash> {
 	Stop,
 }
 
-/// The result of a chain head method.
-#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct ChainHeadResult<T> {
-	/// Result of the method.
-	pub result: T,
-}
-
-/// The event generated by the body / call / storage methods.
-#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-#[serde(tag = "event")]
-pub enum ChainHeadEvent<T> {
-	/// The request completed successfully.
-	Done(ChainHeadResult<T>),
-	/// The resources requested are inaccessible.
-	///
-	/// Resubmitting the request later might succeed.
-	Inaccessible(ErrorEvent),
-	/// An error occurred. This is definitive.
-	Error(ErrorEvent),
-	/// The provided subscription ID is stale or invalid.
-	Disjoint,
-}
-
 /// The storage item received as paramter.
 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
@@ -351,35 +326,6 @@ pub enum StorageResultType {
 	ClosestDescendantMerkleValue(String),
 }
 
-/// The event generated by storage method.
-#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
-#[serde(rename_all = "kebab-case")]
-#[serde(tag = "event")]
-pub enum ChainHeadStorageEvent {
-	/// The request produced multiple result items.
-	Items(ItemsEvent),
-	/// The request produced multiple result items.
-	WaitForContinue,
-	/// The request completed successfully and all the results were provided.
-	Done,
-	/// The resources requested are inaccessible.
-	///
-	/// Resubmitting the request later might succeed.
-	Inaccessible,
-	/// An error occurred. This is definitive.
-	Error(ErrorEvent),
-	/// The provided subscription ID is stale or invalid.
-	Disjoint,
-}
-
-/// The request produced multiple result items.
-#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct ItemsEvent {
-	/// The resulting items.
-	pub items: Vec<StorageResult>,
-}
-
 /// The method respose of `chainHead_body`, `chainHead_call` and `chainHead_storage`.
 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
 #[serde(rename_all = "camelCase")]
@@ -714,56 +660,6 @@ mod tests {
 		assert_eq!(event_dec, event);
 	}
 
-	#[test]
-	fn chain_head_done_event() {
-		let event: ChainHeadEvent<String> =
-			ChainHeadEvent::Done(ChainHeadResult { result: "A".into() });
-
-		let ser = serde_json::to_string(&event).unwrap();
-		let exp = r#"{"event":"done","result":"A"}"#;
-		assert_eq!(ser, exp);
-
-		let event_dec: ChainHeadEvent<String> = serde_json::from_str(exp).unwrap();
-		assert_eq!(event_dec, event);
-	}
-
-	#[test]
-	fn chain_head_inaccessible_event() {
-		let event: ChainHeadEvent<String> =
-			ChainHeadEvent::Inaccessible(ErrorEvent { error: "A".into() });
-
-		let ser = serde_json::to_string(&event).unwrap();
-		let exp = r#"{"event":"inaccessible","error":"A"}"#;
-		assert_eq!(ser, exp);
-
-		let event_dec: ChainHeadEvent<String> = serde_json::from_str(exp).unwrap();
-		assert_eq!(event_dec, event);
-	}
-
-	#[test]
-	fn chain_head_error_event() {
-		let event: ChainHeadEvent<String> = ChainHeadEvent::Error(ErrorEvent { error: "A".into() });
-
-		let ser = serde_json::to_string(&event).unwrap();
-		let exp = r#"{"event":"error","error":"A"}"#;
-		assert_eq!(ser, exp);
-
-		let event_dec: ChainHeadEvent<String> = serde_json::from_str(exp).unwrap();
-		assert_eq!(event_dec, event);
-	}
-
-	#[test]
-	fn chain_head_disjoint_event() {
-		let event: ChainHeadEvent<String> = ChainHeadEvent::Disjoint;
-
-		let ser = serde_json::to_string(&event).unwrap();
-		let exp = r#"{"event":"disjoint"}"#;
-		assert_eq!(ser, exp);
-
-		let event_dec: ChainHeadEvent<String> = serde_json::from_str(exp).unwrap();
-		assert_eq!(event_dec, event);
-	}
-
 	#[test]
 	fn chain_head_storage_query() {
 		// Item with Value.
@@ -855,68 +751,4 @@ mod tests {
 		let dec: StorageResult = serde_json::from_str(exp).unwrap();
 		assert_eq!(dec, item);
 	}
-
-	#[test]
-	fn chain_head_storage_event() {
-		// Event with Items.
-		let event = ChainHeadStorageEvent::Items(ItemsEvent {
-			items: vec![
-				StorageResult {
-					key: "0x1".into(),
-					result: StorageResultType::Value("first".into()),
-				},
-				StorageResult {
-					key: "0x2".into(),
-					result: StorageResultType::Hash("second".into()),
-				},
-			],
-		});
-		// Encode
-		let ser = serde_json::to_string(&event).unwrap();
-		let exp = r#"{"event":"items","items":[{"key":"0x1","value":"first"},{"key":"0x2","hash":"second"}]}"#;
-		assert_eq!(ser, exp);
-		// Decode
-		let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap();
-		assert_eq!(dec, event);
-
-		// Event with WaitForContinue.
-		let event = ChainHeadStorageEvent::WaitForContinue;
-		// Encode
-		let ser = serde_json::to_string(&event).unwrap();
-		let exp = r#"{"event":"wait-for-continue"}"#;
-		assert_eq!(ser, exp);
-		// Decode
-		let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap();
-		assert_eq!(dec, event);
-
-		// Event with Done.
-		let event = ChainHeadStorageEvent::Done;
-		// Encode
-		let ser = serde_json::to_string(&event).unwrap();
-		let exp = r#"{"event":"done"}"#;
-		assert_eq!(ser, exp);
-		// Decode
-		let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap();
-		assert_eq!(dec, event);
-
-		// Event with Inaccessible.
-		let event = ChainHeadStorageEvent::Inaccessible;
-		// Encode
-		let ser = serde_json::to_string(&event).unwrap();
-		let exp = r#"{"event":"inaccessible"}"#;
-		assert_eq!(ser, exp);
-		// Decode
-		let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap();
-		assert_eq!(dec, event);
-
-		// Event with Inaccessible.
-		let event = ChainHeadStorageEvent::Error(ErrorEvent { error: "reason".into() });
-		// Encode
-		let ser = serde_json::to_string(&event).unwrap();
-		let exp = r#"{"event":"error","error":"reason"}"#;
-		assert_eq!(ser, exp);
-		// Decode
-		let dec: ChainHeadStorageEvent = serde_json::from_str(exp).unwrap();
-		assert_eq!(dec, event);
-	}
 }
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs
index 604f565ce75..f0fa898f9f7 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs
@@ -39,8 +39,8 @@ mod subscription;
 pub use api::ChainHeadApiServer;
 pub use chain_head::ChainHead;
 pub use event::{
-	BestBlockChanged, ChainHeadEvent, ChainHeadResult, ErrorEvent, Finalized, FollowEvent,
-	Initialized, NewBlock, RuntimeEvent, RuntimeVersionEvent,
+	BestBlockChanged, ErrorEvent, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
+	RuntimeVersionEvent,
 };
 
 use sp_core::hexdisplay::{AsBytesRef, HexDisplay};
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
index be8b8f46a28..c0c2701c5e1 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
@@ -18,6 +18,7 @@
 
 use futures::channel::oneshot;
 use sc_client_api::Backend;
+use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
 use sp_runtime::traits::Block as BlockT;
 use std::{
 	collections::{hash_map::Entry, HashMap},
@@ -25,7 +26,10 @@ use std::{
 	time::{Duration, Instant},
 };
 
-use crate::chain_head::subscription::SubscriptionManagementError;
+use crate::chain_head::{subscription::SubscriptionManagementError, FollowEvent};
+
+/// The queue size after which the `sc_utils::mpsc::tracing_unbounded` would produce warnings.
+const QUEUE_SIZE_WARNING: usize = 512;
 
 /// The state machine of a block of a single subscription ID.
 ///
@@ -116,6 +120,12 @@ struct SubscriptionState<Block: BlockT> {
 	with_runtime: bool,
 	/// Signals the "Stop" event.
 	tx_stop: Option<oneshot::Sender<()>>,
+	/// The sender of message responses to the `chainHead_follow` events.
+	///
+	/// This object is cloned between methods.
+	response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
+	/// The next operation ID.
+	next_operation_id: usize,
 	/// Track the block hashes available for this subscription.
 	///
 	/// This implementation assumes:
@@ -227,6 +237,13 @@ impl<Block: BlockT> SubscriptionState<Block> {
 		}
 		timestamp
 	}
+
+	/// Generate the next operation ID for this subscription.
+	fn next_operation_id(&mut self) -> usize {
+		let op_id = self.next_operation_id;
+		self.next_operation_id = self.next_operation_id.wrapping_add(1);
+		op_id
+	}
 }
 
 /// Keeps a specific block pinned while the handle is alive.
@@ -235,6 +252,8 @@ impl<Block: BlockT> SubscriptionState<Block> {
 pub struct BlockGuard<Block: BlockT, BE: Backend<Block>> {
 	hash: Block::Hash,
 	with_runtime: bool,
+	response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
+	operation_id: String,
 	backend: Arc<BE>,
 }
 
@@ -251,19 +270,37 @@ impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
 	fn new(
 		hash: Block::Hash,
 		with_runtime: bool,
+		response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
+		operation_id: usize,
 		backend: Arc<BE>,
 	) -> Result<Self, SubscriptionManagementError> {
 		backend
 			.pin_block(hash)
 			.map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;
 
-		Ok(Self { hash, with_runtime, backend })
+		Ok(Self {
+			hash,
+			with_runtime,
+			response_sender,
+			operation_id: operation_id.to_string(),
+			backend,
+		})
 	}
 
 	/// The `with_runtime` flag of the subscription.
 	pub fn has_runtime(&self) -> bool {
 		self.with_runtime
 	}
+
+	/// Send message responses from the `chainHead` methods to `chainHead_follow`.
+	pub fn response_sender(&self) -> TracingUnboundedSender<FollowEvent<Block::Hash>> {
+		self.response_sender.clone()
+	}
+
+	/// The operation ID of this method.
+	pub fn operation_id(&self) -> String {
+		self.operation_id.clone()
+	}
 }
 
 impl<Block: BlockT, BE: Backend<Block>> Drop for BlockGuard<Block, BE> {
@@ -272,6 +309,15 @@ impl<Block: BlockT, BE: Backend<Block>> Drop for BlockGuard<Block, BE> {
 	}
 }
 
+/// The data propagated back to the `chainHead_follow` method after
+/// the subscription is successfully inserted.
+pub struct InsertedSubscriptionData<Block: BlockT> {
+	/// Signal that the subscription must stop.
+	pub rx_stop: oneshot::Receiver<()>,
+	/// Receive message responses from the `chainHead` methods.
+	pub response_receiver: TracingUnboundedReceiver<FollowEvent<Block::Hash>>,
+}
+
 pub struct SubscriptionsInner<Block: BlockT, BE: Backend<Block>> {
 	/// Reference count the block hashes across all subscriptions.
 	///
@@ -311,16 +357,21 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
 		&mut self,
 		sub_id: String,
 		with_runtime: bool,
-	) -> Option<oneshot::Receiver<()>> {
+	) -> Option<InsertedSubscriptionData<Block>> {
 		if let Entry::Vacant(entry) = self.subs.entry(sub_id) {
 			let (tx_stop, rx_stop) = oneshot::channel();
+			let (response_sender, response_receiver) =
+				tracing_unbounded("chain-head-method-responses", QUEUE_SIZE_WARNING);
 			let state = SubscriptionState::<Block> {
 				with_runtime,
 				tx_stop: Some(tx_stop),
+				response_sender,
+				next_operation_id: 0,
 				blocks: Default::default(),
 			};
 			entry.insert(state);
-			Some(rx_stop)
+
+			Some(InsertedSubscriptionData { rx_stop, response_receiver })
 		} else {
 			None
 		}
@@ -491,7 +542,7 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
 		sub_id: &str,
 		hash: Block::Hash,
 	) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
-		let Some(sub) = self.subs.get(sub_id) else {
+		let Some(sub) = self.subs.get_mut(sub_id) else {
 			return Err(SubscriptionManagementError::SubscriptionAbsent)
 		};
 
@@ -499,7 +550,14 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
 			return Err(SubscriptionManagementError::BlockHashAbsent)
 		}
 
-		BlockGuard::new(hash, sub.with_runtime, self.backend.clone())
+		let operation_id = sub.next_operation_id();
+		BlockGuard::new(
+			hash,
+			sub.with_runtime,
+			sub.response_sender.clone(),
+			operation_id,
+			self.backend.clone(),
+		)
 	}
 }
 
@@ -604,9 +662,13 @@ mod tests {
 
 	#[test]
 	fn sub_state_register_twice() {
+		let (response_sender, _response_receiver) =
+			tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING);
 		let mut sub_state = SubscriptionState::<Block> {
 			with_runtime: false,
 			tx_stop: None,
+			response_sender,
+			next_operation_id: 0,
 			blocks: Default::default(),
 		};
 
@@ -629,9 +691,13 @@ mod tests {
 
 	#[test]
 	fn sub_state_register_unregister() {
+		let (response_sender, _response_receiver) =
+			tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING);
 		let mut sub_state = SubscriptionState::<Block> {
 			with_runtime: false,
 			tx_stop: None,
+			response_sender,
+			next_operation_id: 0,
 			blocks: Default::default(),
 		};
 
@@ -921,17 +987,17 @@ mod tests {
 
 		let id = "abc".to_string();
 
-		let mut rx_stop = subs.insert_subscription(id.clone(), true).unwrap();
+		let mut sub_data = subs.insert_subscription(id.clone(), true).unwrap();
 
 		// Check the stop signal was not received.
-		let res = rx_stop.try_recv().unwrap();
+		let res = sub_data.rx_stop.try_recv().unwrap();
 		assert!(res.is_none());
 
 		let sub = subs.subs.get_mut(&id).unwrap();
 		sub.stop();
 
 		// Check the signal was received.
-		let res = rx_stop.try_recv().unwrap();
+		let res = sub_data.rx_stop.try_recv().unwrap();
 		assert!(res.is_some());
 	}
 }
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
index 86e55acc4c1..3aece6575ef 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
@@ -16,7 +16,6 @@
 // You should have received a copy of the GNU General Public License
 // along with this program. If not, see <https://www.gnu.org/licenses/>.
 
-use futures::channel::oneshot;
 use parking_lot::RwLock;
 use sc_client_api::Backend;
 use sp_runtime::traits::Block as BlockT;
@@ -25,9 +24,9 @@ use std::{sync::Arc, time::Duration};
 mod error;
 mod inner;
 
+use self::inner::SubscriptionsInner;
 pub use error::SubscriptionManagementError;
-pub use inner::BlockGuard;
-use inner::SubscriptionsInner;
+pub use inner::{BlockGuard, InsertedSubscriptionData};
 
 /// Manage block pinning / unpinning for subscription IDs.
 pub struct SubscriptionManagement<Block: BlockT, BE: Backend<Block>> {
@@ -61,7 +60,7 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
 		&self,
 		sub_id: String,
 		runtime_updates: bool,
-	) -> Option<oneshot::Receiver<()>> {
+	) -> Option<InsertedSubscriptionData<Block>> {
 		let mut inner = self.inner.write();
 		inner.insert_subscription(sub_id, runtime_updates)
 	}
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
index fc2f1e85b42..6c3c343a10b 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
@@ -1,5 +1,5 @@
 use crate::chain_head::{
-	event::{ChainHeadStorageEvent, StorageQuery, StorageQueryType, StorageResultType},
+	event::{MethodResponse, StorageQuery, StorageQueryType, StorageResultType},
 	test_utils::ChainHeadMockClient,
 };
 
@@ -25,7 +25,7 @@ use sp_core::{
 	Blake2Hasher, Hasher,
 };
 use sp_version::RuntimeVersion;
-use std::{sync::Arc, time::Duration};
+use std::{collections::HashSet, sync::Arc, time::Duration};
 use substrate_test_runtime::Transfer;
 use substrate_test_runtime_client::{
 	prelude::*, runtime, runtime::RuntimeApi, Backend, BlockBuilderExt, Client,
@@ -330,29 +330,34 @@ async fn get_body() {
 	let block_hash = format!("{:?}", block.header.hash());
 	let invalid_hash = hex_string(&INVALID_HASH);
 
-	// Subscription ID is stale the disjoint event is emitted.
-	let mut sub = api
-		.subscribe("chainHead_unstable_body", ["invalid_sub_id", &invalid_hash])
+	// Subscription ID is invalid.
+	let response: MethodResponse = api
+		.call("chainHead_unstable_body", ["invalid_sub_id", &invalid_hash])
 		.await
 		.unwrap();
-	let event: ChainHeadEvent<String> = get_next_event(&mut sub).await;
-	assert_eq!(event, ChainHeadEvent::<String>::Disjoint);
+	assert_matches!(response, MethodResponse::LimitReached);
 
-	// Valid subscription ID with invalid block hash will error.
+	// Block hash is invalid.
 	let err = api
-		.subscribe("chainHead_unstable_body", [&sub_id, &invalid_hash])
+		.call::<_, serde_json::Value>("chainHead_unstable_body", [&sub_id, &invalid_hash])
 		.await
 		.unwrap_err();
 	assert_matches!(err,
 		Error::Call(CallError::Custom(ref err)) if err.code() == 2001 && err.message() == "Invalid block hash"
 	);
 
-	// Obtain valid the body (list of extrinsics).
-	let mut sub = api.subscribe("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap();
-	let event: ChainHeadEvent<String> = get_next_event(&mut sub).await;
-	// Block contains no extrinsics.
-	assert_matches!(event,
-		ChainHeadEvent::Done(done) if done.result == "0x00"
+	// Valid call.
+	let response: MethodResponse =
+		api.call("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap();
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+
+	// Response propagated to `chainHead_follow`.
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationBodyDone(done) if done.operation_id == operation_id && done.value.is_empty()
 	);
 
 	// Import a block with extrinsics.
@@ -378,35 +383,41 @@ async fn get_body() {
 		FollowEvent::BestBlockChanged(_)
 	);
 
-	let mut sub = api.subscribe("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap();
-	let event: ChainHeadEvent<String> = get_next_event(&mut sub).await;
-	// Hex encoded scale encoded string for the vector of extrinsics.
-	let expected = hex_string(&block.extrinsics.encode());
-	assert_matches!(event,
-		ChainHeadEvent::Done(done) if done.result == expected
+	// Valid call to a block with extrinsics.
+	let response: MethodResponse =
+		api.call("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap();
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+
+	// Response propagated to `chainHead_follow`.
+	let expected_tx = hex_string(&block.extrinsics[0].encode());
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationBodyDone(done) if done.operation_id == operation_id && done.value == vec![expected_tx]
 	);
 }
 
 #[tokio::test]
 async fn call_runtime() {
-	let (_client, api, _sub, sub_id, block) = setup_api().await;
+	let (_client, api, mut block_sub, sub_id, block) = setup_api().await;
 	let block_hash = format!("{:?}", block.header.hash());
 	let invalid_hash = hex_string(&INVALID_HASH);
 
-	// Subscription ID is stale the disjoint event is emitted.
-	let mut sub = api
-		.subscribe(
+	// Subscription ID is invalid.
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_call",
 			["invalid_sub_id", &block_hash, "BabeApi_current_epoch", "0x00"],
 		)
 		.await
 		.unwrap();
-	let event: ChainHeadEvent<String> = get_next_event(&mut sub).await;
-	assert_eq!(event, ChainHeadEvent::<String>::Disjoint);
+	assert_matches!(response, MethodResponse::LimitReached);
 
-	// Valid subscription ID with invalid block hash will error.
+	// Block hash is invalid.
 	let err = api
-		.subscribe(
+		.call::<_, serde_json::Value>(
 			"chainHead_unstable_call",
 			[&sub_id, &invalid_hash, "BabeApi_current_epoch", "0x00"],
 		)
@@ -418,8 +429,9 @@ async fn call_runtime() {
 
 	// Pass an invalid parameters that cannot be decode.
 	let err = api
-		.subscribe(
+		.call::<_, serde_json::Value>(
 			"chainHead_unstable_call",
+			// 0x0 is invalid.
 			[&sub_id, &block_hash, "BabeApi_current_epoch", "0x0"],
 		)
 		.await
@@ -428,34 +440,43 @@ async fn call_runtime() {
 		Error::Call(CallError::Custom(ref err)) if err.code() == 2003 && err.message().contains("Invalid parameter")
 	);
 
+	// Valid call.
 	let alice_id = AccountKeyring::Alice.to_account_id();
 	// Hex encoded scale encoded bytes representing the call parameters.
 	let call_parameters = hex_string(&alice_id.encode());
-	let mut sub = api
-		.subscribe(
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_call",
 			[&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters],
 		)
 		.await
 		.unwrap();
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
 
+	// Response propagated to `chainHead_follow`.
 	assert_matches!(
-			get_next_event::<ChainHeadEvent<String>>(&mut sub).await,
-			ChainHeadEvent::Done(done) if done.result == "0x0000000000000000"
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationCallDone(done) if done.operation_id == operation_id && done.output == "0x0000000000000000"
 	);
 
 	// The `current_epoch` takes no parameters and not draining the input buffer
 	// will cause the execution to fail.
-	let mut sub = api
-		.subscribe(
-			"chainHead_unstable_call",
-			[&sub_id, &block_hash, "BabeApi_current_epoch", "0x00"],
-		)
+	let response: MethodResponse = api
+		.call("chainHead_unstable_call", [&sub_id, &block_hash, "BabeApi_current_epoch", "0x00"])
 		.await
 		.unwrap();
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+
+	// Error propagated to `chainHead_follow`.
 	assert_matches!(
-			get_next_event::<ChainHeadEvent<String>>(&mut sub).await,
-			ChainHeadEvent::Error(event) if event.error.contains("Execution failed")
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationError(error) if error.operation_id == operation_id && error.error.contains("Execution failed")
 	);
 }
 
@@ -501,7 +522,7 @@ async fn call_runtime_without_flag() {
 	let alice_id = AccountKeyring::Alice.to_account_id();
 	let call_parameters = hex_string(&alice_id.encode());
 	let err = api
-		.subscribe(
+		.call::<_, serde_json::Value>(
 			"chainHead_unstable_call",
 			[&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters],
 		)
@@ -520,9 +541,9 @@ async fn get_storage_hash() {
 	let invalid_hash = hex_string(&INVALID_HASH);
 	let key = hex_string(&KEY);
 
-	// Subscription ID is stale the disjoint event is emitted.
-	let mut sub = api
-		.subscribe(
+	// Subscription ID is invalid.
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_storage",
 			rpc_params![
 				"invalid_sub_id",
@@ -532,12 +553,11 @@ async fn get_storage_hash() {
 		)
 		.await
 		.unwrap();
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_eq!(event, ChainHeadStorageEvent::Disjoint);
+	assert_matches!(response, MethodResponse::LimitReached);
 
-	// Valid subscription ID with invalid block hash will error.
+	// Block hash is invalid.
 	let err = api
-		.subscribe(
+		.call::<_, serde_json::Value>(
 			"chainHead_unstable_storage",
 			rpc_params![
 				&sub_id,
@@ -552,8 +572,8 @@ async fn get_storage_hash() {
 	);
 
 	// Valid call without storage at the key.
-	let mut sub = api
-		.subscribe(
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_storage",
 			rpc_params![
 				&sub_id,
@@ -563,9 +583,15 @@ async fn get_storage_hash() {
 		)
 		.await
 		.unwrap();
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
 	// The `Done` event is generated directly since the key does not have any value associated.
-	assert_matches!(event, ChainHeadStorageEvent::Done);
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
+	);
 
 	// Import a new block with storage changes.
 	let mut builder = client.new_block(Default::default()).unwrap();
@@ -585,9 +611,8 @@ async fn get_storage_hash() {
 	);
 
 	// Valid call with storage at the key.
-	let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE));
-	let mut sub = api
-		.subscribe(
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_storage",
 			rpc_params![
 				&sub_id,
@@ -597,17 +622,30 @@ async fn get_storage_hash() {
 		)
 		.await
 		.unwrap();
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash));
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Done);
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+
+	let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE));
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
+				res.items.len() == 1 &&
+				res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash)
+	);
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
+	);
 
 	// Child value set in `setup_api`.
 	let child_info = hex_string(&CHILD_STORAGE_KEY);
 	let genesis_hash = format!("{:?}", client.genesis_hash());
-	let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE));
-	let mut sub = api
-		.subscribe(
+
+	// Valid call with storage at the key.
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_storage",
 			rpc_params![
 				&sub_id,
@@ -618,10 +656,22 @@ async fn get_storage_hash() {
 		)
 		.await
 		.unwrap();
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash));
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Done);
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+
+	let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE));
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
+				res.items.len() == 1 &&
+				res.items[0].key == key && res.items[0].result == StorageResultType::Hash(expected_hash)
+	);
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
+	);
 }
 
 #[tokio::test]
@@ -647,10 +697,8 @@ async fn get_storage_multi_query_iter() {
 	);
 
 	// Valid call with storage at the key.
-	let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE));
-	let expected_value = hex_string(&VALUE);
-	let mut sub = api
-		.subscribe(
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_storage",
 			rpc_params![
 				&sub_id,
@@ -669,22 +717,34 @@ async fn get_storage_multi_query_iter() {
 		)
 		.await
 		.unwrap();
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 2 &&
-		res.items[0].key == key &&
-		res.items[1].key == key &&
-		res.items[0].result == StorageResultType::Hash(expected_hash) &&
-		res.items[1].result == StorageResultType::Value(expected_value));
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Done);
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+
+	let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE));
+	let expected_value = hex_string(&VALUE);
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+		FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
+			res.items.len() == 2 &&
+			res.items[0].key == key &&
+			res.items[1].key == key &&
+			res.items[0].result == StorageResultType::Hash(expected_hash) &&
+			res.items[1].result == StorageResultType::Value(expected_value)
+	);
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
+	);
 
 	// Child value set in `setup_api`.
 	let child_info = hex_string(&CHILD_STORAGE_KEY);
 	let genesis_hash = format!("{:?}", client.genesis_hash());
 	let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE));
 	let expected_value = hex_string(&CHILD_VALUE);
-	let mut sub = api
-		.subscribe(
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_storage",
 			rpc_params![
 				&sub_id,
@@ -704,14 +764,24 @@ async fn get_storage_multi_query_iter() {
 		)
 		.await
 		.unwrap();
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 2 &&
-		res.items[0].key == key &&
-		res.items[1].key == key &&
-		res.items[0].result == StorageResultType::Hash(expected_hash) &&
-		res.items[1].result == StorageResultType::Value(expected_value));
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Done);
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+		FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
+			res.items.len() == 2 &&
+			res.items[0].key == key &&
+			res.items[1].key == key &&
+			res.items[0].result == StorageResultType::Hash(expected_hash) &&
+			res.items[1].result == StorageResultType::Value(expected_value)
+	);
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
+	);
 }
 
 #[tokio::test]
@@ -721,9 +791,9 @@ async fn get_storage_value() {
 	let invalid_hash = hex_string(&INVALID_HASH);
 	let key = hex_string(&KEY);
 
-	// Subscription ID is stale the disjoint event is emitted.
-	let mut sub = api
-		.subscribe(
+	// Subscription ID is invalid.
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_storage",
 			rpc_params![
 				"invalid_sub_id",
@@ -733,12 +803,11 @@ async fn get_storage_value() {
 		)
 		.await
 		.unwrap();
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_eq!(event, ChainHeadStorageEvent::Disjoint);
+	assert_matches!(response, MethodResponse::LimitReached);
 
-	// Valid subscription ID with invalid block hash will error.
+	// Block hash is invalid.
 	let err = api
-		.subscribe(
+		.call::<_, serde_json::Value>(
 			"chainHead_unstable_storage",
 			rpc_params![
 				&sub_id,
@@ -753,8 +822,8 @@ async fn get_storage_value() {
 	);
 
 	// Valid call without storage at the key.
-	let mut sub = api
-		.subscribe(
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_storage",
 			rpc_params![
 				&sub_id,
@@ -764,9 +833,15 @@ async fn get_storage_value() {
 		)
 		.await
 		.unwrap();
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
 	// The `Done` event is generated directly since the key does not have any value associated.
-	assert_matches!(event, ChainHeadStorageEvent::Done);
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
+	);
 
 	// Import a new block with storage changes.
 	let mut builder = client.new_block(Default::default()).unwrap();
@@ -786,9 +861,8 @@ async fn get_storage_value() {
 	);
 
 	// Valid call with storage at the key.
-	let expected_value = hex_string(&VALUE);
-	let mut sub = api
-		.subscribe(
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_storage",
 			rpc_params![
 				&sub_id,
@@ -798,17 +872,29 @@ async fn get_storage_value() {
 		)
 		.await
 		.unwrap();
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value));
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Done);
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+
+	let expected_value = hex_string(&VALUE);
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
+				res.items.len() == 1 &&
+				res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value)
+	);
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
+	);
 
 	// Child value set in `setup_api`.
-	let child_info = hex_string(b"child");
+	let child_info = hex_string(&CHILD_STORAGE_KEY);
 	let genesis_hash = format!("{:?}", client.genesis_hash());
-	let expected_value = hex_string(&CHILD_VALUE);
-	let mut sub = api
-		.subscribe(
+
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_storage",
 			rpc_params![
 				&sub_id,
@@ -819,15 +905,28 @@ async fn get_storage_value() {
 		)
 		.await
 		.unwrap();
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Items(res) if res.items.len() == 1 && res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value));
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Done);
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+
+	let expected_value = hex_string(&CHILD_VALUE);
+
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
+				res.items.len() == 1 &&
+				res.items[0].key == key && res.items[0].result == StorageResultType::Value(expected_value)
+	);
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
+	);
 }
 
 #[tokio::test]
-async fn get_storage_wrong_key() {
-	let (mut _client, api, mut _block_sub, sub_id, block) = setup_api().await;
+async fn get_storage_non_queryable_key() {
+	let (mut _client, api, mut block_sub, sub_id, block) = setup_api().await;
 	let block_hash = format!("{:?}", block.header.hash());
 	let key = hex_string(&KEY);
 
@@ -835,8 +934,9 @@ async fn get_storage_wrong_key() {
 	let mut prefixed_key = well_known_keys::CHILD_STORAGE_KEY_PREFIX.to_vec();
 	prefixed_key.extend_from_slice(&KEY);
 	let prefixed_key = hex_string(&prefixed_key);
-	let mut sub = api
-		.subscribe(
+
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_storage",
 			rpc_params![
 				&sub_id,
@@ -846,15 +946,22 @@ async fn get_storage_wrong_key() {
 		)
 		.await
 		.unwrap();
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Done);
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+	// The `Done` event is generated directly since the key is not queryable.
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
+	);
 
 	// Key is prefixed by DEFAULT_CHILD_STORAGE_KEY_PREFIX.
 	let mut prefixed_key = well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec();
 	prefixed_key.extend_from_slice(&KEY);
 	let prefixed_key = hex_string(&prefixed_key);
-	let mut sub = api
-		.subscribe(
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_storage",
 			rpc_params![
 				&sub_id,
@@ -864,15 +971,22 @@ async fn get_storage_wrong_key() {
 		)
 		.await
 		.unwrap();
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Done);
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+	// The `Done` event is generated directly since the key is not queryable.
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
+	);
 
 	// Child key is prefixed by CHILD_STORAGE_KEY_PREFIX.
 	let mut prefixed_key = well_known_keys::CHILD_STORAGE_KEY_PREFIX.to_vec();
-	prefixed_key.extend_from_slice(b"child");
+	prefixed_key.extend_from_slice(CHILD_STORAGE_KEY);
 	let prefixed_key = hex_string(&prefixed_key);
-	let mut sub = api
-		.subscribe(
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_storage",
 			rpc_params![
 				&sub_id,
@@ -883,15 +997,22 @@ async fn get_storage_wrong_key() {
 		)
 		.await
 		.unwrap();
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Done);
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+	// The `Done` event is generated directly since the key is not queryable.
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
+	);
 
 	// Child key is prefixed by DEFAULT_CHILD_STORAGE_KEY_PREFIX.
 	let mut prefixed_key = well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec();
-	prefixed_key.extend_from_slice(b"child");
+	prefixed_key.extend_from_slice(CHILD_STORAGE_KEY);
 	let prefixed_key = hex_string(&prefixed_key);
-	let mut sub = api
-		.subscribe(
+	let response: MethodResponse = api
+		.call(
 			"chainHead_unstable_storage",
 			rpc_params![
 				&sub_id,
@@ -902,8 +1023,164 @@ async fn get_storage_wrong_key() {
 		)
 		.await
 		.unwrap();
-	let event: ChainHeadStorageEvent = get_next_event(&mut sub).await;
-	assert_matches!(event, ChainHeadStorageEvent::Done);
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+	// The `Done` event is generated directly since the key is not queryable.
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+			FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
+	);
+}
+
+#[tokio::test]
+async fn unique_operation_ids() {
+	let (mut _client, api, mut block_sub, sub_id, block) = setup_api().await;
+	let block_hash = format!("{:?}", block.header.hash());
+
+	let mut op_ids = HashSet::new();
+
+	// Ensure that operation IDs are unique for multiple method calls.
+	for _ in 0..5 {
+		// Valid `chainHead_unstable_body` call.
+		let response: MethodResponse =
+			api.call("chainHead_unstable_body", [&sub_id, &block_hash]).await.unwrap();
+		let operation_id = match response {
+			MethodResponse::Started(started) => started.operation_id,
+			MethodResponse::LimitReached => panic!("Expected started response"),
+		};
+		assert_matches!(
+				get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+				FollowEvent::OperationBodyDone(done) if done.operation_id == operation_id && done.value.is_empty()
+		);
+		// Ensure uniqueness.
+		assert!(op_ids.insert(operation_id));
+
+		// Valid `chainHead_unstable_storage` call.
+		let key = hex_string(&KEY);
+		let response: MethodResponse = api
+			.call(
+				"chainHead_unstable_storage",
+				rpc_params![
+					&sub_id,
+					&block_hash,
+					vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }]
+				],
+			)
+			.await
+			.unwrap();
+		let operation_id = match response {
+			MethodResponse::Started(started) => started.operation_id,
+			MethodResponse::LimitReached => panic!("Expected started response"),
+		};
+		// The `Done` event is generated directly since the key does not have any value associated.
+		assert_matches!(
+				get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+				FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
+		);
+		// Ensure uniqueness.
+		assert!(op_ids.insert(operation_id));
+
+		// Valid `chainHead_unstable_call` call.
+		let alice_id = AccountKeyring::Alice.to_account_id();
+		let call_parameters = hex_string(&alice_id.encode());
+		let response: MethodResponse = api
+			.call(
+				"chainHead_unstable_call",
+				[&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters],
+			)
+			.await
+			.unwrap();
+		let operation_id = match response {
+			MethodResponse::Started(started) => started.operation_id,
+			MethodResponse::LimitReached => panic!("Expected started response"),
+		};
+		// Response propagated to `chainHead_follow`.
+		assert_matches!(
+				get_next_event::<FollowEvent<String>>(&mut block_sub).await,
+				FollowEvent::OperationCallDone(done) if done.operation_id == operation_id && done.output == "0x0000000000000000"
+		);
+		// Ensure uniqueness.
+		assert!(op_ids.insert(operation_id));
+	}
+}
+
+#[tokio::test]
+async fn separate_operation_ids_for_subscriptions() {
+	let builder = TestClientBuilder::new();
+	let backend = builder.backend();
+	let mut client = Arc::new(builder.build());
+
+	let api = ChainHead::new(
+		client.clone(),
+		backend,
+		Arc::new(TaskExecutor::default()),
+		CHAIN_GENESIS,
+		MAX_PINNED_BLOCKS,
+		Duration::from_secs(MAX_PINNED_SECS),
+	)
+	.into_rpc();
+
+	// Create two separate subscriptions.
+	let mut sub_first = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap();
+	let sub_id_first = sub_first.subscription_id();
+	let sub_id_first = serde_json::to_string(&sub_id_first).unwrap();
+
+	let mut sub_second = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap();
+	let sub_id_second = sub_second.subscription_id();
+	let sub_id_second = serde_json::to_string(&sub_id_second).unwrap();
+
+	let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
+	client.import(BlockOrigin::Own, block.clone()).await.unwrap();
+	let block_hash = format!("{:?}", block.header.hash());
+
+	// Ensure the imported block is propagated and pinned.
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub_first).await,
+		FollowEvent::Initialized(_)
+	);
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub_first).await,
+		FollowEvent::NewBlock(_)
+	);
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub_first).await,
+		FollowEvent::BestBlockChanged(_)
+	);
+
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub_second).await,
+		FollowEvent::Initialized(_)
+	);
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub_second).await,
+		FollowEvent::NewBlock(_)
+	);
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub_second).await,
+		FollowEvent::BestBlockChanged(_)
+	);
+
+	// Each `chainHead_follow` subscription receives a separate operation ID.
+	let response: MethodResponse =
+		api.call("chainHead_unstable_body", [&sub_id_first, &block_hash]).await.unwrap();
+	let operation_id: String = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+	assert_eq!(operation_id, "0");
+
+	let response: MethodResponse = api
+		.call("chainHead_unstable_body", [&sub_id_second, &block_hash])
+		.await
+		.unwrap();
+	let operation_id_second: String = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+	// The second subscription does not increment the operation ID of the first one.
+	assert_eq!(operation_id_second, "0");
 }
 
 #[tokio::test]
-- 
GitLab