Unverified Commit 108794b5 authored by Niklas Adolfsson's avatar Niklas Adolfsson
Browse files

fix nits

parent 2784beef
Pipeline #199762 passed with stages
in 5 minutes and 23 seconds
......@@ -28,7 +28,7 @@ use std::io;
use std::sync::Arc;
use crate::tracing::tx_log_from_str;
use crate::{Error};
use crate::Error;
use futures_channel::mpsc;
use jsonrpsee_types::error::{ErrorCode, ErrorObject, ErrorResponse, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG};
use jsonrpsee_types::{Id, InvalidRequest, Response};
......@@ -107,39 +107,6 @@ impl MethodSink {
self.tx.is_closed()
}
/// Send a JSON-RPC response to the client. If the serialization of `result` exceeds `max_response_size`,
/// an error will be sent instead.
pub fn send_response(&self, id: Id, result: impl Serialize) -> bool {
let mut writer = BoundedWriter::new(self.max_response_size as usize);
let json = match serde_json::to_writer(&mut writer, &Response::new(result, id.clone())) {
Ok(_) => {
// Safety - serde_json does not emit invalid UTF-8.
unsafe { String::from_utf8_unchecked(writer.into_bytes()) }
}
Err(err) => {
tracing::error!("Error serializing response: {:?}", err);
if err.is_io() {
let data = format!("Exceeded max limit of {}", self.max_response_size);
let err = ErrorObject::owned(OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG, Some(data));
return self.send_error(id, err);
} else {
return self.send_error(id, ErrorCode::InternalError.into());
}
}
};
tx_log_from_str(&json, self.max_log_length);
if let Err(err) = self.send_raw(json) {
tracing::warn!("Error sending response {:?}", err);
false
} else {
true
}
}
/// Send a JSON-RPC error to the client
pub fn send_error(&self, id: Id, error: ErrorObject) -> bool {
let json = match serde_json::to_string(&ErrorResponse::borrowed(error, id)) {
......@@ -167,9 +134,9 @@ impl MethodSink {
/// Send a raw JSON-RPC message to the client, `MethodSink` does not check verify the validity
/// of the JSON being sent.
pub fn send_raw(&self, raw_json: String) -> Result<(), mpsc::TrySendError<String>> {
tracing::trace!("send: {:?}", raw_json);
self.tx.unbounded_send(raw_json)
pub fn send_raw(&self, json: String) -> Result<(), mpsc::TrySendError<String>> {
tx_log_from_str(&json, self.max_log_length);
self.tx.unbounded_send(json)
}
/// Close the channel for any further messages.
......@@ -248,9 +215,9 @@ impl BoundedSubscriptions {
/// Represent the response to method call.
#[derive(Debug)]
pub struct MethodResponse {
/// Serialized response,
/// Serialized JSON-RPC response,
pub result: String,
/// Status indicates whether the call was successful or or.
/// Indicates whether the call was successful or not.
pub success: bool,
}
......@@ -294,7 +261,7 @@ impl MethodResponse {
/// Builder to build a `BatchResponse`.
#[derive(Debug)]
pub struct BatchResponseBuilder {
/// Formatted JSON-RPC response.
/// Serialized JSON-RPC response,
result: String,
/// Indicates whether the call was successful or not.
success: bool,
......
......@@ -425,6 +425,8 @@ impl<M: Middleware> Server<M> {
async move {
Ok::<_, HyperError>(service_fn(move |request| {
let request_start = middleware.on_request(remote_addr, &request.headers());
let methods = methods.clone();
let acl = acl.clone();
let resources = resources.clone();
......@@ -497,7 +499,7 @@ impl<M: Middleware> Server<M> {
max_response_body_size,
max_log_length,
batch_requests_supported,
remote_addr,
request_start,
)
.await?;
......@@ -513,8 +515,7 @@ impl<M: Middleware> Server<M> {
middleware,
methods,
max_response_body_size,
request.headers(),
remote_addr,
request_start,
max_log_length,
)
.await
......@@ -578,16 +579,16 @@ fn is_json(content_type: Option<&hyper::header::HeaderValue>) -> bool {
}
/// Process a verified request, it implies a POST request with content type JSON.
async fn process_validated_request(
async fn process_validated_request<M: Middleware>(
request: hyper::Request<hyper::Body>,
middleware: impl Middleware,
middleware: M,
methods: Methods,
resources: Resources,
max_request_body_size: u32,
max_response_body_size: u32,
max_log_length: u32,
batch_requests_supported: bool,
remote_addr: SocketAddr,
request_start: M::Instant,
) -> Result<hyper::Response<hyper::Body>, HyperError> {
let (parts, body) = request.into_parts();
......@@ -601,8 +602,6 @@ async fn process_validated_request(
}
};
let request_start = middleware.on_request(remote_addr, &parts.headers);
// Single request or notification
if is_single {
let call = CallData {
......@@ -610,6 +609,7 @@ async fn process_validated_request(
middleware: &middleware,
methods: &methods,
max_response_body_size,
max_log_length,
resources: &resources,
request_start,
};
......@@ -623,6 +623,7 @@ async fn process_validated_request(
Id::Null,
ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None),
);
middleware.on_response(&err.result, request_start);
Ok(response::ok_response(err.result))
}
// Batch of requests or notifications
......@@ -634,6 +635,7 @@ async fn process_validated_request(
middleware: &middleware,
methods: &methods,
max_response_body_size,
max_log_length,
resources: &resources,
request_start,
},
......@@ -644,16 +646,15 @@ async fn process_validated_request(
}
}
async fn process_health_request(
async fn process_health_request<M: Middleware>(
health_api: &HealthApi,
middleware: impl Middleware,
middleware: M,
methods: Methods,
max_response_body_size: u32,
headers: &HeaderMap,
remote_addr: SocketAddr,
request_start: M::Instant,
max_log_length: u32,
) -> Result<hyper::Response<hyper::Body>, HyperError> {
let r = match methods.method_with_name(&health_api.method) {
let response = match methods.method_with_name(&health_api.method) {
None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)),
Some((_name, method_callback)) => match method_callback.inner() {
MethodKind::Sync(callback) => (callback)(Id::Number(0), Params::new(None), max_response_body_size as usize),
......@@ -667,14 +668,17 @@ async fn process_health_request(
},
};
if r.success {
middleware.on_result(&health_api.method, response.success, request_start);
middleware.on_response(&response.result, request_start);
if response.success {
#[derive(serde::Deserialize)]
struct RpcPayload<'a> {
#[serde(borrow)]
result: &'a serde_json::value::RawValue,
}
let payload: RpcPayload = serde_json::from_str(&r.result)
let payload: RpcPayload = serde_json::from_str(&response.result)
.expect("valid JSON-RPC response must have a result field and be valid JSON; qed");
Ok(response::ok_response(payload.result.to_string()))
} else {
......@@ -694,6 +698,7 @@ struct CallData<'a, M: Middleware> {
middleware: &'a M,
methods: &'a Methods,
max_response_body_size: u32,
max_log_length: u32,
resources: &'a Resources,
request_start: M::Instant,
}
......@@ -716,8 +721,6 @@ where
let Batch { data, call } = b;
if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&data) {
tracing::debug!("recv batch len={}", batch.len());
tracing::trace!("recv: batch={:?}", batch);
return if !batch.is_empty() {
let batch = batch.into_iter().map(|req| (req, call.clone()));
......@@ -758,8 +761,6 @@ where
async fn process_single_request<M: Middleware>(data: Vec<u8>, call: CallData<'_, M>) -> MethodResponse {
if let Ok(req) = serde_json::from_slice::<Request>(&data) {
tracing::debug!("recv method call={}", req.method);
tracing::trace!("recv: req={:?}", req);
let params = Params::new(req.params.map(|params| params.get()));
let name = &req.method;
let id = req.id;
......@@ -775,7 +776,8 @@ async fn process_single_request<M: Middleware>(data: Vec<u8>, call: CallData<'_,
async fn execute_call<M: Middleware>(c: Call<'_, M>) -> MethodResponse {
let Call { name, id, params, call } = c;
let CallData { resources, methods, middleware, max_response_body_size, conn_id, request_start } = call;
let CallData { resources, methods, middleware, max_response_body_size, max_log_length, conn_id, request_start } =
call;
middleware.on_call(name, params.clone());
......
......@@ -488,6 +488,7 @@ async fn background_task<M: Middleware>(input: BackgroundTask<'_, M>) -> Result<
conn_id,
resources: &resources,
max_response_body_size,
max_log_length,
methods: &methods,
bounded_subscriptions,
sink: &sink,
......@@ -532,6 +533,7 @@ async fn background_task<M: Middleware>(input: BackgroundTask<'_, M>) -> Result<
conn_id,
resources,
max_response_body_size,
max_log_length,
methods,
bounded_subscriptions,
sink: &sink,
......@@ -828,6 +830,7 @@ struct CallData<'a, M: Middleware> {
middleware: &'a M,
methods: &'a Methods,
max_response_body_size: u32,
max_log_length: u32,
resources: &'a Resources,
sink: &'a MethodSink,
request_start: M::Instant,
......@@ -851,8 +854,6 @@ where
let Batch { data, call } = b;
if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&data) {
tracing::debug!("recv batch len={}", batch.len());
tracing::trace!("recv: batch={:?}", batch);
return if !batch.is_empty() {
let batch = batch.into_iter().map(|req| (req, call.clone()));
......@@ -890,8 +891,6 @@ async fn process_single_request<M: Middleware>(
call: CallData<'_, M>,
) -> (MethodResponse, Option<oneshot::Sender<()>>) {
if let Ok(req) = serde_json::from_slice::<Request>(&data) {
tracing::debug!("recv method call={}", req.method);
tracing::trace!("recv: req={:?}", req);
let params = Params::new(req.params.map(|params| params.get()));
let name = &req.method;
let id = req.id;
......@@ -918,6 +917,7 @@ async fn execute_call<M: Middleware>(c: Call<'_, M>) -> (MethodResponse, Option<
methods,
middleware,
max_response_body_size,
max_log_length,
conn_id,
bounded_subscriptions,
id_provider,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment