server.rs 11.1 KiB
Newer Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

David's avatar
David committed
use crate::{response, AccessControl};
use futures_util::future::join_all;
use futures_util::{lock::Mutex, stream::StreamExt, SinkExt};
use hyper::{
	server::{conn::AddrIncoming, Builder as HyperBuilder},
	service::{make_service_fn, service_fn},
	Error as HyperError,
David's avatar
David committed
use jsonrpsee_types::{
	error::{Error, GenericTransportError},
David's avatar
David committed
	v2::{ErrorCode, Id, Notification, Request},
David's avatar
David committed
	TEN_MB_SIZE_BYTES,
};
use jsonrpsee_utils::http_helpers::read_body;
use jsonrpsee_utils::server::{
	helpers::{collect_batch_response, prepare_error, send_error},
Maciej Hirsz's avatar
Maciej Hirsz committed
	resource_limiting::Resources,
	rpc_module::Methods,
use serde_json::value::RawValue;
use socket2::{Domain, Socket, Type};
use std::{
	cmp,
	net::{SocketAddr, TcpListener},
	sync::Arc,
/// Builder to create JSON-RPC HTTP server.
pub struct Builder {
	access_control: AccessControl,
Maciej Hirsz's avatar
Maciej Hirsz committed
	resources: Resources,
	max_request_body_size: u32,
	keep_alive: bool,
impl Builder {
	/// Sets the maximum size of a request body in bytes (default is 10 MiB).
	pub fn max_request_body_size(mut self, size: u32) -> Self {
		self.max_request_body_size = size;
		self
	}
	/// Sets access control settings.
	pub fn set_access_control(mut self, acl: AccessControl) -> Self {
		self.access_control = acl;
		self
	}
	/// Enables or disables HTTP keep-alive.
	///
	/// Default is true.
	pub fn keep_alive(mut self, keep_alive: bool) -> Self {
		self.keep_alive = keep_alive;
		self
	}
Maciej Hirsz's avatar
Maciej Hirsz committed
	/// Register a new resource kind. Errors if `label` is already registered, or if number of
	/// registered resources would exceed 8.
	pub fn register_resource(mut self, label: &'static str, capacity: u16, default: u16) -> Result<Self, Error> {
		self.resources.register(label, capacity, default)?;

		Ok(self)
	}

	/// Finalizes the configuration of the server.
	pub fn build(self, addr: SocketAddr) -> Result<Server, Error> {
		let domain = Domain::for_address(addr);
		let socket = Socket::new(domain, Type::STREAM, None)?;
		socket.set_nodelay(true)?;
		socket.set_reuse_address(true)?;
		socket.set_nonblocking(true)?;
		socket.set_keepalive(self.keep_alive)?;
		let address = addr.into();
		socket.bind(&address)?;

		socket.listen(128)?;
		let listener: TcpListener = socket.into();
		let local_addr = listener.local_addr().ok();

		let listener = hyper::Server::from_tcp(listener)?;

		let stop_pair = mpsc::channel(1);
		Ok(Server {
			listener,
			local_addr,
			access_control: self.access_control,
			max_request_body_size: self.max_request_body_size,
			stop_pair,
			stop_handle: Arc::new(Mutex::new(())),
Maciej Hirsz's avatar
Maciej Hirsz committed
			resources: self.resources,
impl Default for Builder {
	fn default() -> Self {
Maciej Hirsz's avatar
Maciej Hirsz committed
		Self {
			max_request_body_size: TEN_MB_SIZE_BYTES,
			resources: Resources::default(),
			access_control: AccessControl::default(),
			keep_alive: true,
		}
/// Handle used to stop the running server.
#[derive(Debug, Clone)]
pub struct StopHandle {
	stop_sender: mpsc::Sender<()>,
	stop_handle: Arc<Mutex<()>>,
}

impl StopHandle {
	/// Requests server to stop. Returns an error if server was already stopped.
	pub async fn stop(&mut self) -> Result<(), Error> {
		self.stop_sender.send(()).await.map_err(|_| Error::AlreadyStopped)
	}

	/// Blocks indefinitely until the server is stopped.
	pub async fn wait_for_stop(&self) {
		self.stop_handle.lock().await;
	}
}

/// An HTTP JSON RPC server.
#[derive(Debug)]
pub struct Server {
	/// Hyper server.
	listener: HyperBuilder<AddrIncoming>,
	/// Local address
	local_addr: Option<SocketAddr>,
	/// Max request body size.
	max_request_body_size: u32,
	/// Access control
	access_control: AccessControl,
	/// Pair of channels to stop the server.
	stop_pair: (mpsc::Sender<()>, mpsc::Receiver<()>),
	/// Stop handle that indicates whether server has been stopped.
	stop_handle: Arc<Mutex<()>>,
Maciej Hirsz's avatar
Maciej Hirsz committed
	/// Tracker for currently used resources on the server
	resources: Resources,
impl Server {
	/// Returns socket address to which the server is bound.
	pub fn local_addr(&self) -> Result<SocketAddr, Error> {
		self.local_addr.ok_or_else(|| Error::Custom("Local address not found".into()))
	/// Returns the handle to stop the running server.
	pub fn stop_handle(&self) -> StopHandle {
		StopHandle { stop_sender: self.stop_pair.0.clone(), stop_handle: self.stop_handle.clone() }
	}

	/// Start the server.
Maciej Hirsz's avatar
Maciej Hirsz committed
	pub async fn start(self, methods: impl Into<Methods>) -> Result<(), Error> {
		// Lock the stop mutex so existing stop handles can wait for server to stop.
		// It will be unlocked once this function returns.
		let _stop_handle = self.stop_handle.lock().await;

		let max_request_body_size = self.max_request_body_size;
		let access_control = self.access_control;
Maciej Hirsz's avatar
Maciej Hirsz committed
		let resources = self.resources;
		let mut stop_receiver = self.stop_pair.1;
Maciej Hirsz's avatar
Maciej Hirsz committed
		let methods = methods.into().initialize_resources(&resources)?;

		let make_service = make_service_fn(move |_| {
			let methods = methods.clone();
			let access_control = access_control.clone();
Maciej Hirsz's avatar
Maciej Hirsz committed
			let resources = resources.clone();

			async move {
				Ok::<_, HyperError>(service_fn(move |request| {
					let methods = methods.clone();
					let access_control = access_control.clone();
Maciej Hirsz's avatar
Maciej Hirsz committed
					let resources = resources.clone();

					// Run some validation on the http request, then read the body and try to deserialize it into one of
					// two cases: a single RPC request or a batch of RPC requests.
					async move {
						if let Err(e) = access_control_is_valid(&access_control, &request) {
							return Ok::<_, HyperError>(e);
						}
						if let Err(e) = content_type_is_valid(&request) {
							return Ok::<_, HyperError>(e);
						}
						let (parts, body) = request.into_parts();

						let (body, mut is_single) = match read_body(&parts.headers, body, max_request_body_size).await {
							Ok(r) => r,
							Err(GenericTransportError::TooLarge) => return Ok::<_, HyperError>(response::too_large()),
							Err(GenericTransportError::Malformed) => return Ok::<_, HyperError>(response::malformed()),
							Err(GenericTransportError::Inner(e)) => {
								log::error!("Internal error reading request body: {}", e);
								return Ok::<_, HyperError>(response::internal_error());
							}
						};

						// NOTE(niklasad1): it's a channel because it's needed for batch requests.
David's avatar
David committed
						let (tx, mut rx) = mpsc::unbounded::<String>();
David's avatar
David committed
						type Notif<'a> = Notification<'a, Option<&'a RawValue>>;

						// Single request or notification
						if is_single {
David's avatar
David committed
							if let Ok(req) = serde_json::from_slice::<Request>(&body) {
								// NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here.
Maciej Hirsz's avatar
Maciej Hirsz committed
								if let Some(fut) = methods.execute_with_resources(&tx, req, 0, &resources) {
							} else if let Ok(_req) = serde_json::from_slice::<Notif>(&body) {
								return Ok::<_, HyperError>(response::ok_response("".into()));
							} else {
								let (id, code) = prepare_error(&body);
								send_error(id, &tx, code.into());

						// Batch of requests or notifications
David's avatar
David committed
						} else if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&body) {
							if !batch.is_empty() {
Maciej Hirsz's avatar
Maciej Hirsz committed
								join_all(
									batch
										.into_iter()
										.filter_map(|req| methods.execute_with_resources(&tx, req, 0, &resources)),
								)
								.await;
							} else {
								// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
								// Array with at least one value, the response from the Server MUST be a single
								// Response object." – The Spec.
								is_single = true;
David's avatar
David committed
								send_error(Id::Null, &tx, ErrorCode::InvalidRequest.into());
						} else if let Ok(_batch) = serde_json::from_slice::<Vec<Notif>>(&body) {
							return Ok::<_, HyperError>(response::ok_response("".into()));
						} else {
							// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
							// Array with at least one value, the response from the Server MUST be a single
							// Response object." – The Spec.
							is_single = true;
							let (id, code) = prepare_error(&body);
							send_error(id, &tx, code.into());
						// Closes the receiving half of a channel without dropping it. This prevents any further
						// messages from being sent on the channel.
						rx.close();
						let response = if is_single {
							rx.next().await.expect("Sender is still alive managed by us above; qed")
						} else {
David's avatar
David committed
							collect_batch_response(rx).await
						log::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]);
						Ok::<_, HyperError>(response::ok_response(response))
					}
				}))
			}
		});
		let server = self.listener.serve(make_service);
		server
			.with_graceful_shutdown(async move {
				stop_receiver.next().await;
			})
			.await
			.map_err(Into::into)
// Checks to that access control of the received request is the same as configured.
fn access_control_is_valid(
	access_control: &AccessControl,
	request: &hyper::Request<hyper::Body>,
) -> Result<(), hyper::Response<hyper::Body>> {
	if access_control.deny_host(request) {
		return Err(response::host_not_allowed());
	if access_control.deny_cors_origin(request) {
		return Err(response::invalid_allow_origin());
	if access_control.deny_cors_header(request) {
		return Err(response::invalid_allow_headers());
/// Checks that content type of received request is valid for JSON-RPC.
fn content_type_is_valid(request: &hyper::Request<hyper::Body>) -> Result<(), hyper::Response<hyper::Body>> {
	match *request.method() {
		hyper::Method::POST if is_json(request.headers().get("content-type")) => Ok(()),
		_ => Err(response::method_not_allowed()),
/// Returns true if the `content_type` header indicates a valid JSON message.
fn is_json(content_type: Option<&hyper::header::HeaderValue>) -> bool {
	match content_type.and_then(|val| val.to_str().ok()) {
		Some(content)
			if content.eq_ignore_ascii_case("application/json")
				|| content.eq_ignore_ascii_case("application/json; charset=utf-8")
				|| content.eq_ignore_ascii_case("application/json;charset=utf-8") =>
		{
			true
		_ => false,