rpc_module.rs 33.1 KiB
Newer Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::server::helpers::MethodSink;
Maciej Hirsz's avatar
Maciej Hirsz committed
use crate::server::resource_limiting::{ResourceGuard, ResourceTable, ResourceVec, Resources};
use futures_channel::{mpsc, oneshot};
use futures_util::{future::BoxFuture, FutureExt, StreamExt};
use jsonrpsee_types::to_json_raw_value;
use jsonrpsee_types::v2::error::{invalid_subscription_err, CALL_EXECUTION_FAILED_CODE};
David's avatar
David committed
use jsonrpsee_types::{
	error::{Error, SubscriptionClosedError},
David's avatar
David committed
	traits::ToRpcParams,
	v2::{
		ErrorCode, Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload,
		SubscriptionResponse, TwoPointZero,
David's avatar
David committed
	},
	DeserializeOwned,
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
use serde::Serialize;
use serde_json::value::RawValue;
Maciej Hirsz's avatar
Maciej Hirsz committed
use std::collections::hash_map::Entry;
use std::fmt::{self, Debug};
Maciej Hirsz's avatar
Maciej Hirsz committed
use std::future::Future;
Maciej Hirsz's avatar
Maciej Hirsz committed
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

Maciej Hirsz's avatar
Maciej Hirsz committed
/// A `MethodCallback` is an RPC endpoint, callable with a standard JSON-RPC request,
/// implemented as a function pointer to a `Fn` function taking four arguments:
/// the `id`, `params`, a channel the function uses to communicate the result (or error)
/// back to `jsonrpsee`, and the connection ID (useful for the websocket transport).
pub type SyncMethod = Arc<dyn Send + Sync + Fn(Id, Params, &MethodSink, ConnectionId) -> bool>;
Maciej Hirsz's avatar
Maciej Hirsz committed
/// Similar to [`SyncMethod`], but represents an asynchronous handler and takes an additional argument containing a [`ResourceGuard`] if configured.
pub type AsyncMethod<'a> =
	Arc<dyn Send + Sync + Fn(Id<'a>, Params<'a>, MethodSink, Option<ResourceGuard>) -> BoxFuture<'a, bool>>;
/// Connection ID, used for stateful protocol such as WebSockets.
/// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value.
pub type ConnectionId = usize;
/// Subscription ID.
pub type SubscriptionId = u64;

type Subscribers = Arc<Mutex<FxHashMap<SubscriptionKey, (MethodSink, oneshot::Receiver<()>)>>>;

/// Represent a unique subscription entry based on [`SubscriptionId`] and [`ConnectionId`].
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
struct SubscriptionKey {
	conn_id: ConnectionId,
	sub_id: SubscriptionId,
}
/// Callback wrapper that can be either sync or async.
#[derive(Clone)]
Maciej Hirsz's avatar
Maciej Hirsz committed
enum MethodKind {
	/// Synchronous method handler.
	Sync(SyncMethod),
	/// Asynchronous method handler.
	Async(AsyncMethod<'static>),
Maciej Hirsz's avatar
Maciej Hirsz committed
/// Information about resources the method uses during its execution. Initialized when the the server starts.
#[derive(Clone, Debug)]
enum MethodResources {
	/// Uninitialized resource table, mapping string label to units.
	Uninitialized(Box<[(&'static str, u16)]>),
	/// Initialized resource table containing units for each `ResourceId`.
	Initialized(ResourceTable),
}

/// Method callback wrapper that contains a sync or async closure,
/// plus a table with resources it needs to claim to run
#[derive(Clone, Debug)]
pub struct MethodCallback {
	callback: MethodKind,
	resources: MethodResources,
}

/// Result of a method, either direct value or a future of one.
pub enum MethodResult<T> {
	/// Result by value
	Sync(T),
	/// Future of a value
	Async(BoxFuture<'static, T>),
}

impl<T: Debug> Debug for MethodResult<T> {
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
		match self {
			MethodResult::Sync(result) => result.fmt(f),
			MethodResult::Async(_) => f.write_str("<future>"),
		}
	}
}

Maciej Hirsz's avatar
Maciej Hirsz committed
/// Builder for configuring resources used by a method.
#[derive(Debug)]
pub struct MethodResourcesBuilder<'a> {
	build: ResourceVec<(&'static str, u16)>,
	callback: &'a mut MethodCallback,
}

impl<'a> MethodResourcesBuilder<'a> {
	/// Define how many units of a given named resource the method uses during its execution.
	pub fn resource(mut self, label: &'static str, units: u16) -> Result<Self, Error> {
		self.build.try_push((label, units)).map_err(|_| Error::MaxResourcesReached)?;
		Ok(self)
	}
}

impl<'a> Drop for MethodResourcesBuilder<'a> {
	fn drop(&mut self) {
		self.callback.resources = MethodResources::Uninitialized(self.build[..].into());
	}
}

impl MethodCallback {
Maciej Hirsz's avatar
Maciej Hirsz committed
	fn new_sync(callback: SyncMethod) -> Self {
		MethodCallback { callback: MethodKind::Sync(callback), resources: MethodResources::Uninitialized([].into()) }
	}

	fn new_async(callback: AsyncMethod<'static>) -> Self {
		MethodCallback { callback: MethodKind::Async(callback), resources: MethodResources::Uninitialized([].into()) }
	}

	/// Attempt to claim resources prior to executing a method. On success returns a guard that releases
	/// claimed resources when dropped.
	pub fn claim(&self, name: &str, resources: &Resources) -> Result<ResourceGuard, Error> {
		match self.resources {
			MethodResources::Uninitialized(_) => Err(Error::UninitializedMethod(name.into())),
			MethodResources::Initialized(units) => resources.claim(units),
		}
	}

	/// Execute the callback, sending the resulting JSON (success or error) to the specified sink.
Maciej Hirsz's avatar
Maciej Hirsz committed
	pub fn execute(
		&self,
		sink: &MethodSink,
Maciej Hirsz's avatar
Maciej Hirsz committed
		req: Request<'_>,
		conn_id: ConnectionId,
		claimed: Option<ResourceGuard>,
	) -> MethodResult<bool> {
		let id = req.id.clone();
David's avatar
David committed
		let params = Params::new(req.params.map(|params| params.get()));

		let result = match &self.callback {
Maciej Hirsz's avatar
Maciej Hirsz committed
			MethodKind::Sync(callback) => {
				tracing::trace!(
					"[MethodCallback::execute] Executing sync callback, params={:?}, req.id={:?}, conn_id={:?}",
					params,
					id,
					conn_id
				);

				let result = (callback)(id, params, sink, conn_id);
Maciej Hirsz's avatar
Maciej Hirsz committed
				// Release claimed resources
				drop(claimed);

				MethodResult::Sync(result)
Maciej Hirsz's avatar
Maciej Hirsz committed
			MethodKind::Async(callback) => {
				let sink = sink.clone();
				let params = params.into_owned();
				let id = id.into_owned();
				tracing::trace!(
					"[MethodCallback::execute] Executing async callback, params={:?}, req.id={:?}, conn_id={:?}",
					params,
					id,
					conn_id
				);
				MethodResult::Async((callback)(id, params, sink, claimed))
Loading full blame...