server.rs 31.9 KiB
Newer Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
Maciej Hirsz's avatar
Maciej Hirsz committed
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::future::Future;
Maciej Hirsz's avatar
Maciej Hirsz committed
use std::net::SocketAddr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use crate::future::{FutureDriver, ServerHandle, StopMonitor};
use crate::types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use crate::types::{Id, Request};
use futures_util::future::{Either, FutureExt};
Maciej Hirsz's avatar
Maciej Hirsz committed
use futures_util::io::{BufReader, BufWriter};
use futures_util::stream::StreamExt;
use futures_util::TryStreamExt;
use http::header::{HOST, ORIGIN};
use http::{HeaderMap, HeaderValue};
use jsonrpsee_core::id_providers::RandomIntegerIdProvider;
use jsonrpsee_core::middleware::WsMiddleware as Middleware;
use jsonrpsee_core::server::access_control::AccessControl;
use jsonrpsee_core::server::helpers::{
	prepare_error, BatchResponse, BatchResponseBuilder, BoundedSubscriptions, MethodResponse, MethodSink,
};
Maciej Hirsz's avatar
Maciej Hirsz committed
use jsonrpsee_core::server::resource_limiting::Resources;
use jsonrpsee_core::server::rpc_module::{ConnState, ConnectionId, MethodKind, Methods};
use jsonrpsee_core::tracing::{rx_log_from_json, rx_log_from_str, tx_log_from_str, RpcTracing};
use jsonrpsee_core::traits::IdProvider;
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::error::{reject_too_big_request, reject_too_many_subscriptions};
use jsonrpsee_types::Params;
use soketto::connection::Error as SokettoError;
use soketto::data::ByteSlice125;
use soketto::handshake::WebSocketKey;
Maciej Hirsz's avatar
Maciej Hirsz committed
use soketto::handshake::{server::Response, Server as SokettoServer};
use soketto::Sender;
Maciej Hirsz's avatar
Maciej Hirsz committed
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
use tokio_stream::wrappers::IntervalStream;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
use tracing_futures::Instrument;
/// Default maximum connections allowed.
const MAX_CONNECTIONS: u64 = 100;

/// A WebSocket JSON RPC server.
pub struct Server<M> {
	listener: TcpListener,
	cfg: Settings,
Maciej Hirsz's avatar
Maciej Hirsz committed
	stop_monitor: StopMonitor,
Maciej Hirsz's avatar
Maciej Hirsz committed
	resources: Resources,
	middleware: M,
	id_provider: Arc<dyn IdProvider>,
}

impl<M> std::fmt::Debug for Server<M> {
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		f.debug_struct("Server")
			.field("listener", &self.listener)
			.field("cfg", &self.cfg)
			.field("stop_monitor", &self.stop_monitor)
			.field("id_provider", &self.id_provider)
			.field("resources", &self.resources)
impl<M: Middleware> Server<M> {
	/// Returns socket address to which the server is bound.
	pub fn local_addr(&self) -> Result<SocketAddr, Error> {
Maciej Hirsz's avatar
Maciej Hirsz committed
		self.listener.local_addr().map_err(Into::into)
	/// Returns the handle to stop the running server.
	pub fn server_handle(&self) -> ServerHandle {
Maciej Hirsz's avatar
Maciej Hirsz committed
		self.stop_monitor.handle()
	/// Start responding to connections requests. This will run on the tokio runtime until the server is stopped.
	pub fn start(mut self, methods: impl Into<Methods>) -> Result<ServerHandle, Error> {
Maciej Hirsz's avatar
Maciej Hirsz committed
		let methods = methods.into().initialize_resources(&self.resources)?;
		let handle = self.server_handle();
		match self.cfg.tokio_runtime.take() {
			Some(rt) => rt.spawn(self.start_inner(methods)),
			None => tokio::spawn(self.start_inner(methods)),
		};
Maciej Hirsz's avatar
Maciej Hirsz committed

		Ok(handle)
	}

	async fn start_inner(self, methods: Methods) {
Maciej Hirsz's avatar
Maciej Hirsz committed
		let stop_monitor = self.stop_monitor;
Maciej Hirsz's avatar
Maciej Hirsz committed
		let resources = self.resources;
		let middleware = self.middleware;
Maciej Hirsz's avatar
Maciej Hirsz committed
		let mut id = 0;
		let mut connections = FutureDriver::default();
		let mut incoming = Monitored::new(Incoming(self.listener), &stop_monitor);
			match connections.select_with(&mut incoming).await {
				Ok((socket, _addr)) => {
					if let Err(e) = socket.set_nodelay(true) {
						tracing::error!("Could not set NODELAY on socket: {:?}", e);
					if connections.count() >= self.cfg.max_connections as usize {
						tracing::warn!("Too many connections. Try again in a while.");
						connections.add(Box::pin(handshake(socket, HandshakeResponse::Reject { status_code: 429 })));

					let methods = &methods;
					let cfg = &self.cfg;
					let id_provider = self.id_provider.clone();
					connections.add(Box::pin(handshake(
						socket,
Maciej Hirsz's avatar
Maciej Hirsz committed
						HandshakeResponse::Accept {
							conn_id: id,
							methods,
							resources: &resources,
							cfg,
							stop_monitor: &stop_monitor,
							middleware: middleware.clone(),
Maciej Hirsz's avatar
Maciej Hirsz committed
						},
					tracing::info!("Accepting new connection {}/{}", connections.count(), self.cfg.max_connections);
					tracing::error!("Error while awaiting a new connection: {:?}", err);
Maciej Hirsz's avatar
Maciej Hirsz committed

		connections.await
/// This is a glorified select listening for new messages, while also checking the `stop_receiver` signal.
struct Monitored<'a, F> {
	future: F,
Maciej Hirsz's avatar
Maciej Hirsz committed
	stop_monitor: &'a StopMonitor,
impl<'a, F> Monitored<'a, F> {
	fn new(future: F, stop_monitor: &'a StopMonitor) -> Self {
		Monitored { future, stop_monitor }
struct Incoming(TcpListener);

impl<'a> Future for Monitored<'a, Incoming> {
	type Output = Result<(TcpStream, SocketAddr), MonitoredError<std::io::Error>>;

	fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
		let this = Pin::into_inner(self);

Maciej Hirsz's avatar
Maciej Hirsz committed
		if this.stop_monitor.shutdown_requested() {
			return Poll::Ready(Err(MonitoredError::Shutdown));
		this.future.0.poll_accept(cx).map_err(MonitoredError::Selector)
	}
}

impl<'a, 'f, F, T, E> Future for Monitored<'a, Pin<&'f mut F>>
where
	F: Future<Output = Result<T, E>>,
{
	type Output = Result<T, MonitoredError<E>>;

	fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
		let this = Pin::into_inner(self);

		if this.stop_monitor.shutdown_requested() {
			return Poll::Ready(Err(MonitoredError::Shutdown));
		}

		this.future.poll_unpin(cx).map_err(MonitoredError::Selector)
enum HandshakeResponse<'a, M> {
Maciej Hirsz's avatar
Maciej Hirsz committed
	Reject {
		status_code: u16,
	},
	Accept {
		conn_id: ConnectionId,
		methods: &'a Methods,
		resources: &'a Resources,
		cfg: &'a Settings,
		stop_monitor: &'a StopMonitor,
		middleware: M,
		id_provider: Arc<dyn IdProvider>,
Maciej Hirsz's avatar
Maciej Hirsz committed
	},
async fn handshake<M: Middleware>(socket: tokio::net::TcpStream, mode: HandshakeResponse<'_, M>) -> Result<(), Error> {
	let remote_addr = socket.peer_addr()?;

Maciej Hirsz's avatar
Maciej Hirsz committed
	// For each incoming background_task we perform a handshake.
	let mut server = SokettoServer::new(BufReader::new(BufWriter::new(socket.compat())));
	match mode {
		HandshakeResponse::Reject { status_code } => {
			// Forced rejection, don't need to read anything from the socket
			let reject = Response::Reject { status_code };
			server.send_response(&reject).await?;
			let (mut sender, _) = server.into_builder().finish();
			// Gracefully shut down the connection
			sender.close().await?;
		HandshakeResponse::Accept { conn_id, methods, resources, cfg, stop_monitor, middleware, id_provider } => {
			tracing::debug!("Accepting new connection: {}", conn_id);

			let key_and_headers = get_key_and_headers(&mut server, cfg).await;
			match key_and_headers {
				Ok((key, headers)) => {
					middleware.on_connect(remote_addr, &headers);
					let accept = Response::Accept { key, protocol: None };
					server.send_response(&accept).await?;
				}
				Err(err) => {
					tracing::warn!("Rejected connection: {:?}", err);
					let reject = Response::Reject { status_code: 403 };
					server.send_response(&reject).await?;
			let join_result = tokio::spawn(background_task(BackgroundTask {
				methods: methods.clone(),
				resources: resources.clone(),
				max_request_body_size: cfg.max_request_body_size,
				max_response_body_size: cfg.max_response_body_size,
				max_log_length: cfg.max_log_length,
				batch_requests_supported: cfg.batch_requests_supported,
				bounded_subscriptions: BoundedSubscriptions::new(cfg.max_subscriptions_per_connection),
				stop_server: stop_monitor.clone(),
				middleware,
				ping_interval: cfg.ping_interval,
				remote_addr,
			}))
			.await;

			match join_result {
				Err(_) => Err(Error::Custom("Background task was aborted".into())),
				Ok(result) => result,
			}
		}
struct BackgroundTask<'a, M> {
	server: SokettoServer<'a, BufReader<BufWriter<Compat<tokio::net::TcpStream>>>>,
	conn_id: ConnectionId,
	methods: Methods,
Maciej Hirsz's avatar
Maciej Hirsz committed
	resources: Resources,
	max_request_body_size: u32,
	batch_requests_supported: bool,
	bounded_subscriptions: BoundedSubscriptions,
	middleware: M,
	id_provider: Arc<dyn IdProvider>,
	ping_interval: Duration,
	remote_addr: SocketAddr,
}

async fn background_task<M: Middleware>(input: BackgroundTask<'_, M>) -> Result<(), Error> {
	let BackgroundTask {
		server,
		conn_id,
		methods,
		resources,
		max_request_body_size,
		max_response_body_size,
		max_log_length,
		batch_requests_supported,
		bounded_subscriptions,
		stop_server,
		middleware,
		id_provider,
		ping_interval,
		remote_addr,
	} = input;

Maciej Hirsz's avatar
Maciej Hirsz committed
	// And we can finally transition to a websocket background_task.
	let mut builder = server.into_builder();
	builder.set_max_message_size(max_request_body_size as usize);
	let (mut sender, mut receiver) = builder.finish();
	let (tx, mut rx) = mpsc::unbounded::<String>();
	let bounded_subscriptions2 = bounded_subscriptions.clone();
	let sink = MethodSink::new_with_limit(tx, max_response_body_size, max_log_length);
David's avatar
David committed
	// Send results back to the client.
Maciej Hirsz's avatar
Maciej Hirsz committed
	tokio::spawn(async move {
		// Received messages from the WebSocket.
		let mut rx_item = rx.next();

		// Interval to send out continuously `pings`.
		let ping_interval = IntervalStream::new(tokio::time::interval(ping_interval));
		tokio::pin!(ping_interval);
		let mut next_ping = ping_interval.next();
		while !stop_server2.shutdown_requested() {
			// Ensure select is cancel-safe by fetching and storing the `rx_item` that did not finish yet.
			// Note: Although, this is cancel-safe already, avoid using `select!` macro for future proofing.
			match futures_util::future::select(rx_item, next_ping).await {
				Either::Left((Some(response), ping)) => {
					// If websocket message send fail then terminate the connection.
					if let Err(err) = send_ws_message(&mut sender, response).await {
						tracing::warn!("WS send error: {}; terminate connection", err);
						break;
					}
					rx_item = rx.next();
				}
				// Nothing else to receive.
				Either::Left((None, _)) => break,

				// Handle timer intervals.
				Either::Right((_, next_rx)) => {
					if let Err(err) = send_ws_ping(&mut sender).await {
						tracing::warn!("WS send ping error: {}; terminate connection", err);
						break;
					}
					rx_item = next_rx;
					next_ping = ping_interval.next();

		// Terminate connection and send close message.
		// Notify all listeners and close down associated tasks.
Maciej Hirsz's avatar
Maciej Hirsz committed
	});
	// Buffer for incoming data.
David's avatar
David committed
	let mut data = Vec::with_capacity(100);
	let mut method_executors = FutureDriver::default();
	let middleware = &middleware;
	let result = loop {
Maciej Hirsz's avatar
Maciej Hirsz committed
		data.clear();
		{
			// Need the extra scope to drop this pinned future and reclaim access to `data`
			let receive = async {
				// Identical loop to `soketto::receive_data` with debug logs for `Pong` frames.
				loop {
					match receiver.receive(&mut data).await? {
						soketto::Incoming::Data(d) => break Ok(d),
						soketto::Incoming::Pong(_) => tracing::debug!("recv pong"),
						soketto::Incoming::Closed(_) => {
							// The closing reason is already logged by `soketto` trace log level.
							// Return the `Closed` error to avoid logging unnecessary warnings on clean shutdown.
							break Err(SokettoError::Closed);
						}

			tokio::pin!(receive);

			if let Err(err) = method_executors.select_with(Monitored::new(receive, &stop_server)).await {
				match err {
					MonitoredError::Selector(SokettoError::Closed) => {
						tracing::debug!("WS transport: remote peer terminated the connection: {}", conn_id);
						sink.close();
						break Ok(());
					}
					MonitoredError::Selector(SokettoError::MessageTooLarge { current, maximum }) => {
						tracing::warn!(
							"WS transport error: outgoing message is too big error ({} bytes, max is {})",
							current,
							maximum
						);
						sink.send_error(Id::Null, reject_too_big_request(max_request_body_size));
						continue;
					}
					// These errors can not be gracefully handled, so just log them and terminate the connection.
					MonitoredError::Selector(err) => {
						tracing::debug!("WS error: {}; terminate connection {}", err, conn_id);
						sink.close();
						break Err(err.into());
					MonitoredError::Shutdown => break Ok(()),
		let request_start = middleware.on_request();

		let first_non_whitespace = data.iter().find(|byte| !byte.is_ascii_whitespace());
		match first_non_whitespace {
				let data = std::mem::take(&mut data);
				let sink = sink.clone();
				let resources = &resources;
				let methods = &methods;
				let bounded_subscriptions = bounded_subscriptions.clone();
				let id_provider = &*id_provider;
				let fut = async move {
					let call = CallData {
						conn_id,
						resources,
						max_response_body_size,
						max_log_length,
						methods,
						bounded_subscriptions,
						sink: &sink,
						id_provider: &*id_provider,
						middleware,
						request_start,
					};

					match process_single_request(data, call).await {
						MethodResult::JustMiddleware(r) => {
							middleware.on_response(&r.result, request_start);
						MethodResult::SendAndMiddleware(r) => {
							middleware.on_response(&r.result, request_start);
							let _ = sink.send_raw(r.result);
						}
					};
				.boxed();

				method_executors.add(fut);
			}
			Some(b'[') if !batch_requests_supported => {
				let response = MethodResponse::error(
					Id::Null,
					ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, &BATCHES_NOT_SUPPORTED_MSG, None),
				);
				middleware.on_response(&response.result, request_start);
				let _ = sink.send_raw(response.result);
				// Make sure the following variables are not moved into async closure below.
				let resources = &resources;
				let methods = &methods;
				let bounded_subscriptions = bounded_subscriptions.clone();
				let sink = sink.clone();
				let id_provider = id_provider.clone();
				let data = std::mem::take(&mut data);
					let response = process_batch_request(Batch {
						data,
						call: CallData {
							conn_id,
							resources,
							max_response_body_size,
							max_log_length,
							methods,
							bounded_subscriptions,
							sink: &sink,
							id_provider: &*id_provider,
							middleware,
							request_start,
						},
					})
					.await;

					tx_log_from_str(&response.result, max_log_length);
					middleware.on_response(&response.result, request_start);
					let _ = sink.send_raw(response.result);
			_ => {
				sink.send_error(Id::Null, ErrorCode::ParseError.into());
			}
	middleware.on_disconnect(remote_addr);

	// Drive all running methods to completion.
	// **NOTE** Do not return early in this function. This `await` needs to run to guarantee
	// proper drop behaviour.
	method_executors.await;

	result

/// JSON-RPC Websocket server settings.
#[derive(Debug, Clone)]
struct Settings {
	/// Maximum size in bytes of a request.
	max_request_body_size: u32,
	/// Maximum size in bytes of a response.
	max_response_body_size: u32,
	/// Maximum number of incoming connections allowed.
	max_connections: u64,
	/// Maximum number of subscriptions per connection.
	max_subscriptions_per_connection: u32,
	/// Max length for logging for requests and responses
	///
	/// Logs bigger than this limit will be truncated.
	max_log_length: u32,
	/// Access control based on HTTP headers
	access_control: AccessControl,
	/// Whether batch requests are supported by this server or not.
	batch_requests_supported: bool,
	/// Custom tokio runtime to run the server on.
	tokio_runtime: Option<tokio::runtime::Handle>,
	/// The interval at which `Ping` frames are submitted.
	ping_interval: Duration,
}

impl Default for Settings {
	fn default() -> Self {
		Self {
			max_request_body_size: TEN_MB_SIZE_BYTES,
			max_response_body_size: TEN_MB_SIZE_BYTES,
			max_subscriptions_per_connection: 1024,
			max_connections: MAX_CONNECTIONS,
			batch_requests_supported: true,
			access_control: AccessControl::default(),
			ping_interval: Duration::from_secs(60),
	}
}

/// Builder to configure and create a JSON-RPC Websocket server
pub struct Builder<M = ()> {
	settings: Settings,
Maciej Hirsz's avatar
Maciej Hirsz committed
	resources: Resources,
	middleware: M,
	id_provider: Arc<dyn IdProvider>,
}

impl Default for Builder {
	fn default() -> Self {
		Builder {
			settings: Settings::default(),
			resources: Resources::default(),
			middleware: (),
			id_provider: Arc::new(RandomIntegerIdProvider),
		}
}

impl Builder {
	/// Create a default server builder.
	pub fn new() -> Self {
		Self::default()
	}
}

impl<M> Builder<M> {
	/// Set the maximum size of a request body in bytes. Default is 10 MiB.
	pub fn max_request_body_size(mut self, size: u32) -> Self {
		self.settings.max_request_body_size = size;
		self
	}

	/// Set the maximum size of a response body in bytes. Default is 10 MiB.
	pub fn max_response_body_size(mut self, size: u32) -> Self {
		self.settings.max_response_body_size = size;
		self
	}

	/// Set the maximum number of connections allowed. Default is 100.
	pub fn max_connections(mut self, max: u64) -> Self {
		self.settings.max_connections = max;
		self
	}

	/// Enables or disables support of [batch requests](https://www.jsonrpc.org/specification#batch).
	/// By default, support is enabled.
	pub fn batch_requests_supported(mut self, supported: bool) -> Self {
		self.settings.batch_requests_supported = supported;
		self
	}

	/// Set the maximum number of connections allowed. Default is 1024.
	pub fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
		self.settings.max_subscriptions_per_connection = max;
		self
	}

	/// Register a new resource kind. Errors if `label` is already registered, or if the number of
	/// registered resources on this server instance would exceed 8.
	///
	/// See the module documentation for [`resurce_limiting`](../jsonrpsee_utils/server/resource_limiting/index.html#resource-limiting)
	/// for details.
Maciej Hirsz's avatar
Maciej Hirsz committed
	pub fn register_resource(mut self, label: &'static str, capacity: u16, default: u16) -> Result<Self, Error> {
		self.resources.register(label, capacity, default)?;
		Ok(self)
	}

Maciej Hirsz's avatar
Maciej Hirsz committed
	/// Add a middleware to the builder [`Middleware`](../jsonrpsee_core/middleware/trait.Middleware.html).
	///
	/// ```
	/// use std::{time::Instant, net::SocketAddr};
	/// use jsonrpsee_core::middleware::{WsMiddleware, Headers, Params};
Maciej Hirsz's avatar
Maciej Hirsz committed
	/// use jsonrpsee_ws_server::WsServerBuilder;
	///
	/// #[derive(Clone)]
	/// struct MyMiddleware;
	///
	/// impl WsMiddleware for MyMiddleware {
	///     type Instant = Instant;
	///
	///     fn on_connect(&self, remote_addr: SocketAddr, headers: &Headers) {
Niklas Adolfsson's avatar
Niklas Adolfsson committed
	///          println!("[MyMiddleware::on_call] remote_addr: {}, headers: {:?}", remote_addr, headers);
	///     }
	///
	///     fn on_request(&self) -> Self::Instant {
Niklas Adolfsson's avatar
Niklas Adolfsson committed
	///          Instant::now()
	///     fn on_call(&self, method_name: &str, params: Params) {
Niklas Adolfsson's avatar
Niklas Adolfsson committed
	///          println!("[MyMiddleware::on_call] method: '{}' params: {:?}", method_name, params);
	///     }
	///
	///     fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant) {
Niklas Adolfsson's avatar
Niklas Adolfsson committed
	///          println!("[MyMiddleware::on_result] '{}', worked? {}, time elapsed {:?}", method_name, success, started_at.elapsed());
	///     }
	///
	///     fn on_response(&self, result: &str, started_at: Self::Instant) {
Niklas Adolfsson's avatar
Niklas Adolfsson committed
	///          println!("[MyMiddleware::on_response] result: {}, time elapsed {:?}", result, started_at.elapsed());
	///     }
	///
	///     fn on_disconnect(&self, remote_addr: SocketAddr) {
Niklas Adolfsson's avatar
Niklas Adolfsson committed
	///          println!("[MyMiddleware::on_disconnect] remote_addr: {}", remote_addr);
	///     }
	/// }
	///
	/// let builder = WsServerBuilder::new().set_middleware(MyMiddleware);
	/// ```
	pub fn set_middleware<T: Middleware>(self, middleware: T) -> Builder<T> {
		Builder { settings: self.settings, resources: self.resources, middleware, id_provider: self.id_provider }
	/// Configure a custom [`tokio::runtime::Handle`] to run the server on.
	///
	/// Default: [`tokio::spawn`]
	pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
	/// Configure the interval at which pings are submitted.
	///
	/// This option is used to keep the connection alive, and is just submitting `Ping` frames,
	/// without making any assumptions about when a `Pong` frame should be received.
	///
	/// Default: 60 seconds.
	///
	/// # Examples
	///
	/// ```rust
	/// use std::time::Duration;
	/// use jsonrpsee_ws_server::WsServerBuilder;
	///
	/// // Set the ping interval to 10 seconds.
	/// let builder = WsServerBuilder::default().ping_interval(Duration::from_secs(10));
	/// ```
	pub fn ping_interval(mut self, interval: Duration) -> Self {
		self.settings.ping_interval = interval;
		self
	}

	/// Configure custom `subscription ID` provider for the server to use
	/// to when getting new subscription calls.
	///
	/// You may choose static dispatch or dynamic dispatch because
	/// `IdProvider` is implemented for `Box<T>`.
	///
	/// Default: [`RandomIntegerIdProvider`].
	///
	/// # Examples
	///
	/// ```rust
	/// use jsonrpsee_ws_server::{WsServerBuilder, RandomStringIdProvider, IdProvider};
	///
	/// // static dispatch
	/// let builder1 = WsServerBuilder::default().set_id_provider(RandomStringIdProvider::new(16));
	///
	/// // or dynamic dispatch
	/// let builder2 = WsServerBuilder::default().set_id_provider(Box::new(RandomStringIdProvider::new(16)));
	///
	pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
		self.id_provider = Arc::new(id_provider);
		self
	}

	/// Sets access control settings.
	pub fn set_access_control(mut self, acl: AccessControl) -> Self {
		self.settings.access_control = acl;
		self
	}

	/// Finalize the configuration of the server. Consumes the [`Builder`].
	///
	/// ```rust
	/// #[tokio::main]
	/// async fn main() {
	///   let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
	///   let occupied_addr = listener.local_addr().unwrap();
	///   let addrs: &[std::net::SocketAddr] = &[
	///       occupied_addr,
	///       "127.0.0.1:0".parse().unwrap(),
	///   ];
	///   assert!(jsonrpsee_ws_server::WsServerBuilder::default().build(occupied_addr).await.is_err());
	///   assert!(jsonrpsee_ws_server::WsServerBuilder::default().build(addrs).await.is_ok());
	/// }
	/// ```
	///
	pub async fn build(self, addrs: impl ToSocketAddrs) -> Result<Server<M>, Error> {
		let listener = TcpListener::bind(addrs).await?;
Maciej Hirsz's avatar
Maciej Hirsz committed
		let stop_monitor = StopMonitor::new();
Maciej Hirsz's avatar
Maciej Hirsz committed
		let resources = self.resources;
		Ok(Server {
			listener,
			cfg: self.settings,
			stop_monitor,
			resources,
			middleware: self.middleware,
			id_provider: self.id_provider,
		})

async fn send_ws_message(
	sender: &mut Sender<BufReader<BufWriter<Compat<TcpStream>>>>,
	response: String,
) -> Result<(), Error> {
	sender.send_text_owned(response).await?;
	sender.flush().await.map_err(Into::into)
}

async fn send_ws_ping(sender: &mut Sender<BufReader<BufWriter<Compat<TcpStream>>>>) -> Result<(), Error> {
	tracing::debug!("send ping");
	// Submit empty slice as "optional" parameter.
	let slice: &[u8] = &[];
	// Byte slice fails if the provided slice is larger than 125 bytes.
	let byte_slice = ByteSlice125::try_from(slice).expect("Empty slice should fit into ByteSlice125");
	sender.send_ping(byte_slice).await?;
	sender.flush().await.map_err(Into::into)
}

#[derive(Debug, Clone)]
struct Batch<'a, M: Middleware> {
	data: Vec<u8>,
	call: CallData<'a, M>,
}

#[derive(Debug, Clone)]
struct CallData<'a, M: Middleware> {
	conn_id: usize,
	bounded_subscriptions: BoundedSubscriptions,
	id_provider: &'a dyn IdProvider,
	middleware: &'a M,
	methods: &'a Methods,
	max_response_body_size: u32,
	max_log_length: u32,
	resources: &'a Resources,
	sink: &'a MethodSink,
	request_start: M::Instant,
}

#[derive(Debug, Clone)]
struct Call<'a, M: Middleware> {
	params: Params<'a>,
	name: &'a str,
	call: CallData<'a, M>,
	id: Id<'a>,
}

enum MethodResult {
	JustMiddleware(MethodResponse),
	SendAndMiddleware(MethodResponse),
}

impl MethodResult {
	fn as_inner(&self) -> &MethodResponse {
		match &self {
			Self::JustMiddleware(r) => r,
			Self::SendAndMiddleware(r) => r,
		}
	}
}

// Batch responses must be sent back as a single message so we read the results from each
// request in the batch and read the results off of a new channel, `rx_batch`, and then send the
// complete batch response back to the client over `tx`.
async fn process_batch_request<M>(b: Batch<'_, M>) -> BatchResponse
where
	M: Middleware,
{
	let Batch { data, call } = b;

	if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&data) {
		return if !batch.is_empty() {
			let batch = batch.into_iter().map(|req| Ok((req, call.clone())));
			let batch_stream = futures_util::stream::iter(batch);

			let trace = RpcTracing::batch();
			let _enter = trace.span().enter();
			let max_response_size = call.max_response_body_size;

			let batch_response = batch_stream
				.try_fold(
					BatchResponseBuilder::new_with_limit(max_response_size as usize),
					|batch_response, (req, call)| async move {
						let params = Params::new(req.params.map(|params| params.get()));

						let response =
							execute_call(Call { name: &req.method, params, id: req.id, call }).in_current_span().await;

						batch_response.append(response.as_inner())
					},
				)
				.await;

			return match batch_response {
				Ok(batch) => batch.finish(),
				Err(batch_err) => batch_err,
			};
		} else {
			BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
		};
	}

	let (id, code) = prepare_error(&data);
	BatchResponse::error(id, ErrorObject::from(code))
}

async fn process_single_request<M: Middleware>(data: Vec<u8>, call: CallData<'_, M>) -> MethodResult {
	if let Ok(req) = serde_json::from_slice::<Request>(&data) {
		let trace = RpcTracing::method_call(&req.method);
		let _enter = trace.span().enter();

		rx_log_from_json(&req, call.max_log_length);

		let params = Params::new(req.params.map(|params| params.get()));
		let name = &req.method;
		let id = req.id;

		execute_call(Call { name, params, id, call }).in_current_span().await
	} else {
		let (id, code) = prepare_error(&data);
		MethodResult::SendAndMiddleware(MethodResponse::error(id, ErrorObject::from(code)))
	}
}

/// Execute a call which returns result of the call with a additional sink
/// to fire a signal once the subscription call has been answered.
///
/// Returns `(MethodResponse, None)` on every call that isn't a subscription
/// Otherwise `(MethodResponse, Some(PendingSubscriptionCallTx)`.
async fn execute_call<M: Middleware>(c: Call<'_, M>) -> MethodResult {
	let Call { name, id, params, call } = c;
	let CallData {
		resources,
		methods,
		middleware,
		max_response_body_size,
		max_log_length,
		conn_id,
		bounded_subscriptions,
		id_provider,
		sink,
		request_start,
	} = call;

	middleware.on_call(name, params.clone());

	let response = match methods.method_with_name(name) {
		None => {
			let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound));
			MethodResult::SendAndMiddleware(response)
		}
		Some((name, method)) => match &method.inner() {
			MethodKind::Sync(callback) => match method.claim(name, resources) {
				Ok(guard) => {
					let r = (callback)(id, params, max_response_body_size as usize);
					drop(guard);
					MethodResult::SendAndMiddleware(r)
				}
				Err(err) => {
					tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
					let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy));
					MethodResult::SendAndMiddleware(response)
				}
			},
			MethodKind::Async(callback) => match method.claim(name, resources) {
				Ok(guard) => {
					let id = id.into_owned();
					let params = params.into_owned();

					let response = (callback)(id, params, conn_id, max_response_body_size as usize, Some(guard)).await;
					MethodResult::SendAndMiddleware(response)
				}
				Err(err) => {
					tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
					let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy));
					MethodResult::SendAndMiddleware(response)
				}
			},
			MethodKind::Subscription(callback) => match method.claim(name, resources) {
				Ok(guard) => {
					if let Some(cn) = bounded_subscriptions.acquire() {
						let conn_state = ConnState { conn_id, close_notify: cn, id_provider };
						let response = callback(id.clone(), params, sink.clone(), conn_state, Some(guard)).await;
						MethodResult::JustMiddleware(response)
					} else {
						let response =
							MethodResponse::error(id, reject_too_many_subscriptions(bounded_subscriptions.max()));
						MethodResult::SendAndMiddleware(response)
					}
				}
				Err(err) => {
					tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
					let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy));
					MethodResult::SendAndMiddleware(response)
				}
			},
			MethodKind::Unsubscription(callback) => {
				// Don't adhere to any resource or subscription limits; always let unsubscribing happen!
				let result = callback(id, params, conn_id, max_response_body_size as usize);
				MethodResult::SendAndMiddleware(result)
			}
		},
	};

	let r = response.as_inner();

	rx_log_from_str(&r.result, max_log_length);
	middleware.on_result(name, r.success, request_start);
	response
}

/// Helper to fetch the `WebSocketKey` and `Headers` from the WebSocket handshake.
async fn get_key_and_headers(