Unverified Commit 326d0c91 authored by David's avatar David Committed by GitHub
Browse files

Don't allocate until we know it's worth it (#420)

* Sniff the first byte to glean if the incoming request is a single or batch request

This works around the serde limitations around `untagged` enums and `RawValue`.

* fmt

* For http server, check first byte before allocating space for the body

Also, rework the way we return errors: prefer JSON-RPC errors according to spec (application/json) wherever sensible.

* Review feedback

* Don't assume there is a first byte to read

* ty clipyp

* Review concerns

* Cleanup
parent 32811d3c
......@@ -59,11 +59,11 @@ impl HttpTransportClient {
}
}
/// Send serialized message and wait until all bytes from the HTTP message body is read.
/// Send serialized message and wait until all bytes from the HTTP message body have been read.
pub(crate) async fn send_and_read_body(&self, body: String) -> Result<Vec<u8>, Error> {
let response = self.inner_send(body).await?;
let (parts, body) = response.into_parts();
let body = http_helpers::read_response_to_body(&parts.headers, body, self.max_request_body_size).await?;
let (body, _) = http_helpers::read_body(&parts.headers, body, self.max_request_body_size).await?;
Ok(body)
}
......@@ -95,6 +95,10 @@ pub(crate) enum Error {
/// Request body too large.
#[error("The request body was too large")]
RequestTooLarge,
/// Malformed request.
#[error("Malformed request")]
Malformed,
}
impl<T> From<GenericTransportError<T>> for Error
......@@ -104,6 +108,7 @@ where
fn from(err: GenericTransportError<T>) -> Self {
match err {
GenericTransportError::<T>::TooLarge => Self::RequestTooLarge,
GenericTransportError::<T>::Malformed => Self::Malformed,
GenericTransportError::<T>::Inner(e) => Self::Http(Box::new(e)),
}
}
......
......@@ -26,51 +26,92 @@
//! Contains common builders for hyper responses.
/// Create a response for plaintext internal error.
pub fn internal_error<T: Into<String>>(msg: T) -> hyper::Response<hyper::Body> {
from_template(hyper::StatusCode::INTERNAL_SERVER_ERROR, format!("Internal Server Error: {}", msg.into()))
use crate::types::v2::{
error::{JsonRpcError, JsonRpcErrorCode},
params::{Id, TwoPointZero},
};
const JSON: &str = "application/json; charset=utf-8";
const TEXT: &str = "text/plain";
/// Create a response for json internal error.
pub fn internal_error() -> hyper::Response<hyper::Body> {
let error = serde_json::to_string(&JsonRpcError {
jsonrpc: TwoPointZero,
error: JsonRpcErrorCode::InternalError.into(),
id: Id::Null,
})
.expect("built from known-good data; qed");
from_template(hyper::StatusCode::INTERNAL_SERVER_ERROR, error, JSON)
}
/// Create a response for not allowed hosts.
/// Create a text/plain response for not allowed hosts.
pub fn host_not_allowed() -> hyper::Response<hyper::Body> {
from_template(hyper::StatusCode::FORBIDDEN, "Provided Host header is not whitelisted.\n".to_owned())
from_template(hyper::StatusCode::FORBIDDEN, "Provided Host header is not whitelisted.\n".to_owned(), TEXT)
}
/// Create a response for disallowed method used.
/// Create a text/plain response for disallowed method used.
pub fn method_not_allowed() -> hyper::Response<hyper::Body> {
from_template(
hyper::StatusCode::METHOD_NOT_ALLOWED,
"Used HTTP Method is not allowed. POST or OPTIONS is required\n".to_owned(),
TEXT,
)
}
/// CORS invalid
/// Create a text/plain response for invalid CORS "Origin" headers.
pub fn invalid_allow_origin() -> hyper::Response<hyper::Body> {
from_template(
hyper::StatusCode::FORBIDDEN,
"Origin of the request is not whitelisted. CORS headers would not be sent and any side-effects were cancelled as well.\n".to_owned(),
TEXT,
)
}
/// CORS header invalid
/// Create a text/plain response for invalid CORS "Allow-*" headers.
pub fn invalid_allow_headers() -> hyper::Response<hyper::Body> {
from_template(
hyper::StatusCode::FORBIDDEN,
"Requested headers are not allowed for CORS. CORS headers would not be sent and any side-effects were cancelled as well.\n".to_owned(),
TEXT,
)
}
/// Create a response for too large (413)
pub fn too_large<S: Into<String>>(msg: S) -> hyper::Response<hyper::Body> {
from_template(hyper::StatusCode::PAYLOAD_TOO_LARGE, msg.into())
/// Create a json response for oversized requests (413)
pub fn too_large() -> hyper::Response<hyper::Body> {
let error = serde_json::to_string(&JsonRpcError {
jsonrpc: TwoPointZero,
error: JsonRpcErrorCode::OversizedRequest.into(),
id: Id::Null,
})
.expect("built from known-good data; qed");
from_template(hyper::StatusCode::PAYLOAD_TOO_LARGE, error, JSON)
}
/// Create a text response for a template.
fn from_template(status: hyper::StatusCode, body: String) -> hyper::Response<hyper::Body> {
/// Create a json response for empty or malformed requests (400)
pub fn malformed() -> hyper::Response<hyper::Body> {
let error = serde_json::to_string(&JsonRpcError {
jsonrpc: TwoPointZero,
error: JsonRpcErrorCode::ParseError.into(),
id: Id::Null,
})
.expect("built from known-good data; qed");
from_template(hyper::StatusCode::BAD_REQUEST, error, JSON)
}
/// Create a response body.
fn from_template<S: Into<hyper::Body>>(
status: hyper::StatusCode,
body: S,
content_type: &'static str,
) -> hyper::Response<hyper::Body> {
hyper::Response::builder()
.status(status)
.header("content-type", hyper::header::HeaderValue::from_static("text/plain; charset=utf-8"))
.body(hyper::Body::from(body))
.header("content-type", hyper::header::HeaderValue::from_static(content_type))
.body(body.into())
// Parsing `StatusCode` and `HeaderValue` is infalliable but
// parsing body content is not.
.expect("Unable to parse response body for type conversion")
......@@ -78,11 +119,5 @@ fn from_template(status: hyper::StatusCode, body: String) -> hyper::Response<hyp
/// Create a valid JSON response.
pub fn ok_response(body: String) -> hyper::Response<hyper::Body> {
hyper::Response::builder()
.status(hyper::StatusCode::OK)
.header("content-type", hyper::header::HeaderValue::from_static("application/json; charset=utf-8"))
.body(hyper::Body::from(body))
// Parsing `StatusCode` and `HeaderValue` is infalliable but
// parsing body content is not.
.expect("Unable to parse response body for type conversion")
from_template(hyper::StatusCode::OK, body, JSON)
}
......@@ -41,7 +41,7 @@ use jsonrpsee_types::{
},
TEN_MB_SIZE_BYTES,
};
use jsonrpsee_utils::http_helpers::read_response_to_body;
use jsonrpsee_utils::http_helpers::read_body;
use jsonrpsee_utils::server::{
helpers::{collect_batch_response, prepare_error, send_error},
rpc_module::Methods,
......@@ -201,60 +201,61 @@ impl Server {
}
let (parts, body) = request.into_parts();
let body = match read_response_to_body(&parts.headers, body, max_request_body_size).await {
Ok(body) => body,
Err(GenericTransportError::TooLarge) => {
return Ok::<_, HyperError>(response::too_large("The request was too large"))
}
let (body, mut is_single) = match read_body(&parts.headers, body, max_request_body_size).await {
Ok(r) => r,
Err(GenericTransportError::TooLarge) => return Ok::<_, HyperError>(response::too_large()),
Err(GenericTransportError::Malformed) => return Ok::<_, HyperError>(response::malformed()),
Err(GenericTransportError::Inner(e)) => {
return Ok::<_, HyperError>(response::internal_error(e.to_string()))
log::error!("Internal error reading request body: {}", e);
return Ok::<_, HyperError>(response::internal_error());
}
};
// NOTE(niklasad1): it's a channel because it's needed for batch requests.
let (tx, mut rx) = mpsc::unbounded::<String>();
// Is this a single request or a batch (or error)?
let mut single = true;
type Notif<'a> = JsonRpcNotification<'a, Option<&'a RawValue>>;
match body.get(0) {
// Single request or notification
Some(b'{') => {
if let Ok(req) = serde_json::from_slice::<JsonRpcRequest>(&body) {
// NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here.
methods.execute(&tx, req, 0).await;
} else if let Ok(_req) = serde_json::from_slice::<Notif>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
let (id, code) = prepare_error(&body);
send_error(id, &tx, code.into());
}
// Single request or notification
if is_single {
if let Ok(req) = serde_json::from_slice::<JsonRpcRequest>(&body) {
// NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here.
methods.execute(&tx, req, 0).await;
} else if let Ok(_req) = serde_json::from_slice::<Notif>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
let (id, code) = prepare_error(&body);
send_error(id, &tx, code.into());
}
// Bacth of requests or notifications
Some(b'[') => {
if let Ok(batch) = serde_json::from_slice::<Vec<JsonRpcRequest>>(&body) {
if !batch.is_empty() {
single = false;
for req in batch {
methods.execute(&tx, req, 0).await;
}
} else {
send_error(Id::Null, &tx, JsonRpcErrorCode::InvalidRequest.into());
}
} else if let Ok(_batch) = serde_json::from_slice::<Vec<Notif>>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
let (id, code) = prepare_error(&body);
send_error(id, &tx, code.into());
// Batch of requests or notifications
} else if let Ok(batch) = serde_json::from_slice::<Vec<JsonRpcRequest>>(&body) {
if !batch.is_empty() {
for req in batch {
methods.execute(&tx, req, 0).await;
}
} else {
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
is_single = true;
send_error(Id::Null, &tx, JsonRpcErrorCode::InvalidRequest.into());
}
// Garbage request
_ => send_error(Id::Null, &tx, JsonRpcErrorCode::ParseError.into()),
} else if let Ok(_batch) = serde_json::from_slice::<Vec<Notif>>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
is_single = true;
let (id, code) = prepare_error(&body);
send_error(id, &tx, code.into());
}
// Closes the receiving half of a channel without dropping it. This prevents any further
// messages from being sent on the channel.
rx.close();
let response = if single {
let response = if is_single {
rx.next().await.expect("Sender is still alive managed by us above; qed")
} else {
collect_batch_response(rx).await
......
......@@ -123,6 +123,9 @@ pub enum GenericTransportError<T: std::error::Error + Send + Sync> {
/// Request was too large.
#[error("The request was too big")]
TooLarge,
/// Malformed request
#[error("Malformed request")]
Malformed,
/// Concrete transport error.
#[error("Transport error: {0}")]
Inner(T),
......
......@@ -29,15 +29,16 @@
use futures_util::stream::StreamExt;
use jsonrpsee_types::error::GenericTransportError;
/// Read a hyper response with configured `HTTP` settings.
/// Read a data from a [`hyper::Body`] and return the data if it is valid and within the allowed size range.
///
/// Returns `Ok(bytes)` if the body was in valid size range.
/// Returns `Ok((bytes, single))` if the body was in valid size range; and a bool indicating whether the JSON-RPC
/// request is a single or a batch.
/// Returns `Err` if the body was too large or the body couldn't be read.
pub async fn read_response_to_body(
pub async fn read_body(
headers: &hyper::HeaderMap,
mut body: hyper::Body,
max_request_body_size: u32,
) -> Result<Vec<u8>, GenericTransportError<hyper::Error>> {
) -> Result<(Vec<u8>, bool), GenericTransportError<hyper::Error>> {
// NOTE(niklasad1): Values bigger than `u32::MAX` will be turned into zero here. This is unlikely to occur in practice
// and for that case we fallback to allocating in the while-loop below instead of pre-allocating.
let body_size = read_header_content_length(headers).unwrap_or(0);
......@@ -46,7 +47,21 @@ pub async fn read_response_to_body(
return Err(GenericTransportError::TooLarge);
}
let first_chunk =
body.next().await.ok_or(GenericTransportError::Malformed)?.map_err(GenericTransportError::Inner)?;
if first_chunk.len() > max_request_body_size as usize {
return Err(GenericTransportError::TooLarge);
}
let single = match first_chunk.get(0) {
Some(b'{') => true,
Some(b'[') => false,
_ => return Err(GenericTransportError::Malformed),
};
let mut received_data = Vec::with_capacity(body_size as usize);
received_data.extend_from_slice(&first_chunk);
while let Some(chunk) = body.next().await {
let chunk = chunk.map_err(GenericTransportError::Inner)?;
......@@ -56,7 +71,7 @@ pub async fn read_response_to_body(
}
received_data.extend_from_slice(&chunk);
}
Ok(received_data)
Ok((received_data, single))
}
/// Read the `Content-Length` HTTP Header. Must fit into a `u32`; returns `None` otherwise.
......@@ -90,13 +105,13 @@ pub fn read_header_values<'a>(
#[cfg(test)]
mod tests {
use super::{read_header_content_length, read_response_to_body};
use super::{read_body, read_header_content_length};
#[tokio::test]
async fn body_to_bytes_size_limit_works() {
let headers = hyper::header::HeaderMap::new();
let body = hyper::Body::from(vec![0; 128]);
assert!(read_response_to_body(&headers, body, 127).await.is_err());
assert!(read_body(&headers, body, 127).await.is_err());
}
#[test]
......
......@@ -42,7 +42,7 @@ pub fn send_error(id: Id, tx: &MethodSink, error: JsonRpcErrorObject) {
/// Figure out if this is a sufficiently complete request that we can extract an [`Id`] out of, or just plain
/// unparseable garbage.
pub fn prepare_error(data: &[u8]) -> (Id<'_>, JsonRpcErrorCode) {
match serde_json::from_slice::<JsonRpcInvalidRequest>(&data) {
match serde_json::from_slice::<JsonRpcInvalidRequest>(data) {
Ok(JsonRpcInvalidRequest { id }) => (id, JsonRpcErrorCode::InvalidRequest),
Err(_) => (Id::Null, JsonRpcErrorCode::ParseError),
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment