diff --git a/Cargo.lock b/Cargo.lock index 0d55487f8f19a7947831a47f0c27d5b963c7b194..e00cd343c50e35b936f963c451f035f76c400ae0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6187,6 +6187,24 @@ dependencies = [ "testnet-parachains-constants", ] +[[package]] +name = "governor" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "821239e5672ff23e2a7060901fa622950bbd80b649cdaadd78d1c1767ed14eb4" +dependencies = [ + "cfg-if", + "dashmap", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot 0.12.1", + "quanta", + "rand", + "smallvec", +] + [[package]] name = "group" version = "0.13.0" @@ -7865,6 +7883,15 @@ dependencies = [ "libc", ] +[[package]] +name = "mach2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b955cdeb2a02b9117f121ce63aa52d08ade45de53e48fe6a38b39c10f6f709" +dependencies = [ + "libc", +] + [[package]] name = "macro_magic" version = "0.5.0" @@ -8531,6 +8558,12 @@ dependencies = [ "libc", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "no-std-net" version = "0.6.0" @@ -8781,6 +8814,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -14352,6 +14391,22 @@ dependencies = [ "thiserror", ] +[[package]] +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" +dependencies = [ + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -14512,6 +14567,15 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "rawpointer" version = "0.2.1" @@ -16496,6 +16560,7 @@ name = "sc-rpc-server" version = "11.0.0" dependencies = [ "futures", + "governor", "http", "hyper", "jsonrpsee", diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs index aa2c4af97dfddd83e447bbf92f3b133daa30ce75..229bde4d19c54c1077ef8d3f16f85426dd1b0397 100644 --- a/cumulus/test/service/src/lib.rs +++ b/cumulus/test/service/src/lib.rs @@ -801,6 +801,7 @@ pub fn node_config( rpc_max_subs_per_conn: Default::default(), rpc_port: 9945, rpc_message_buffer_capacity: Default::default(), + rpc_rate_limit: None, prometheus_config: None, telemetry_endpoints: None, default_heap_pages: None, diff --git a/polkadot/node/test/service/src/lib.rs b/polkadot/node/test/service/src/lib.rs index e4eec32baf2abc3d766f6daf444528ea3ad2a528..a3af977ab3e4a8a68fee7b4c269720ef1a2e8af7 100644 --- a/polkadot/node/test/service/src/lib.rs +++ b/polkadot/node/test/service/src/lib.rs @@ -186,6 +186,7 @@ pub fn node_config( rpc_max_subs_per_conn: Default::default(), rpc_port: 9944, rpc_message_buffer_capacity: Default::default(), + rpc_rate_limit: None, prometheus_config: None, telemetry_endpoints: None, default_heap_pages: None, diff --git a/prdoc/pr_3301.prdoc b/prdoc/pr_3301.prdoc new file mode 100644 index 0000000000000000000000000000000000000000..19d1c5f1c18907cd1996603aade66990c358efeb --- /dev/null +++ b/prdoc/pr_3301.prdoc @@ -0,0 +1,11 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: rpc server add rate limiting. + +doc: + - audience: Node Operator + description: | + Add rate limiting for RPC server which can be utilized by the CLI `--rpc-rate-limit <calls per minute>` + The rate-limiting is disabled by default. +crates: [ ] diff --git a/substrate/bin/node/cli/benches/block_production.rs b/substrate/bin/node/cli/benches/block_production.rs index c17c12dfef13e49662fe1ed8c73a9499e00535ec..d28cfbddfd222fa0400735e0a72090a2e814aa58 100644 --- a/substrate/bin/node/cli/benches/block_production.rs +++ b/substrate/bin/node/cli/benches/block_production.rs @@ -84,6 +84,7 @@ fn new_node(tokio_handle: Handle) -> node_cli::service::NewFullBase { rpc_max_subs_per_conn: Default::default(), rpc_port: 9944, rpc_message_buffer_capacity: Default::default(), + rpc_rate_limit: None, prometheus_config: None, telemetry_endpoints: None, default_heap_pages: None, diff --git a/substrate/bin/node/cli/benches/transaction_pool.rs b/substrate/bin/node/cli/benches/transaction_pool.rs index 0d0d3a072d89dd18ddf0362bc4b73dab54f8df51..1e25b7ce6fd876deff024808cf4694a0f1b5d82f 100644 --- a/substrate/bin/node/cli/benches/transaction_pool.rs +++ b/substrate/bin/node/cli/benches/transaction_pool.rs @@ -80,6 +80,7 @@ fn new_node(tokio_handle: Handle) -> node_cli::service::NewFullBase { rpc_max_subs_per_conn: Default::default(), rpc_port: 9944, rpc_message_buffer_capacity: Default::default(), + rpc_rate_limit: None, prometheus_config: None, telemetry_endpoints: None, default_heap_pages: None, diff --git a/substrate/client/cli/src/commands/run_cmd.rs b/substrate/client/cli/src/commands/run_cmd.rs index f7b0fc51049106306ccff6d0d8fedc4db48868a6..ecff7eead7b673ba0986e360ea46ff0be5bede93 100644 --- a/substrate/client/cli/src/commands/run_cmd.rs +++ b/substrate/client/cli/src/commands/run_cmd.rs @@ -34,7 +34,10 @@ use sc_service::{ ChainSpec, Role, }; use sc_telemetry::TelemetryEndpoints; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + num::NonZeroU32, +}; /// The `run` command used to run a node. #[derive(Debug, Clone, Parser)] @@ -82,6 +85,15 @@ pub struct RunCmd { )] pub rpc_methods: RpcMethods, + /// RPC rate limiting (calls/minute) for each connection. + /// + /// This is disabled by default. + /// + /// For example `--rpc-rate-limit 10` will maximum allow + /// 10 calls per minute per connection. + #[arg(long)] + pub rpc_rate_limit: Option<NonZeroU32>, + /// Set the maximum RPC request payload size for both HTTP and WS in megabytes. #[arg(long, default_value_t = RPC_DEFAULT_MAX_REQUEST_SIZE_MB)] pub rpc_max_request_size: u32, @@ -399,6 +411,10 @@ impl CliConfiguration for RunCmd { Ok(self.rpc_max_subscriptions_per_connection) } + fn rpc_rate_limit(&self) -> Result<Option<NonZeroU32>> { + Ok(self.rpc_rate_limit) + } + fn transaction_pool(&self, is_dev: bool) -> Result<TransactionPoolOptions> { Ok(self.pool_config.transaction_pool(is_dev)) } diff --git a/substrate/client/cli/src/config.rs b/substrate/client/cli/src/config.rs index defcc4a8a69078513ffacf667e6de7a7dd67f6dd..78015cf8373d1e44f020304d8e258cad742f3adc 100644 --- a/substrate/client/cli/src/config.rs +++ b/substrate/client/cli/src/config.rs @@ -33,7 +33,7 @@ use sc_service::{ BlocksPruning, ChainSpec, TracingReceiver, }; use sc_tracing::logging::LoggerBuilder; -use std::{net::SocketAddr, path::PathBuf}; +use std::{net::SocketAddr, num::NonZeroU32, path::PathBuf}; /// The maximum number of characters for a node name. pub(crate) const NODE_NAME_MAX_LENGTH: usize = 64; @@ -338,6 +338,11 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized { Ok(RPC_DEFAULT_MESSAGE_CAPACITY_PER_CONN) } + /// Rate limit calls per minute. + fn rpc_rate_limit(&self) -> Result<Option<NonZeroU32>> { + Ok(None) + } + /// Get the prometheus configuration (`None` if disabled) /// /// By default this is `None`. @@ -510,6 +515,7 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized { rpc_max_subs_per_conn: self.rpc_max_subscriptions_per_connection()?, rpc_port: DCV::rpc_listen_port(), rpc_message_buffer_capacity: self.rpc_buffer_capacity_per_connection()?, + rpc_rate_limit: self.rpc_rate_limit()?, prometheus_config: self .prometheus_config(DCV::prometheus_listen_port(), &chain_spec)?, telemetry_endpoints, diff --git a/substrate/client/cli/src/runner.rs b/substrate/client/cli/src/runner.rs index e37c8ab0e55163f72a4ec79dbc3c8c221205ac84..b4937db71e690bd02c89a4ee1dd4f0b389a7e366 100644 --- a/substrate/client/cli/src/runner.rs +++ b/substrate/client/cli/src/runner.rs @@ -271,6 +271,7 @@ mod tests { rpc_max_subs_per_conn: Default::default(), rpc_message_buffer_capacity: Default::default(), rpc_port: 9944, + rpc_rate_limit: None, prometheus_config: None, telemetry_endpoints: None, default_heap_pages: None, diff --git a/substrate/client/rpc-servers/Cargo.toml b/substrate/client/rpc-servers/Cargo.toml index 7dd525ada6533fe227402cf8f4318d6fc0d11d65..3e04b927a222f5c35d7afdefd63263e69fa666b1 100644 --- a/substrate/client/rpc-servers/Cargo.toml +++ b/substrate/client/rpc-servers/Cargo.toml @@ -27,3 +27,4 @@ http = "0.2.8" hyper = "0.14.27" futures = "0.3.29" pin-project = "1.1.3" +governor = "0.6.0" diff --git a/substrate/client/rpc-servers/src/lib.rs b/substrate/client/rpc-servers/src/lib.rs index 29b34b2945b13d2b5ed108e6507d92e6131ad7df..a22b7309ac1923cf58e99e0fbe59ec57d04feaf0 100644 --- a/substrate/client/rpc-servers/src/lib.rs +++ b/substrate/client/rpc-servers/src/lib.rs @@ -22,7 +22,9 @@ pub mod middleware; -use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration}; +use std::{ + convert::Infallible, error::Error as StdError, net::SocketAddr, num::NonZeroU32, time::Duration, +}; use http::header::HeaderValue; use hyper::{ @@ -31,10 +33,7 @@ use hyper::{ }; use jsonrpsee::{ server::{ - middleware::{ - http::{HostFilterLayer, ProxyGetRequestLayer}, - rpc::RpcServiceBuilder, - }, + middleware::http::{HostFilterLayer, ProxyGetRequestLayer}, stop_channel, ws, PingConfig, StopHandle, TowerServiceBuilder, }, Methods, RpcModule, @@ -43,11 +42,14 @@ use tokio::net::TcpListener; use tower::Service; use tower_http::cors::{AllowOrigin, CorsLayer}; -pub use jsonrpsee::core::{ - id_providers::{RandomIntegerIdProvider, RandomStringIdProvider}, - traits::IdProvider, +pub use jsonrpsee::{ + core::{ + id_providers::{RandomIntegerIdProvider, RandomStringIdProvider}, + traits::IdProvider, + }, + server::middleware::rpc::RpcServiceBuilder, }; -pub use middleware::{MetricsLayer, RpcMetrics}; +pub use middleware::{MetricsLayer, RateLimitLayer, RpcMetrics}; const MEGABYTE: u32 = 1024 * 1024; @@ -79,12 +81,26 @@ pub struct Config<'a, M: Send + Sync + 'static> { pub id_provider: Option<Box<dyn IdProvider>>, /// Tokio runtime handle. pub tokio_handle: tokio::runtime::Handle, + /// Rate limit calls per minute. + pub rate_limit: Option<NonZeroU32>, +} + +#[derive(Debug, Clone)] +struct PerConnection<RpcMiddleware, HttpMiddleware> { + methods: Methods, + stop_handle: StopHandle, + metrics: Option<RpcMetrics>, + tokio_handle: tokio::runtime::Handle, + service_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>, } /// Start RPC server listening on given address. -pub async fn start_server<M: Send + Sync + 'static>( +pub async fn start_server<M>( config: Config<'_, M>, -) -> Result<Server, Box<dyn StdError + Send + Sync>> { +) -> Result<Server, Box<dyn StdError + Send + Sync>> +where + M: Send + Sync, +{ let Config { addrs, cors, @@ -97,6 +113,7 @@ pub async fn start_server<M: Send + Sync + 'static>( id_provider, tokio_handle, rpc_api, + rate_limit, } = config; let std_listener = TcpListener::bind(addrs.as_slice()).await?.into_std()?; @@ -153,7 +170,13 @@ pub async fn start_server<M: Send + Sync + 'static>( let transport_label = if is_websocket { "ws" } else { "http" }; let metrics = metrics.map(|m| MetricsLayer::new(m, transport_label)); - let rpc_middleware = RpcServiceBuilder::new().option_layer(metrics.clone()); + let rate_limit = rate_limit.map(|r| RateLimitLayer::per_minute(r)); + + // NOTE: The metrics needs to run first to include rate-limited calls in the + // metrics. + let rpc_middleware = + RpcServiceBuilder::new().option_layer(metrics.clone()).option_layer(rate_limit); + let mut svc = service_builder.set_rpc_middleware(rpc_middleware).build(methods, stop_handle); @@ -245,12 +268,3 @@ fn format_cors(maybe_cors: Option<&Vec<String>>) -> String { format!("{:?}", ["*"]) } } - -#[derive(Clone)] -struct PerConnection<RpcMiddleware, HttpMiddleware> { - methods: Methods, - stop_handle: StopHandle, - metrics: Option<RpcMetrics>, - tokio_handle: tokio::runtime::Handle, - service_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>, -} diff --git a/substrate/client/rpc-servers/src/middleware/mod.rs b/substrate/client/rpc-servers/src/middleware/mod.rs index 1c1930582441b75d281c6916e6e3092ed0d5818e..cac516913d327f46ccf6ba1f81c7fb447873509b 100644 --- a/substrate/client/rpc-servers/src/middleware/mod.rs +++ b/substrate/client/rpc-servers/src/middleware/mod.rs @@ -18,6 +18,10 @@ //! JSON-RPC specific middleware. +/// Grafana metrics middleware. pub mod metrics; +/// Rate limit middleware. +pub mod rate_limit; pub use metrics::*; +pub use rate_limit::*; diff --git a/substrate/client/rpc-servers/src/middleware/rate_limit.rs b/substrate/client/rpc-servers/src/middleware/rate_limit.rs new file mode 100644 index 0000000000000000000000000000000000000000..cdcc3ebf66f7d0ab9d2ba515b0c128b55281f590 --- /dev/null +++ b/substrate/client/rpc-servers/src/middleware/rate_limit.rs @@ -0,0 +1,107 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! RPC rate limiting middleware. + +use std::{num::NonZeroU32, sync::Arc, time::Duration}; + +use futures::future::{BoxFuture, FutureExt}; +use governor::{ + clock::{Clock, DefaultClock, QuantaClock}, + middleware::NoOpMiddleware, + state::{InMemoryState, NotKeyed}, + Jitter, +}; +use jsonrpsee::{ + server::middleware::rpc::RpcServiceT, + types::{ErrorObject, Id, Request}, + MethodResponse, +}; + +type RateLimitInner = governor::RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>; + +const MAX_JITTER: Duration = Duration::from_millis(50); +const MAX_RETRIES: usize = 10; + +/// JSON-RPC rate limit middleware layer. +#[derive(Debug, Clone)] +pub struct RateLimitLayer(governor::Quota); + +impl RateLimitLayer { + /// Create new rate limit enforced per minute. + pub fn per_minute(n: NonZeroU32) -> Self { + Self(governor::Quota::per_minute(n)) + } +} + +/// JSON-RPC rate limit middleware +pub struct RateLimit<S> { + service: S, + rate_limit: Arc<RateLimitInner>, + clock: QuantaClock, +} + +impl<S> tower::Layer<S> for RateLimitLayer { + type Service = RateLimit<S>; + + fn layer(&self, service: S) -> Self::Service { + let clock = QuantaClock::default(); + RateLimit { + service, + rate_limit: Arc::new(RateLimitInner::direct_with_clock(self.0, &clock)), + clock, + } + } +} + +impl<'a, S> RpcServiceT<'a> for RateLimit<S> +where + S: Send + Sync + RpcServiceT<'a> + Clone + 'static, +{ + type Future = BoxFuture<'a, MethodResponse>; + + fn call(&self, req: Request<'a>) -> Self::Future { + let service = self.service.clone(); + let rate_limit = self.rate_limit.clone(); + let clock = self.clock.clone(); + + async move { + let mut attempts = 0; + let jitter = Jitter::up_to(MAX_JITTER); + + loop { + if attempts >= MAX_RETRIES { + break reject_too_many_calls(req.id); + } + + if let Err(rejected) = rate_limit.check() { + tokio::time::sleep(jitter + rejected.wait_time_from(clock.now())).await; + } else { + break service.call(req).await; + } + + attempts += 1; + } + } + .boxed() + } +} + +fn reject_too_many_calls(id: Id) -> MethodResponse { + MethodResponse::error(id, ErrorObject::owned(-32999, "RPC rate limit exceeded", None::<()>)) +} diff --git a/substrate/client/service/src/config.rs b/substrate/client/service/src/config.rs index 3e68f5b58defc40b802b359e68b116a1fd734e15..74c5dd775b0eeca74e4cba35f5816448436cd68a 100644 --- a/substrate/client/service/src/config.rs +++ b/substrate/client/service/src/config.rs @@ -39,6 +39,7 @@ use sp_core::crypto::SecretString; use std::{ io, iter, net::SocketAddr, + num::NonZeroU32, path::{Path, PathBuf}, }; use tempfile::TempDir; @@ -102,6 +103,8 @@ pub struct Configuration { pub rpc_port: u16, /// The number of messages the JSON-RPC server is allowed to keep in memory. pub rpc_message_buffer_capacity: u32, + /// RPC rate limit per minute. + pub rpc_rate_limit: Option<NonZeroU32>, /// Prometheus endpoint configuration. `None` if disabled. pub prometheus_config: Option<PrometheusConfig>, /// Telemetry service URL. `None` if disabled. diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index 1fbfd14c3beba8ce6cbffecc412fd9dda8403eec..d0adf4f7d5c561707d061855732c7262b838a8fc 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -403,6 +403,7 @@ where id_provider: rpc_id_provider, cors: config.rpc_cors.as_ref(), tokio_handle: config.tokio_handle.clone(), + rate_limit: config.rpc_rate_limit, }; // TODO: https://github.com/paritytech/substrate/issues/13773 diff --git a/substrate/client/service/test/src/lib.rs b/substrate/client/service/test/src/lib.rs index 9b88300bf53048502f2579891886f2832ba6d530..6148bb05fcfbf99323c8b92d2c369c201094e373 100644 --- a/substrate/client/service/test/src/lib.rs +++ b/substrate/client/service/test/src/lib.rs @@ -254,6 +254,7 @@ fn node_config< rpc_max_subs_per_conn: Default::default(), rpc_port: 9944, rpc_message_buffer_capacity: Default::default(), + rpc_rate_limit: None, prometheus_config: None, telemetry_endpoints: None, default_heap_pages: None,