diff --git a/Cargo.lock b/Cargo.lock
index db33c59f803c19d5a4e1af6e158b5f13363db4cf..46c8b607e8b472a1f6db7f0348a76ee2a3ae204b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7149,9 +7149,9 @@ dependencies = [
 
 [[package]]
 name = "hermit-abi"
-version = "0.3.2"
+version = "0.3.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b"
+checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
 
 [[package]]
 name = "hex"
@@ -7753,7 +7753,7 @@ version = "1.0.11"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
 dependencies = [
- "hermit-abi 0.3.2",
+ "hermit-abi 0.3.9",
  "libc",
  "windows-sys 0.48.0",
 ]
@@ -7811,7 +7811,7 @@ version = "0.4.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b"
 dependencies = [
- "hermit-abi 0.3.2",
+ "hermit-abi 0.3.9",
  "rustix 0.38.21",
  "windows-sys 0.48.0",
 ]
@@ -9622,13 +9622,14 @@ dependencies = [
 
 [[package]]
 name = "mio"
-version = "0.8.11"
+version = "1.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
+checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec"
 dependencies = [
+ "hermit-abi 0.3.9",
  "libc",
  "wasi",
- "windows-sys 0.48.0",
+ "windows-sys 0.52.0",
 ]
 
 [[package]]
@@ -10410,7 +10411,7 @@ version = "1.16.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
 dependencies = [
- "hermit-abi 0.3.2",
+ "hermit-abi 0.3.9",
  "libc",
 ]
 
@@ -24894,21 +24895,20 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
 
 [[package]]
 name = "tokio"
-version = "1.37.0"
+version = "1.40.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787"
+checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998"
 dependencies = [
  "backtrace",
  "bytes",
  "libc",
  "mio",
- "num_cpus",
  "parking_lot 0.12.3",
  "pin-project-lite",
  "signal-hook-registry",
  "socket2 0.5.7",
  "tokio-macros",
- "windows-sys 0.48.0",
+ "windows-sys 0.52.0",
 ]
 
 [[package]]
@@ -24923,9 +24923,9 @@ dependencies = [
 
 [[package]]
 name = "tokio-macros"
-version = "2.2.0"
+version = "2.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
+checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
 dependencies = [
  "proc-macro2 1.0.86",
  "quote 1.0.37",
diff --git a/Cargo.toml b/Cargo.toml
index e2c6d6c8dedc80dc34e5c1885a38aeeee419f2cd..70eda144b5c691090555b18fb4a9db0fdfa3dd0c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1322,7 +1322,7 @@ tikv-jemalloc-ctl = { version = "0.5.0" }
 tikv-jemallocator = { version = "0.5.0" }
 time = { version = "0.3" }
 tiny-keccak = { version = "2.0.2" }
-tokio = { version = "1.37.0", default-features = false }
+tokio = { version = "1.40.0", default-features = false }
 tokio-retry = { version = "0.3.0" }
 tokio-stream = { version = "0.1.14" }
 tokio-test = { version = "0.4.2" }
diff --git a/prdoc/pr_5741.prdoc b/prdoc/pr_5741.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..5eafbc90ee85e07c8fb230a5db6cb8a73f5c4e07
--- /dev/null
+++ b/prdoc/pr_5741.prdoc
@@ -0,0 +1,25 @@
+# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
+# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
+
+title: make RPC endpoint `chainHead_v1_storage` faster
+
+doc:
+  - audience: Node Operator
+    description: |
+      The RPC endpoint `chainHead_v1_storage` now relies solely on backpressure to
+      determine how quickly to serve back values instead of handing back a fixed number
+      of entries and then expecting the client to ask for more. This should improve the 
+      throughput for bigger storage queries significantly.
+
+      Benchmarks using subxt on localhost:
+        - Iterate over 10 accounts on westend-dev -> ~2-3x faster
+        - Fetch 1024 storage values (i.e, not descedant values) -> ~50x faster
+        - Fetch 1024 descendant values -> ~500x faster
+
+crates: 
+  - name: sc-rpc-spec-v2
+    bump: major
+  - name: sc-rpc-server
+    bump: patch
+  - name: sc-service
+    bump: major
diff --git a/substrate/client/rpc-servers/src/lib.rs b/substrate/client/rpc-servers/src/lib.rs
index 756e2a08c6d4d8771119ee43d4cbd7e48ee29d0b..31e4042d81f27342a80b48587bb7ad130607edcc 100644
--- a/substrate/client/rpc-servers/src/lib.rs
+++ b/substrate/client/rpc-servers/src/lib.rs
@@ -255,8 +255,9 @@ where
 							),
 						};
 
-						let rpc_middleware =
-							RpcServiceBuilder::new().option_layer(middleware_layer.clone());
+						let rpc_middleware = RpcServiceBuilder::new()
+							.rpc_logger(1024)
+							.option_layer(middleware_layer.clone());
 						let mut svc = service_builder
 							.set_rpc_middleware(rpc_middleware)
 							.build(methods, stop_handle);
diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml
index ae21895de38d19c1deb25f0103bf8a899d6fd72a..58dd8b830bebf5b25ceae5c36953b3ad286eb833 100644
--- a/substrate/client/rpc-spec-v2/Cargo.toml
+++ b/substrate/client/rpc-spec-v2/Cargo.toml
@@ -28,7 +28,6 @@ sp-rpc = { workspace = true, default-features = true }
 sp-blockchain = { workspace = true, default-features = true }
 sp-version = { workspace = true, default-features = true }
 sc-client-api = { workspace = true, default-features = true }
-sc-utils = { workspace = true, default-features = true }
 sc-rpc = { workspace = true, default-features = true }
 codec = { workspace = true, default-features = true }
 thiserror = { workspace = true }
@@ -56,6 +55,8 @@ sp-externalities = { workspace = true, default-features = true }
 sp-maybe-compressed-blob = { workspace = true, default-features = true }
 sc-block-builder = { workspace = true, default-features = true }
 sc-service = { features = ["test-helpers"], workspace = true, default-features = true }
+sc-rpc = { workspace = true, default-features = true, features = ["test-helpers"] }
 assert_matches = { workspace = true }
 pretty_assertions = { workspace = true }
 sc-transaction-pool = { workspace = true, default-features = true }
+sc-utils = { workspace = true, default-features = true }
diff --git a/substrate/client/rpc-spec-v2/src/archive/archive.rs b/substrate/client/rpc-spec-v2/src/archive/archive.rs
index 82c6b2cacc2f4b02991dc89faa8e28a43318b223..dd6c566a76ed475f41acf189dc5e6734e68f58bc 100644
--- a/substrate/client/rpc-spec-v2/src/archive/archive.rs
+++ b/substrate/client/rpc-spec-v2/src/archive/archive.rs
@@ -275,6 +275,7 @@ where
 			self.storage_max_descendant_responses,
 			self.storage_max_queried_items,
 		);
+
 		Ok(storage_client.handle_query(hash, items, child_trie))
 	}
 }
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
index 1bc5cecb205bbd58a1cc6ebb5f0da9c94f6e26b7..a88e7f2a0b3a40269d41459591af9842029d9dd4 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
@@ -27,14 +27,15 @@ use crate::{
 		api::ChainHeadApiServer,
 		chain_head_follow::ChainHeadFollower,
 		error::Error as ChainHeadRpcError,
-		event::{FollowEvent, MethodResponse, OperationError},
-		subscription::{SubscriptionManagement, SubscriptionManagementError},
+		event::{FollowEvent, MethodResponse, OperationError, OperationId, OperationStorageItems},
+		subscription::{StopHandle, SubscriptionManagement, SubscriptionManagementError},
+		FollowEventSendError, FollowEventSender,
 	},
-	common::events::StorageQuery,
+	common::{events::StorageQuery, storage::QueryResult},
 	hex_string, SubscriptionTaskExecutor,
 };
 use codec::Encode;
-use futures::{channel::oneshot, future::FutureExt};
+use futures::{channel::oneshot, future::FutureExt, SinkExt};
 use jsonrpsee::{
 	core::async_trait, server::ResponsePayload, types::SubscriptionId, ConnectionId, Extensions,
 	MethodResponseFuture, PendingSubscriptionSink,
@@ -51,9 +52,16 @@ use sp_core::{traits::CallContext, Bytes};
 use sp_rpc::list::ListOrValue;
 use sp_runtime::traits::Block as BlockT;
 use std::{marker::PhantomData, sync::Arc, time::Duration};
+use tokio::sync::mpsc;
 
 pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
 
+/// The buffer capacity for each storage query.
+///
+/// This is small because the underlying JSON-RPC server has
+/// its down buffer capacity per connection as well.
+const STORAGE_QUERY_BUF: usize = 16;
+
 /// The configuration of [`ChainHead`].
 pub struct ChainHeadConfig {
 	/// The maximum number of pinned blocks across all subscriptions.
@@ -65,9 +73,6 @@ pub struct ChainHeadConfig {
 	/// Stop all subscriptions if the distance between the leaves and the current finalized
 	/// block is larger than this value.
 	pub max_lagging_distance: usize,
-	/// The maximum number of items reported by the `chainHead_storage` before
-	/// pagination is required.
-	pub operation_max_storage_items: usize,
 	/// The maximum number of `chainHead_follow` subscriptions per connection.
 	pub max_follow_subscriptions_per_connection: usize,
 }
@@ -87,10 +92,6 @@ const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);
 /// Note: The lower limit imposed by the spec is 16.
 const MAX_ONGOING_OPERATIONS: usize = 16;
 
-/// The maximum number of items the `chainHead_storage` can return
-/// before paginations is required.
-const MAX_STORAGE_ITER_ITEMS: usize = 5;
-
 /// Stop all subscriptions if the distance between the leaves and the current finalized
 /// block is larger than this value.
 const MAX_LAGGING_DISTANCE: usize = 128;
@@ -105,7 +106,6 @@ impl Default for ChainHeadConfig {
 			subscription_max_pinned_duration: MAX_PINNED_DURATION,
 			subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
-			operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		}
 	}
@@ -121,9 +121,6 @@ pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
 	executor: SubscriptionTaskExecutor,
 	/// Keep track of the pinned blocks for each subscription.
 	subscriptions: SubscriptionManagement<Block, BE>,
-	/// The maximum number of items reported by the `chainHead_storage` before
-	/// pagination is required.
-	operation_max_storage_items: usize,
 	/// Stop all subscriptions if the distance between the leaves and the current finalized
 	/// block is larger than this value.
 	max_lagging_distance: usize,
@@ -150,7 +147,6 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
 				config.max_follow_subscriptions_per_connection,
 				backend,
 			),
-			operation_max_storage_items: config.operation_max_storage_items,
 			max_lagging_distance: config.max_lagging_distance,
 			_phantom: PhantomData,
 		}
@@ -314,7 +310,7 @@ where
 				}),
 			};
 
-			let (rp, rp_fut) = method_started_response(operation_id, None);
+			let (rp, rp_fut) = method_started_response(operation_id);
 			let fut = async move {
 				// Wait for the server to send out the response and if it produces an error no event
 				// should be generated.
@@ -322,7 +318,7 @@ where
 					return;
 				}
 
-				let _ = block_guard.response_sender().unbounded_send(event);
+				let _ = block_guard.response_sender().send(event).await;
 			};
 			executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
 
@@ -426,20 +422,10 @@ where
 				Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock),
 			};
 
-		let mut storage_client = ChainHeadStorage::<Client, Block, BE>::new(
-			self.client.clone(),
-			self.operation_max_storage_items,
-		);
-		let operation = block_guard.operation();
-		let operation_id = operation.operation_id();
+		let mut storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
 
-		// The number of operations we are allowed to execute.
-		let num_operations = operation.num_reserved();
-		let discarded = items.len().saturating_sub(num_operations);
-		let mut items = items;
-		items.truncate(num_operations);
+		let (rp, rp_fut) = method_started_response(block_guard.operation().operation_id());
 
-		let (rp, rp_fut) = method_started_response(operation_id, Some(discarded));
 		let fut = async move {
 			// Wait for the server to send out the response and if it produces an error no event
 			// should be generated.
@@ -447,10 +433,20 @@ where
 				return;
 			}
 
-			storage_client.generate_events(block_guard, hash, items, child_trie).await;
+			let (tx, rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
+			let operation_id = block_guard.operation().operation_id();
+			let stop_handle = block_guard.operation().stop_handle().clone();
+			let response_sender = block_guard.response_sender();
+
+			// May fail if the channel is closed or the connection is closed.
+			// which is okay to ignore.
+			let _ = futures::future::join(
+				storage_client.generate_events(hash, items, child_trie, tx),
+				process_storage_items(rx, response_sender, operation_id, &stop_handle),
+			)
+			.await;
 		};
-		self.executor
-			.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
+		self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
 
 		rp
 	}
@@ -503,7 +499,7 @@ where
 		let operation_id = block_guard.operation().operation_id();
 		let client = self.client.clone();
 
-		let (rp, rp_fut) = method_started_response(operation_id.clone(), None);
+		let (rp, rp_fut) = method_started_response(operation_id.clone());
 		let fut = async move {
 			// Wait for the server to send out the response and if it produces an error no event
 			// should be generated.
@@ -527,7 +523,7 @@ where
 					})
 				});
 
