From 7430f413503f8008fe60eb2e4ebd76d14af12ea9 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Tue, 2 Apr 2024 16:12:34 +0300 Subject: [PATCH] chainHead: Allow methods to be called from within a single connection context and limit connections (#3481) This PR ensures that the chainHead RPC class can be called only from within the same connection context. The chainHead methods are now registered as raw methods. - https://github.com/paritytech/jsonrpsee/pull/1297 The concept of raw methods is introduced in jsonrpsee, which is an async method that exposes the connection ID: The raw method doesn't have the concept of a blocking method. Previously blocking methods are now spawning a blocking task to handle their blocking (ie DB) access. We spawn the same number of tasks as before, however we do that explicitly. Another approach would be implementing a RPC middleware that captures and decodes the method parameters: - https://github.com/paritytech/polkadot-sdk/pull/3343 However, that approach is prone to errors since the methods are hardcoded by name. Performace is affected by the double deserialization that needs to happen to extract the subscription ID we'd like to limit. Once from the middleware, and once from the methods itself. This PR paves the way to implement the chainHead connection limiter: - https://github.com/paritytech/polkadot-sdk/issues/1505 Registering tokens (subscription ID / operation ID) on the `RpcConnections` could be extended to return an error when the maximum number of operations is reached. While at it, have added an integration-test to ensure that chainHead methods can be called from within the same connection context. Before this is merged, a new JsonRPC release should be made to expose the `raw-methods`: - [x] Use jsonrpsee from crates io (blocked by: https://github.com/paritytech/jsonrpsee/pull/1297) Closes: https://github.com/paritytech/polkadot-sdk/issues/3207 cc @paritytech/subxt-team --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com> --- Cargo.lock | 34 +- substrate/client/rpc-spec-v2/Cargo.toml | 1 + .../client/rpc-spec-v2/src/chain_head/api.rs | 30 +- .../rpc-spec-v2/src/chain_head/chain_head.rs | 321 ++++++++++++------ .../src/chain_head/chain_head_follow.rs | 17 +- .../rpc-spec-v2/src/chain_head/error.rs | 7 + .../src/chain_head/subscription/inner.rs | 53 +++ .../src/chain_head/subscription/mod.rs | 132 ++++++- .../rpc-spec-v2/src/chain_head/tests.rs | 230 ++++++++++++- .../rpc-spec-v2/src/common/connections.rs | 262 ++++++++++++++ .../client/rpc-spec-v2/src/common/mod.rs | 1 + 11 files changed, 934 insertions(+), 154 deletions(-) create mode 100644 substrate/client/rpc-spec-v2/src/common/connections.rs diff --git a/Cargo.lock b/Cargo.lock index 9a8eff4691e..24612391d3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6840,9 +6840,9 @@ checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" [[package]] name = "jsonrpsee" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a95f7cc23d5fab0cdeeaf6bad8c8f5e7a3aa7f0d211957ea78232b327ab27b0" +checksum = "87f3ae45a64cfc0882934f963be9431b2a165d667f53140358181f262aca0702" dependencies = [ "jsonrpsee-core", "jsonrpsee-http-client", @@ -6856,9 +6856,9 @@ dependencies = [ [[package]] name = "jsonrpsee-client-transport" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b1736cfa3845fd9f8f43751f2b8e0e83f7b6081e754502f7d63b6587692cc83" +checksum = "455fc882e56f58228df2aee36b88a1340eafd707c76af2fa68cf94b37d461131" dependencies = [ "futures-util", "http", @@ -6877,9 +6877,9 @@ dependencies = [ [[package]] name = "jsonrpsee-core" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82030d038658974732103e623ba2e0abec03bbbe175b39c0a2fafbada60c5868" +checksum = "b75568f4f9696e3a47426e1985b548e1a9fcb13372a5e320372acaf04aca30d1" dependencies = [ "anyhow", "async-lock 3.3.0", @@ -6903,9 +6903,9 @@ dependencies = [ [[package]] name = "jsonrpsee-http-client" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36a06ef0de060005fddf772d54597bb6a8b0413da47dcffd304b0306147b9678" +checksum = "9e7a95e346f55df84fb167b7e06470e196e7d5b9488a21d69c5d9732043ba7ba" dependencies = [ "async-trait", "hyper", @@ -6923,22 +6923,22 @@ dependencies = [ [[package]] name = "jsonrpsee-proc-macros" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69fc56131589f82e57805f7338b87023db4aafef813555708b159787e34ad6bc" +checksum = "30ca066e73dd70294aebc5c2675d8ffae43be944af027c857ce0d4c51785f014" dependencies = [ "heck 0.4.1", "proc-macro-crate 3.0.0", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.53", ] [[package]] name = "jsonrpsee-server" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d85be77fe5b2a94589e3164fb780017f7aff7d646b49278c0d0346af16975c8e" +checksum = "0e29c1bd1f9bba83c864977c73404e505f74f730fa0db89dd490ec174e36d7f0" dependencies = [ "futures-util", "http", @@ -6960,9 +6960,9 @@ dependencies = [ [[package]] name = "jsonrpsee-types" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a48fdc1202eafc51c63e00406575e59493284ace8b8b61aa16f3a6db5d64f1a" +checksum = "3467fd35feeee179f71ab294516bdf3a81139e7aeebdd860e46897c12e1a3368" dependencies = [ "anyhow", "beef", @@ -6973,9 +6973,9 @@ dependencies = [ [[package]] name = "jsonrpsee-ws-client" -version = "0.22.0" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5ce25d70a8e4d3cc574bbc3cad0137c326ad64b194793d5e7bbdd3fa4504181" +checksum = "68ca71e74983f624c0cb67828e480a981586074da8ad3a2f214c6a3f884edab9" dependencies = [ "http", "jsonrpsee-client-transport", diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml index c62b3e789d3..937e5c6b626 100644 --- a/substrate/client/rpc-spec-v2/Cargo.toml +++ b/substrate/client/rpc-spec-v2/Cargo.toml @@ -44,6 +44,7 @@ futures-util = { version = "0.3.30", default-features = false } rand = "0.8.5" [dev-dependencies] +jsonrpsee = { version = "0.22", features = ["server", "ws-client"] } serde_json = { workspace = true, default-features = true } tokio = { version = "1.22.0", features = ["macros"] } substrate-test-runtime-client = { path = "../../test-utils/runtime/client" } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/api.rs b/substrate/client/rpc-spec-v2/src/chain_head/api.rs index 00000e1fb27..3851adac264 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/api.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/api.rs @@ -27,7 +27,7 @@ use crate::{ common::events::StorageQuery, }; use jsonrpsee::{proc_macros::rpc, server::ResponsePayload}; -use sp_rpc::list::ListOrValue; +pub use sp_rpc::list::ListOrValue; #[rpc(client, server)] pub trait ChainHeadApi<Hash> { @@ -54,8 +54,8 @@ pub trait ChainHeadApi<Hash> { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "chainHead_unstable_body", blocking)] - fn chain_head_unstable_body( + #[method(name = "chainHead_unstable_body", raw_method)] + async fn chain_head_unstable_body( &self, follow_subscription: String, hash: Hash, @@ -73,8 +73,8 @@ pub trait ChainHeadApi<Hash> { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "chainHead_unstable_header", blocking)] - fn chain_head_unstable_header( + #[method(name = "chainHead_unstable_header", raw_method)] + async fn chain_head_unstable_header( &self, follow_subscription: String, hash: Hash, @@ -85,8 +85,8 @@ pub trait ChainHeadApi<Hash> { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "chainHead_unstable_storage", blocking)] - fn chain_head_unstable_storage( + #[method(name = "chainHead_unstable_storage", raw_method)] + async fn chain_head_unstable_storage( &self, follow_subscription: String, hash: Hash, @@ -99,8 +99,8 @@ pub trait ChainHeadApi<Hash> { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "chainHead_unstable_call", blocking)] - fn chain_head_unstable_call( + #[method(name = "chainHead_unstable_call", raw_method)] + async fn chain_head_unstable_call( &self, follow_subscription: String, hash: Hash, @@ -118,8 +118,8 @@ pub trait ChainHeadApi<Hash> { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "chainHead_unstable_unpin", blocking)] - fn chain_head_unstable_unpin( + #[method(name = "chainHead_unstable_unpin", raw_method)] + async fn chain_head_unstable_unpin( &self, follow_subscription: String, hash_or_hashes: ListOrValue<Hash>, @@ -131,8 +131,8 @@ pub trait ChainHeadApi<Hash> { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "chainHead_unstable_continue", blocking)] - fn chain_head_unstable_continue( + #[method(name = "chainHead_unstable_continue", raw_method)] + async fn chain_head_unstable_continue( &self, follow_subscription: String, operation_id: String, @@ -145,8 +145,8 @@ pub trait ChainHeadApi<Hash> { /// # Unstable /// /// This method is unstable and subject to change in the future. - #[method(name = "chainHead_unstable_stopOperation", blocking)] - fn chain_head_unstable_stop_operation( + #[method(name = "chainHead_unstable_stopOperation", raw_method)] + async fn chain_head_unstable_stop_operation( &self, follow_subscription: String, operation_id: String, diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index 2bda22b4523..975abbca4b6 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -34,10 +34,10 @@ use crate::{ hex_string, SubscriptionTaskExecutor, }; use codec::Encode; -use futures::future::FutureExt; +use futures::{channel::oneshot, future::FutureExt}; use jsonrpsee::{ - core::async_trait, server::ResponsePayload, types::SubscriptionId, MethodResponseFuture, - PendingSubscriptionSink, SubscriptionSink, + core::async_trait, server::ResponsePayload, types::SubscriptionId, ConnectionDetails, + MethodResponseFuture, PendingSubscriptionSink, SubscriptionSink, }; use log::debug; use sc_client_api::{ @@ -65,6 +65,8 @@ pub struct ChainHeadConfig { /// The maximum number of items reported by the `chainHead_storage` before /// pagination is required. pub operation_max_storage_items: usize, + /// The maximum number of `chainHead_follow` subscriptions per connection. + pub max_follow_subscriptions_per_connection: usize, } /// Maximum pinned blocks across all connections. @@ -86,6 +88,9 @@ const MAX_ONGOING_OPERATIONS: usize = 16; /// before paginations is required. const MAX_STORAGE_ITER_ITEMS: usize = 5; +/// The maximum number of `chainHead_follow` subscriptions per connection. +const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4; + impl Default for ChainHeadConfig { fn default() -> Self { ChainHeadConfig { @@ -93,6 +98,7 @@ impl Default for ChainHeadConfig { subscription_max_pinned_duration: MAX_PINNED_DURATION, subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS, operation_max_storage_items: MAX_STORAGE_ITER_ITEMS, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, } } } @@ -106,7 +112,7 @@ pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> { /// Executor to spawn subscriptions. executor: SubscriptionTaskExecutor, /// Keep track of the pinned blocks for each subscription. - subscriptions: Arc<SubscriptionManagement<Block, BE>>, + subscriptions: SubscriptionManagement<Block, BE>, /// The maximum number of items reported by the `chainHead_storage` before /// pagination is required. operation_max_storage_items: usize, @@ -126,12 +132,13 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> { client, backend: backend.clone(), executor, - subscriptions: Arc::new(SubscriptionManagement::new( + subscriptions: SubscriptionManagement::new( config.global_max_pinned_blocks, config.subscription_max_pinned_duration, config.subscription_max_ongoing_operations, + config.max_follow_subscriptions_per_connection, backend, - )), + ), operation_max_storage_items: config.operation_max_storage_items, _phantom: PhantomData, } @@ -182,12 +189,23 @@ where let client = self.client.clone(); let fut = async move { + // Ensure the current connection ID has enough space to accept a new subscription. + let connection_id = pending.connection_id(); + // The RAII `reserved_subscription` will clean up resources on drop: + // - free the reserved subscription for the connection ID. + // - remove the subscription ID from the subscription management. + let Some(mut reserved_subscription) = subscriptions.reserve_subscription(connection_id) + else { + pending.reject(ChainHeadRpcError::ReachedLimits).await; + return + }; + let Ok(sink) = pending.accept().await else { return }; let sub_id = read_subscription_id_as_string(&sink); - // Keep track of the subscription. - let Some(sub_data) = subscriptions.insert_subscription(sub_id.clone(), with_runtime) + let Some(sub_data) = + reserved_subscription.insert_subscription(sub_id.clone(), with_runtime) else { // Inserting the subscription can only fail if the JsonRPSee // generated a duplicate subscription ID. @@ -201,91 +219,117 @@ where let mut chain_head_follow = ChainHeadFollower::new( client, backend, - subscriptions.clone(), + subscriptions, with_runtime, sub_id.clone(), ); chain_head_follow.generate_events(sink, sub_data).await; - subscriptions.remove_subscription(&sub_id); debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id); }; self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); } - fn chain_head_unstable_body( + async fn chain_head_unstable_body( &self, + connection_details: ConnectionDetails, follow_subscription: String, hash: Block::Hash, ) -> ResponsePayload<'static, MethodResponse> { - let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { - Ok(block) => block, - Err(SubscriptionManagementError::SubscriptionAbsent) | - Err(SubscriptionManagementError::ExceededLimits) => - return ResponsePayload::success(MethodResponse::LimitReached), - Err(SubscriptionManagementError::BlockHashAbsent) => { - // Block is not part of the subscription. - return ResponsePayload::error(ChainHeadRpcError::InvalidBlock); - }, - Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock), - }; + if !self + .subscriptions + .contains_subscription(connection_details.id(), &follow_subscription) + { + // The spec says to return `LimitReached` if the follow subscription is invalid or + // stale. + return ResponsePayload::success(MethodResponse::LimitReached); + } - let operation_id = block_guard.operation().operation_id(); + let client = self.client.clone(); + let subscriptions = self.subscriptions.clone(); + let executor = self.executor.clone(); + + let result = spawn_blocking(&self.executor, async move { + let mut block_guard = match subscriptions.lock_block(&follow_subscription, hash, 1) { + Ok(block) => block, + Err(SubscriptionManagementError::SubscriptionAbsent) | + Err(SubscriptionManagementError::ExceededLimits) => + return ResponsePayload::success(MethodResponse::LimitReached), + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. + return ResponsePayload::error(ChainHeadRpcError::InvalidBlock); + }, + Err(_) => return ResponsePayload::error(ChainHeadRpcError::InvalidBlock), + }; - let event = match self.client.block(hash) { - Ok(Some(signed_block)) => { - let extrinsics = signed_block - .block - .extrinsics() - .iter() - .map(|extrinsic| hex_string(&extrinsic.encode())) - .collect(); - FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone { + let operation_id = block_guard.operation().operation_id(); + + let event = match client.block(hash) { + Ok(Some(signed_block)) => { + let extrinsics = signed_block + .block + .extrinsics() + .iter() + .map(|extrinsic| hex_string(&extrinsic.encode())) + .collect(); + FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone { + operation_id: operation_id.clone(), + value: extrinsics, + }) + }, + Ok(None) => { + // The block's body was pruned. This subscription ID has become invalid. + debug!( + target: LOG_TARGET, + "[body][id={:?}] Stopping subscription because hash={:?} was pruned", + &follow_subscription, + hash + ); + subscriptions.remove_subscription(&follow_subscription); + return ResponsePayload::error(ChainHeadRpcError::InvalidBlock) + }, + Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError { operation_id: operation_id.clone(), - value: extrinsics, - }) - }, - Ok(None) => { - // The block's body was pruned. This subscription ID has become invalid. - debug!( - target: LOG_TARGET, - "[body][id={:?}] Stopping subscription because hash={:?} was pruned", - &follow_subscription, - hash - ); - self.subscriptions.remove_subscription(&follow_subscription); - return ResponsePayload::error(ChainHeadRpcError::InvalidBlock) - }, - Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError { - operation_id: operation_id.clone(), - error: error.to_string(), - }), - }; + error: error.to_string(), + }), + }; - let (rp, rp_fut) = method_started_response(operation_id, None); + let (rp, rp_fut) = method_started_response(operation_id, None); + let fut = async move { + // Wait for the server to send out the response and if it produces an error no event + // should be generated. + if rp_fut.await.is_err() { + return; + } - let fut = async move { - // Events should only by generated - // if the response was successfully propagated. - if rp_fut.await.is_err() { - return; - } - let _ = block_guard.response_sender().unbounded_send(event); - }; + let _ = block_guard.response_sender().unbounded_send(event); + }; + executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed()); - self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); + rp + }); - rp + result + .await + .unwrap_or_else(|_| ResponsePayload::success(MethodResponse::LimitReached)) } - fn chain_head_unstable_header( + async fn chain_head_unstable_header( &self, + connection_details: ConnectionDetails, follow_subscription: String, hash: Block::Hash, ) -> Result<Option<String>, ChainHeadRpcError> { - let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { + if !self + .subscriptions + .contains_subscription(connection_details.id(), &follow_subscription) + { + return Ok(None); + } + + let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) | Err(SubscriptionManagementError::ExceededLimits) => return Ok(None), @@ -296,19 +340,35 @@ where Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - self.client - .header(hash) - .map(|opt_header| opt_header.map(|h| hex_string(&h.encode()))) - .map_err(|err| ChainHeadRpcError::InternalError(err.to_string())) + let client = self.client.clone(); + let result = spawn_blocking(&self.executor, async move { + let _block_guard = block_guard; + + client + .header(hash) + .map(|opt_header| opt_header.map(|h| hex_string(&h.encode()))) + .map_err(|err| ChainHeadRpcError::InternalError(err.to_string())) + }); + result.await.unwrap_or_else(|_| Ok(None)) } - fn chain_head_unstable_storage( + async fn chain_head_unstable_storage( &self, + connection_details: ConnectionDetails, follow_subscription: String, hash: Block::Hash, items: Vec<StorageQuery<String>>, child_trie: Option<String>, ) -> ResponsePayload<'static, MethodResponse> { + if !self + .subscriptions + .contains_subscription(connection_details.id(), &follow_subscription) + { + // The spec says to return `LimitReached` if the follow subscription is invalid or + // stale. + return ResponsePayload::success(MethodResponse::LimitReached); + } + // Gain control over parameter parsing and returned error. let items = match items .into_iter() @@ -357,25 +417,25 @@ where let mut items = items; items.truncate(num_operations); - let (rp, rp_is_success) = method_started_response(operation_id, Some(discarded)); - + let (rp, rp_fut) = method_started_response(operation_id, Some(discarded)); let fut = async move { - // Events should only by generated - // if the response was successfully propagated. - if rp_is_success.await.is_err() { + // Wait for the server to send out the response and if it produces an error no event + // should be generated. + if rp_fut.await.is_err() { return; } + storage_client.generate_events(block_guard, hash, items, child_trie).await; }; - self.executor .spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed()); rp } - fn chain_head_unstable_call( + async fn chain_head_unstable_call( &self, + connection_details: ConnectionDetails, follow_subscription: String, hash: Block::Hash, function: String, @@ -386,6 +446,15 @@ where Err(err) => return ResponsePayload::error(err), }; + if !self + .subscriptions + .contains_subscription(connection_details.id(), &follow_subscription) + { + // The spec says to return `LimitReached` if the follow subscription is invalid or + // stale. + return ResponsePayload::success(MethodResponse::LimitReached); + } + let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) | @@ -408,44 +477,53 @@ where } let operation_id = block_guard.operation().operation_id(); - let event = self - .client - .executor() - .call(hash, &function, &call_parameters, CallContext::Offchain) - .map(|result| { - FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone { - operation_id: operation_id.clone(), - output: hex_string(&result), - }) - }) - .unwrap_or_else(|error| { - FollowEvent::<Block::Hash>::OperationError(OperationError { - operation_id: operation_id.clone(), - error: error.to_string(), - }) - }); - - let (rp, rp_fut) = method_started_response(operation_id, None); + let client = self.client.clone(); + let (rp, rp_fut) = method_started_response(operation_id.clone(), None); let fut = async move { - // Events should only by generated - // if the response was successfully propagated. + // Wait for the server to send out the response and if it produces an error no event + // should be generated. if rp_fut.await.is_err() { - return; + return } + + let event = client + .executor() + .call(hash, &function, &call_parameters, CallContext::Offchain) + .map(|result| { + FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone { + operation_id: operation_id.clone(), + output: hex_string(&result), + }) + }) + .unwrap_or_else(|error| { + FollowEvent::<Block::Hash>::OperationError(OperationError { + operation_id: operation_id.clone(), + error: error.to_string(), + }) + }); + let _ = block_guard.response_sender().unbounded_send(event); }; - - self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); + self.executor + .spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed()); rp } - fn chain_head_unstable_unpin( + async fn chain_head_unstable_unpin( &self, + connection_details: ConnectionDetails, follow_subscription: String, hash_or_hashes: ListOrValue<Block::Hash>, ) -> Result<(), ChainHeadRpcError> { + if !self + .subscriptions + .contains_subscription(connection_details.id(), &follow_subscription) + { + return Ok(()); + } + let result = match hash_or_hashes { ListOrValue::Value(hash) => self.subscriptions.unpin_blocks(&follow_subscription, [hash]), @@ -469,11 +547,19 @@ where } } - fn chain_head_unstable_continue( + async fn chain_head_unstable_continue( &self, + connection_details: ConnectionDetails, follow_subscription: String, operation_id: String, ) -> Result<(), ChainHeadRpcError> { + if !self + .subscriptions + .contains_subscription(connection_details.id(), &follow_subscription) + { + return Ok(()) + } + let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else { return Ok(()) @@ -487,11 +573,19 @@ where } } - fn chain_head_unstable_stop_operation( + async fn chain_head_unstable_stop_operation( &self, + connection_details: ConnectionDetails, follow_subscription: String, operation_id: String, ) -> Result<(), ChainHeadRpcError> { + if !self + .subscriptions + .contains_subscription(connection_details.id(), &follow_subscription) + { + return Ok(()) + } + let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else { return Ok(()) @@ -510,3 +604,26 @@ fn method_started_response( let rp = MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items }); ResponsePayload::success(rp).notify_on_completion() } + +/// Spawn a blocking future on the provided executor and return the result on a oneshot channel. +/// +/// This is a wrapper to extract the result of a `executor.spawn_blocking` future. +fn spawn_blocking<R>( + executor: &SubscriptionTaskExecutor, + fut: impl std::future::Future<Output = R> + Send + 'static, +) -> oneshot::Receiver<R> +where + R: Send + 'static, +{ + let (tx, rx) = oneshot::channel(); + + let blocking_fut = async move { + let result = fut.await; + // Send the result back on the channel. + let _ = tx.send(result); + }; + + executor.spawn_blocking("substrate-rpc-subscription", Some("rpc"), blocking_fut.boxed()); + + rx +} diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index afa99f3aa16..90cc62a36fa 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -60,7 +60,7 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> { /// Backend of the chain. backend: Arc<BE>, /// Subscriptions handle. - sub_handle: Arc<SubscriptionManagement<Block, BE>>, + sub_handle: SubscriptionManagement<Block, BE>, /// Subscription was started with the runtime updates flag. with_runtime: bool, /// Subscription ID. @@ -74,7 +74,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli pub fn new( client: Arc<Client>, backend: Arc<BE>, - sub_handle: Arc<SubscriptionManagement<Block, BE>>, + sub_handle: SubscriptionManagement<Block, BE>, with_runtime: bool, sub_id: String, ) -> Self { @@ -546,7 +546,12 @@ where EventStream: Stream<Item = NotificationType<Block>> + Unpin, { let mut stream_item = stream.next(); - let mut stop_event = rx_stop; + + // The stop event can be triggered by the chainHead logic when the pinned + // block guarantee cannot be hold. Or when the client is disconnected. + let connection_closed = sink.closed(); + tokio::pin!(connection_closed); + let mut stop_event = futures_util::future::select(rx_stop, connection_closed); while let Either::Left((Some(event), next_stop_event)) = futures_util::future::select(stream_item, stop_event).await @@ -594,8 +599,10 @@ where stop_event = next_stop_event; } - // If we got here either the substrate streams have closed - // or the `Stop` receiver was triggered. + // If we got here either: + // - the substrate streams have closed + // - the `Stop` receiver was triggered internally (cannot hold the pinned block guarantee) + // - the client disconnected. let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop); let _ = sink.send(msg).await; } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/error.rs b/substrate/client/rpc-spec-v2/src/chain_head/error.rs index 8c50e445aa0..35604db0660 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/error.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/error.rs @@ -23,6 +23,9 @@ use jsonrpsee::types::error::ErrorObject; /// ChainHead RPC errors. #[derive(Debug, thiserror::Error)] pub enum Error { + /// Maximum number of chainHead_follow has been reached. + #[error("Maximum number of chainHead_follow has been reached")] + ReachedLimits, /// The provided block hash is invalid. #[error("Invalid block hash")] InvalidBlock, @@ -46,6 +49,8 @@ pub enum Error { /// Errors for `chainHead` RPC module, as defined in /// <https://github.com/paritytech/json-rpc-interface-spec>. pub mod rpc_spec_v2 { + /// Maximum number of chainHead_follow has been reached. + pub const REACHED_LIMITS: i32 = -32800; /// The provided block hash is invalid. pub const INVALID_BLOCK_ERROR: i32 = -32801; /// The follow subscription was started with `withRuntime` set to `false`. @@ -70,6 +75,8 @@ impl From<Error> for ErrorObject<'static> { let msg = e.to_string(); match e { + Error::ReachedLimits => + ErrorObject::owned(rpc_spec_v2::REACHED_LIMITS, msg, None::<()>), Error::InvalidBlock => ErrorObject::owned(rpc_spec_v2::INVALID_BLOCK_ERROR, msg, None::<()>), Error::InvalidRuntimeCall(_) => diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index d2879679501..1ebee3c80fc 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -1455,4 +1455,57 @@ mod tests { let permit_three = ops.reserve_at_most(1).unwrap(); assert_eq!(permit_three.num_ops, 1); } + + #[test] + fn reserved_subscription_cleans_resources() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let subs = Arc::new(parking_lot::RwLock::new(SubscriptionsInner::new( + 10, + Duration::from_secs(10), + MAX_OPERATIONS_PER_SUB, + backend, + ))); + + // Maximum 2 subscriptions per connection. + let rpc_connections = crate::common::connections::RpcConnections::new(2); + + let subscription_management = + crate::chain_head::subscription::SubscriptionManagement::_from_inner( + subs.clone(), + rpc_connections.clone(), + ); + + let reserved_sub_first = subscription_management.reserve_subscription(1).unwrap(); + let mut reserved_sub_second = subscription_management.reserve_subscription(1).unwrap(); + // Subscriptions reserved but not yet populated. + assert_eq!(subs.read().subs.len(), 0); + + // Cannot reserve anymore. + assert!(subscription_management.reserve_subscription(1).is_none()); + // Drop the first subscription. + drop(reserved_sub_first); + // Space is freed-up for the rpc connections. + let mut reserved_sub_first = subscription_management.reserve_subscription(1).unwrap(); + + // Insert subscriptions. + let _sub_data_first = + reserved_sub_first.insert_subscription("sub1".to_string(), true).unwrap(); + let _sub_data_second = + reserved_sub_second.insert_subscription("sub2".to_string(), true).unwrap(); + // Check we have 2 subscriptions under management. + assert_eq!(subs.read().subs.len(), 2); + + // Drop first reserved subscription. + drop(reserved_sub_first); + // Check that the subscription is removed. + assert_eq!(subs.read().subs.len(), 1); + // Space is freed-up for the rpc connections. + let reserved_sub_first = subscription_management.reserve_subscription(1).unwrap(); + + // Drop all subscriptions. + drop(reserved_sub_first); + drop(reserved_sub_second); + assert_eq!(subs.read().subs.len(), 0); + } } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index c830e662da2..5b016af1aa4 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -16,6 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. +use jsonrpsee::ConnectionId; use parking_lot::RwLock; use sc_client_api::Backend; use sp_runtime::traits::Block as BlockT; @@ -24,6 +25,11 @@ use std::{sync::Arc, time::Duration}; mod error; mod inner; +use crate::{ + chain_head::chain_head::LOG_TARGET, + common::connections::{RegisteredConnection, ReservedConnection, RpcConnections}, +}; + use self::inner::SubscriptionsInner; pub use self::inner::OperationState; @@ -34,7 +40,22 @@ pub use inner::{BlockGuard, InsertedSubscriptionData}; pub struct SubscriptionManagement<Block: BlockT, BE: Backend<Block>> { /// Manage subscription by mapping the subscription ID /// to a set of block hashes. - inner: RwLock<SubscriptionsInner<Block, BE>>, + inner: Arc<RwLock<SubscriptionsInner<Block, BE>>>, + + /// Ensures that chainHead methods can be called from a single connection context. + /// + /// For example, `chainHead_storage` cannot be called with a subscription ID that + /// was obtained from a different connection. + rpc_connections: RpcConnections, +} + +impl<Block: BlockT, BE: Backend<Block>> Clone for SubscriptionManagement<Block, BE> { + fn clone(&self) -> Self { + SubscriptionManagement { + inner: self.inner.clone(), + rpc_connections: self.rpc_connections.clone(), + } + } } impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> { @@ -43,30 +64,55 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> { global_max_pinned_blocks: usize, local_max_pin_duration: Duration, max_ongoing_operations: usize, + max_follow_subscriptions_per_connection: usize, backend: Arc<BE>, ) -> Self { SubscriptionManagement { - inner: RwLock::new(SubscriptionsInner::new( + inner: Arc::new(RwLock::new(SubscriptionsInner::new( global_max_pinned_blocks, local_max_pin_duration, max_ongoing_operations, backend, - )), + ))), + rpc_connections: RpcConnections::new(max_follow_subscriptions_per_connection), } } - /// Insert a new subscription ID. + /// Create a new instance from the inner state. /// - /// If the subscription was not previously inserted, returns the receiver that is - /// triggered upon the "Stop" event. Otherwise, if the subscription ID was already - /// inserted returns none. - pub fn insert_subscription( + /// # Note + /// + /// Used for testing. + #[cfg(test)] + pub(crate) fn _from_inner( + inner: Arc<RwLock<SubscriptionsInner<Block, BE>>>, + rpc_connections: RpcConnections, + ) -> Self { + SubscriptionManagement { inner, rpc_connections } + } + + /// Reserve space for a subscriptions. + /// + /// Fails if the connection ID is has reached the maximum number of active subscriptions. + pub fn reserve_subscription( &self, - sub_id: String, - runtime_updates: bool, - ) -> Option<InsertedSubscriptionData<Block>> { - let mut inner = self.inner.write(); - inner.insert_subscription(sub_id, runtime_updates) + connection_id: ConnectionId, + ) -> Option<ReservedSubscription<Block, BE>> { + let reserved_token = self.rpc_connections.reserve_space(connection_id)?; + + Some(ReservedSubscription { + state: ConnectionState::Reserved(reserved_token), + inner: self.inner.clone(), + }) + } + + /// Check if the given connection contains the given subscription. + pub fn contains_subscription( + &self, + connection_id: ConnectionId, + subscription_id: &str, + ) -> bool { + self.rpc_connections.contains_identifier(connection_id, subscription_id) } /// Remove the subscription ID with associated pinned blocks. @@ -136,3 +182,63 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> { inner.get_operation(sub_id, operation_id) } } + +/// The state of the connection. +/// +/// The state starts in a [`ConnectionState::Reserved`] state and then transitions to +/// [`ConnectionState::Registered`] when the subscription is inserted. +enum ConnectionState { + Reserved(ReservedConnection), + Registered { _unregister_on_drop: RegisteredConnection, sub_id: String }, + Empty, +} + +/// RAII wrapper that removes the subscription from internal mappings and +/// gives back the reserved space for the connection. +pub struct ReservedSubscription<Block: BlockT, BE: Backend<Block>> { + state: ConnectionState, + inner: Arc<RwLock<SubscriptionsInner<Block, BE>>>, +} + +impl<Block: BlockT, BE: Backend<Block>> ReservedSubscription<Block, BE> { + /// Insert a new subscription ID. + /// + /// If the subscription was not previously inserted, returns the receiver that is + /// triggered upon the "Stop" event. Otherwise, if the subscription ID was already + /// inserted returns none. + /// + /// # Note + /// + /// This method should be called only once. + pub fn insert_subscription( + &mut self, + sub_id: String, + runtime_updates: bool, + ) -> Option<InsertedSubscriptionData<Block>> { + match std::mem::replace(&mut self.state, ConnectionState::Empty) { + ConnectionState::Reserved(reserved) => { + let registered_token = reserved.register(sub_id.clone())?; + self.state = ConnectionState::Registered { + _unregister_on_drop: registered_token, + sub_id: sub_id.clone(), + }; + + let mut inner = self.inner.write(); + inner.insert_subscription(sub_id, runtime_updates) + }, + // Cannot insert multiple subscriptions into one single reserved space. + ConnectionState::Registered { .. } | ConnectionState::Empty => { + log::error!(target: LOG_TARGET, "Called insert_subscription on a connection that is not reserved"); + None + }, + } + } +} + +impl<Block: BlockT, BE: Backend<Block>> Drop for ReservedSubscription<Block, BE> { + fn drop(&mut self) { + if let ConnectionState::Registered { sub_id, .. } = &self.state { + self.inner.write().remove_subscription(sub_id); + } + } +} diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs index 30152efb5b6..c3f10a201c5 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs @@ -17,7 +17,7 @@ // along with this program. If not, see <https://www.gnu.org/licenses/>. use crate::{ - chain_head::{event::MethodResponse, test_utils::ChainHeadMockClient}, + chain_head::{api::ChainHeadApiClient, event::MethodResponse, test_utils::ChainHeadMockClient}, common::events::{StorageQuery, StorageQueryType, StorageResultType}, hex_string, }; @@ -27,8 +27,12 @@ use assert_matches::assert_matches; use codec::{Decode, Encode}; use futures::Future; use jsonrpsee::{ - core::server::Subscription as RpcSubscription, rpc_params, MethodsError as Error, RpcModule, + core::{ + client::Subscription as RpcClientSubscription, server::Subscription as RpcSubscription, + }, + rpc_params, MethodsError as Error, RpcModule, }; + use sc_block_builder::BlockBuilderBuilder; use sc_client_api::ChildInfo; use sc_service::client::new_in_mem; @@ -59,6 +63,8 @@ const MAX_PINNED_BLOCKS: usize = 32; const MAX_PINNED_SECS: u64 = 60; const MAX_OPERATIONS: usize = 16; const MAX_PAGINATION_LIMIT: usize = 5; +const MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION: usize = 4; + const INVALID_HASH: [u8; 32] = [1; 32]; const KEY: &[u8] = b":mock"; const VALUE: &[u8] = b"hello world"; @@ -66,6 +72,35 @@ const CHILD_STORAGE_KEY: &[u8] = b"child"; const CHILD_VALUE: &[u8] = b"child value"; const DOES_NOT_PRODUCE_EVENTS_SECONDS: u64 = 10; +/// Start an RPC server with the chainHead module. +pub async fn run_server() -> std::net::SocketAddr { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let client = Arc::new(builder.build()); + + let api = ChainHead::new( + client, + backend, + Arc::new(TaskExecutor::default()), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: 1, + }, + ) + .into_rpc(); + + let server = jsonrpsee::server::ServerBuilder::default().build("127.0.0.1:0").await.unwrap(); + + let addr = server.local_addr().unwrap(); + let handle = server.start(api); + + tokio::spawn(handle.stopped()); + addr +} + async fn get_next_event<T: serde::de::DeserializeOwned>(sub: &mut RpcSubscription) -> T { let (event, _sub_id) = tokio::time::timeout(std::time::Duration::from_secs(60), sub.next()) .await @@ -113,6 +148,7 @@ async fn setup_api() -> ( subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -163,6 +199,7 @@ async fn follow_subscription_produces_blocks() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -231,6 +268,7 @@ async fn follow_with_runtime() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -543,6 +581,7 @@ async fn call_runtime_without_flag() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -1201,6 +1240,7 @@ async fn separate_operation_ids_for_subscriptions() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -1289,6 +1329,7 @@ async fn follow_generates_initial_blocks() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -1444,6 +1485,7 @@ async fn follow_exceeding_pinned_blocks() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -1520,6 +1562,7 @@ async fn follow_with_unpin() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -1631,6 +1674,7 @@ async fn unpin_duplicate_hashes() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -1733,6 +1777,7 @@ async fn follow_with_multiple_unpin_hashes() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -1886,6 +1931,7 @@ async fn follow_prune_best_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -2071,6 +2117,7 @@ async fn follow_forks_pruned_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -2230,6 +2277,7 @@ async fn follow_report_multiple_pruned_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -2475,6 +2523,7 @@ async fn pin_block_references() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -2612,6 +2661,7 @@ async fn follow_finalized_before_new_block() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -2726,6 +2776,7 @@ async fn ensure_operation_limits_works() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: 1, operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -2830,6 +2881,7 @@ async fn check_continue_operation() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: 1, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -3012,6 +3064,7 @@ async fn stop_storage_operation() { subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, operation_max_storage_items: 1, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, }, ) .into_rpc(); @@ -3297,3 +3350,176 @@ async fn storage_closest_merkle_value() { merkle_values_rhs.get(&hex_string(b":AAAA")).unwrap() ); } + +#[tokio::test] +async fn chain_head_single_connection_context() { + let server_addr = run_server().await; + let server_url = format!("ws://{}", server_addr); + let client = jsonrpsee::ws_client::WsClientBuilder::default() + .build(&server_url) + .await + .unwrap(); + // Calls cannot be made from a different connection context. + let second_client = jsonrpsee::ws_client::WsClientBuilder::default() + .build(&server_url) + .await + .unwrap(); + + let mut sub: RpcClientSubscription<FollowEvent<String>> = + ChainHeadApiClient::<String>::chain_head_unstable_follow(&client, true) + .await + .unwrap(); + + let event = tokio::time::timeout(std::time::Duration::from_secs(60), sub.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let finalized_hash = match event { + FollowEvent::Initialized(init) => init.finalized_block_hashes.into_iter().last().unwrap(), + _ => panic!("Expected FollowEvent::Initialized"), + }; + + let first_sub_id = match sub.kind() { + jsonrpsee::core::client::SubscriptionKind::Subscription(id) => match id { + jsonrpsee::types::SubscriptionId::Num(num) => num.to_string(), + jsonrpsee::types::SubscriptionId::Str(s) => s.to_string(), + }, + _ => panic!("Unexpected subscription ID"), + }; + + // Trying to unpin from a different connection will have no effect. + let _response = ChainHeadApiClient::<String>::chain_head_unstable_unpin( + &second_client, + first_sub_id.clone(), + crate::chain_head::api::ListOrValue::Value(finalized_hash.clone()), + ) + .await + .unwrap(); + + // Body can still be fetched from the first subscription. + let response: MethodResponse = ChainHeadApiClient::<String>::chain_head_unstable_body( + &client, + first_sub_id.clone(), + finalized_hash.clone(), + ) + .await + .unwrap(); + assert_matches!(response, MethodResponse::Started(_started)); + + // Cannot make a call from a different connection context. + let response: MethodResponse = ChainHeadApiClient::<String>::chain_head_unstable_body( + &second_client, + first_sub_id.clone(), + finalized_hash.clone(), + ) + .await + .unwrap(); + assert_matches!(response, MethodResponse::LimitReached); + + let response: Option<String> = ChainHeadApiClient::<String>::chain_head_unstable_header( + &client, + first_sub_id.clone(), + finalized_hash.clone(), + ) + .await + .unwrap(); + assert!(response.is_some()); + // Cannot make a call from a different connection context. + let response: Option<String> = ChainHeadApiClient::<String>::chain_head_unstable_header( + &second_client, + first_sub_id.clone(), + finalized_hash.clone(), + ) + .await + .unwrap(); + assert!(response.is_none()); + + let key = hex_string(&KEY); + let response: MethodResponse = ChainHeadApiClient::<String>::chain_head_unstable_storage( + &client, + first_sub_id.clone(), + finalized_hash.clone(), + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }], + None, + ) + .await + .unwrap(); + assert_matches!(response, MethodResponse::Started(_started)); + // Cannot make a call from a different connection context. + let response: MethodResponse = ChainHeadApiClient::<String>::chain_head_unstable_storage( + &second_client, + first_sub_id.clone(), + finalized_hash.clone(), + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }], + None, + ) + .await + .unwrap(); + assert_matches!(response, MethodResponse::LimitReached); + + let alice_id = AccountKeyring::Alice.to_account_id(); + // Hex encoded scale encoded bytes representing the call parameters. + let call_parameters = hex_string(&alice_id.encode()); + let response: MethodResponse = ChainHeadApiClient::<String>::chain_head_unstable_call( + &client, + first_sub_id.clone(), + finalized_hash.clone(), + "AccountNonceApi_account_nonce".into(), + call_parameters.clone(), + ) + .await + .unwrap(); + assert_matches!(response, MethodResponse::Started(_started)); + // Cannot make a call from a different connection context. + let response: MethodResponse = ChainHeadApiClient::<String>::chain_head_unstable_call( + &second_client, + first_sub_id.clone(), + finalized_hash.clone(), + "AccountNonceApi_account_nonce".into(), + call_parameters.clone(), + ) + .await + .unwrap(); + assert_matches!(response, MethodResponse::LimitReached); +} + +#[tokio::test] +async fn chain_head_limit_reached() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let client = Arc::new(builder.build()); + + // Maximum of 1 chainHead_follow subscription. + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, + max_follow_subscriptions_per_connection: 1, + }, + ) + .into_rpc(); + + let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap(); + // Initialized must always be reported first. + let _event: FollowEvent<String> = get_next_event(&mut sub).await; + + let error = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap_err(); + assert!(error + .to_string() + .contains("Maximum number of chainHead_follow has been reached")); + + // After dropping the subscription, other subscriptions are allowed to be created. + drop(sub); + // Ensure the `chainHead_unfollow` is propagated to the server. + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [true]).await.unwrap(); + // Initialized must always be reported first. + let _event: FollowEvent<String> = get_next_event(&mut sub).await; +} diff --git a/substrate/client/rpc-spec-v2/src/common/connections.rs b/substrate/client/rpc-spec-v2/src/common/connections.rs new file mode 100644 index 00000000000..c16a80bf49d --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/common/connections.rs @@ -0,0 +1,262 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use jsonrpsee::ConnectionId; +use parking_lot::Mutex; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +/// Connection state which keeps track whether a connection exist and +/// the number of concurrent operations. +#[derive(Default, Clone)] +pub struct RpcConnections { + /// The number of identifiers that can be registered for each connection. + /// + /// # Example + /// + /// This is used to limit how many `chainHead_follow` subscriptions are active at one time. + capacity: usize, + /// Map the connecton ID to a set of identifiers. + data: Arc<Mutex<HashMap<ConnectionId, ConnectionData>>>, +} + +#[derive(Default)] +struct ConnectionData { + /// The total number of identifiers for the given connection. + /// + /// An identifier for a connection might be: + /// - the subscription ID for chainHead_follow + /// - the operation ID for the transactionBroadcast API + /// - or simply how many times the transaction API has been called. + /// + /// # Note + /// + /// Because a pending subscription sink does not expose the future subscription ID, + /// we cannot register a subscription ID before the pending subscription is accepted. + /// This variable ensures that we have enough capacity to register an identifier, after + /// the subscription is accepted. Otherwise, a jsonrpc error object should be returned. + num_identifiers: usize, + /// Active registered identifiers for the given connection. + /// + /// # Note + /// + /// For chainHead, this represents the subscription ID. + /// For transactionBroadcast, this represents the operation ID. + /// For transaction, this is empty and the number of active calls is tracked by + /// [`Self::num_identifiers`]. + identifiers: HashSet<String>, +} + +impl RpcConnections { + /// Constructs a new instance of [`RpcConnections`]. + pub fn new(capacity: usize) -> Self { + RpcConnections { capacity, data: Default::default() } + } + + /// Reserve space for a new connection identifier. + /// + /// If the number of active identifiers for the given connection exceeds the capacity, + /// returns None. + pub fn reserve_space(&self, connection_id: ConnectionId) -> Option<ReservedConnection> { + let mut data = self.data.lock(); + + let entry = data.entry(connection_id).or_insert_with(ConnectionData::default); + if entry.num_identifiers >= self.capacity { + return None; + } + entry.num_identifiers = entry.num_identifiers.saturating_add(1); + + Some(ReservedConnection { connection_id, rpc_connections: Some(self.clone()) }) + } + + /// Gives back the reserved space before the connection identifier is registered. + /// + /// # Note + /// + /// This may happen if the pending subscription cannot be accepted (unlikely). + fn unreserve_space(&self, connection_id: ConnectionId) { + let mut data = self.data.lock(); + + let entry = data.entry(connection_id).or_insert_with(ConnectionData::default); + entry.num_identifiers = entry.num_identifiers.saturating_sub(1); + + if entry.num_identifiers == 0 { + data.remove(&connection_id); + } + } + + /// Register an identifier for the given connection. + /// + /// Users must call [`Self::reserve_space`] before calling this method to ensure enough + /// space is available. + /// + /// Returns true if the identifier was inserted successfully, false if the identifier was + /// already inserted or reached capacity. + fn register_identifier(&self, connection_id: ConnectionId, identifier: String) -> bool { + let mut data = self.data.lock(); + + let entry = data.entry(connection_id).or_insert_with(ConnectionData::default); + // Should be already checked `Self::reserve_space`. + if entry.identifiers.len() >= self.capacity { + return false; + } + + entry.identifiers.insert(identifier) + } + + /// Unregister an identifier for the given connection. + fn unregister_identifier(&self, connection_id: ConnectionId, identifier: &str) { + let mut data = self.data.lock(); + if let Some(connection_data) = data.get_mut(&connection_id) { + connection_data.identifiers.remove(identifier); + connection_data.num_identifiers = connection_data.num_identifiers.saturating_sub(1); + + if connection_data.num_identifiers == 0 { + data.remove(&connection_id); + } + } + } + + /// Check if the given connection contains the given identifier. + pub fn contains_identifier(&self, connection_id: ConnectionId, identifier: &str) -> bool { + let data = self.data.lock(); + data.get(&connection_id) + .map(|connection_data| connection_data.identifiers.contains(identifier)) + .unwrap_or(false) + } +} + +/// RAII wrapper that ensures the reserved space is given back if the object is +/// dropped before the identifier is registered. +pub struct ReservedConnection { + connection_id: ConnectionId, + rpc_connections: Option<RpcConnections>, +} + +impl ReservedConnection { + /// Register the identifier for the given connection. + pub fn register(mut self, identifier: String) -> Option<RegisteredConnection> { + let rpc_connections = self.rpc_connections.take()?; + + if rpc_connections.register_identifier(self.connection_id, identifier.clone()) { + Some(RegisteredConnection { + connection_id: self.connection_id, + identifier, + rpc_connections, + }) + } else { + None + } + } +} + +impl Drop for ReservedConnection { + fn drop(&mut self) { + if let Some(rpc_connections) = self.rpc_connections.take() { + rpc_connections.unreserve_space(self.connection_id); + } + } +} + +/// RAII wrapper that ensures the identifier is unregistered if the object is dropped. +pub struct RegisteredConnection { + connection_id: ConnectionId, + identifier: String, + rpc_connections: RpcConnections, +} + +impl Drop for RegisteredConnection { + fn drop(&mut self) { + self.rpc_connections.unregister_identifier(self.connection_id, &self.identifier); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn reserve_space() { + let rpc_connections = RpcConnections::new(2); + let reserved = rpc_connections.reserve_space(1); + assert!(reserved.is_some()); + assert_eq!(1, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + assert_eq!(rpc_connections.data.lock().len(), 1); + + let reserved = reserved.unwrap(); + let registered = reserved.register("identifier1".to_string()).unwrap(); + assert!(rpc_connections.contains_identifier(1, "identifier1")); + assert_eq!(1, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + drop(registered); + + // Data is dropped. + assert!(rpc_connections.data.lock().get(&1).is_none()); + assert!(rpc_connections.data.lock().is_empty()); + // Checks can still happen. + assert!(!rpc_connections.contains_identifier(1, "identifier1")); + } + + #[test] + fn reserve_space_capacity_reached() { + let rpc_connections = RpcConnections::new(2); + + // Reserve identifier for connection 1. + let reserved = rpc_connections.reserve_space(1); + assert!(reserved.is_some()); + assert_eq!(1, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + + // Add identifier for connection 1. + let reserved = reserved.unwrap(); + let registered = reserved.register("identifier1".to_string()).unwrap(); + assert!(rpc_connections.contains_identifier(1, "identifier1")); + assert_eq!(1, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + + // Reserve identifier for connection 1 again. + let reserved = rpc_connections.reserve_space(1); + assert!(reserved.is_some()); + assert_eq!(2, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + + // Add identifier for connection 1 again. + let reserved = reserved.unwrap(); + let registered_second = reserved.register("identifier2".to_string()).unwrap(); + assert!(rpc_connections.contains_identifier(1, "identifier2")); + assert_eq!(2, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + + // Cannot reserve more identifiers. + let reserved = rpc_connections.reserve_space(1); + assert!(reserved.is_none()); + + // Drop the first identifier. + drop(registered); + assert_eq!(1, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + assert!(rpc_connections.contains_identifier(1, "identifier2")); + assert!(!rpc_connections.contains_identifier(1, "identifier1")); + + // Can reserve again after clearing the space. + let reserved = rpc_connections.reserve_space(1); + assert!(reserved.is_some()); + assert_eq!(2, rpc_connections.data.lock().get(&1).unwrap().num_identifiers); + + // Ensure data is cleared. + drop(reserved); + drop(registered_second); + assert!(rpc_connections.data.lock().get(&1).is_none()); + } +} diff --git a/substrate/client/rpc-spec-v2/src/common/mod.rs b/substrate/client/rpc-spec-v2/src/common/mod.rs index ac1af8fce3c..3167561d649 100644 --- a/substrate/client/rpc-spec-v2/src/common/mod.rs +++ b/substrate/client/rpc-spec-v2/src/common/mod.rs @@ -13,5 +13,6 @@ //! Common types and functionality for the RPC-V2 spec. +pub mod connections; pub mod events; pub mod storage; -- GitLab