core.rs 6.86 KiB
Newer Older
// Copyright (c) 2019 Parity Technologies Limited
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::transport::{HttpTransportServer, TransportServerEvent};
use jsonrpsee_types::jsonrpc::{
	self,
	wrapped::{batches, Notification, Params},
};

use core::{fmt, hash::Hash};

pub type RequestId = u64;

/// Wraps around a "raw server" and adds capabilities.
///
/// See the module-level documentation for more information.
pub struct RawServer {
	/// Internal "raw" server.
	raw: HttpTransportServer,

	/// List of requests that are in the progress of being answered. Each batch is associated with
	/// the raw request ID, or with `None` if this raw request has been closed.
	///
	/// See the documentation of [`BatchesState`][batches::BatchesState] for more information.
	batches: batches::BatchesState<Option<RequestId>>,
}

/// Identifier of a request within a `RawServer`.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct RawServerRequestId {
	inner: batches::BatchesRequestId,
}

/// Identifier of a subscription within a [`RawServer`](crate::server::RawServer).
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct RawServerSubscriptionId([u8; 32]);

/// Event generated by a [`RawServer`](crate::server::RawServer).
///
/// > **Note**: Holds a borrow of the `RawServer`. Therefore, must be dropped before the `RawServer` can
/// >           be dropped.
#[derive(Debug)]
pub enum RawServerEvent<'a> {
	/// Request is a notification.
	Notification(Notification),

	/// Request is a method call.
	Request(RawServerRequest<'a>),
}

/// Request received by a [`RawServer`](crate::raw::RawServer).
pub struct RawServerRequest<'a> {
	/// Reference to the request within `self.batches`.
	inner: batches::BatchesRequest<'a, Option<RequestId>>,
}

impl RawServer {
	/// Starts a [`RawServer`](crate::raw::RawServer) using the given raw server internally.
	pub fn new(raw: HttpTransportServer) -> RawServer {
		RawServer { raw, batches: batches::BatchesState::new() }
	}
}

impl RawServer {
	/// Returns a `Future` resolving to the next event that this server generates.
	pub async fn next_event<'a>(&'a mut self) -> RawServerEvent<'a> {
		let request_id = loop {
			match self.batches.next_event() {
				None => {}
				Some(batches::BatchesEvent::Notification { notification, .. }) => {
					return RawServerEvent::Notification(notification)
				}
				Some(batches::BatchesEvent::Request(inner)) => {
					break RawServerRequestId { inner: inner.id() };
				}
				Some(batches::BatchesEvent::ReadyToSend { response, user_param: Some(raw_request_id) }) => {
					let _ = self.raw.finish(&raw_request_id, Some(&response)).await;
					continue;
				}
				Some(batches::BatchesEvent::ReadyToSend { response: _, user_param: None }) => {
					// This situation happens if the connection has been closed by the client.
					// Clients who close their connection.
					continue;
				}
			};

			match self.raw.next_request().await {
				TransportServerEvent::Request { id, request } => self.batches.inject(request, Some(id)),
				TransportServerEvent::Closed(raw_id) => {
					// The client has a closed their connection. We eliminate all traces of the
					// raw request ID from our state.
					// TODO: this has an O(n) complexity; make sure that this is not attackable
					for ud in self.batches.batches() {
						if ud.as_ref() == Some(&raw_id) {
							*ud = None;
						}
					}
				}
			};
		};
		RawServerEvent::Request(self.request_by_id(&request_id).unwrap())
	}

	/// Returns a request previously returned by [`next_event`](crate::raw::RawServer::next_event)
	/// by its id.
	///
	/// Note that previous notifications don't have an ID and can't be accessed with this method.
	///
	/// Returns `None` if the request ID is invalid or if the request has already been answered in
	/// the past.
	pub fn request_by_id<'a>(&'a mut self, id: &RawServerRequestId) -> Option<RawServerRequest<'a>> {
		Some(RawServerRequest { inner: self.batches.request_by_id(id.inner)? })
	}
}

impl From<HttpTransportServer> for RawServer {
	fn from(inner: HttpTransportServer) -> Self {
		RawServer::new(inner)
	}
}

impl<'a> RawServerRequest<'a> {
	/// Returns the id of the request.
	///
	/// If this request object is dropped, you can retrieve it again later by calling
	/// [`request_by_id`](crate::raw::RawServer::request_by_id).
	pub fn id(&self) -> RawServerRequestId {
		RawServerRequestId { inner: self.inner.id() }
	}

	/// Returns the id that the client sent out.
	// TODO: can return None, which is wrong
	pub fn request_id(&self) -> &jsonrpc::Id {
		self.inner.request_id()
	}

	/// Returns the method of this request.
	pub fn method(&self) -> &str {
		self.inner.method()
	}

	/// Returns the parameters of the request, as a `jsonrpc::Params`.
	pub fn params(&self) -> Params {
		self.inner.params()
	}
}

impl<'a> RawServerRequest<'a> {
	/// Send back a response.
	///
	/// If this request is part of a batch:
	///
	/// - If all requests of the batch have been responded to, then the response is actively
	///   sent out.
	/// - Otherwise, this response is buffered.
	///
	/// > **Note**: This method is implemented in a way that doesn't wait for long to send the
	/// >           response. While calling this method will block your entire server, it
	/// >           should only block it for a short amount of time. See also [the equivalent
	/// >           method](crate::transport::TransportServer::finish) on the
	/// >           [`TransportServer`](crate::transport::TransportServer) trait.
	///
	pub fn respond(self, response: Result<jsonrpc::JsonValue, jsonrpc::Error>) {
		self.inner.set_response(response);
		//unimplemented!();
		// TODO: actually send out response?
	}
}

impl<'a> fmt::Debug for RawServerRequest<'a> {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		f.debug_struct("RawServerRequest")
			.field("request_id", &self.request_id())
			.field("method", &self.method())
			.field("params", &self.params())
			.finish()
	}
}