Newer
Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::server::resource_limiting::{ResourceGuard, ResourceTable, ResourceVec, Resources};
use futures_channel::{mpsc, oneshot};
use futures_util::{future::BoxFuture, FutureExt, StreamExt};
use jsonrpsee_types::to_json_raw_value;
use jsonrpsee_types::v2::error::{invalid_subscription_err, CALL_EXECUTION_FAILED_CODE};
error::{Error, SubscriptionClosedError},
ErrorCode, Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload,
SubscriptionResponse, TwoPointZero,
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
use serde::Serialize;
use serde_json::value::RawValue;
/// A `MethodCallback` is an RPC endpoint, callable with a standard JSON-RPC request,
/// implemented as a function pointer to a `Fn` function taking four arguments:
/// the `id`, `params`, a channel the function uses to communicate the result (or error)
/// back to `jsonrpsee`, and the connection ID (useful for the websocket transport).
pub type SyncMethod = Arc<dyn Send + Sync + Fn(Id, Params, &MethodSink, ConnectionId) -> bool>;
/// Similar to [`SyncMethod`], but represents an asynchronous handler and takes an additional argument containing a [`ResourceGuard`] if configured.
pub type AsyncMethod<'a> =
Arc<dyn Send + Sync + Fn(Id<'a>, Params<'a>, MethodSink, Option<ResourceGuard>) -> BoxFuture<'a, bool>>;
/// Connection ID, used for stateful protocol such as WebSockets.
/// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value.
pub type ConnectionId = usize;
/// Subscription ID.
pub type SubscriptionId = u64;
type Subscribers = Arc<Mutex<FxHashMap<SubscriptionKey, (MethodSink, oneshot::Receiver<()>)>>>;
/// Represent a unique subscription entry based on [`SubscriptionId`] and [`ConnectionId`].
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
struct SubscriptionKey {
conn_id: ConnectionId,
sub_id: SubscriptionId,
}
/// Callback wrapper that can be either sync or async.
/// Information about resources the method uses during its execution. Initialized when the the server starts.
#[derive(Clone, Debug)]
enum MethodResources {
/// Uninitialized resource table, mapping string label to units.
Uninitialized(Box<[(&'static str, u16)]>),
/// Initialized resource table containing units for each `ResourceId`.
Initialized(ResourceTable),
}
/// Method callback wrapper that contains a sync or async closure,
/// plus a table with resources it needs to claim to run
#[derive(Clone, Debug)]
pub struct MethodCallback {
callback: MethodKind,
resources: MethodResources,
}
/// Result of a method, either direct value or a future of one.
pub enum MethodResult<T> {
/// Result by value
Sync(T),
/// Future of a value
Async(BoxFuture<'static, T>),
}
impl<T: Debug> Debug for MethodResult<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
MethodResult::Sync(result) => result.fmt(f),
MethodResult::Async(_) => f.write_str("<future>"),
}
}
}
/// Builder for configuring resources used by a method.
#[derive(Debug)]
pub struct MethodResourcesBuilder<'a> {
build: ResourceVec<(&'static str, u16)>,
callback: &'a mut MethodCallback,
}
impl<'a> MethodResourcesBuilder<'a> {
/// Define how many units of a given named resource the method uses during its execution.
pub fn resource(mut self, label: &'static str, units: u16) -> Result<Self, Error> {
self.build.try_push((label, units)).map_err(|_| Error::MaxResourcesReached)?;
Ok(self)
}
}
impl<'a> Drop for MethodResourcesBuilder<'a> {
fn drop(&mut self) {
self.callback.resources = MethodResources::Uninitialized(self.build[..].into());
}
}
fn new_sync(callback: SyncMethod) -> Self {
MethodCallback { callback: MethodKind::Sync(callback), resources: MethodResources::Uninitialized([].into()) }
}
fn new_async(callback: AsyncMethod<'static>) -> Self {
MethodCallback { callback: MethodKind::Async(callback), resources: MethodResources::Uninitialized([].into()) }
}
/// Attempt to claim resources prior to executing a method. On success returns a guard that releases
/// claimed resources when dropped.
pub fn claim(&self, name: &str, resources: &Resources) -> Result<ResourceGuard, Error> {
match self.resources {
MethodResources::Uninitialized(_) => Err(Error::UninitializedMethod(name.into())),
MethodResources::Initialized(units) => resources.claim(units),
}
}
/// Execute the callback, sending the resulting JSON (success or error) to the specified sink.
req: Request<'_>,
conn_id: ConnectionId,
claimed: Option<ResourceGuard>,
let params = Params::new(req.params.map(|params| params.get()));
"[MethodCallback::execute] Executing sync callback, params={:?}, req.id={:?}, conn_id={:?}",
params,
id,
conn_id
);
let result = (callback)(id, params, sink, conn_id);
// Release claimed resources
drop(claimed);
"[MethodCallback::execute] Executing async callback, params={:?}, req.id={:?}, conn_id={:?}",
params,
id,
conn_id
);
MethodResult::Async((callback)(id, params, sink, claimed))
Loading full blame...