Unverified Commit 000492cb authored by Niklas Adolfsson's avatar Niklas Adolfsson
Browse files

refactor http server

parent e9252ac6
Pipeline #198749 failed with stages
in 61 minutes and 25 seconds
......@@ -67,7 +67,7 @@ pub type AsyncMethod<'a> = Arc<
pub type SubscriptionMethod<'a> =
Arc<dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState) -> BoxFuture<'a, MethodResponse>>;
// Method callback to unsubscribe.
type UnsubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, ConnectionId) -> MethodResponse>;
type UnsubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, ConnectionId, MaxResponseSize) -> MethodResponse>;
/// Connection ID, used for stateful protocol such as WebSockets.
/// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value.
......@@ -430,7 +430,7 @@ impl Methods {
let conn_state = ConnState { conn_id: 0, close_notify, id_provider: &RandomIntegerIdProvider };
(cb)(id, params, sink.clone(), conn_state).await
}
Some(MethodKind::Unsubscription(cb)) => (cb)(id, params, 0),
Some(MethodKind::Unsubscription(cb)) => (cb)(id, params, 0, usize::MAX),
};
tracing::trace!("[Methods::inner_call]: method: `{}` result: {:?}", req.method, result);
......@@ -745,7 +745,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
{
self.methods.mut_callbacks().insert(
unsubscribe_method_name,
MethodCallback::new_unsubscription(Arc::new(move |id, params, conn_id| {
MethodCallback::new_unsubscription(Arc::new(move |id, params, conn_id, max_response_size| {
let sub_id = match params.one::<RpcSubscriptionId>() {
Ok(sub_id) => sub_id,
Err(_) => {
......@@ -771,8 +771,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
);
}
MethodResponse::response(id, result, 999)
MethodResponse::response(id, result, max_response_size)
})),
);
}
......
......@@ -32,7 +32,7 @@ use std::task::{Context, Poll};
use crate::response;
use crate::response::{internal_error, malformed};
use futures_channel::mpsc;
use futures_util::{future, future::join_all, stream::StreamExt, FutureExt};
use futures_util::{stream::StreamExt, FutureExt};
use hyper::header::{HeaderMap, HeaderValue};
use hyper::server::conn::AddrStream;
use hyper::server::{conn::AddrIncoming, Builder as HyperBuilder};
......@@ -42,8 +42,8 @@ use jsonrpsee_core::error::{Error, GenericTransportError};
use jsonrpsee_core::http_helpers::{self, read_body};
use jsonrpsee_core::middleware::Middleware;
use jsonrpsee_core::server::access_control::AccessControl;
use jsonrpsee_core::server::helpers::{collect_batch_response, MethodSink};
use jsonrpsee_core::server::helpers::{prepare_error, MethodResponse};
use jsonrpsee_core::server::helpers::{BatchResponse, BatchResponseBuilder};
use jsonrpsee_core::server::resource_limiting::Resources;
use jsonrpsee_core::server::rpc_module::{MethodKind, Methods};
use jsonrpsee_core::TEN_MB_SIZE_BYTES;
......@@ -52,6 +52,8 @@ use jsonrpsee_types::{Id, Notification, Params, Request};
use serde_json::value::RawValue;
use tokio::net::{TcpListener, ToSocketAddrs};
type Notif<'a> = Notification<'a, Option<&'a RawValue>>;
/// Builder to create JSON-RPC HTTP server.
#[derive(Debug)]
pub struct Builder<M = ()> {
......@@ -584,150 +586,45 @@ async fn process_validated_request(
let request_start = middleware.on_request(remote_addr, &parts.headers);
type Notif<'a> = Notification<'a, Option<&'a RawValue>>;
// Single request or notification
if is_single {
if let Ok(req) = serde_json::from_slice::<Request>(&body) {
let method = req.method.as_ref();
let id = req.id.clone();
let params = Params::new(req.params.map(|params| params.get()));
middleware.on_call(method, params.clone());
let result = match methods.method_with_name(method) {
None => MethodResponse::error(req.id, ErrorObject::from(ErrorCode::MethodNotFound)),
Some((name, method_callback)) => match method_callback.inner() {
MethodKind::Sync(callback) => match method_callback.claim(&req.method, &resources) {
Ok(guard) => {
let result = (callback)(id, params, max_response_body_size as usize);
drop(guard);
result
}
Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
MethodResponse::error(req.id, ErrorObject::from(ErrorCode::ServerIsBusy))
}
},
MethodKind::Async(callback) => match method_callback.claim(name, &resources) {
Ok(guard) => {
let result = (callback)(
id.into_owned(),
params.into_owned(),
0,
max_response_body_size as usize,
Some(guard),
)
.await;
result
}
Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
MethodResponse::error(req.id, ErrorObject::from(ErrorCode::ServerIsBusy))
}
},
MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
tracing::error!("Subscriptions not supported on HTTP");
MethodResponse::error(req.id, ErrorObject::from(ErrorCode::InternalError))
}
},
};
middleware.on_response(&result.result, request_start);
middleware.on_result(&req.method, result.success, request_start);
Ok(response::ok_response(result.result))
} else if let Ok(_req) = serde_json::from_slice::<Notif>(&body) {
Ok(response::ok_response("".into()))
} else {
let (id, code) = prepare_error(&body);
let response = MethodResponse::error(id, ErrorObject::from(code));
Ok(response::ok_response(response.result))
}
let call = CallData {
conn_id: 0,
middleware: &middleware,
methods: &&methods,
max_response_body_size,
resources: &&resources,
request_start,
};
let response = process_single_request(body, call).await;
middleware.on_response(&response.result, request_start);
Ok(response::ok_response(response.result))
}
// Batch of requests or notifications
} else if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&body) {
if !batch_requests_supported {
let err = MethodResponse::error(
Id::Null,
ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None),
);
return Ok(response::ok_response(err.result));
} else if !batch.is_empty() {
let batch: Vec<_> = batch.into_iter().map(|b| (b, methods.clone(), resources.clone())).collect();
let batch_stream = futures_util::stream::iter(batch);
let mut result = batch_stream
.fold(String::from("["), |mut response, (req, methods, resources)| async move {
let id = req.id.clone().into_owned();
let params = Params::new(req.params.map(|params| params.get())).into_owned();
let r = match methods.method_with_name(&req.method) {
None => MethodResponse::error(req.id, ErrorObject::from(ErrorCode::MethodNotFound)),
Some((name, method_callback)) => match method_callback.inner() {
MethodKind::Sync(callback) => match method_callback.claim(&req.method, &resources) {
Ok(guard) => (callback)(id, params, max_response_body_size as usize),
Err(err) => {
tracing::error!(
"[Methods::execute_with_resources] failed to lock resources: {:?}",
err
);
MethodResponse::error(req.id, ErrorObject::from(ErrorCode::ServerIsBusy))
}
},
MethodKind::Async(callback) => match method_callback.claim(name, &resources) {
Ok(guard) => {
(callback)(
id.into_owned(),
params.into_owned(),
0,
max_response_body_size as usize,
Some(guard),
)
.await
}
Err(err) => {
tracing::error!(
"[Methods::execute_with_resources] failed to lock resources: {:?}",
err
);
MethodResponse::error(req.id, ErrorObject::from(ErrorCode::ServerIsBusy))
}
},
MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
tracing::error!("Subscriptions not supported on HTTP");
MethodResponse::error(req.id, ErrorObject::from(ErrorCode::InternalError))
}
},
};
response.push_str(&r.result);
response.push_str(",");
response
})
.await;
result.pop();
result.push(']');
Ok(response::ok_response(result))
} else {
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
let err = MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest));
Ok(response::ok_response(err.result))
}
} else if let Ok(_batch) = serde_json::from_slice::<Vec<Notif>>(&body) {
Ok(response::ok_response("".into()))
} else {
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
let (id, code) = prepare_error(&body);
let err = MethodResponse::error(id, ErrorObject::from(code));
else if !batch_requests_supported {
let err = MethodResponse::error(
Id::Null,
ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None),
);
Ok(response::ok_response(err.result))
}
// Batch of requests or notifications
else {
let response = process_batch_request(Batch {
data: body,
call: CallData {
conn_id: 0,
middleware: &middleware,
methods: &&methods,
max_response_body_size,
resources: &&resources,
request_start,
},
})
.await;
middleware.on_response(&response.result, request_start);
Ok(response::ok_response(response.result))
}
}
async fn process_health_request(
......@@ -742,7 +639,7 @@ async fn process_health_request(
let r = match methods.method_with_name(&health_api.method) {
None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)),
Some((name, method_callback)) => match method_callback.inner() {
Some((_name, method_callback)) => match method_callback.inner() {
MethodKind::Sync(callback) => (callback)(Id::Number(0), Params::new(None), max_response_body_size as usize),
MethodKind::Async(callback) => {
(callback)(Id::Number(0), Params::new(None), 0, max_response_body_size as usize, None).await
......@@ -771,3 +668,135 @@ async fn process_health_request(
Ok(response::internal_error())
}
}
#[derive(Debug, Clone)]
struct Batch<'a, M: Middleware> {
data: Vec<u8>,
call: CallData<'a, M>,
}
#[derive(Debug, Clone)]
struct CallData<'a, M: Middleware> {
conn_id: usize,
middleware: &'a M,
methods: &'a Methods,
max_response_body_size: u32,
resources: &'a Resources,
request_start: M::Instant,
}
#[derive(Debug, Clone)]
struct Call<'a, M: Middleware> {
params: Params<'a>,
name: &'a str,
call: CallData<'a, M>,
id: Id<'a>,
}
// Batch responses must be sent back as a single message so we read the results from each
// request in the batch and read the results off of a new channel, `rx_batch`, and then send the
// complete batch response back to the client over `tx`.
async fn process_batch_request<'a, M>(b: Batch<'a, M>) -> BatchResponse
where
M: Middleware,
{
let Batch { data, call } = b;
if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&data) {
tracing::debug!("recv batch len={}", batch.len());
tracing::trace!("recv: batch={:?}", batch);
return if !batch.is_empty() {
let batch = batch.into_iter().map(|req| (req, call.clone()));
let batch_stream = futures_util::stream::iter(batch);
let batch_response = batch_stream
.fold(BatchResponseBuilder::new(), |mut batch_response, (req, call)| async move {
let params = Params::new(req.params.map(|params| params.get()));
let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await;
batch_response.append(&response);
batch_response
})
.await;
batch_response.finish()
} else {
BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
};
}
if let Ok(batch) = serde_json::from_slice::<Vec<Notif>>(&data) {
if !batch.is_empty() {
return BatchResponse { result: "".to_string(), success: true };
}
}
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
let (id, code) = prepare_error(&data);
BatchResponse::error(id, ErrorObject::from(code))
}
async fn process_single_request<'a, M: Middleware>(data: Vec<u8>, call: CallData<'a, M>) -> MethodResponse {
if let Ok(req) = serde_json::from_slice::<Request>(&data) {
tracing::debug!("recv method call={}", req.method);
tracing::trace!("recv: req={:?}", req);
let params = Params::new(req.params.map(|params| params.get()));
let name = &req.method;
let id = req.id;
execute_call(Call { name, params, id, call }).await
} else if let Ok(_req) = serde_json::from_slice::<Notif>(&data) {
MethodResponse { result: String::new(), success: true }
} else {
let (id, code) = prepare_error(&data);
MethodResponse::error(id, ErrorObject::from(code))
}
}
async fn execute_call<'a, M: Middleware>(c: Call<'a, M>) -> MethodResponse {
let Call { name, id, params, call } = c;
let CallData { resources, methods, middleware, max_response_body_size, conn_id, request_start } = call;
middleware.on_call(name, params.clone());
let response = match methods.method_with_name(name) {
None => MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)),
Some((name, method)) => match &method.inner() {
MethodKind::Sync(callback) => match method.claim(name, &resources) {
Ok(guard) => {
let r = (callback)(id, params, max_response_body_size as usize);
drop(guard);
r
}
Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
}
},
MethodKind::Async(callback) => match method.claim(name, &resources) {
Ok(guard) => {
let id = id.into_owned();
let params = params.into_owned();
(callback)(id, params, conn_id, max_response_body_size as usize, Some(guard)).await
}
Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
}
},
MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
tracing::error!("Subscriptions not supported on HTTP");
MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
}
},
};
middleware.on_result(name, response.success, request_start);
response
}
......@@ -35,15 +35,14 @@ use crate::future::{FutureDriver, ServerHandle, StopMonitor};
use crate::types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use crate::types::{Id, Request};
use futures_channel::mpsc;
use futures_util::future::{join_all, Either, FutureExt};
use futures_util::future::{Either, FutureExt};
use futures_util::io::{BufReader, BufWriter};
use futures_util::stream::StreamExt;
use jsonrpsee_core::id_providers::RandomIntegerIdProvider;
use jsonrpsee_core::middleware::Middleware;
use jsonrpsee_core::server::access_control::AccessControl;
use jsonrpsee_core::server::helpers::{
collect_batch_response, prepare_error, BatchResponse, BatchResponseBuilder, BoundedSubscriptions, MethodResponse,
MethodSink,
prepare_error, BatchResponse, BatchResponseBuilder, BoundedSubscriptions, MethodResponse, MethodSink,
};
use jsonrpsee_core::server::resource_limiting::Resources;
use jsonrpsee_core::server::rpc_module::{ConnState, ConnectionId, MethodKind, Methods};
......@@ -905,7 +904,7 @@ async fn execute_call<'a, M: Middleware>(c: Call<'a, M>) -> MethodResponse {
},
MethodKind::Unsubscription(callback) => {
// Don't adhere to any resource or subscription limits; always let unsubscribing happen!
callback(id, params, conn_id)
callback(id, params, conn_id, max_response_body_size as usize)
}
},
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment