server.rs 7.32 KiB
Newer Older
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
Maciej Hirsz's avatar
Maciej Hirsz committed
// IN background_task WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use futures_channel::mpsc;
use futures_util::io::{BufReader, BufWriter};
David's avatar
David committed
use futures_util::stream::StreamExt;
use serde::Serialize;
Maciej Hirsz's avatar
Maciej Hirsz committed
use soketto::handshake::{server::Response, Server as SokettoServer};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::{TcpListener, ToSocketAddrs};
David's avatar
David committed
use tokio_stream::wrappers::TcpListenerStream;
Maciej Hirsz's avatar
Maciej Hirsz committed
use tokio_util::compat::TokioAsyncReadCompatExt;

David's avatar
David committed
use jsonrpsee_types::error::{CallError, Error};
use jsonrpsee_types::v2::error::JsonRpcErrorCode;
use jsonrpsee_types::v2::params::{Id, RpcParams};
use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest};
use jsonrpsee_utils::server::helpers::{collect_batch_response, send_error};
use jsonrpsee_utils::server::rpc_module::{ConnectionId, MethodSink, Methods, RpcModule, SubscriptionSink};
pub struct Server {
	root: RpcModule,
	listener: TcpListener,
}

Maciej Hirsz's avatar
Maciej Hirsz committed
impl Server {
	/// Create a new WebSocket RPC server, bound to the `addr`.
Maciej Hirsz's avatar
Maciej Hirsz committed
	pub async fn new(addr: impl ToSocketAddrs) -> anyhow::Result<Self> {
		let listener = TcpListener::bind(addr).await?;
		Ok(Server { listener, root: RpcModule::new() })
Maciej Hirsz's avatar
Maciej Hirsz committed
	}
Maciej Hirsz's avatar
Maciej Hirsz committed
	/// Register a new RPC method, which responds with a given callback.
	pub fn register_method<R, F>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
Maciej Hirsz's avatar
Maciej Hirsz committed
	where
		R: Serialize,
David's avatar
David committed
		F: Fn(RpcParams) -> Result<R, CallError> + Send + Sync + 'static,
Maciej Hirsz's avatar
Maciej Hirsz committed
	{
		self.root.register_method(method_name, callback)
Maciej Hirsz's avatar
Maciej Hirsz committed
	/// Register a new RPC subscription, with subscribe and unsubscribe methods.
	pub fn register_subscription(
Maciej Hirsz's avatar
Maciej Hirsz committed
		&mut self,
		subscribe_method_name: &'static str,
		unsubscribe_method_name: &'static str,
	) -> Result<SubscriptionSink, Error> {
		self.root.register_subscription(subscribe_method_name, unsubscribe_method_name)
	}
	/// Register all methods from a module on this server.
	pub fn register_module(&mut self, module: RpcModule) -> Result<(), Error> {
		self.root.merge(module)
	/// Returns socket address to which the server is bound.
Maciej Hirsz's avatar
Maciej Hirsz committed
	pub fn local_addr(&self) -> anyhow::Result<SocketAddr> {
		self.listener.local_addr().map_err(Into::into)
Maciej Hirsz's avatar
Maciej Hirsz committed
	/// Start responding to connections requests. This will block current thread until the server is stopped.
	pub async fn start(self) {
		let mut incoming = TcpListenerStream::new(self.listener);
		let methods = Arc::new(self.root.into_methods());
Maciej Hirsz's avatar
Maciej Hirsz committed
		let mut id = 0;

		while let Some(socket) = incoming.next().await {
			if let Ok(socket) = socket {
				socket.set_nodelay(true).unwrap();

				let methods = methods.clone();

				tokio::spawn(async move { background_task(socket, methods, id).await });

				id += 1;
David's avatar
David committed
async fn background_task(
	socket: tokio::net::TcpStream,
	methods: Arc<Methods>,
	conn_id: ConnectionId,
) -> anyhow::Result<()> {
Maciej Hirsz's avatar
Maciej Hirsz committed
	// For each incoming background_task we perform a handshake.
	let mut server = SokettoServer::new(BufReader::new(BufWriter::new(socket.compat())));
Maciej Hirsz's avatar
Maciej Hirsz committed
	let websocket_key = {
		let req = server.receive_request().await?;
		req.into_key()
	};
Maciej Hirsz's avatar
Maciej Hirsz committed
	// Here we accept the client unconditionally.
	let accept = Response::Accept { key: &websocket_key, protocol: None };
	server.send_response(&accept).await?;

	// And we can finally transition to a websocket background_task.
	let (mut sender, mut receiver) = server.into_builder().finish();
	let (tx, mut rx) = mpsc::unbounded::<String>();
David's avatar
David committed
	// Send results back to the client.
Maciej Hirsz's avatar
Maciej Hirsz committed
	tokio::spawn(async move {
		while let Some(response) = rx.next().await {
			log::debug!("send: {}", response);
Maciej Hirsz's avatar
Maciej Hirsz committed
			let _ = sender.send_binary_mut(response.into_bytes()).await;
			let _ = sender.flush().await;
		}
	});
David's avatar
David committed
	let mut data = Vec::with_capacity(100);

	// Look up the "method" (i.e. function pointer) from the registered methods and run it passing in
	// the params from the request. The result of the computation is sent back over the `tx` channel and
	// the result(s) are collected into a `String` and sent back over the wire.
	let execute = move |tx: &MethodSink, req: JsonRpcRequest| {
David's avatar
David committed
		if let Some(method) = methods.get(&*req.method) {
			let params = RpcParams::new(req.params.map(|params| params.get()));
			if let Err(err) = (method)(req.id.clone(), params, &tx, conn_id) {
David's avatar
David committed
				log::error!("execution of method call '{}' failed: {:?}, request id={:?}", req.method, err, req.id);
				send_error(req.id, &tx, JsonRpcErrorCode::ServerError(-1).into());
			}
		} else {
			send_error(req.id, &tx, JsonRpcErrorCode::MethodNotFound.into());
		}
	};
Maciej Hirsz's avatar
Maciej Hirsz committed
		data.clear();
Maciej Hirsz's avatar
Maciej Hirsz committed
		receiver.receive_data(&mut data).await?;

David's avatar
David committed
		// For reasons outlined [here](https://github.com/serde-rs/json/issues/497), `RawValue` can't be used with
		// untagged enums at the moment. This means we can't use an `SingleOrBatch` untagged enum here and have to try
		// each case individually: first the single request case, then the batch case and lastly the error. For the
		// worst case – unparseable input – we make three calls to [`serde_json::from_slice`] which is pretty annoying.
		// Our [issue](https://github.com/paritytech/jsonrpsee/issues/296).
		if let Ok(req) = serde_json::from_slice::<JsonRpcRequest>(&data) {
			log::debug!("recv: {:?}", req);
David's avatar
David committed
			execute(&tx, req);
		} else if let Ok(batch) = serde_json::from_slice::<Vec<JsonRpcRequest>>(&data) {
			if !batch.is_empty() {
				// Batch responses must be sent back as a single message so we read the results from each request in the
				// batch and read the results off of a new channel, `rx_batch`, and then send the complete batch response
				// back to the client over `tx`.
				let (tx_batch, mut rx_batch) = mpsc::unbounded::<String>();
				for req in batch {
					execute(&tx_batch, req);
				}
				// Closes the receiving half of a channel without dropping it. This prevents any further messages from
				// being sent on the channel.
				rx_batch.close();
				let results = collect_batch_response(rx_batch).await;
				if let Err(err) = tx.unbounded_send(results) {
					log::error!("Error sending batch response to the client: {:?}", err)
David's avatar
David committed
			} else {
				send_error(Id::Null, &tx, JsonRpcErrorCode::InvalidRequest.into());
David's avatar
David committed
		} else {
			let (id, code) = match serde_json::from_slice::<JsonRpcInvalidRequest>(&data) {
				Ok(req) => (req.id, JsonRpcErrorCode::InvalidRequest),
				Err(_) => (Id::Null, JsonRpcErrorCode::ParseError),
David's avatar
David committed
			send_error(id, &tx, code.into());