-			let _ = block_guard.response_sender().unbounded_send(event);
+			let _ = block_guard.response_sender().send(event).await;
 		};
 		self.executor
 			.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
@@ -588,13 +584,9 @@ where
 			return Ok(())
 		}
 
-		let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id)
-		else {
-			return Ok(())
-		};
-
-		if !operation.submit_continue() {
-			// Continue called without generating a `WaitingForContinue` event.
+		// WaitingForContinue event is never emitted, in such cases
+		// emit an `InvalidContinue error`.
+		if self.subscriptions.get_operation(&follow_subscription, &operation_id).is_some() {
 			Err(ChainHeadRpcError::InvalidContinue.into())
 		} else {
 			Ok(())
@@ -616,12 +608,13 @@ where
 			return Ok(())
 		}
 
-		let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id)
+		let Some(mut operation) =
+			self.subscriptions.get_operation(&follow_subscription, &operation_id)
 		else {
 			return Ok(())
 		};
 
-		operation.stop_operation();
+		operation.stop();
 
 		Ok(())
 	}
@@ -629,9 +622,8 @@ where
 
 fn method_started_response(
 	operation_id: String,
-	discarded_items: Option<usize>,
 ) -> (ResponsePayload<'static, MethodResponse>, MethodResponseFuture) {
-	let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items });
+	let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items: None });
 	ResponsePayload::success(rp).notify_on_completion()
 }
 
@@ -657,3 +649,46 @@ where
 
 	rx
 }
+
+async fn process_storage_items<Hash>(
+	mut storage_query_stream: mpsc::Receiver<QueryResult>,
+	mut sender: FollowEventSender<Hash>,
+	operation_id: String,
+	stop_handle: &StopHandle,
+) -> Result<(), FollowEventSendError> {
+	loop {
+		tokio::select! {
+			_ = stop_handle.stopped() => {
+				break;
+			},
+
+			maybe_storage = storage_query_stream.recv() => {
+				let Some(storage) = maybe_storage else {
+					break;
+				};
+
+				let item = match storage {
+					QueryResult::Err(error) => {
+						return sender
+						.send(FollowEvent::OperationError(OperationError { operation_id, error }))
+						.await
+					}
+					QueryResult::Ok(Some(v)) => v,
+					QueryResult::Ok(None) => continue,
+				};
+
+				sender
+					.send(FollowEvent::OperationStorageItems(OperationStorageItems {
+						operation_id: operation_id.clone(),
+						items: vec![item],
+				})).await?;
+			},
+		}
+	}
+
+	sender
+		.send(FollowEvent::OperationStorageDone(OperationId { operation_id }))
+		.await?;
+
+	Ok(())
+}
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs
index ee39ec253a30cdca26a95ba78174b52b68ee202a..936117e66f98128425b4020e06a4ed7d758cb709 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs
@@ -18,45 +18,34 @@
 
 //! Implementation of the `chainHead_storage` method.
 
-use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
+use std::{marker::PhantomData, sync::Arc};
 
 use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
-use sc_utils::mpsc::TracingUnboundedSender;
 use sp_runtime::traits::Block as BlockT;
+use tokio::sync::mpsc;
 
