Unverified Commit 19aaf656 authored by Igor Aleksanov's avatar Igor Aleksanov Committed by GitHub
Browse files

Make it possible to disable batch requests support (#744)

parent 816ecca5
Pipeline #191778 passed with stages
in 5 minutes and 31 seconds
......@@ -45,7 +45,7 @@ use jsonrpsee_core::server::helpers::{collect_batch_response, prepare_error, Met
use jsonrpsee_core::server::resource_limiting::Resources;
use jsonrpsee_core::server::rpc_module::{MethodKind, Methods};
use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use jsonrpsee_types::error::ErrorCode;
use jsonrpsee_types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use jsonrpsee_types::{Id, Notification, Params, Request};
use serde_json::value::RawValue;
use tokio::net::{TcpListener, ToSocketAddrs};
......@@ -57,6 +57,7 @@ pub struct Builder<M = ()> {
resources: Resources,
max_request_body_size: u32,
max_response_body_size: u32,
batch_requests_supported: bool,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
middleware: M,
......@@ -67,6 +68,7 @@ impl Default for Builder {
Self {
max_request_body_size: TEN_MB_SIZE_BYTES,
max_response_body_size: TEN_MB_SIZE_BYTES,
batch_requests_supported: true,
resources: Resources::default(),
access_control: AccessControl::default(),
tokio_runtime: None,
......@@ -112,6 +114,7 @@ impl<M> Builder<M> {
Builder {
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
batch_requests_supported: self.batch_requests_supported,
resources: self.resources,
access_control: self.access_control,
tokio_runtime: self.tokio_runtime,
......@@ -137,6 +140,13 @@ impl<M> Builder<M> {
self
}
/// Enables or disables support of [batch requests](https://www.jsonrpc.org/specification#batch).
/// By default, support is enabled.
pub fn batch_requests_supported(mut self, supported: bool) -> Self {
self.batch_requests_supported = supported;
self
}
/// Register a new resource kind. Errors if `label` is already registered, or if the number of
/// registered resources on this server instance would exceed 8.
///
......@@ -199,6 +209,7 @@ impl<M> Builder<M> {
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
batch_requests_supported: self.batch_requests_supported,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
......@@ -241,6 +252,7 @@ impl<M> Builder<M> {
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
batch_requests_supported: self.batch_requests_supported,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
......@@ -274,6 +286,7 @@ impl<M> Builder<M> {
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
batch_requests_supported: self.batch_requests_supported,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
......@@ -323,6 +336,8 @@ pub struct Server<M = ()> {
max_request_body_size: u32,
/// Max response body size.
max_response_body_size: u32,
/// Whether batch requests are supported by this server or not.
batch_requests_supported: bool,
/// Access control
access_control: AccessControl,
/// Tracker for currently used resources on the server
......@@ -347,6 +362,7 @@ impl<M: Middleware> Server<M> {
let listener = self.listener;
let resources = self.resources;
let middleware = self.middleware;
let batch_requests_supported = self.batch_requests_supported;
let methods = methods.into().initialize_resources(&resources)?;
let make_service = make_service_fn(move |_| {
......@@ -405,6 +421,7 @@ impl<M: Middleware> Server<M> {
resources,
max_request_body_size,
max_response_body_size,
batch_requests_supported,
)
.await?;
......@@ -494,6 +511,7 @@ async fn process_validated_request(
resources: Resources,
max_request_body_size: u32,
max_response_body_size: u32,
batch_requests_supported: bool,
) -> Result<hyper::Response<hyper::Body>, HyperError> {
let (parts, body) = request.into_parts();
......@@ -570,7 +588,14 @@ async fn process_validated_request(
}
// Batch of requests or notifications
} else if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&body) {
if !batch.is_empty() {
if !batch_requests_supported {
// Server was configured to not support batches.
is_single = true;
sink.send_error(
Id::Null,
ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None),
);
} else if !batch.is_empty() {
let middleware = &middleware;
join_all(batch.into_iter().filter_map(move |req| {
......
......@@ -448,3 +448,25 @@ async fn can_set_the_max_response_size() {
handle.stop().unwrap();
}
#[tokio::test]
async fn disabled_batches() {
let addr = "127.0.0.1:0";
// Disable batches support.
let server = HttpServerBuilder::default().batch_requests_supported(false).build(addr).await.unwrap();
let mut module = RpcModule::new(());
module.register_method("should_ok", |_, _ctx| Ok("ok")).unwrap();
let addr = server.local_addr().unwrap();
let uri = to_http_uri(addr);
let handle = server.start(module).unwrap();
// Send a valid batch.
let req = r#"[
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":1},
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":2}
]"#;
let response = http_request(req.into(), uri.clone()).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response.body, batches_not_supported());
handle.stop().unwrap();
}
......@@ -75,6 +75,10 @@ pub fn oversized_request() -> String {
r#"{"jsonrpc":"2.0","error":{"code":-32701,"message":"Request is too big"},"id":null}"#.into()
}
pub fn batches_not_supported() -> String {
r#"{"jsonrpc":"2.0","error":{"code":-32005,"message":"Batched requests are not supported by this server"},"id":null}"#.into()
}
pub fn oversized_response(id: Id, max_limit: u32) -> String {
format!(
r#"{{"jsonrpc":"2.0","error":{{"code":-32702,"message":"Response is too big","data":"Exceeded max limit {}"}},"id":{}}}"#,
......
......@@ -180,6 +180,8 @@ pub const UNKNOWN_ERROR_CODE: i32 = -32001;
pub const SUBSCRIPTION_CLOSED: i32 = -32003;
/// Subscription got closed by the server.
pub const SUBSCRIPTION_CLOSED_WITH_ERROR: i32 = -32004;
/// Batched requests are not supported by the server.
pub const BATCHES_NOT_SUPPORTED_CODE: i32 = -32005;
/// Parse error message
pub const PARSE_ERROR_MSG: &str = "Parse error";
......@@ -199,6 +201,8 @@ pub const METHOD_NOT_FOUND_MSG: &str = "Method not found";
pub const SERVER_IS_BUSY_MSG: &str = "Server is busy, try again later";
/// Reserved for implementation-defined server-errors.
pub const SERVER_ERROR_MSG: &str = "Server error";
/// Batched requests not supported error message.
pub const BATCHES_NOT_SUPPORTED_MSG: &str = "Batched requests are not supported by this server";
/// JSONRPC error code
#[derive(Error, Debug, PartialEq, Copy, Clone)]
......
......@@ -31,7 +31,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use crate::future::{FutureDriver, ServerHandle, StopMonitor};
use crate::types::error::ErrorCode;
use crate::types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use crate::types::{Id, Request};
use futures_channel::mpsc;
use futures_util::future::{join_all, FutureExt};
......@@ -270,6 +270,7 @@ where
resources.clone(),
cfg.max_request_body_size,
cfg.max_response_body_size,
cfg.batch_requests_supported,
BoundedSubscriptions::new(cfg.max_subscriptions_per_connection),
stop_monitor.clone(),
middleware,
......@@ -292,6 +293,7 @@ async fn background_task(
resources: Resources,
max_request_body_size: u32,
max_response_body_size: u32,
batch_requests_supported: bool,
bounded_subscriptions: BoundedSubscriptions,
stop_server: StopMonitor,
middleware: impl Middleware,
......@@ -490,7 +492,13 @@ async fn background_task(
if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&d) {
tracing::debug!("recv batch len={}", batch.len());
tracing::trace!("recv: batch={:?}", batch);
if !batch.is_empty() {
if !batch_requests_supported {
sink.send_error(
Id::Null,
ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None),
);
middleware.on_response(request_start);
} else if !batch.is_empty() {
join_all(batch.into_iter().filter_map(move |req| {
let id = req.id.clone();
let params = Params::new(req.params.map(|params| params.get()));
......@@ -656,6 +664,8 @@ struct Settings {
allowed_origins: AllowedValue,
/// Policy by which to accept or deny incoming requests based on the `Host` header.
allowed_hosts: AllowedValue,
/// Whether batch requests are supported by this server or not.
batch_requests_supported: bool,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
}
......@@ -667,6 +677,7 @@ impl Default for Settings {
max_response_body_size: TEN_MB_SIZE_BYTES,
max_subscriptions_per_connection: 1024,
max_connections: MAX_CONNECTIONS,
batch_requests_supported: true,
allowed_origins: AllowedValue::Any,
allowed_hosts: AllowedValue::Any,
tokio_runtime: None,
......@@ -720,6 +731,13 @@ impl<M> Builder<M> {
self
}
/// Enables or disables support of [batch requests](https://www.jsonrpc.org/specification#batch).
/// By default, support is enabled.
pub fn batch_requests_supported(mut self, supported: bool) -> Self {
self.settings.batch_requests_supported = supported;
self
}
/// Set the maximum number of connections allowed. Default is 1024.
pub fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
self.settings.max_subscriptions_per_connection = max;
......
......@@ -692,3 +692,32 @@ async fn custom_subscription_id_works() {
let unsub = client.send_request_text(call("unsubscribe_hello", vec!["0xdeadbeef"], Id::Num(1))).await.unwrap();
assert_eq!(&unsub, r#"{"jsonrpc":"2.0","result":true,"id":1}"#);
}
#[tokio::test]
async fn disabled_batches() {
// Disable batches support.
let server = WsServerBuilder::default()
.batch_requests_supported(false)
.build("127.0.0.1:0")
.with_default_timeout()
.await
.unwrap()
.unwrap();
let mut module = RpcModule::new(());
module.register_method("should_ok", |_, _ctx| Ok("ok")).unwrap();
let addr = server.local_addr().unwrap();
let handle = server.start(module).unwrap();
// Send a valid batch.
let mut client = WebSocketTestClient::new(addr).with_default_timeout().await.unwrap().unwrap();
let req = r#"[
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":1},
{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":2}
]"#;
let response = client.send_request_text(req).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response, batches_not_supported());
handle.stop().unwrap();
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment