From 744b783a7de835c679df6b835e4092c9caeff67c Mon Sep 17 00:00:00 2001
From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
Date: Tue, 15 Aug 2023 15:17:41 +0300
Subject: [PATCH] chainHead: Limit ongoing operations (#14699)

* chainHead/api: Make storage/body/call pure RPC methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Add mpsc channel between RPC methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/subscriptions: Extract mpsc::Sender via BlockGuard

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/subscriptions: Generate and provide the method operation ID

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Generate `chainHead_body` response

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Generate `chainHead_call` response

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Generate `chainHead_storage` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Propagate responses of methods to chainHead_follow

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust `chainHead_body` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust `chainHead_call` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust `chainHead_call` responses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Ensure unique operation IDs across methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/events: Remove old method events

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/subscriptions: Add limit helper

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/subscription: Expose limits to `BlockGuard`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust testing to ongoing operations

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Make limits configurable via `ChainHeadConfig`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Adjust testing to `ChainHeadConfig`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/tests: Ensure operation limits discards items

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Improve documentation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Rename `OngoingOperations` -> `LimitOperations`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Rename reserve -> reserve_at_most

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead: Use duration const instead of u64

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* chainHead/subscription: Use tokio::sync::Semaphore for limits

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update client/rpc-spec-v2/src/chain_head/subscription/inner.rs

Co-authored-by: Sebastian Kunert <skunert49@gmail.com>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: parity-processbot <>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
---
 substrate/client/rpc-spec-v2/Cargo.toml       |   1 +
 .../rpc-spec-v2/src/chain_head/chain_head.rs  |  96 ++++++---
 .../client/rpc-spec-v2/src/chain_head/mod.rs  |   2 +-
 .../src/chain_head/subscription/error.rs      |   8 +-
 .../src/chain_head/subscription/inner.rs      | 157 ++++++++++++--
 .../src/chain_head/subscription/mod.rs        |  13 +-
 .../rpc-spec-v2/src/chain_head/tests.rs       | 196 +++++++++++++++---
 substrate/client/service/src/builder.rs       |  21 +-
 8 files changed, 394 insertions(+), 100 deletions(-)

diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml
index b1ab2a87997..599596777b7 100644
--- a/substrate/client/rpc-spec-v2/Cargo.toml
+++ b/substrate/client/rpc-spec-v2/Cargo.toml
@@ -32,6 +32,7 @@ hex = "0.4"
 futures = "0.3.21"
 parking_lot = "0.12.1"
 tokio-stream = { version = "0.1", features = ["sync"] }
+tokio = { version = "1.22.0", features = ["sync"] }
 array-bytes = "6.1"
 log = "0.4.17"
 futures-util = { version = "0.3.19", default-features = false }
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
index 16881b05fd7..79cf251f180 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
@@ -53,6 +53,41 @@ use std::{marker::PhantomData, sync::Arc, time::Duration};
 
 pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";
 
+/// The configuration of [`ChainHead`].
+pub struct ChainHeadConfig {
+	/// The maximum number of pinned blocks across all subscriptions.
+	pub global_max_pinned_blocks: usize,
+	/// The maximum duration that a block is allowed to be pinned per subscription.
+	pub subscription_max_pinned_duration: Duration,
+	/// The maximum number of ongoing operations per subscription.
+	pub subscription_max_ongoing_operations: usize,
+}
+
+/// Maximum pinned blocks across all connections.
+/// This number is large enough to consider immediate blocks.
+/// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db.
+const MAX_PINNED_BLOCKS: usize = 512;
+
+/// Any block of any subscription should not be pinned more than
+/// this constant. When a subscription contains a block older than this,
+/// the subscription becomes subject to termination.
+/// Note: This should be enough for immediate blocks.
+const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);
+
+/// The maximum number of ongoing operations per subscription.
+/// Note: The lower limit imposed by the spec is 16.
+const MAX_ONGOING_OPERATIONS: usize = 16;
+
+impl Default for ChainHeadConfig {
+	fn default() -> Self {
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: MAX_PINNED_DURATION,
+			subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
+		}
+	}
+}
+
 /// An API for chain head RPC calls.
 pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
 	/// Substrate client.
@@ -76,8 +111,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
 		backend: Arc<BE>,
 		executor: SubscriptionTaskExecutor,
 		genesis_hash: GenesisHash,
-		max_pinned_blocks: usize,
-		max_pinned_duration: Duration,
+		config: ChainHeadConfig,
 	) -> Self {
 		let genesis_hash = hex_string(&genesis_hash.as_ref());
 		Self {
@@ -85,8 +119,9 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
 			backend: backend.clone(),
 			executor,
 			subscriptions: Arc::new(SubscriptionManagement::new(
-				max_pinned_blocks,
-				max_pinned_duration,
+				config.global_max_pinned_blocks,
+				config.subscription_max_pinned_duration,
+				config.subscription_max_ongoing_operations,
 				backend,
 			)),
 			genesis_hash,
@@ -197,12 +232,10 @@ where
 		follow_subscription: String,
 		hash: Block::Hash,
 	) -> RpcResult<MethodResponse> {
-		let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
+		let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
 			Ok(block) => block,
-			Err(SubscriptionManagementError::SubscriptionAbsent) => {
-				// Invalid invalid subscription ID.
-				return Ok(MethodResponse::LimitReached)
-			},
+			Err(SubscriptionManagementError::SubscriptionAbsent) |
+			Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached),
 			Err(SubscriptionManagementError::BlockHashAbsent) => {
 				// Block is not part of the subscription.
 				return Err(ChainHeadRpcError::InvalidBlock.into())
@@ -252,12 +285,10 @@ where
 		follow_subscription: String,
 		hash: Block::Hash,
 	) -> RpcResult<Option<String>> {
-		let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
+		let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
 			Ok(block) => block,
-			Err(SubscriptionManagementError::SubscriptionAbsent) => {
-				// Invalid invalid subscription ID.
-				return Ok(None)
-			},
+			Err(SubscriptionManagementError::SubscriptionAbsent) |
+			Err(SubscriptionManagementError::ExceededLimits) => return Ok(None),
 			Err(SubscriptionManagementError::BlockHashAbsent) => {
 				// Block is not part of the subscription.
 				return Err(ChainHeadRpcError::InvalidBlock.into())
@@ -306,21 +337,27 @@ where
 			.transpose()?
 			.map(ChildInfo::new_default_from_vec);
 
-		let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
-			Ok(block) => block,
-			Err(SubscriptionManagementError::SubscriptionAbsent) => {
-				// Invalid invalid subscription ID.
-				return Ok(MethodResponse::LimitReached)
-			},
-			Err(SubscriptionManagementError::BlockHashAbsent) => {
-				// Block is not part of the subscription.
-				return Err(ChainHeadRpcError::InvalidBlock.into())
-			},
-			Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
-		};
+		let block_guard =
+			match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
+				Ok(block) => block,
+				Err(SubscriptionManagementError::SubscriptionAbsent) |
+				Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached),
+				Err(SubscriptionManagementError::BlockHashAbsent) => {
+					// Block is not part of the subscription.
+					return Err(ChainHeadRpcError::InvalidBlock.into())
+				},
+				Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
+			};
 
 		let storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
 		let operation_id = block_guard.operation_id();
+
+		// The number of operations we are allowed to execute.
+		let num_operations = block_guard.num_reserved();
+		let discarded = items.len().saturating_sub(num_operations);
+		let mut items = items;
+		items.truncate(num_operations);
+
 		let fut = async move {
 			storage_client.generate_events(block_guard, hash, items, child_trie);
 		};
@@ -329,7 +366,7 @@ where
 			.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
 		Ok(MethodResponse::Started(MethodResponseStarted {
 			operation_id,
-			discarded_items: Some(0),
+			discarded_items: Some(discarded),
 		}))
 	}
 
@@ -342,9 +379,10 @@ where
 	) -> RpcResult<MethodResponse> {
 		let call_parameters = Bytes::from(parse_hex_param(call_parameters)?);
 
-		let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
+		let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
 			Ok(block) => block,
-			Err(SubscriptionManagementError::SubscriptionAbsent) => {
+			Err(SubscriptionManagementError::SubscriptionAbsent) |
+			Err(SubscriptionManagementError::ExceededLimits) => {
 				// Invalid invalid subscription ID.
 				return Ok(MethodResponse::LimitReached)
 			},
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs
index f0fa898f9f7..1bd22885780 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs
@@ -37,7 +37,7 @@ mod chain_head_storage;
 mod subscription;
 
 pub use api::ChainHeadApiServer;
-pub use chain_head::ChainHead;
+pub use chain_head::{ChainHead, ChainHeadConfig};
 pub use event::{
 	BestBlockChanged, ErrorEvent, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
 	RuntimeVersionEvent,
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs
index 443ee9fb87a..38e8fd7384f 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs
@@ -21,10 +21,10 @@ use sp_blockchain::Error;
 /// Subscription management error.
 #[derive(Debug, thiserror::Error)]
 pub enum SubscriptionManagementError {
-	/// The block cannot be pinned into memory because
-	/// the subscription has exceeded the maximum number
-	/// of blocks pinned.
-	#[error("Exceeded pinning limits")]
+	/// The subscription has exceeded the internal limits
+	/// regarding the number of pinned blocks in memory or
+	/// the number of ongoing operations.
+	#[error("Exceeded pinning or operation limits")]
 	ExceededLimits,
 	/// Error originated from the blockchain (client or backend).
 	#[error("Blockchain error {0}")]
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
index c0c2701c5e1..9f42be4a2f7 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
@@ -107,6 +107,62 @@ impl BlockStateMachine {
 	}
 }
 
+/// Limit the number of ongoing operations across methods.
+struct LimitOperations {
+	/// Limit the number of ongoing operations for this subscription.
+	semaphore: Arc<tokio::sync::Semaphore>,
+}
+
+impl LimitOperations {
+	/// Constructs a new [`LimitOperations`].
+	fn new(max_operations: usize) -> Self {
+		LimitOperations { semaphore: Arc::new(tokio::sync::Semaphore::new(max_operations)) }
+	}
+
+	/// Reserves capacity to execute at least one operation and at most the requested items.
+	///
+	/// Dropping [`PermitOperations`] without executing an operation will release
+	/// the reserved capacity.
+	///
+	/// Returns nothing if there's no space available, else returns a permit
+	/// that guarantees that at least one operation can be executed.
+	fn reserve_at_most(&self, to_reserve: usize) -> Option<PermitOperations> {
+		let num_ops = std::cmp::min(self.semaphore.available_permits(), to_reserve);
+
+		if num_ops == 0 {
+			return None
+		}
+
+		let permits = Arc::clone(&self.semaphore)
+			.try_acquire_many_owned(num_ops.try_into().ok()?)
+			.ok()?;
+
+		Some(PermitOperations { num_ops, _permit: permits })
+	}
+}
+
+/// Permits a number of operations to be executed.
+///
+/// [`PermitOperations`] are returned by [`LimitOperations::reserve()`] and are used
+/// to guarantee the RPC server can execute the number of operations.
+///
+/// The number of reserved items are given back to the [`LimitOperations`] on drop.
+struct PermitOperations {
+	/// The number of operations permitted (reserved).
+	num_ops: usize,
+	/// The permit for these operations.
+	_permit: tokio::sync::OwnedSemaphorePermit,
+}
+
+impl PermitOperations {
+	/// Returns the number of reserved elements for this permit.
+	///
+	/// This can be smaller than the number of items requested via [`LimitOperations::reserve()`].
+	fn num_reserved(&self) -> usize {
+		self.num_ops
+	}
+}
+
 struct BlockState {
 	/// The state machine of this block.
 	state_machine: BlockStateMachine,
@@ -124,6 +180,8 @@ struct SubscriptionState<Block: BlockT> {
 	///
 	/// This object is cloned between methods.
 	response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
+	/// Limit the number of ongoing operations.
+	limits: LimitOperations,
 	/// The next operation ID.
 	next_operation_id: usize,
 	/// Track the block hashes available for this subscription.
@@ -244,6 +302,13 @@ impl<Block: BlockT> SubscriptionState<Block> {
 		self.next_operation_id = self.next_operation_id.wrapping_add(1);
 		op_id
 	}
+
+	/// Reserves capacity to execute at least one operation and at most the requested items.
+	///
+	/// For more details see [`PermitOperations`].
+	fn reserve_at_most(&self, to_reserve: usize) -> Option<PermitOperations> {
+		self.limits.reserve_at_most(to_reserve)
+	}
 }
 
 /// Keeps a specific block pinned while the handle is alive.
@@ -254,6 +319,7 @@ pub struct BlockGuard<Block: BlockT, BE: Backend<Block>> {
 	with_runtime: bool,
 	response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
 	operation_id: String,
+	permit_operations: PermitOperations,
 	backend: Arc<BE>,
 }
 
@@ -272,6 +338,7 @@ impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
 		with_runtime: bool,
 		response_sender: TracingUnboundedSender<FollowEvent<Block::Hash>>,
 		operation_id: usize,
+		permit_operations: PermitOperations,
 		backend: Arc<BE>,
 	) -> Result<Self, SubscriptionManagementError> {
 		backend
@@ -283,6 +350,7 @@ impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
 			with_runtime,
 			response_sender,
 			operation_id: operation_id.to_string(),
+			permit_operations,
 			backend,
 		})
 	}
@@ -301,6 +369,13 @@ impl<Block: BlockT, BE: Backend<Block>> BlockGuard<Block, BE> {
 	pub fn operation_id(&self) -> String {
 		self.operation_id.clone()
 	}
+
+	/// Returns the number of reserved elements for this permit.
+	///
+	/// This can be smaller than the number of items requested.
+	pub fn num_reserved(&self) -> usize {
+		self.permit_operations.num_reserved()
+	}
 }
 
 impl<Block: BlockT, BE: Backend<Block>> Drop for BlockGuard<Block, BE> {
@@ -328,6 +403,8 @@ pub struct SubscriptionsInner<Block: BlockT, BE: Backend<Block>> {
 	global_max_pinned_blocks: usize,
 	/// The maximum duration that a block is allowed to be pinned per subscription.
 	local_max_pin_duration: Duration,
+	/// The maximum number of ongoing operations per subscription.
+	max_ongoing_operations: usize,
 	/// Map the subscription ID to internal details of the subscription.
 	subs: HashMap<String, SubscriptionState<Block>>,
 	/// Backend pinning / unpinning blocks.
@@ -341,12 +418,14 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
 	pub fn new(
 		global_max_pinned_blocks: usize,
 		local_max_pin_duration: Duration,
+		max_ongoing_operations: usize,
 		backend: Arc<BE>,
 	) -> Self {
 		SubscriptionsInner {
 			global_blocks: Default::default(),
 			global_max_pinned_blocks,
 			local_max_pin_duration,
+			max_ongoing_operations,
 			subs: Default::default(),
 			backend,
 		}
@@ -366,6 +445,7 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
 				with_runtime,
 				tx_stop: Some(tx_stop),
 				response_sender,
+				limits: LimitOperations::new(self.max_ongoing_operations),
 				next_operation_id: 0,
 				blocks: Default::default(),
 			};
@@ -541,6 +621,7 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
 		&mut self,
 		sub_id: &str,
 		hash: Block::Hash,
+		to_reserve: usize,
 	) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
 		let Some(sub) = self.subs.get_mut(sub_id) else {
 			return Err(SubscriptionManagementError::SubscriptionAbsent)
@@ -550,12 +631,18 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
 			return Err(SubscriptionManagementError::BlockHashAbsent)
 		}
 
+		let Some(permit_operations) = sub.reserve_at_most(to_reserve) else {
+			// Error when the server cannot execute at least one operation.
+			return Err(SubscriptionManagementError::ExceededLimits)
+		};
+
 		let operation_id = sub.next_operation_id();
 		BlockGuard::new(
 			hash,
 			sub.with_runtime,
 			sub.response_sender.clone(),
 			operation_id,
+			permit_operations,
 			self.backend.clone(),
 		)
 	}
@@ -574,6 +661,9 @@ mod tests {
 		Client, ClientBlockImportExt, GenesisInit,
 	};
 
+	/// Maximum number of ongoing operations per subscription ID.
+	const MAX_OPERATIONS_PER_SUB: usize = 16;
+
 	fn init_backend() -> (
 		Arc<sc_client_api::in_mem::Backend<Block>>,
 		Arc<Client<sc_client_api::in_mem::Backend<Block>>>,
@@ -669,6 +759,7 @@ mod tests {
 			tx_stop: None,
 			response_sender,
 			next_operation_id: 0,
+			limits: LimitOperations::new(MAX_OPERATIONS_PER_SUB),
 			blocks: Default::default(),
 		};
 
@@ -698,6 +789,7 @@ mod tests {
 			tx_stop: None,
 			response_sender,
 			next_operation_id: 0,
+			limits: LimitOperations::new(MAX_OPERATIONS_PER_SUB),
 			blocks: Default::default(),
 		};
 
@@ -730,13 +822,14 @@ mod tests {
 	fn subscription_lock_block() {
 		let builder = TestClientBuilder::new();
 		let backend = builder.backend();
-		let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
+		let mut subs =
+			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
 
 		let id = "abc".to_string();
 		let hash = H256::random();
 
 		// Subscription not inserted.
-		let err = subs.lock_block(&id, hash).unwrap_err();
+		let err = subs.lock_block(&id, hash, 1).unwrap_err();
 		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
 
 		let _stop = subs.insert_subscription(id.clone(), true).unwrap();
@@ -744,13 +837,13 @@ mod tests {
 		assert!(subs.insert_subscription(id.clone(), true).is_none());
 
 		// No block hash.
-		let err = subs.lock_block(&id, hash).unwrap_err();
+		let err = subs.lock_block(&id, hash, 1).unwrap_err();
 		assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
 
 		subs.remove_subscription(&id);
 
 		// No subscription.
-		let err = subs.lock_block(&id, hash).unwrap_err();
+		let err = subs.lock_block(&id, hash, 1).unwrap_err();
 		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
 	}
 
@@ -762,7 +855,8 @@ mod tests {
 		let hash = block.header.hash();
 		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
 
-		let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
+		let mut subs =
+			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
 		let id = "abc".to_string();
 
 		let _stop = subs.insert_subscription(id.clone(), true).unwrap();
@@ -770,7 +864,7 @@ mod tests {
 		// First time we are pinning the block.
 		assert_eq!(subs.pin_block(&id, hash).unwrap(), true);
 
-		let block = subs.lock_block(&id, hash).unwrap();
+		let block = subs.lock_block(&id, hash, 1).unwrap();
 		// Subscription started with runtime updates
 		assert_eq!(block.has_runtime(), true);
 
@@ -780,7 +874,7 @@ mod tests {
 
 		// Unpin the block.
 		subs.unpin_block(&id, hash).unwrap();
-		let err = subs.lock_block(&id, hash).unwrap_err();
+		let err = subs.lock_block(&id, hash, 1).unwrap_err();
 		assert_eq!(err, SubscriptionManagementError::BlockHashAbsent);
 	}
 
@@ -791,7 +885,8 @@ mod tests {
 		let hash = block.header.hash();
 		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
 
-		let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
+		let mut subs =
+			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
 		let id = "abc".to_string();
 
 		let _stop = subs.insert_subscription(id.clone(), true).unwrap();
@@ -839,7 +934,8 @@ mod tests {
 		let hash_3 = block.header.hash();
 		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
 
-		let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
+		let mut subs =
+			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
 		let id_1 = "abc".to_string();
 		let id_2 = "abcd".to_string();
 
@@ -884,7 +980,8 @@ mod tests {
 		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
 
 		// Maximum number of pinned blocks is 2.
-		let mut subs = SubscriptionsInner::new(2, Duration::from_secs(10), backend);
+		let mut subs =
+			SubscriptionsInner::new(2, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
 		let id_1 = "abc".to_string();
 		let id_2 = "abcd".to_string();
 
@@ -908,10 +1005,10 @@ mod tests {
 		assert_eq!(err, SubscriptionManagementError::ExceededLimits);
 
 		// Ensure both subscriptions are removed.
-		let err = subs.lock_block(&id_1, hash_1).unwrap_err();
+		let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
 		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
 
-		let err = subs.lock_block(&id_2, hash_1).unwrap_err();
+		let err = subs.lock_block(&id_2, hash_1, 1).unwrap_err();
 		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
 
 		assert!(subs.global_blocks.get(&hash_1).is_none());
@@ -934,7 +1031,8 @@ mod tests {
 		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
 
 		// Maximum number of pinned blocks is 2 and maximum pin duration is 5 second.
-		let mut subs = SubscriptionsInner::new(2, Duration::from_secs(5), backend);
+		let mut subs =
+			SubscriptionsInner::new(2, Duration::from_secs(5), MAX_OPERATIONS_PER_SUB, backend);
 		let id_1 = "abc".to_string();
 		let id_2 = "abcd".to_string();
 
@@ -958,10 +1056,10 @@ mod tests {
 		assert_eq!(err, SubscriptionManagementError::ExceededLimits);
 
 		// Ensure both subscriptions are removed.
-		let err = subs.lock_block(&id_1, hash_1).unwrap_err();
+		let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err();
 		assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent);
 
-		let _block_guard = subs.lock_block(&id_2, hash_1).unwrap();
+		let _block_guard = subs.lock_block(&id_2, hash_1, 1).unwrap();
 
 		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
 		assert!(subs.global_blocks.get(&hash_2).is_none());
@@ -983,7 +1081,8 @@ mod tests {
 	fn subscription_check_stop_event() {
 		let builder = TestClientBuilder::new();
 		let backend = builder.backend();
-		let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend);
+		let mut subs =
+			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
 
 		let id = "abc".to_string();
 
@@ -1000,4 +1099,30 @@ mod tests {
 		let res = sub_data.rx_stop.try_recv().unwrap();
 		assert!(res.is_some());
 	}
+
+	#[test]
+	fn ongoing_operations() {
+		// The object can hold at most 2 operations.
+		let ops = LimitOperations::new(2);
+
+		// One operation is reserved.
+		let permit_one = ops.reserve_at_most(1).unwrap();
+		assert_eq!(permit_one.num_reserved(), 1);
+
+		// Request 2 operations, however there is capacity only for one.
+		let permit_two = ops.reserve_at_most(2).unwrap();
+		// Number of reserved permits is smaller than provided.
+		assert_eq!(permit_two.num_reserved(), 1);
+
+		// Try to reserve operations when there's no space.
+		let permit = ops.reserve_at_most(1);
+		assert!(permit.is_none());
+
+		// Release capacity.
+		drop(permit_two);
+
+		// Can reserve again
+		let permit_three = ops.reserve_at_most(1).unwrap();
+		assert_eq!(permit_three.num_reserved(), 1);
+	}
 }
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
index 3aece6575ef..39618ecfc1b 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
@@ -40,12 +40,14 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
 	pub fn new(
 		global_max_pinned_blocks: usize,
 		local_max_pin_duration: Duration,
+		max_ongoing_operations: usize,
 		backend: Arc<BE>,
 	) -> Self {
 		SubscriptionManagement {
 			inner: RwLock::new(SubscriptionsInner::new(
 				global_max_pinned_blocks,
 				local_max_pin_duration,
+				max_ongoing_operations,
 				backend,
 			)),
 		}
@@ -110,15 +112,18 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
 
 	/// Ensure the block remains pinned until the return object is dropped.
 	///
-	/// Returns a [`BlockGuard`] that pins and unpins the block hash in RAII manner.
-	/// Returns an error if the block hash is not pinned for the subscription or
-	/// the subscription ID is invalid.
+	/// Returns a [`BlockGuard`] that pins and unpins the block hash in RAII manner
+	/// and reserves capacity for ogoing operations.
+	///
+	/// Returns an error if the block hash is not pinned for the subscription,
+	/// the subscription ID is invalid or the limit of ongoing operations was exceeded.
 	pub fn lock_block(
 		&self,
 		sub_id: &str,
 		hash: Block::Hash,
+		to_reserve: usize,
 	) -> Result<BlockGuard<Block, BE>, SubscriptionManagementError> {
 		let mut inner = self.inner.write();
-		inner.lock_block(sub_id, hash)
+		inner.lock_block(sub_id, hash, to_reserve)
 	}
 }
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
index 6c3c343a10b..4bda06d3cf0 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
@@ -36,6 +36,7 @@ type Header = substrate_test_runtime_client::runtime::Header;
 type Block = substrate_test_runtime_client::runtime::Block;
 const MAX_PINNED_BLOCKS: usize = 32;
 const MAX_PINNED_SECS: u64 = 60;
+const MAX_OPERATIONS: usize = 16;
 const CHAIN_GENESIS: [u8; 32] = [0; 32];
 const INVALID_HASH: [u8; 32] = [1; 32];
 const KEY: &[u8] = b":mock";
@@ -79,8 +80,11 @@ async fn setup_api() -> (
 		backend,
 		Arc::new(TaskExecutor::default()),
 		CHAIN_GENESIS,
-		MAX_PINNED_BLOCKS,
-		Duration::from_secs(MAX_PINNED_SECS),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+		},
 	)
 	.into_rpc();
 
@@ -119,8 +123,11 @@ async fn follow_subscription_produces_blocks() {
 		backend,
 		Arc::new(TaskExecutor::default()),
 		CHAIN_GENESIS,
-		MAX_PINNED_BLOCKS,
-		Duration::from_secs(MAX_PINNED_SECS),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+		},
 	)
 	.into_rpc();
 
@@ -177,8 +184,11 @@ async fn follow_with_runtime() {
 		backend,
 		Arc::new(TaskExecutor::default()),
 		CHAIN_GENESIS,
-		MAX_PINNED_BLOCKS,
-		Duration::from_secs(MAX_PINNED_SECS),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+		},
 	)
 	.into_rpc();
 
@@ -285,8 +295,11 @@ async fn get_genesis() {
 		backend,
 		Arc::new(TaskExecutor::default()),
 		CHAIN_GENESIS,
-		MAX_PINNED_BLOCKS,
-		Duration::from_secs(MAX_PINNED_SECS),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+		},
 	)
 	.into_rpc();
 
@@ -491,8 +504,11 @@ async fn call_runtime_without_flag() {
 		backend,
 		Arc::new(TaskExecutor::default()),
 		CHAIN_GENESIS,
-		MAX_PINNED_BLOCKS,
-		Duration::from_secs(MAX_PINNED_SECS),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+		},
 	)
 	.into_rpc();
 
@@ -1117,8 +1133,11 @@ async fn separate_operation_ids_for_subscriptions() {
 		backend,
 		Arc::new(TaskExecutor::default()),
 		CHAIN_GENESIS,
-		MAX_PINNED_BLOCKS,
-		Duration::from_secs(MAX_PINNED_SECS),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+		},
 	)
 	.into_rpc();
 
@@ -1194,8 +1213,11 @@ async fn follow_generates_initial_blocks() {
 		backend,
 		Arc::new(TaskExecutor::default()),
 		CHAIN_GENESIS,
-		MAX_PINNED_BLOCKS,
-		Duration::from_secs(MAX_PINNED_SECS),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+		},
 	)
 	.into_rpc();
 
@@ -1322,8 +1344,11 @@ async fn follow_exceeding_pinned_blocks() {
 		backend,
 		Arc::new(TaskExecutor::default()),
 		CHAIN_GENESIS,
-		2,
-		Duration::from_secs(MAX_PINNED_SECS),
+		ChainHeadConfig {
+			global_max_pinned_blocks: 2,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+		},
 	)
 	.into_rpc();
 
@@ -1373,8 +1398,11 @@ async fn follow_with_unpin() {
 		backend,
 		Arc::new(TaskExecutor::default()),
 		CHAIN_GENESIS,
-		2,
-		Duration::from_secs(MAX_PINNED_SECS),
+		ChainHeadConfig {
+			global_max_pinned_blocks: 2,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+		},
 	)
 	.into_rpc();
 
@@ -1454,8 +1482,11 @@ async fn follow_prune_best_block() {
 		backend,
 		Arc::new(TaskExecutor::default()),
 		CHAIN_GENESIS,
-		MAX_PINNED_BLOCKS,
-		Duration::from_secs(MAX_PINNED_SECS),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+		},
 	)
 	.into_rpc();
 
@@ -1611,8 +1642,11 @@ async fn follow_forks_pruned_block() {
 		backend,
 		Arc::new(TaskExecutor::default()),
 		CHAIN_GENESIS,
-		MAX_PINNED_BLOCKS,
-		Duration::from_secs(MAX_PINNED_SECS),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+		},
 	)
 	.into_rpc();
 
@@ -1725,8 +1759,11 @@ async fn follow_report_multiple_pruned_block() {
 		backend,
 		Arc::new(TaskExecutor::default()),
 		CHAIN_GENESIS,
-		MAX_PINNED_BLOCKS,
-		Duration::from_secs(MAX_PINNED_SECS),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+		},
 	)
 	.into_rpc();
 
@@ -1930,8 +1967,11 @@ async fn pin_block_references() {
 		backend.clone(),
 		Arc::new(TaskExecutor::default()),
 		CHAIN_GENESIS,
-		3,
-		Duration::from_secs(MAX_PINNED_SECS),
+		ChainHeadConfig {
+			global_max_pinned_blocks: 3,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+		},
 	)
 	.into_rpc();
 
@@ -2040,8 +2080,11 @@ async fn follow_finalized_before_new_block() {
 		backend,
 		Arc::new(TaskExecutor::default()),
 		CHAIN_GENESIS,
-		MAX_PINNED_BLOCKS,
-		Duration::from_secs(MAX_PINNED_SECS),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+		},
 	)
 	.into_rpc();
 
@@ -2119,3 +2162,100 @@ async fn follow_finalized_before_new_block() {
 	});
 	assert_eq!(event, expected);
 }
+
+#[tokio::test]
+async fn ensure_operation_limits_works() {
+	let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY);
+	let builder = TestClientBuilder::new().add_extra_child_storage(
+		&child_info,
+		KEY.to_vec(),
+		CHILD_VALUE.to_vec(),
+	);
+	let backend = builder.backend();
+	let mut client = Arc::new(builder.build());
+
+	// Configure the chainHead with maximum 1 ongoing operations.
+	let api = ChainHead::new(
+		client.clone(),
+		backend,
+		Arc::new(TaskExecutor::default()),
+		CHAIN_GENESIS,
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: 1,
+		},
+	)
+	.into_rpc();
+
+	let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap();
+	let sub_id = sub.subscription_id();
+	let sub_id = serde_json::to_string(&sub_id).unwrap();
+
+	let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
+	client.import(BlockOrigin::Own, block.clone()).await.unwrap();
+
+	// Ensure the imported block is propagated and pinned for this subscription.
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub).await,
+		FollowEvent::Initialized(_)
+	);
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub).await,
+		FollowEvent::NewBlock(_)
+	);
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub).await,
+		FollowEvent::BestBlockChanged(_)
+	);
+
+	let block_hash = format!("{:?}", block.header.hash());
+	let key = hex_string(&KEY);
+
+	let items = vec![
+		StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsHashes },
+		StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsHashes },
+		StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsValues },
+		StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsValues },
+	];
+
+	let response: MethodResponse = api
+		.call("chainHead_unstable_storage", rpc_params![&sub_id, &block_hash, items])
+		.await
+		.unwrap();
+	let operation_id = match response {
+		MethodResponse::Started(started) => {
+			// Check discarded items.
+			assert_eq!(started.discarded_items.unwrap(), 3);
+			started.operation_id
+		},
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+	// No value associated with the provided key.
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut sub).await,
+			FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id
+	);
+
+	// The storage is finished and capactiy must be released.
+	let alice_id = AccountKeyring::Alice.to_account_id();
+	// Hex encoded scale encoded bytes representing the call parameters.
+	let call_parameters = hex_string(&alice_id.encode());
+	let response: MethodResponse = api
+		.call(
+			"chainHead_unstable_call",
+			[&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters],
+		)
+		.await
+		.unwrap();
+	let operation_id = match response {
+		MethodResponse::Started(started) => started.operation_id,
+		MethodResponse::LimitReached => panic!("Expected started response"),
+	};
+
+	// Response propagated to `chainHead_follow`.
+	assert_matches!(
+			get_next_event::<FollowEvent<String>>(&mut sub).await,
+			FollowEvent::OperationCallDone(done) if done.operation_id == operation_id && done.output == "0x0000000000000000"
+	);
+}
diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs
index d4cc575afec..b942ac58aa9 100644
--- a/substrate/client/service/src/builder.rs
+++ b/substrate/client/service/src/builder.rs
@@ -74,11 +74,7 @@ use sp_consensus::block_validation::{
 use sp_core::traits::{CodeExecutor, SpawnNamed};
 use sp_keystore::KeystorePtr;
 use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
-use std::{
-	str::FromStr,
-	sync::Arc,
-	time::{Duration, SystemTime},
-};
+use std::{str::FromStr, sync::Arc, time::SystemTime};
 
 /// Full client type.
 pub type TFullClient<TBl, TRtApi, TExec> =
@@ -636,24 +632,13 @@ where
 	)
 	.into_rpc();
 
-	// Maximum pinned blocks across all connections.
-	// This number is large enough to consider immediate blocks.
-	// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db.
-	const MAX_PINNED_BLOCKS: usize = 512;
-
-	// Any block of any subscription should not be pinned more than
-	// this constant. When a subscription contains a block older than this,
-	// the subscription becomes subject to termination.
-	// Note: This should be enough for immediate blocks.
-	const MAX_PINNED_SECONDS: u64 = 60;
-
 	let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
 		client.clone(),
 		backend.clone(),
 		task_executor.clone(),
 		client.info().genesis_hash,
-		MAX_PINNED_BLOCKS,
-		Duration::from_secs(MAX_PINNED_SECONDS),
+		// Defaults to sensible limits for the `ChainHead`.
+		sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
 	)
 	.into_rpc();
 
-- 
GitLab