server.rs 31.9 KiB
Newer Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
Maciej Hirsz's avatar
Maciej Hirsz committed
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::future::Future;
Maciej Hirsz's avatar
Maciej Hirsz committed
use std::net::SocketAddr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use crate::future::{FutureDriver, ServerHandle, StopMonitor};
use crate::types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use crate::types::{Id, Request};
use futures_util::future::{Either, FutureExt};
Maciej Hirsz's avatar
Maciej Hirsz committed
use futures_util::io::{BufReader, BufWriter};
use futures_util::stream::StreamExt;
use futures_util::TryStreamExt;
use http::header::{HOST, ORIGIN};
use http::{HeaderMap, HeaderValue};
use jsonrpsee_core::id_providers::RandomIntegerIdProvider;
use jsonrpsee_core::middleware::WsMiddleware as Middleware;
use jsonrpsee_core::server::access_control::AccessControl;
use jsonrpsee_core::server::helpers::{
	prepare_error, BatchResponse, BatchResponseBuilder, BoundedSubscriptions, MethodResponse, MethodSink,
};
Maciej Hirsz's avatar
Maciej Hirsz committed
use jsonrpsee_core::server::resource_limiting::Resources;
use jsonrpsee_core::server::rpc_module::{ConnState, ConnectionId, MethodKind, Methods};
use jsonrpsee_core::tracing::{rx_log_from_json, rx_log_from_str, tx_log_from_str, RpcTracing};
use jsonrpsee_core::traits::IdProvider;
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::error::{reject_too_big_request, reject_too_many_subscriptions};
use jsonrpsee_types::Params;
use soketto::connection::Error as SokettoError;
use soketto::data::ByteSlice125;
use soketto::handshake::WebSocketKey;
Maciej Hirsz's avatar
Maciej Hirsz committed
use soketto::handshake::{server::Response, Server as SokettoServer};
use soketto::Sender;
Maciej Hirsz's avatar
Maciej Hirsz committed
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
use tokio_stream::wrappers::IntervalStream;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
use tracing_futures::Instrument;
/// Default maximum connections allowed.
const MAX_CONNECTIONS: u64 = 100;

/// A WebSocket JSON RPC server.
pub struct Server<M> {
	listener: TcpListener,
	cfg: Settings,
Maciej Hirsz's avatar
Maciej Hirsz committed
	stop_monitor: StopMonitor,
Maciej Hirsz's avatar
Maciej Hirsz committed
	resources: Resources,
	middleware: M,
	id_provider: Arc<dyn IdProvider>,
}

impl<M> std::fmt::Debug for Server<M> {
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		f.debug_struct("Server")
			.field("listener", &self.listener)
			.field("cfg", &self.cfg)
			.field("stop_monitor", &self.stop_monitor)
			.field("id_provider", &self.id_provider)
			.field("resources", &self.resources)
impl<M: Middleware> Server<M> {
	/// Returns socket address to which the server is bound.
	pub fn local_addr(&self) -> Result<SocketAddr, Error> {
Maciej Hirsz's avatar
Maciej Hirsz committed
		self.listener.local_addr().map_err(Into::into)
	/// Returns the handle to stop the running server.
	pub fn server_handle(&self) -> ServerHandle {
Maciej Hirsz's avatar
Maciej Hirsz committed
		self.stop_monitor.handle()
	/// Start responding to connections requests. This will run on the tokio runtime until the server is stopped.
	pub fn start(mut self, methods: impl Into<Methods>) -> Result<ServerHandle, Error> {
Maciej Hirsz's avatar
Maciej Hirsz committed
		let methods = methods.into().initialize_resources(&self.resources)?;
		let handle = self.server_handle();
		match self.cfg.tokio_runtime.take() {
			Some(rt) => rt.spawn(self.start_inner(methods)),
			None => tokio::spawn(self.start_inner(methods)),
		};
Maciej Hirsz's avatar
Maciej Hirsz committed

		Ok(handle)
	}

	async fn start_inner(self, methods: Methods) {
Maciej Hirsz's avatar
Maciej Hirsz committed
		let stop_monitor = self.stop_monitor;
Maciej Hirsz's avatar
Maciej Hirsz committed
		let resources = self.resources;
		let middleware = self.middleware;
Maciej Hirsz's avatar
Maciej Hirsz committed
		let mut id = 0;
		let mut connections = FutureDriver::default();
		let mut incoming = Monitored::new(Incoming(self.listener), &stop_monitor);
			match connections.select_with(&mut incoming).await {
				Ok((socket, _addr)) => {
					if let Err(e) = socket.set_nodelay(true) {
						tracing::error!("Could not set NODELAY on socket: {:?}", e);
					if connections.count() >= self.cfg.max_connections as usize {
						tracing::warn!("Too many connections. Try again in a while.");
						connections.add(Box::pin(handshake(socket, HandshakeResponse::Reject { status_code: 429 })));

					let methods = &methods;
					let cfg = &self.cfg;
					let id_provider = self.id_provider.clone();
					connections.add(Box::pin(handshake(
						socket,
Maciej Hirsz's avatar
Maciej Hirsz committed
						HandshakeResponse::Accept {
							conn_id: id,
							methods,
							resources: &resources,
							cfg,
							stop_monitor: &stop_monitor,
							middleware: middleware.clone(),
Maciej Hirsz's avatar
Maciej Hirsz committed
						},
					tracing::info!("Accepting new connection {}/{}", connections.count(), self.cfg.max_connections);
					tracing::error!("Error while awaiting a new connection: {:?}", err);
Maciej Hirsz's avatar
Maciej Hirsz committed

		connections.await
/// This is a glorified select listening for new messages, while also checking the `stop_receiver` signal.
struct Monitored<'a, F> {
	future: F,
Maciej Hirsz's avatar
Maciej Hirsz committed
	stop_monitor: &'a StopMonitor,
impl<'a, F> Monitored<'a, F> {
	fn new(future: F, stop_monitor: &'a StopMonitor) -> Self {
		Monitored { future, stop_monitor }
struct Incoming(TcpListener);

impl<'a> Future for Monitored<'a, Incoming> {
	type Output = Result<(TcpStream, SocketAddr), MonitoredError<std::io::Error>>;

	fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
		let this = Pin::into_inner(self);

Maciej Hirsz's avatar
Maciej Hirsz committed
		if this.stop_monitor.shutdown_requested() {
			return Poll::Ready(Err(MonitoredError::Shutdown));
		this.future.0.poll_accept(cx).map_err(MonitoredError::Selector)
Loading full blame...