Newer
Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::collections::hash_map::Entry;
use std::fmt::{self, Debug};
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use crate::error::{Error, SubscriptionClosed};
use crate::id_providers::RandomIntegerIdProvider;
Niklas Adolfsson
committed
use crate::server::helpers::{BoundedSubscriptions, MethodSink, SubscriptionPermit};
use crate::server::resource_limiting::{ResourceGuard, ResourceTable, ResourceVec, Resources};
use crate::traits::{IdProvider, ToRpcParams};
use futures_util::future::Either;
Niklas Adolfsson
committed
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStream, TryStreamExt};
use jsonrpsee_types::error::{
CallError, ErrorCode, ErrorObject, ErrorObjectOwned, SubscriptionAcceptRejectError, INTERNAL_ERROR_CODE,
SUBSCRIPTION_CLOSED_WITH_ERROR,
use jsonrpsee_types::response::{SubscriptionError, SubscriptionPayloadError};
ErrorResponse, Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload,
SubscriptionResponse, SubscriptionResult,
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
Niklas Adolfsson
committed
use tokio::sync::watch;
/// A `MethodCallback` is an RPC endpoint, callable with a standard JSON-RPC request,
/// implemented as a function pointer to a `Fn` function taking four arguments:
/// the `id`, `params`, a channel the function uses to communicate the result (or error)
/// back to `jsonrpsee`, and the connection ID (useful for the websocket transport).
pub type SyncMethod = Arc<dyn Send + Sync + Fn(Id, Params, MaxResponseSize) -> MethodResponse>;
/// Similar to [`SyncMethod`], but represents an asynchronous handler and takes an additional argument containing a [`ResourceGuard`] if configured.
pub type AsyncMethod<'a> = Arc<
dyn Send
+ Sync
+ Fn(Id<'a>, Params<'a>, ConnectionId, MaxResponseSize, Option<ResourceGuard>) -> BoxFuture<'a, MethodResponse>,
/// Method callback for subscriptions.
pub type SubscriptionMethod<'a> = Arc<
dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState, Option<ResourceGuard>) -> BoxFuture<'a, MethodResponse>,
>;
Niklas Adolfsson
committed
// Method callback to unsubscribe.
type UnsubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, ConnectionId, MaxResponseSize) -> MethodResponse>;
/// Connection ID, used for stateful protocol such as WebSockets.
/// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value.
pub type ConnectionId = usize;
/// Max response size.
pub type MaxResponseSize = usize;
/// Raw response from an RPC
/// A 3-tuple containing:
/// - Call result as a `String`,
/// - a [`mpsc::UnboundedReceiver<String>`] to receive future subscription results
Niklas Adolfsson
committed
/// - a [`crate::server::helpers::SubscriptionPermit`] to allow subscribers to notify their [`SubscriptionSink`] when they disconnect.
pub type RawRpcResponse = (MethodResponse, mpsc::UnboundedReceiver<String>, SubscriptionPermit);
pub struct ConnState<'a> {
/// Connection ID
pub conn_id: ConnectionId,
/// Get notified when the connection to subscribers is closed.
Niklas Adolfsson
committed
pub close_notify: SubscriptionPermit,
/// ID provider.
pub id_provider: &'a dyn IdProvider,
}
/// Outcome of a successful terminated subscription.
#[derive(Debug)]
pub enum InnerSubscriptionResult {
/// The subscription stream was executed successfully.
Success,
/// The subscription was aborted by the remote peer.
Aborted,
}
impl<'a> std::fmt::Debug for ConnState<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConnState").field("conn_id", &self.conn_id).field("close", &self.close_notify).finish()
Niklas Adolfsson
committed
type Subscribers = Arc<Mutex<FxHashMap<SubscriptionKey, (MethodSink, watch::Sender<()>)>>>;
/// Represent a unique subscription entry based on [`RpcSubscriptionId`] and [`ConnectionId`].
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct SubscriptionKey {
conn_id: ConnectionId,
sub_id: RpcSubscriptionId<'static>,
/// Callback wrapper that can be either sync or async.
pub enum MethodKind {
/// Asynchronous method handler.
Niklas Adolfsson
committed
/// Subscription method handler.
Subscription(SubscriptionMethod<'static>),
Niklas Adolfsson
committed
/// Unsubscription method handler.
Unsubscription(UnsubscriptionMethod),
/// Information about resources the method uses during its execution. Initialized when the the server starts.
#[derive(Clone, Debug)]
enum MethodResources {
/// Uninitialized resource table, mapping string label to units.
Uninitialized(Box<[(&'static str, u16)]>),
/// Initialized resource table containing units for each `ResourceId`.
Initialized(ResourceTable),
}
/// Method callback wrapper that contains a sync or async closure,
/// plus a table with resources it needs to claim to run
#[derive(Clone, Debug)]
pub struct MethodCallback {
callback: MethodKind,
resources: MethodResources,
}
/// Result of a method, either direct value or a future of one.
pub enum MethodResult<T> {
/// Result by value
Sync(T),
/// Future of a value
Async(BoxFuture<'static, T>),
}
impl<T: Debug> Debug for MethodResult<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
MethodResult::Sync(result) => result.fmt(f),
MethodResult::Async(_) => f.write_str("<future>"),
}
}
}
/// Builder for configuring resources used by a method.
#[derive(Debug)]
pub struct MethodResourcesBuilder<'a> {
build: ResourceVec<(&'static str, u16)>,
callback: &'a mut MethodCallback,
}
impl<'a> MethodResourcesBuilder<'a> {
/// Define how many units of a given named resource the method uses during its execution.
pub fn resource(mut self, label: &'static str, units: u16) -> Result<Self, Error> {
self.build.try_push((label, units)).map_err(|_| Error::MaxResourcesReached)?;
Ok(self)
}
}
impl<'a> Drop for MethodResourcesBuilder<'a> {
fn drop(&mut self) {
self.callback.resources = MethodResources::Uninitialized(self.build[..].into());
}
}
fn new_sync(callback: SyncMethod) -> Self {
MethodCallback { callback: MethodKind::Sync(callback), resources: MethodResources::Uninitialized([].into()) }
}
fn new_async(callback: AsyncMethod<'static>) -> Self {
MethodCallback { callback: MethodKind::Async(callback), resources: MethodResources::Uninitialized([].into()) }
}
fn new_subscription(callback: SubscriptionMethod<'static>) -> Self {
MethodCallback {
callback: MethodKind::Subscription(callback),
resources: MethodResources::Uninitialized([].into()),
}
}
Niklas Adolfsson
committed
fn new_unsubscription(callback: UnsubscriptionMethod) -> Self {
MethodCallback {
callback: MethodKind::Unsubscription(callback),
resources: MethodResources::Uninitialized([].into()),
}
}
/// Attempt to claim resources prior to executing a method. On success returns a guard that releases
/// claimed resources when dropped.
pub fn claim(&self, name: &str, resources: &Resources) -> Result<ResourceGuard, Error> {
match self.resources {
MethodResources::Uninitialized(_) => Err(Error::UninitializedMethod(name.into())),
MethodResources::Initialized(units) => resources.claim(units),
}
}
/// Get handle to the callback.
pub fn inner(&self) -> &MethodKind {
&self.callback
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Async(_) => write!(f, "Async"),
Self::Sync(_) => write!(f, "Sync"),
Self::Subscription(_) => write!(f, "Subscription"),
Niklas Adolfsson
committed
Self::Unsubscription(_) => write!(f, "Unsubscription"),
/// Reference-counted, clone-on-write collection of synchronous and asynchronous methods.
#[derive(Default, Debug, Clone)]
callbacks: Arc<FxHashMap<&'static str, MethodCallback>>,
impl Methods {
/// Creates a new empty [`Methods`].
pub fn new() -> Self {
Self::default()
fn verify_method_name(&mut self, name: &'static str) -> Result<(), Error> {
if self.callbacks.contains_key(name) {
return Err(Error::MethodAlreadyRegistered(name.into()));
}
Ok(())
}
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
/// Inserts the method callback for a given name, or returns an error if the name was already taken.
/// On success it returns a mut reference to the [`MethodCallback`] just inserted.
fn verify_and_insert(
&mut self,
name: &'static str,
callback: MethodCallback,
) -> Result<&mut MethodCallback, Error> {
match self.mut_callbacks().entry(name) {
Entry::Occupied(_) => Err(Error::MethodAlreadyRegistered(name.into())),
Entry::Vacant(vacant) => Ok(vacant.insert(callback)),
}
}
/// Initialize resources for all methods in this collection. This method has no effect if called more than once.
pub fn initialize_resources(mut self, resources: &Resources) -> Result<Self, Error> {
let callbacks = self.mut_callbacks();
for (&method_name, callback) in callbacks.iter_mut() {
if let MethodResources::Uninitialized(uninit) = &callback.resources {
let mut map = resources.defaults;
for &(label, units) in uninit.iter() {
let idx = match resources.labels.iter().position(|&l| l == label) {
Some(idx) => idx,
None => return Err(Error::ResourceNameNotFoundForMethod(label, method_name)),
};
// If resource capacity set to `0`, we ignore the unit value of the method
// and set it to `0` as well, effectively making the resource unlimited.
if resources.capacities[idx] == 0 {
map[idx] = 0;
} else {
map[idx] = units;
}
}
callback.resources = MethodResources::Initialized(map);
}
}
Ok(self)
}
/// Helper for obtaining a mut ref to the callbacks HashMap.
fn mut_callbacks(&mut self) -> &mut FxHashMap<&'static str, MethodCallback> {
Arc::make_mut(&mut self.callbacks)
}
/// Merge two [`Methods`]'s by adding all [`MethodCallback`]s from `other` into `self`.
/// Fails if any of the methods in `other` is present already.
pub fn merge(&mut self, other: impl Into<Methods>) -> Result<(), Error> {
let mut other = other.into();
self.verify_method_name(name)?;
}
let callbacks = self.mut_callbacks();
for (name, callback) in other.mut_callbacks().drain() {
callbacks.insert(name, callback);
/// Returns the method callback.
pub fn method(&self, method_name: &str) -> Option<&MethodCallback> {
self.callbacks.get(method_name)
/// Returns the method callback along with its name. The returned name is same as the
/// `method_name`, but its lifetime bound is `'static`.
pub fn method_with_name(&self, method_name: &str) -> Option<(&'static str, &MethodCallback)> {
self.callbacks.get_key_value(method_name).map(|(k, v)| (*k, v))
}
/// Helper to call a method on the `RPC module` without having to spin up a server.
Niklas Adolfsson
committed
///
/// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation.
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
///
/// Returns the decoded value of the `result field` in JSON-RPC response if succesful.
///
/// # Examples
///
/// ```
/// #[tokio::main]
/// async fn main() {
/// use jsonrpsee::RpcModule;
///
/// let mut module = RpcModule::new(());
/// module.register_method("echo_call", |params, _| {
/// params.one::<u64>().map_err(Into::into)
/// }).unwrap();
///
/// let echo: u64 = module.call("echo_call", [1_u64]).await.unwrap();
/// assert_eq!(echo, 1);
/// }
/// ```
pub async fn call<Params: ToRpcParams, T: DeserializeOwned>(
&self,
method: &str,
params: Params,
) -> Result<T, Error> {
let params = params.to_rpc_params()?;
let req = Request::new(method.into(), Some(¶ms), Id::Number(0));
tracing::trace!("[Methods::call] Method: {:?}, params: {:?}", method, params);
let (resp, _, _) = self.inner_call(req).await;
if resp.success {
serde_json::from_str::<Response<T>>(&resp.result).map(|r| r.result).map_err(Into::into)
} else {
match serde_json::from_str::<ErrorResponse>(&resp.result) {
Ok(err) => Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned()))),
Err(e) => Err(e.into()),
/// Make a request (JSON-RPC method call or subscription) by using raw JSON.
/// Returns the raw JSON response to the call and a stream to receive notifications if the call was a subscription.
/// # Examples
///
/// ```
/// #[tokio::main]
/// async fn main() {
/// use jsonrpsee::RpcModule;
/// use jsonrpsee::types::Response;
/// use futures_util::StreamExt;
///
/// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| {
/// sink.send(&"one answer").unwrap();
/// Ok(())
/// let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#).await.unwrap();
/// let resp = serde_json::from_str::<Response<u64>>(&resp.result).unwrap();
/// let sub_resp = stream.next().await.unwrap();
/// assert_eq!(
/// format!(r#"{{"jsonrpc":"2.0","method":"hi","params":{{"subscription":{},"result":"one answer"}}}}"#, resp.result),
/// sub_resp
/// );
pub async fn raw_json_request(
&self,
) -> Result<(MethodResponse, mpsc::UnboundedReceiver<String>), Error> {
tracing::trace!("[Methods::raw_json_request] Request: {:?}", request);
let req: Request = serde_json::from_str(request)?;
let (resp, rx, _) = self.inner_call(req).await;
Ok((resp, rx))
/// Execute a callback.
async fn inner_call(&self, req: Request<'_>) -> RawRpcResponse {
let (tx_sink, mut rx_sink) = mpsc::unbounded();
let sink = MethodSink::new(tx_sink);
let id = req.id.clone();
let params = Params::new(req.params.map(|params| params.get()));
Niklas Adolfsson
committed
let bounded_subs = BoundedSubscriptions::new(u32::MAX);
let close_notify = bounded_subs.acquire().expect("u32::MAX permits is sufficient; qed");
let notify = bounded_subs.acquire().expect("u32::MAX permits is sufficient; qed");
let response = match self.method(&req.method).map(|c| &c.callback) {
None => MethodResponse::error(req.id, ErrorObject::from(ErrorCode::MethodNotFound)),
Some(MethodKind::Sync(cb)) => (cb)(id, params, usize::MAX),
Some(MethodKind::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), 0, usize::MAX, None).await,
Some(MethodKind::Subscription(cb)) => {
let conn_state = ConnState { conn_id: 0, close_notify, id_provider: &RandomIntegerIdProvider };
let res = (cb)(id, params, sink.clone(), conn_state, None).await;
// This message is not used because it's used for metrics so we discard in other to
// not read once this is used for subscriptions.
//
// The same information is part of `res` above.
let _ = rx_sink.next().await.expect("Every call must at least produce one reponse; qed");
res
Some(MethodKind::Unsubscription(cb)) => (cb)(id, params, 0, usize::MAX),
tracing::trace!("[Methods::inner_call] Method: {}, response: {:?}", req.method, response);
}
/// Helper to create a subscription on the `RPC module` without having to spin up a server.
///
/// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation.
///
/// Returns [`Subscription`] on success which can used to get results from the subscriptions.
///
/// # Examples
///
/// ```
/// #[tokio::main]
/// async fn main() {
/// use jsonrpsee::{RpcModule, types::EmptyParams};
///
/// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| {
/// sink.send(&"one answer").unwrap();
/// Ok(())
/// }).unwrap();
///
/// let mut sub = module.subscribe("hi", EmptyParams::new()).await.unwrap();
/// // In this case we ignore the subscription ID,
/// let (sub_resp, _sub_id) = sub.next::<String>().await.unwrap().unwrap();
/// assert_eq!(&sub_resp, "one answer");
/// }
/// ```
pub async fn subscribe(&self, sub_method: &str, params: impl ToRpcParams) -> Result<Subscription, Error> {
let params = params.to_rpc_params()?;
let req = Request::new(sub_method.into(), Some(¶ms), Id::Number(0));
tracing::trace!("[Methods::subscribe] Method: {}, params: {:?}", sub_method, params);
let (response, rx, close_notify) = self.inner_call(req).await;
let subscription_response = match serde_json::from_str::<Response<RpcSubscriptionId>>(&response.result) {
Ok(r) => r,
Err(_) => match serde_json::from_str::<ErrorResponse>(&response.result) {
Ok(err) => return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned()))),
Err(err) => return Err(err.into()),
},
};
let sub_id = subscription_response.result.into_owned();
/// Returns an `Iterator` with all the method names registered on this server.
pub fn method_names(&self) -> impl Iterator<Item = &'static str> + '_ {
self.callbacks.keys().copied()
}
}
impl<Context> Deref for RpcModule<Context> {
type Target = Methods;
fn deref(&self) -> &Methods {
&self.methods
}
}
impl<Context> DerefMut for RpcModule<Context> {
fn deref_mut(&mut self) -> &mut Methods {
&mut self.methods
}
}
/// Sets of JSON-RPC methods can be organized into a "module"s that are in turn registered on the server or,
/// alternatively, merged with other modules to construct a cohesive API. [`RpcModule`] wraps an additional context
/// argument that can be used to access data during call execution.
pub struct RpcModule<Context> {
ctx: Arc<Context>,
impl<Context> RpcModule<Context> {
/// Create a new module with a given shared `Context`.
pub fn new(ctx: Context) -> Self {
Self { ctx: Arc::new(ctx), methods: Default::default() }
/// Transform a module into an `RpcModule<()>` (unit context).
pub fn remove_context(self) -> RpcModule<()> {
let mut module = RpcModule::new(());
module.methods = self.methods;
module
}
impl<Context> From<RpcModule<Context>> for Methods {
fn from(module: RpcModule<Context>) -> Methods {
module.methods
}
}
impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// Register a new synchronous RPC method, which computes the response with the given callback.
pub fn register_method<R, F>(
&mut self,
method_name: &'static str,
callback: F,
) -> Result<MethodResourcesBuilder, Error>
F: Fn(Params, &Context) -> Result<R, Error> + Send + Sync + 'static,
MethodCallback::new_sync(Arc::new(move |id, params, max_response_size| match callback(params, &*ctx) {
Ok(res) => MethodResponse::response(id, res, max_response_size),
Err(err) => MethodResponse::error(id, err),
Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
/// Register a new asynchronous RPC method, which computes the response with the given callback.
pub fn register_async_method<R, Fun, Fut>(
&mut self,
method_name: &'static str,
callback: Fun,
) -> Result<MethodResourcesBuilder, Error>
where
R: Serialize + Send + Sync + 'static,
Fut: Future<Output = Result<R, Error>> + Send,
Fun: (Fn(Params<'static>, Arc<Context>) -> Fut) + Copy + Send + Sync + 'static,
{
let ctx = self.ctx.clone();
MethodCallback::new_async(Arc::new(move |id, params, _, max_response_size, claimed| {
let ctx = ctx.clone();
let future = async move {
let result = match callback(params, ctx).await {
Ok(res) => MethodResponse::response(id, res, max_response_size),
Err(err) => MethodResponse::error(id, err),
// Release claimed resources
drop(claimed);
Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
/// Register a new **blocking** synchronous RPC method, which computes the response with the given callback.
/// Unlike the regular [`register_method`](RpcModule::register_method), this method can block its thread and perform expensive computations.
pub fn register_blocking_method<R, F>(
&mut self,
method_name: &'static str,
callback: F,
) -> Result<MethodResourcesBuilder, Error>
where
Context: Send + Sync + 'static,
R: Serialize,
F: Fn(Params, Arc<Context>) -> Result<R, Error> + Copy + Send + Sync + 'static,
{
let ctx = self.ctx.clone();
let callback = self.methods.verify_and_insert(
method_name,
MethodCallback::new_async(Arc::new(move |id, params, _, max_response_size, claimed| {
let ctx = ctx.clone();
tokio::task::spawn_blocking(move || {
Ok(result) => MethodResponse::response(id, result, max_response_size),
Err(err) => MethodResponse::error(id, err),
};
// Release claimed resources
drop(claimed);
.map(|result| match result {
Ok(r) => r,
Err(err) => {
tracing::error!("Join error for blocking RPC method: {:?}", err);
MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError))
})
.boxed()
})),
)?;
Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
}
/// Register a new publish/subscribe interface using JSON-RPC notifications.
///
/// It implements the [ethereum pubsub specification](https://geth.ethereum.org/docs/rpc/pubsub)
/// with an option to choose custom subscription ID generation.
///
/// Furthermore, it generates the `unsubscribe implementation` where a `bool` is used as
/// the result to indicate whether the subscription was successfully unsubscribed to or not.
/// For instance an `unsubscribe call` may fail if a non-existent subscriptionID is used in the call.
///
/// This method ensures that the `subscription_method_name` and `unsubscription_method_name` are unique.
/// The `notif_method_name` argument sets the content of the `method` field in the JSON document that
/// the server sends back to the client. The uniqueness of this value is not machine checked and it's up to
/// the user to ensure it is not used in any other [`RpcModule`] used in the server.
///
/// # Arguments
///
/// * `subscription_method_name` - name of the method to call to initiate a subscription
/// * `notif_method_name` - name of method to be used in the subscription payload (technically a JSON-RPC notification)
/// * `unsubscription_method` - name of the method to call to terminate a subscription
/// * `callback` - A callback to invoke on each subscription; it takes three parameters:
/// - [`Params`]: JSON-RPC parameters in the subscription call.
/// - [`SubscriptionSink`]: A sink to send messages to the subscriber.
/// - Context: Any type that can be embedded into the [`RpcModule`].
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::{RpcModule, SubscriptionSink};
/// use jsonrpsee_core::Error;
/// ctx.register_subscription("sub", "notif_name", "unsub", |params, mut sink, ctx| {
/// let x = match params.one::<usize>() {
/// Ok(x) => x,
/// Err(e) => {
/// let err: Error = e.into();
/// sink.reject(err);
/// return Ok(());
/// }
/// };
/// // Sink is accepted on the first `send` call.
/// std::thread::spawn(move || {
/// let _ = sink.send(&sum);
/// });
/// ```
pub fn register_subscription<F>(
&mut self,
subscribe_method_name: &'static str,
notif_method_name: &'static str,
unsubscribe_method_name: &'static str,
callback: F,
) -> Result<MethodResourcesBuilder, Error>
F: Fn(Params, SubscriptionSink, Arc<Context>) -> SubscriptionResult + Send + Sync + 'static,
if subscribe_method_name == unsubscribe_method_name {
return Err(Error::SubscriptionNameConflict(subscribe_method_name.into()));
}
self.methods.verify_method_name(subscribe_method_name)?;
self.methods.verify_method_name(unsubscribe_method_name)?;
let subscribers = Subscribers::default();
let subscribers = subscribers.clone();
self.methods.mut_callbacks().insert(
MethodCallback::new_unsubscription(Arc::new(move |id, params, conn_id, max_response_size| {
let sub_id = match params.one::<RpcSubscriptionId>() {
Ok(sub_id) => sub_id,
Err(_) => {
tracing::warn!(
"Unsubscribe call `{}` failed: couldn't parse subscription id={:?} request id={:?}",
return MethodResponse::response(id, false, max_response_size);
let key = SubscriptionKey { conn_id, sub_id: sub_id.into_owned() };
let result = subscribers.lock().remove(&key).is_some();
if !result {
tracing::warn!(
"Unsubscribe call `{}` subscription key={:?} not an active subscription",
unsubscribe_method_name,
key,
);
}
// TODO: register as failed in !result.
MethodResponse::response(id, result, max_response_size)
// Subscribe
let callback = {
self.methods.verify_and_insert(
subscribe_method_name,
MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn, claimed| {
let uniq_sub = SubscriptionKey { conn_id: conn.conn_id, sub_id: conn.id_provider.next_id() };
// response to the subscription call.
let (tx, rx) = oneshot::channel();
let sink = SubscriptionSink {
close_notify: Some(conn.close_notify),
method: notif_method_name,
subscribers: subscribers.clone(),
uniq_sub,
id: Some((id.clone().into_owned(), tx)),
unsubscribe: None,
_claimed: claimed,
};
// The callback returns a `SubscriptionResult` for better ergonomics and is not propagated further.
tracing::warn!("Subscribe call `{}` failed", subscribe_method_name);
let id = id.clone().into_owned();
let result = async move {
match rx.await {
Ok(result) => result,
Err(_) => MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError)),
}
};
Box::pin(result)
})),
)?
};
Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
/// Register an alias for an existing_method. Alias uniqueness is enforced.
pub fn register_alias(&mut self, alias: &'static str, existing_method: &'static str) -> Result<(), Error> {
self.methods.verify_method_name(alias)?;
let callback = match self.methods.callbacks.get(existing_method) {
Some(callback) => callback.clone(),
None => return Err(Error::MethodNotFound(existing_method.into())),
};
self.methods.mut_callbacks().insert(alias, callback);
/// Returns once the unsubscribe method has been called.
type UnsubscribeCall = Option<watch::Receiver<()>>;
/// Represents a single subscription.
#[derive(Debug)]
pub struct SubscriptionSink {
/// Sink.
/// Get notified when subscribers leave so we can exit
Niklas Adolfsson
committed
close_notify: Option<SubscriptionPermit>,
/// MethodCallback.
method: &'static str,
/// Shared Mutex of subscriptions for this method.
subscribers: Subscribers,
/// Unique subscription.
uniq_sub: SubscriptionKey,
/// Id of the `subscription call` (i.e. not the same as subscription id) which is used
/// to reply to subscription method call and must only be used once.
///
/// *Note*: Having some value means the subscription was not accepted or rejected yet.
id: Option<(Id<'static>, oneshot::Sender<MethodResponse>)>,
/// Having some value means the subscription was accepted.
unsubscribe: UnsubscribeCall,
/// Claimed resources.
_claimed: Option<ResourceGuard>,
}
impl SubscriptionSink {
/// Reject the subscription call from [`ErrorObject`].
pub fn reject(&mut self, err: impl Into<ErrorObjectOwned>) -> Result<(), SubscriptionAcceptRejectError> {
let (id, subscribe_call) = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?;
let err = MethodResponse::error(id, err.into());
if self.answer_subscription(err, subscribe_call) {
} else {
Err(SubscriptionAcceptRejectError::RemotePeerAborted)
}
}
/// Attempt to accept the subscription and respond the subscription method call.
///
/// Fails if the connection was closed, or if called multiple times.
pub fn accept(&mut self) -> Result<(), SubscriptionAcceptRejectError> {
let (id, subscribe_call) = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?;
let response = MethodResponse::response(id, &self.uniq_sub.sub_id, self.inner.max_response_size() as usize);
let success = response.success;
let sent = self.answer_subscription(response, subscribe_call);
Niklas Adolfsson
committed
let (tx, rx) = watch::channel(());
self.subscribers.lock().insert(self.uniq_sub.clone(), (self.inner.clone(), tx));
self.unsubscribe = Some(rx);
Ok(())
} else {
Err(SubscriptionAcceptRejectError::RemotePeerAborted)
}
}
/// Returns
/// - `Ok(true)` if the message could be send.
/// - `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated),
/// or the subscription could not be accepted.
/// - `Err(err)` if the message could not be serialized.
pub fn send<T: Serialize>(&mut self, result: &T) -> Result<bool, serde_json::Error> {
// Cannot accept the subscription.
if let Err(SubscriptionAcceptRejectError::RemotePeerAborted) = self.accept() {
return Ok(false);
}
// Only possible to trigger when the connection is dropped.
return Ok(false);
let msg = self.build_message(result)?;
Niklas Adolfsson
committed
Ok(self.inner.send_raw(msg).is_ok())
/// Reads data from the `stream` and sends back data on the subscription
/// when items gets produced by the stream.
Niklas Adolfsson
committed
/// The underlying stream must produce `Result values, see [`futures_util::TryStream`] for further information.
///
/// Returns `Ok(())` if the stream or connection was terminated.
Niklas Adolfsson
committed
/// Returns `Err(_)` immediately if the underlying stream returns an error or if an item from the stream could not be serialized.
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::RpcModule;
/// use jsonrpsee_core::error::{Error, SubscriptionClosed};
/// use jsonrpsee_types::ErrorObjectOwned;
Niklas Adolfsson
committed
/// use anyhow::anyhow;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
Niklas Adolfsson
committed
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
/// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
/// // because after the `Err(_)` the stream is terminated.
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
///
/// tokio::spawn(async move {
/// // jsonrpsee doesn't send an error notification unless `close` is explicitly called.
/// // If we pipe messages to the sink, we can inspect why it ended:
/// match sink.pipe_from_try_stream(stream).await {
/// SubscriptionClosed::Success => {
/// let err_obj: ErrorObjectOwned = SubscriptionClosed::Success.into();
/// sink.close(err_obj);
/// }
/// // we don't want to send close reason when the client is unsubscribed or disconnected.
/// SubscriptionClosed::RemotePeerAborted => (),
/// SubscriptionClosed::Failed(e) => {
/// sink.close(e);
/// }
Niklas Adolfsson
committed
/// }
/// });
/// });
/// ```
pub async fn pipe_from_try_stream<S, T, E>(&mut self, mut stream: S) -> SubscriptionClosed
Niklas Adolfsson
committed
S: TryStream<Ok = T, Error = E> + Unpin,
T: Serialize,
Niklas Adolfsson
committed
E: std::fmt::Display,
if let Err(SubscriptionAcceptRejectError::RemotePeerAborted) = self.accept() {
return SubscriptionClosed::RemotePeerAborted;
}
Niklas Adolfsson
committed
let conn_closed = match self.close_notify.as_ref().map(|cn| cn.handle()) {
Some(cn) => cn,
None => return SubscriptionClosed::RemotePeerAborted,
};
let mut sub_closed = match self.unsubscribe.as_ref() {
Some(rx) => rx.clone(),
_ => {
return SubscriptionClosed::Failed(ErrorObject::owned(
INTERNAL_ERROR_CODE,
"Unsubscribe watcher not set after accepting the subscription".to_string(),
};
Niklas Adolfsson
committed
let sub_closed_fut = sub_closed.changed();
let conn_closed_fut = conn_closed.notified();
pin_mut!(conn_closed_fut);
pin_mut!(sub_closed_fut);
let mut stream_item = stream.try_next();
Niklas Adolfsson
committed
let mut closed_fut = futures_util::future::select(conn_closed_fut, sub_closed_fut);
loop {
match futures_util::future::select(stream_item, closed_fut).await {
// The app sent us a value to send back to the subscribers
Either::Left((Ok(Some(result)), next_closed_fut)) => {
match self.send(&result) {
Ok(true) => (),
Ok(false) => {
break SubscriptionClosed::RemotePeerAborted;
}
Err(err) => {
let err = ErrorObject::owned(SUBSCRIPTION_CLOSED_WITH_ERROR, err.to_string(), None::<()>);
break SubscriptionClosed::Failed(err);
}
};
stream_item = stream.try_next();
closed_fut = next_closed_fut;
}
// Stream canceled because of error.
Either::Left((Err(err), _)) => {
let err = ErrorObject::owned(SUBSCRIPTION_CLOSED_WITH_ERROR, err.to_string(), None::<()>);
break SubscriptionClosed::Failed(err);
}
Either::Left((Ok(None), _)) => break SubscriptionClosed::Success,
Niklas Adolfsson
committed
Either::Right((_, _)) => {
break SubscriptionClosed::RemotePeerAborted;