// Copyright 2019-2020 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any // person obtaining a copy of this software and associated // documentation files (the "Software"), to deal in the // Software without restriction, including without // limitation the rights to use, copy, modify, merge, // publish, distribute, sublicense, and/or sell copies of // the Software, and to permit persons to whom the Software // is furnished to do so, subject to the following // conditions: // // The above copyright notice and this permission notice // shall be included in all copies or substantial portions // of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF // ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED // TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A // PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT // SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. use crate::{response, AccessControl}; use futures_channel::mpsc; use futures_util::{lock::Mutex, stream::StreamExt, SinkExt}; use hyper::{ server::{conn::AddrIncoming, Builder as HyperBuilder}, service::{make_service_fn, service_fn}, Error as HyperError, }; use jsonrpsee_types::{ error::{Error, GenericTransportError}, v2::{ error::JsonRpcErrorCode, params::Id, request::{JsonRpcNotification, JsonRpcRequest}, }, TEN_MB_SIZE_BYTES, }; use jsonrpsee_utils::http_helpers::read_body; use jsonrpsee_utils::server::{ helpers::{collect_batch_response, prepare_error, send_error}, rpc_module::Methods, }; use serde_json::value::RawValue; use socket2::{Domain, Socket, Type}; use std::{ cmp, net::{SocketAddr, TcpListener}, sync::Arc, }; /// Builder to create JSON-RPC HTTP server. #[derive(Debug)] pub struct Builder { access_control: AccessControl, max_request_body_size: u32, keep_alive: bool, } impl Builder { /// Sets the maximum size of a request body in bytes (default is 10 MiB). pub fn max_request_body_size(mut self, size: u32) -> Self { self.max_request_body_size = size; self } /// Sets access control settings. pub fn set_access_control(mut self, acl: AccessControl) -> Self { self.access_control = acl; self } /// Enables or disables HTTP keep-alive. /// /// Default is true. pub fn keep_alive(mut self, keep_alive: bool) -> Self { self.keep_alive = keep_alive; self } /// Finalizes the configuration of the server. pub fn build(self, addr: SocketAddr) -> Result { let domain = Domain::for_address(addr); let socket = Socket::new(domain, Type::STREAM, None)?; socket.set_nodelay(true)?; socket.set_reuse_address(true)?; socket.set_nonblocking(true)?; socket.set_keepalive(self.keep_alive)?; let address = addr.into(); socket.bind(&address)?; socket.listen(128)?; let listener: TcpListener = socket.into(); let local_addr = listener.local_addr().ok(); let listener = hyper::Server::from_tcp(listener)?; let stop_pair = mpsc::channel(1); Ok(Server { listener, local_addr, methods: Methods::default(), access_control: self.access_control, max_request_body_size: self.max_request_body_size, stop_pair, stop_handle: Arc::new(Mutex::new(())), }) } } impl Default for Builder { fn default() -> Self { Self { max_request_body_size: TEN_MB_SIZE_BYTES, access_control: AccessControl::default(), keep_alive: true } } } /// Handle used to stop the running server. #[derive(Debug, Clone)] pub struct StopHandle { stop_sender: mpsc::Sender<()>, stop_handle: Arc>, } impl StopHandle { /// Requests server to stop. Returns an error if server was already stopped. pub async fn stop(&mut self) -> Result<(), Error> { self.stop_sender.send(()).await.map_err(|_| Error::AlreadyStopped) } /// Blocks indefinitely until the server is stopped. pub async fn wait_for_stop(&self) { self.stop_handle.lock().await; } } /// An HTTP JSON RPC server. #[derive(Debug)] pub struct Server { /// Hyper server. listener: HyperBuilder, /// Local address local_addr: Option, /// Registered methods. methods: Methods, /// Max request body size. max_request_body_size: u32, /// Access control access_control: AccessControl, /// Pair of channels to stop the server. stop_pair: (mpsc::Sender<()>, mpsc::Receiver<()>), /// Stop handle that indicates whether server has been stopped. stop_handle: Arc>, } impl Server { /// Returns socket address to which the server is bound. pub fn local_addr(&self) -> Result { self.local_addr.ok_or_else(|| Error::Custom("Local address not found".into())) } /// Returns the handle to stop the running server. pub fn stop_handle(&self) -> StopHandle { StopHandle { stop_sender: self.stop_pair.0.clone(), stop_handle: self.stop_handle.clone() } } /// Start the server. pub async fn start(self, methods: impl Into) -> Result<(), Error> { // Lock the stop mutex so existing stop handles can wait for server to stop. // It will be unlocked once this function returns. let _stop_handle = self.stop_handle.lock().await; let max_request_body_size = self.max_request_body_size; let access_control = self.access_control; let mut stop_receiver = self.stop_pair.1; let methods = methods.into(); let make_service = make_service_fn(move |_| { let methods = methods.clone(); let access_control = access_control.clone(); async move { Ok::<_, HyperError>(service_fn(move |request| { let methods = methods.clone(); let access_control = access_control.clone(); // Run some validation on the http request, then read the body and try to deserialize it into one of // two cases: a single RPC request or a batch of RPC requests. async move { if let Err(e) = access_control_is_valid(&access_control, &request) { return Ok::<_, HyperError>(e); } if let Err(e) = content_type_is_valid(&request) { return Ok::<_, HyperError>(e); } let (parts, body) = request.into_parts(); let (body, mut is_single) = match read_body(&parts.headers, body, max_request_body_size).await { Ok(r) => r, Err(GenericTransportError::TooLarge) => return Ok::<_, HyperError>(response::too_large()), Err(GenericTransportError::Malformed) => return Ok::<_, HyperError>(response::malformed()), Err(GenericTransportError::Inner(e)) => { log::error!("Internal error reading request body: {}", e); return Ok::<_, HyperError>(response::internal_error()); } }; // NOTE(niklasad1): it's a channel because it's needed for batch requests. let (tx, mut rx) = mpsc::unbounded::(); type Notif<'a> = JsonRpcNotification<'a, Option<&'a RawValue>>; // Single request or notification if is_single { if let Ok(req) = serde_json::from_slice::(&body) { // NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here. methods.execute(&tx, req, 0).await; } else if let Ok(_req) = serde_json::from_slice::(&body) { return Ok::<_, HyperError>(response::ok_response("".into())); } else { let (id, code) = prepare_error(&body); send_error(id, &tx, code.into()); } // Batch of requests or notifications } else if let Ok(batch) = serde_json::from_slice::>(&body) { if !batch.is_empty() { for req in batch { methods.execute(&tx, req, 0).await; } } else { // "If the batch rpc call itself fails to be recognized as an valid JSON or as an // Array with at least one value, the response from the Server MUST be a single // Response object." – The Spec. is_single = true; send_error(Id::Null, &tx, JsonRpcErrorCode::InvalidRequest.into()); } } else if let Ok(_batch) = serde_json::from_slice::>(&body) { return Ok::<_, HyperError>(response::ok_response("".into())); } else { // "If the batch rpc call itself fails to be recognized as an valid JSON or as an // Array with at least one value, the response from the Server MUST be a single // Response object." – The Spec. is_single = true; let (id, code) = prepare_error(&body); send_error(id, &tx, code.into()); } // Closes the receiving half of a channel without dropping it. This prevents any further // messages from being sent on the channel. rx.close(); let response = if is_single { rx.next().await.expect("Sender is still alive managed by us above; qed") } else { collect_batch_response(rx).await }; log::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]); Ok::<_, HyperError>(response::ok_response(response)) } })) } }); let server = self.listener.serve(make_service); server .with_graceful_shutdown(async move { stop_receiver.next().await; }) .await .map_err(Into::into) } } // Checks to that access control of the received request is the same as configured. fn access_control_is_valid( access_control: &AccessControl, request: &hyper::Request, ) -> Result<(), hyper::Response> { if access_control.deny_host(request) { return Err(response::host_not_allowed()); } if access_control.deny_cors_origin(request) { return Err(response::invalid_allow_origin()); } if access_control.deny_cors_header(request) { return Err(response::invalid_allow_headers()); } Ok(()) } /// Checks that content type of received request is valid for JSON-RPC. fn content_type_is_valid(request: &hyper::Request) -> Result<(), hyper::Response> { match *request.method() { hyper::Method::POST if is_json(request.headers().get("content-type")) => Ok(()), _ => Err(response::method_not_allowed()), } } /// Returns true if the `content_type` header indicates a valid JSON message. fn is_json(content_type: Option<&hyper::header::HeaderValue>) -> bool { match content_type.and_then(|val| val.to_str().ok()) { Some(content) if content.eq_ignore_ascii_case("application/json") || content.eq_ignore_ascii_case("application/json; charset=utf-8") || content.eq_ignore_ascii_case("application/json;charset=utf-8") => { true } _ => false, } }