-use crate::{
-	chain_head::{
-		event::{OperationError, OperationId, OperationStorageItems},
-		subscription::BlockGuard,
-		FollowEvent,
-	},
-	common::{
-		events::{StorageQuery, StorageQueryType},
-		storage::{IterQueryType, QueryIter, QueryIterResult, Storage},
-	},
+use crate::common::{
+	events::{StorageQuery, StorageQueryType},
+	storage::{IterQueryType, QueryIter, QueryResult, Storage},
 };
 
 /// Generates the events of the `chainHead_storage` method.
 pub struct ChainHeadStorage<Client, Block, BE> {
 	/// Storage client.
 	client: Storage<Client, Block, BE>,
-	/// Queue of operations that may require pagination.
-	iter_operations: VecDeque<QueryIter>,
-	/// The maximum number of items reported by the `chainHead_storage` before
-	/// pagination is required.
-	operation_max_storage_items: usize,
 	_phandom: PhantomData<(BE, Block)>,
 }
 
+impl<Client, Block, BE> Clone for ChainHeadStorage<Client, Block, BE> {
+	fn clone(&self) -> Self {
+		Self { client: self.client.clone(), _phandom: PhantomData }
+	}
+}
+
 impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE> {
 	/// Constructs a new [`ChainHeadStorage`].
-	pub fn new(client: Arc<Client>, operation_max_storage_items: usize) -> Self {
-		Self {
-			client: Storage::new(client),
-			iter_operations: VecDeque::new(),
-			operation_max_storage_items,
-			_phandom: PhantomData,
-		}
+	pub fn new(client: Arc<Client>) -> Self {
+		Self { client: Storage::new(client), _phandom: PhantomData }
 	}
 }
 
@@ -64,146 +53,71 @@ impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE>
 where
 	Block: BlockT + 'static,
 	BE: Backend<Block> + 'static,
-	Client: StorageProvider<Block, BE> + 'static,
+	Client: StorageProvider<Block, BE> + Send + Sync + 'static,
 {
-	/// Iterate over (key, hash) and (key, value) generating the `WaitingForContinue` event if
-	/// necessary.
-	async fn generate_storage_iter_events(
-		&mut self,
-		mut block_guard: BlockGuard<Block, BE>,
-		hash: Block::Hash,
-		child_key: Option<ChildInfo>,
-	) {
-		let sender = block_guard.response_sender();
-		let operation = block_guard.operation();
-
-		while let Some(query) = self.iter_operations.pop_front() {
-			if operation.was_stopped() {
-				return
-			}
-
-			let result = self.client.query_iter_pagination(
-				query,
-				hash,
-				child_key.as_ref(),
-				self.operation_max_storage_items,
-			);
-			let (events, maybe_next_query) = match result {
-				QueryIterResult::Ok(result) => result,
-				QueryIterResult::Err(error) => {
-					send_error::<Block>(&sender, operation.operation_id(), error.to_string());
-					return
-				},
-			};
-
-			if !events.is_empty() {
-				// Send back the results of the iteration produced so far.
-				let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageItems(
-					OperationStorageItems { operation_id: operation.operation_id(), items: events },
-				));
-			}
-
-			if let Some(next_query) = maybe_next_query {
-				let _ =
-					sender.unbounded_send(FollowEvent::<Block::Hash>::OperationWaitingForContinue(
-						OperationId { operation_id: operation.operation_id() },
-					));
-
-				// The operation might be continued or cancelled only after the
-				// `OperationWaitingForContinue` is generated above.
-				operation.wait_for_continue().await;
-
-				// Give a chance for the other items to advance next time.
-				self.iter_operations.push_back(next_query);
-			}
-		}
-
-		if operation.was_stopped() {
-			return
-		}
-
-		let _ =
-			sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageDone(OperationId {
-				operation_id: operation.operation_id(),
-			}));
-	}
-
 	/// Generate the block events for the `chainHead_storage` method.
 	pub async fn generate_events(
 		&mut self,
-		mut block_guard: BlockGuard<Block, BE>,
 		hash: Block::Hash,
 		items: Vec<StorageQuery<StorageKey>>,
 		child_key: Option<ChildInfo>,
-	) {
-		let sender = block_guard.response_sender();
-		let operation = block_guard.operation();
-
-		let mut storage_results = Vec::with_capacity(items.len());
-		for item in items {
-			match item.query_type {
-				StorageQueryType::Value => {
-					match self.client.query_value(hash, &item.key, child_key.as_ref()) {
-						Ok(Some(value)) => storage_results.push(value),
-						Ok(None) => continue,
-						Err(error) => {
-							send_error::<Block>(&sender, operation.operation_id(), error);
-							return
-						},
-					}
-				},
-				StorageQueryType::Hash =>
-					match self.client.query_hash(hash, &item.key, child_key.as_ref()) {
-						Ok(Some(value)) => storage_results.push(value),
-						Ok(None) => continue,
-						Err(error) => {
-							send_error::<Block>(&sender, operation.operation_id(), error);
-							return
-						},
+		tx: mpsc::Sender<QueryResult>,
+	) -> Result<(), tokio::task::JoinError> {
+		let this = self.clone();
+
+		tokio::task::spawn_blocking(move || {
+			for item in items {
+				match item.query_type {
+					StorageQueryType::Value => {
+						let rp = this.client.query_value(hash, &item.key, child_key.as_ref());
+						if tx.blocking_send(rp).is_err() {
+							break;
+						}
 					},
-				StorageQueryType::ClosestDescendantMerkleValue =>
-					match self.client.query_merkle_value(hash, &item.key, child_key.as_ref()) {
-						Ok(Some(value)) => storage_results.push(value),
-						Ok(None) => continue,
-						Err(error) => {
-							send_error::<Block>(&sender, operation.operation_id(), error);
-							return
-						},
+					StorageQueryType::Hash => {
+						let rp = this.client.query_hash(hash, &item.key, child_key.as_ref());
+						if tx.blocking_send(rp).is_err() {
+							break;
+						}
 					},
-				StorageQueryType::DescendantsValues => self.iter_operations.push_back(QueryIter {
-					query_key: item.key,
-					ty: IterQueryType::Value,
-					pagination_start_key: None,
-				}),
-				StorageQueryType::DescendantsHashes => self.iter_operations.push_back(QueryIter {
-					query_key: item.key,
-					ty: IterQueryType::Hash,
-					pagination_start_key: None,
-				}),
-			};
-		}
-
-		if !storage_results.is_empty() {
-			let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationStorageItems(
-				OperationStorageItems {
-					operation_id: operation.operation_id(),
-					items: storage_results,
-				},
-			));
-		}
+					StorageQueryType::ClosestDescendantMerkleValue => {
+						let rp =
+							this.client.query_merkle_value(hash, &item.key, child_key.as_ref());
+						if tx.blocking_send(rp).is_err() {
+							break;
+						}
+					},
+					StorageQueryType::DescendantsValues => {
+						let query = QueryIter {
+							query_key: item.key,
+							ty: IterQueryType::Value,
+							pagination_start_key: None,
+						};
+						this.client.query_iter_pagination_with_producer(
+							query,
+							hash,
+							child_key.as_ref(),
+							&tx,
+						)
+					},
+					StorageQueryType::DescendantsHashes => {
+						let query = QueryIter {
+							query_key: item.key,
+							ty: IterQueryType::Hash,
+							pagination_start_key: None,
+						};
+						this.client.query_iter_pagination_with_producer(
+							query,
+							hash,
+							child_key.as_ref(),
+							&tx,
+						)
+					},
+				}
+			}
+		})
+		.await?;
 
-		self.generate_storage_iter_events(block_guard, hash, child_key).await
+		Ok(())
 	}
 }
-
-/// Build and send the opaque error back to the `chainHead_follow` method.
-fn send_error<Block: BlockT>(
-	sender: &TracingUnboundedSender<FollowEvent<Block::Hash>>,
-	operation_id: String,
-	error: String,
-) {
-	let _ = sender.unbounded_send(FollowEvent::<Block::Hash>::OperationError(OperationError {
-		operation_id,
-		error,
-	}));
-}
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs
index c9fe19aca2b1898da25e45019e1924256d732d9a..98ddfbbdc63f2dfae7bd90eef5e72b6d0fe21ccc 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs
@@ -42,3 +42,10 @@ pub use event::{
 	BestBlockChanged, ErrorEvent, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
 	RuntimeVersionEvent,
 };
+
+/// Follow event sender.
+pub(crate) type FollowEventSender<Hash> = futures::channel::mpsc::Sender<FollowEvent<Hash>>;
+/// Follow event receiver.
+pub(crate) type FollowEventReceiver<Hash> = futures::channel::mpsc::Receiver<FollowEvent<Hash>>;
+/// Follow event send error.
+pub(crate) type FollowEventSendError = futures::channel::mpsc::SendError;
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
index 14325b4fbb9807d25b7292aa143cbdb2840d0753..95a7c7fe183205d376d9c6d0da797cae1eaa967c 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
@@ -19,18 +19,25 @@
 use futures::channel::oneshot;
 use parking_lot::Mutex;
 use sc_client_api::Backend;
-use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
 use sp_runtime::traits::Block as BlockT;
 use std::{
 	collections::{hash_map::Entry, HashMap, HashSet},
-	sync::{atomic::AtomicBool, Arc},
+	sync::Arc,
 	time::{Duration, Instant},
 };
 
-use crate::chain_head::{subscription::SubscriptionManagementError, FollowEvent};
+use crate::chain_head::{
+	subscription::SubscriptionManagementError, FollowEventReceiver, FollowEventSender,
+};
+
+type NotifyOnDrop = tokio::sync::mpsc::Receiver<()>;
+type SharedOperations = Arc<Mutex<HashMap<String, (NotifyOnDrop, StopHandle)>>>;
 
-/// The queue size after which the `sc_utils::mpsc::tracing_unbounded` would produce warnings.
-const QUEUE_SIZE_WARNING: usize = 512;
+/// The buffer capacity for each subscription
+///
+/// Beware of that the JSON-RPC server has a global
+/// buffer per connection and this a extra buffer.
+const BUF_CAP_PER_SUBSCRIPTION: usize = 16;
 
 /// The state machine of a block of a single subscription ID.
 ///
@@ -138,7 +145,7 @@ impl LimitOperations {
 			.try_acquire_many_owned(num_ops.try_into().ok()?)
 			.ok()?;
 
-		Some(PermitOperations { num_ops, _permit: permits })
+		Some(permits)
 	}
 }
 
@@ -148,79 +155,36 @@ impl LimitOperations {
 /// to guarantee the RPC server can execute the number of operations.
 ///
 /// The number of reserved items are given back to the [`LimitOperations`] on drop.
-struct PermitOperations {
-	/// The number of operations permitted (reserved).
-	num_ops: usize,
-	/// The permit for these operations.
-	_permit: tokio::sync::OwnedSemaphorePermit,
-}
+type PermitOperations = tokio::sync::OwnedSemaphorePermit;
 
-/// The state of one operation.
-///
-/// This is directly exposed to users via `chain_head_unstable_continue` and
-/// `chain_head_unstable_stop_operation`.
+/// Stop handle for the operation.
 #[derive(Clone)]
-pub struct OperationState {
-	/// The shared operation state that holds information about the
-	/// `waitingForContinue` event and cancellation.
-	shared_state: Arc<SharedOperationState>,
-	/// Send notifications when the user calls `chainHead_continue` method.
-	send_continue: tokio::sync::mpsc::Sender<()>,
-}
+pub struct StopHandle(tokio::sync::mpsc::Sender<()>);
 
-impl OperationState {
-	/// Returns true if `chainHead_continue` is called after the
-	/// `waitingForContinue` event was emitted for the associated
-	/// operation ID.
-	pub fn submit_continue(&self) -> bool {
-		// `waitingForContinue` not generated.
-		if !self.shared_state.requested_continue.load(std::sync::atomic::Ordering::Acquire) {
-			return false
-		}
-
-		// Has enough capacity for 1 message.
-		// Can fail if the `stop_operation` propagated the stop first.
-		self.send_continue.try_send(()).is_ok()
+impl StopHandle {
+	pub async fn stopped(&self) {
+		self.0.closed().await;
 	}
 
-	/// Stops the operation if `waitingForContinue` event was emitted for the associated
-	/// operation ID.
-	///
-	/// Returns nothing in accordance with `chainHead_v1_stopOperation`.
-	pub fn stop_operation(&self) {
-		// `waitingForContinue` not generated.
-		if !self.shared_state.requested_continue.load(std::sync::atomic::Ordering::Acquire) {
-			return
-		}
-
-		self.shared_state
-			.operation_stopped
-			.store(true, std::sync::atomic::Ordering::Release);
-
-		// Send might not have enough capacity if `submit_continue` was sent first.
-		// However, the `operation_stopped` boolean was set.
-		let _ = self.send_continue.try_send(());
+	pub fn is_stopped(&self) -> bool {
+		self.0.is_closed()
 	}
 }
 
 /// The shared operation state between the backend [`RegisteredOperation`] and frontend
 /// [`RegisteredOperation`].
-struct SharedOperationState {
-	/// True if the `chainHead` generated `waitingForContinue` event.
-	requested_continue: AtomicBool,
-	/// True if the operation was cancelled by the user.
-	operation_stopped: AtomicBool,
+#[derive(Clone)]
+pub struct OperationState {
+	stop: StopHandle,
+	operations: SharedOperations,
+	operation_id: String,
 }
 
-impl SharedOperationState {
-	/// Constructs a new [`SharedOperationState`].
-	///
-	/// This is efficiently cloned under a single heap allocation.
-	fn new() -> Arc<Self> {
-		Arc::new(SharedOperationState {
-			requested_continue: AtomicBool::new(false),
-			operation_stopped: AtomicBool::new(false),
-		})
+impl OperationState {
+	pub fn stop(&mut self) {
+		if !self.stop.is_stopped() {
+			self.operations.lock().remove(&self.operation_id);
+		}
 	}
 }
 
@@ -228,59 +192,31 @@ impl SharedOperationState {
 ///
 /// This is used internally by the `chainHead` methods.
 pub struct RegisteredOperation {
-	/// The shared operation state that holds information about the
-	/// `waitingForContinue` event and cancellation.
-	shared_state: Arc<SharedOperationState>,
-	/// Receive notifications when the user calls `chainHead_continue` method.
-	recv_continue: tokio::sync::mpsc::Receiver<()>,
+	/// Stop handle for the operation.
+	stop_handle: StopHandle,
+	/// Track the operations ID of this subscription.
+	operations: SharedOperations,
 	/// The operation ID of the request.
 	operation_id: String,
-	/// Track the operations ID of this subscription.
-	operations: Arc<Mutex<HashMap<String, OperationState>>>,
 	/// Permit a number of items to be executed by this operation.
-	permit: PermitOperations,
+	_permit: PermitOperations,
 }
 
 impl RegisteredOperation {
-	/// Wait until the user calls `chainHead_continue` or the operation
-	/// is cancelled via `chainHead_stopOperation`.
-	pub async fn wait_for_continue(&mut self) {
-		self.shared_state
-			.requested_continue
-			.store(true, std::sync::atomic::Ordering::Release);
-
-		// The sender part of this channel is around for as long as this object exists,
-		// because it is stored in the `OperationState` of the `operations` field.
-		// The sender part is removed from tracking when this object is dropped.
-		let _ = self.recv_continue.recv().await;
-
-		self.shared_state
-			.requested_continue
-			.store(false, std::sync::atomic::Ordering::Release);
-	}
-
-	/// Returns true if the current operation was stopped.
-	pub fn was_stopped(&self) -> bool {
-		self.shared_state.operation_stopped.load(std::sync::atomic::Ordering::Acquire)
+	/// Stop handle for the operation.
+	pub fn stop_handle(&self) -> &StopHandle {
+		&self.stop_handle
 	}
 
 	/// Get the operation ID.
 	pub fn operation_id(&self) -> String {
 		self.operation_id.clone()
 	}
-
-	/// Returns the number of reserved elements for this permit.
-	///
-	/// This can be smaller than the number of items requested via [`LimitOperations::reserve()`].
-	pub fn num_reserved(&self) -> usize {
-		self.permit.num_ops
-	}
 }
 
 impl Drop for RegisteredOperation {
 	fn drop(&mut self) {
-		let mut operations = self.operations.lock();
-		operations.remove(&self.operation_id);
+		self.operations.lock().remove(&self.operation_id);
 	}
 }
 
@@ -291,7 +227,7 @@ struct Operations {
 	/// Limit the number of ongoing operations.
 	limits: LimitOperations,
 	/// Track the operations ID of this subscription.
-	operations: Arc<Mutex<HashMap<String, OperationState>>>,
+	operations: SharedOperations,
 }
 
 impl Operations {
@@ -307,25 +243,25 @@ impl Operations {
 	/// Register a new operation.
 	pub fn register_operation(&mut self, to_reserve: usize) -> Option<RegisteredOperation> {
 		let permit = self.limits.reserve_at_most(to_reserve)?;
-
 		let operation_id = self.next_operation_id();
 
-		// At most one message can be sent.
-		let (send_continue, recv_continue) = tokio::sync::mpsc::channel(1);
-		let shared_state = SharedOperationState::new();
-
-		let state = OperationState { send_continue, shared_state: shared_state.clone() };
-
-		// Cloned operations for removing the current ID on drop.
+		let (tx, rx) = tokio::sync::mpsc::channel(1);
+		let stop_handle = StopHandle(tx);
 		let operations = self.operations.clone();
-		operations.lock().insert(operation_id.clone(), state);
+		operations.lock().insert(operation_id.clone(), (rx, stop_handle.clone()));
 
-		Some(RegisteredOperation { shared_state, operation_id, recv_continue, operations, permit })
+		Some(RegisteredOperation { stop_handle, operation_id, operations, _permit: permit })
 	}
 
 	/// Get the associated operation state with the ID.
 	pub fn get_operation(&self, id: &str) -> Option<OperationState> {
-		self.operations.lock().get(id).map(|state| state.clone())
+		let stop = self.operations.lock().get(id).map(|(_, stop)| stop.clone())?;
+
+		Some(OperationState {
+			stop,
+			operations: self.operations.clone(),
+			operation_id: id.to_string(),
+		})
 	}
 
 	/// Generate the next operation ID for this subscription.
@@ -352,7 +288,7 @@ struct SubscriptionState<Block: BlockT> {
 	/// The sender of message responses to the `chainHead_follow` events.
 	///
 	/// This object is cloned between methods.
-	response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
+	response_sender: FollowEventSender<Block::Hash>,
 	/// The ongoing operations of a subscription.
 	operations: Operations,
 	/// Track the block hashes available for this subscription.
@@ -486,7 +422,7 @@ impl<Block: BlockT> SubscriptionState<Block> {
 pub struct BlockGuard<Block: BlockT, BE: Backend<Block>> {
 	hash: Block::Hash,
 	with_runtime: bool,
-	response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
+	response_sender: FollowEventSender<Block::Hash>,
 	operation: RegisteredOperation,
 	backend: Arc<BE>,
 }
@@ -504,7 +440,7 @@ impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
 	fn new(
 		hash: Block::Hash,
 		with_runtime: bool,
-		response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
+		response_sender: FollowEventSender<Block::Hash>,
 		operation: RegisteredOperation,
 		backend: Arc<BE>,
 	) -> Result<Self, SubscriptionManagementError> {
@@ -521,7 +457,7 @@ impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
 	}
 
 	/// Send message responses from the `chainHead` methods to `chainHead_follow`.
-	pub fn response_sender(&self) -> TracingUnboundedSender<FollowEvent<Block::Hash>> {
+	pub fn response_sender(&self) -> FollowEventSender<Block::Hash> {
 		self.response_sender.clone()
 	}
 
@@ -543,7 +479,7 @@ pub struct InsertedSubscriptionData<Block: BlockT> {
 	/// Signal that the subscription must stop.
 	pub rx_stop: oneshot::Receiver<()>,
 	/// Receive message responses from the `chainHead` methods.
-	pub response_receiver: TracingUnboundedReceiver<FollowEvent<Block::Hash>>,
+	pub response_receiver: FollowEventReceiver<Block::Hash>,
 }
 
 pub struct SubscriptionsInner<Block: BlockT, BE: Backend<Block>> {
@@ -594,7 +530,7 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
 		if let Entry::Vacant(entry) = self.subs.entry(sub_id) {
 			let (tx_stop, rx_stop) = oneshot::channel();
 			let (response_sender, response_receiver) =
-				tracing_unbounded("chain-head-method-responses", QUEUE_SIZE_WARNING);
+				futures::channel::mpsc::channel(BUF_CAP_PER_SUBSCRIPTION);
 			let state = SubscriptionState::<Block> {
 				with_runtime,
 				tx_stop: Some(tx_stop),
@@ -972,8 +908,7 @@ mod tests {
 
 	#[test]
 	fn sub_state_register_twice() {
-		let (response_sender, _response_receiver) =
-			tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING);
+		let (response_sender, _response_receiver) = futures::channel::mpsc::channel(1);
 		let mut sub_state = SubscriptionState::<Block> {
 			with_runtime: false,
 			tx_stop: None,
@@ -1001,8 +936,7 @@ mod tests {
 
 	#[test]
 	fn sub_state_register_unregister() {
-		let (response_sender, _response_receiver) =
-			tracing_unbounded("test-chain-head-method-responses", QUEUE_SIZE_WARNING);
+		let (response_sender, _response_receiver) = futures::channel::mpsc::channel(1);
 		let mut sub_state = SubscriptionState::<Block> {
 			with_runtime: false,
 			tx_stop: None,
@@ -1349,12 +1283,12 @@ mod tests {
 
 		// One operation is reserved.
 		let permit_one = ops.reserve_at_most(1).unwrap();
-		assert_eq!(permit_one.num_ops, 1);
+		assert_eq!(permit_one.num_permits(), 1);
 
 		// Request 2 operations, however there is capacity only for one.
 		let permit_two = ops.reserve_at_most(2).unwrap();
 		// Number of reserved permits is smaller than provided.
-		assert_eq!(permit_two.num_ops, 1);
+		assert_eq!(permit_two.num_permits(), 1);
 
 		// Try to reserve operations when there's no space.
 		let permit = ops.reserve_at_most(1);
@@ -1365,7 +1299,7 @@ mod tests {
 
 		// Can reserve again
 		let permit_three = ops.reserve_at_most(1).unwrap();
-		assert_eq!(permit_three.num_ops, 1);
+		assert_eq!(permit_three.num_permits(), 1);
 	}
 
 	#[test]
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
index f266c9d8b34fc774e04a5825740204913851db82..84d1b8f8f9b71d17d7148ef867ce680064dc7067 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
@@ -34,7 +34,7 @@ use self::inner::SubscriptionsInner;
 
 pub use self::inner::OperationState;
 pub use error::SubscriptionManagementError;
-pub use inner::{BlockGuard, InsertedSubscriptionData};
+pub use inner::{BlockGuard, InsertedSubscriptionData, StopHandle};
 
 /// Manage block pinning / unpinning for subscription IDs.
 pub struct SubscriptionManagement<Block: BlockT, BE: Backend<Block>> {
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
index 30a01b93b315120efbb0f88f623cce5eef55ae69..44a2849d91533abb55eccdf1eee97a198f3612fe 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
@@ -33,12 +33,12 @@ use jsonrpsee::{
 };
 use sc_block_builder::BlockBuilderBuilder;
 use sc_client_api::ChildInfo;
+use sc_rpc::testing::TokioTestExecutor;
 use sc_service::client::new_in_mem;
 use sp_blockchain::HeaderBackend;
 use sp_consensus::BlockOrigin;
 use sp_core::{
 	storage::well_known_keys::{self, CODE},
-	testing::TaskExecutor,
 	Blake2Hasher, Hasher,
 };
 use sp_runtime::traits::Block as BlockT;
@@ -60,7 +60,6 @@ type Block = substrate_test_runtime_client::runtime::Block;
 const MAX_PINNED_BLOCKS: usize = 32;
 const MAX_PINNED_SECS: u64 = 60;
 const MAX_OPERATIONS: usize = 16;
-const MAX_PAGINATION_LIMIT: usize = 5;
 const MAX_LAGGING_DISTANCE: usize = 128;
 const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4;
 
@@ -80,12 +79,11 @@ pub async fn run_server() -> std::net::SocketAddr {
 	let api = ChainHead::new(
 		client,
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
 			max_follow_subscriptions_per_connection: 1,
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 		},
@@ -142,12 +140,11 @@ async fn setup_api() -> (
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -250,13 +247,11 @@ async fn follow_subscription_produces_blocks() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -321,13 +316,11 @@ async fn follow_with_runtime() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -631,13 +624,11 @@ async fn call_runtime_without_flag() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -1292,13 +1283,11 @@ async fn separate_operation_ids_for_subscriptions() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -1380,13 +1369,11 @@ async fn follow_generates_initial_blocks() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -1538,13 +1525,11 @@ async fn follow_exceeding_pinned_blocks() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: 2,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -1617,13 +1602,11 @@ async fn follow_with_unpin() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: 2,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -1725,13 +1708,11 @@ async fn unpin_duplicate_hashes() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: 3,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -1830,13 +1811,11 @@ async fn follow_with_multiple_unpin_hashes() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -1977,13 +1956,11 @@ async fn follow_prune_best_block() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -2165,13 +2142,11 @@ async fn follow_forks_pruned_block() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -2327,13 +2302,11 @@ async fn follow_report_multiple_pruned_block() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -2566,7 +2539,7 @@ async fn pin_block_references() {
 			genesis_block_builder,
 			None,
 			None,
-			Box::new(TaskExecutor::new()),
+			Box::new(TokioTestExecutor::default()),
 			client_config,
 		)
 		.unwrap(),
@@ -2575,13 +2548,11 @@ async fn pin_block_references() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend.clone(),
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: 3,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -2712,13 +2683,11 @@ async fn follow_finalized_before_new_block() {
 	let api = ChainHead::new(
 		client_mock.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -2829,13 +2798,11 @@ async fn ensure_operation_limits_works() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: 1,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -2887,7 +2854,7 @@ async fn ensure_operation_limits_works() {
 	let operation_id = match response {
 		MethodResponse::Started(started) => {
 			// Check discarded items.
-			assert_eq!(started.discarded_items.unwrap(), 3);
+			assert!(started.discarded_items.is_none());
 			started.operation_id
 		},
 		MethodResponse::LimitReached => panic!("Expected started response"),
@@ -2922,7 +2889,7 @@ async fn ensure_operation_limits_works() {
 }
 
 #[tokio::test]
-async fn check_continue_operation() {
+async fn storage_is_backpressured() {
 	let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY);
 	let builder = TestClientBuilder::new().add_extra_child_storage(
 		&child_info,
@@ -2936,13 +2903,11 @@ async fn check_continue_operation() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: 1,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -3021,18 +2986,6 @@ async fn check_continue_operation() {
 			res.items[0].result == StorageResultType::Value(hex_string(b"a"))
 	);
 
-	// Pagination event.
-	assert_matches!(
-		get_next_event::<FollowEvent<String>>(&mut sub).await,
-		FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id
-	);
-
-	does_not_produce_event::<FollowEvent<String>>(
-		&mut sub,
-		std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS),
-	)
-	.await;
-	let _res: () = api.call("chainHead_v1_continue", [&sub_id, &operation_id]).await.unwrap();
 	assert_matches!(
 		get_next_event::<FollowEvent<String>>(&mut sub).await,
 		FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
@@ -3041,17 +2994,6 @@ async fn check_continue_operation() {
 			res.items[0].result == StorageResultType::Value(hex_string(b"ab"))
 	);
 
-	// Pagination event.
-	assert_matches!(
-		get_next_event::<FollowEvent<String>>(&mut sub).await,
-		FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id
-	);
-	does_not_produce_event::<FollowEvent<String>>(
-		&mut sub,
-		std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS),
-	)
-	.await;
-	let _res: () = api.call("chainHead_v1_continue", [&sub_id, &operation_id]).await.unwrap();
 	assert_matches!(
 		get_next_event::<FollowEvent<String>>(&mut sub).await,
 		FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
@@ -3060,18 +3002,6 @@ async fn check_continue_operation() {
 			res.items[0].result == StorageResultType::Value(hex_string(b"abcmoD"))
 	);
 
-	// Pagination event.
-	assert_matches!(
-		get_next_event::<FollowEvent<String>>(&mut sub).await,
-		FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id
-	);
-
-	does_not_produce_event::<FollowEvent<String>>(
-		&mut sub,
-		std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS),
-	)
-	.await;
-	let _res: () = api.call("chainHead_v1_continue", [&sub_id, &operation_id]).await.unwrap();
 	assert_matches!(
 		get_next_event::<FollowEvent<String>>(&mut sub).await,
 		FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
@@ -3080,17 +3010,6 @@ async fn check_continue_operation() {
 			res.items[0].result == StorageResultType::Value(hex_string(b"abc"))
 	);
 
-	// Pagination event.
-	assert_matches!(
-		get_next_event::<FollowEvent<String>>(&mut sub).await,
-		FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id
-	);
-	does_not_produce_event::<FollowEvent<String>>(
-		&mut sub,
-		std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS),
-	)
-	.await;
-	let _res: () = api.call("chainHead_v1_continue", [&sub_id, &operation_id]).await.unwrap();
 	assert_matches!(
 		get_next_event::<FollowEvent<String>>(&mut sub).await,
 		FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
@@ -3121,13 +3040,11 @@ async fn stop_storage_operation() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: 1,
-
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -3203,15 +3120,22 @@ async fn stop_storage_operation() {
 			res.items[0].result == StorageResultType::Value(hex_string(b"a"))
 	);
 
-	// Pagination event.
 	assert_matches!(
 		get_next_event::<FollowEvent<String>>(&mut sub).await,
-		FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id
+		FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
+			res.items.len() == 1 &&
+			res.items[0].key == hex_string(b":mo") &&
+			res.items[0].result == StorageResultType::Value(hex_string(b"ab"))
 	);
 
 	// Stop the operation.
 	let _res: () = api.call("chainHead_v1_stopOperation", [&sub_id, &operation_id]).await.unwrap();
 
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub).await,
+		FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
+	);
+
 	does_not_produce_event::<FollowEvent<String>>(
 		&mut sub,
 		std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS),
@@ -3289,30 +3213,23 @@ async fn storage_closest_merkle_value() {
 			MethodResponse::LimitReached => panic!("Expected started response"),
 		};
 
-		let event = get_next_event::<FollowEvent<String>>(&mut sub).await;
-		let merkle_values: HashMap<_, _> = match event {
-			FollowEvent::OperationStorageItems(res) => {
-				assert_eq!(res.operation_id, operation_id);
+		let mut merkle_values = HashMap::new();
 
-				res.items
-					.into_iter()
-					.map(|res| {
+		loop {
+			match get_next_event::<FollowEvent<String>>(&mut sub).await {
+				FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id =>
+					for res in res.items {
 						let value = match res.result {
 							StorageResultType::ClosestDescendantMerkleValue(value) => value,
 							_ => panic!("Unexpected StorageResultType"),
 						};
-						(res.key, value)
-					})
-					.collect()
-			},
-			_ => panic!("Expected OperationStorageItems event"),
-		};
-
-		// Finished.
-		assert_matches!(
-				get_next_event::<FollowEvent<String>>(&mut sub).await,
-				FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
-		);
+						merkle_values.insert(res.key, value);
+					},
+				FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id =>
+					break,
+				_ => panic!("Unexpected event"),
+			}
+		}
 
 		// Response for AAAA, AAAB, A and AA.
 		assert_eq!(merkle_values.len(), 4);
@@ -3420,12 +3337,11 @@ async fn chain_head_stop_all_subscriptions() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
 			max_lagging_distance: 5,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
@@ -3634,12 +3550,11 @@ async fn chain_head_limit_reached() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: 1,
 		},
@@ -3675,12 +3590,11 @@ async fn follow_unique_pruned_blocks() {
 	let api = ChainHead::new(
 		client.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 		},
@@ -3845,12 +3759,11 @@ async fn follow_report_best_block_of_a_known_block() {
 	let api = ChainHead::new(
 		client_mock.clone(),
 		backend,
-		Arc::new(TaskExecutor::default()),
+		Arc::new(TokioTestExecutor::default()),
 		ChainHeadConfig {
 			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
 			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
 			subscription_max_ongoing_operations: MAX_OPERATIONS,
-			operation_max_storage_items: MAX_PAGINATION_LIMIT,
 			max_lagging_distance: MAX_LAGGING_DISTANCE,
 			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
 		},
diff --git a/substrate/client/rpc-spec-v2/src/common/storage.rs b/substrate/client/rpc-spec-v2/src/common/storage.rs
index bd249e033f8f9d27cdcab2e19a41a3323d560b50..2e24a8da8ca8451e93b91d0aa1e220a1829df7e4 100644
--- a/substrate/client/rpc-spec-v2/src/common/storage.rs
+++ b/substrate/client/rpc-spec-v2/src/common/storage.rs
@@ -22,6 +22,7 @@ use std::{marker::PhantomData, sync::Arc};
 
 use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider};
 use sp_runtime::traits::Block as BlockT;
+use tokio::sync::mpsc;
 
 use super::events::{StorageResult, StorageResultType};
 use crate::hex_string;
@@ -33,6 +34,12 @@ pub struct Storage<Client, Block, BE> {
 	_phandom: PhantomData<(BE, Block)>,
 }
 
+impl<Client, Block, BE> Clone for Storage<Client, Block, BE> {
+	fn clone(&self) -> Self {
+		Self { client: self.client.clone(), _phandom: PhantomData }
+	}
+}
+
 impl<Client, Block, BE> Storage<Client, Block, BE> {
 	/// Constructs a new [`Storage`].
 	pub fn new(client: Arc<Client>) -> Self {
@@ -41,6 +48,7 @@ impl<Client, Block, BE> Storage<Client, Block, BE> {
 }
 
 /// Query to iterate over storage.
+#[derive(Debug)]
 pub struct QueryIter {
 	/// The key from which the iteration was started.
 	pub query_key: StorageKey,
@@ -51,6 +59,7 @@ pub struct QueryIter {
 }
 
 /// The query type of an iteration.
+#[derive(Debug)]
 pub enum IterQueryType {
 	/// Iterating over (key, value) pairs.
 	Value,
@@ -123,7 +132,7 @@ where
 		key: &StorageKey,
 		child_key: Option<&ChildInfo>,
 	) -> QueryResult {
-		let result = if let Some(child_key) = child_key {
+		let result = if let Some(ref child_key) = child_key {
 			self.client.child_closest_merkle_value(hash, child_key, key)
 		} else {
 			self.client.closest_merkle_value(hash, key)
@@ -146,6 +155,50 @@ where
 			.unwrap_or_else(|error| QueryResult::Err(error.to_string()))
 	}
 
+	/// Iterate over the storage keys and send the results to the provided sender.
+	///
+	/// Because this relies on a bounded channel, it will pause the storage iteration
+	// if the channel is becomes full which in turn provides backpressure.
+	pub fn query_iter_pagination_with_producer(
+		&self,
+		query: QueryIter,
+		hash: Block::Hash,
+		child_key: Option<&ChildInfo>,
+		tx: &mpsc::Sender<QueryResult>,
+	) {
+		let QueryIter { ty, query_key, pagination_start_key } = query;
+
+		let maybe_storage = if let Some(child_key) = child_key {
+			self.client.child_storage_keys(
+				hash,
+				child_key.to_owned(),
+				Some(&query_key),
+				pagination_start_key.as_ref(),
+			)
+		} else {
+			self.client.storage_keys(hash, Some(&query_key), pagination_start_key.as_ref())
+		};
+
+		let keys_iter = match maybe_storage {
+			Ok(keys_iter) => keys_iter,
+			Err(error) => {
+				_ = tx.blocking_send(Err(error.to_string()));
+				return;
+			},
+		};
+
+		for key in keys_iter {
+			let result = match ty {
+				IterQueryType::Value => self.query_value(hash, &key, child_key),
+				IterQueryType::Hash => self.query_hash(hash, &key, child_key),
+			};
+
+			if tx.blocking_send(result).is_err() {
+				break;
+			}
+		}
+	}
+
 	/// Iterate over at most the provided number of keys.
 	///
 	/// Returns the storage result with a potential next key to resume iteration.