Newer
Older
use crate::server::helpers::{send_error, send_response};
use futures_channel::{mpsc, oneshot};
use futures_util::{future::BoxFuture, FutureExt};
use jsonrpsee_types::error::{CallError, Error};
use jsonrpsee_types::v2::error::{JsonRpcErrorCode, JsonRpcErrorObject, CALL_EXECUTION_FAILED_CODE};
use jsonrpsee_types::v2::params::{Id, JsonRpcNotificationParams, OwnedId, OwnedRpcParams, RpcParams, TwoPointZero};
use jsonrpsee_types::v2::response::JsonRpcSubscriptionResponse;
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
use serde::Serialize;
use serde_json::value::{to_raw_value, RawValue};
/// A `Method` is an RPC endpoint, callable with a standard JSON-RPC request,
/// implemented as a function pointer to a `Fn` function taking four arguments:
/// the `id`, `params`, a channel the function uses to communicate the result (or error)
/// back to `jsonrpsee`, and the connection ID (useful for the websocket transport).
pub type SyncMethod = Box<dyn Send + Sync + Fn(Id, RpcParams, &MethodSink, ConnectionId) -> Result<(), Error>>;
/// Similar to [`SyncMethod`], but represents an asynchronous handler.
pub type AsyncMethod = Box<
dyn Send + Sync + Fn(OwnedId, OwnedRpcParams, MethodSink, ConnectionId) -> BoxFuture<'static, Result<(), Error>>,
>;
/// A collection of registered [`SyncMethod`]s.
pub type SyncMethods = FxHashMap<&'static str, SyncMethod>;
/// A collection of registered [`AsyncMethod`]s.
pub type AsyncMethods = FxHashMap<&'static str, AsyncMethod>;
/// Connection ID, used for stateful protocol such as WebSockets.
/// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value.
pub type ConnectionId = usize;
/// Subscription ID.
pub type SubscriptionId = u64;
/// Sink that is used to send back the result to the server for a specific method.
pub type MethodSink = mpsc::UnboundedSender<String>;
type Subscribers = Arc<Mutex<FxHashMap<(ConnectionId, SubscriptionId), (MethodSink, oneshot::Receiver<()>)>>>;
/// Identifier of the method type.
#[derive(Debug, Copy, Clone)]
pub enum MethodType {
/// Synchronous method handler.
Sync,
/// Asynchronous method handler.
Async,
}
/// Collection of synchronous and asynchronous methods.
#[derive(Default)]
method_types: FxHashMap<&'static str, MethodType>,
impl std::fmt::Debug for Methods {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Including types only is sufficient, as it contains information
// about the registered methods and if they are sync or async.
f.debug_struct("Methods").field("method_types", &self.method_types).finish()
}
}
impl Methods {
/// Creates a new empty [`Methods`].
pub fn new() -> Self {
Self::default()
}
fn verify_method_name(&mut self, name: &str) -> Result<(), Error> {
if self.sync_methods.contains_key(name) || self.async_methods.contains_key(name) {
return Err(Error::MethodAlreadyRegistered(name.into()));
}
Ok(())
}
/// Merge two [`Methods`]'s by adding all [`SyncMethod`]s and [`AsyncMethod`]s from `other` into `self`.
/// Fails if any of the methods in `other` is present already.
pub fn merge(&mut self, other: Methods) -> Result<(), Error> {
for name in other.method_types.keys() {
self.verify_method_name(name)?;
}
for (name, callback) in other.sync_methods {
self.sync_methods.insert(name, callback);
self.method_types.insert(name, MethodType::Sync);
}
for (name, callback) in other.async_methods {
self.async_methods.insert(name, callback);
self.method_types.insert(name, MethodType::Async);
}
Ok(())
}
/// Returns the type of the method handler, if any.
pub fn method_type(&self, method_name: &str) -> Option<&MethodType> {
self.method_types.get(method_name)
}
/// Returns the synchronous method.
pub fn sync_method(&self, method_name: &str) -> Option<&SyncMethod> {
self.sync_methods.get(method_name)
}
/// Returns the asynchronous method.
pub fn async_method(&self, method_name: &str) -> Option<&AsyncMethod> {
self.async_methods.get(method_name)
}
/// Returns a `Vec` with all the method names registered on this server.
pub fn method_names(&self) -> Vec<String> {
self.method_types.keys().map(|name| name.to_string()).collect()
}
}
/// Sets of JSON-RPC methods can be organized into a "module"s that are in turn registered on the server or,
/// alternatively, merged with other modules to construct a cohesive API. [`RpcModule`] wraps an additional context
/// argument that can be used to access data during call execution.
#[derive(Debug)]
pub struct RpcModule<Context> {
ctx: Arc<Context>,
subscribers: Subscribers,
}
impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// Create a new module with a given shared `Context`.
pub fn new(ctx: Context) -> Self {
Self { ctx: Arc::new(ctx), methods: Default::default(), subscribers: Default::default() }
}
/// Register a new synchronous RPC method, which computes the response with the given callback.
pub fn register_method<R, F>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
F: Fn(RpcParams, &Context) -> Result<R, CallError> + Send + Sync + 'static,
self.methods.verify_method_name(method_name)?;
method_name,
Box::new(move |id, params, tx, _| {
Ok(res) => send_response(id, tx, res),
Err(CallError::InvalidParams) => send_error(id, tx, JsonRpcErrorCode::InvalidParams.into()),
Err(CallError::Failed(err)) => {
let err = JsonRpcErrorObject {
code: JsonRpcErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE),
message: &err.to_string(),
data: None,
};
send_error(id, tx, err)
}
self.methods.method_types.insert(method_name, MethodType::Sync);
Ok(())
}
/// Register a new asynchronous RPC method, which computes the response with the given callback.
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
pub fn register_async_method<R, F>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
where
R: Serialize + Send + Sync + 'static,
F: Fn(RpcParams, Arc<Context>) -> BoxFuture<'static, Result<R, CallError>> + Copy + Send + Sync + 'static,
{
self.methods.verify_method_name(method_name)?;
let ctx = self.ctx.clone();
self.methods.async_methods.insert(
method_name,
Box::new(move |id, params, tx, _| {
let ctx = ctx.clone();
let future = async move {
let params = params.borrowed();
let id = id.borrowed();
match callback(params, ctx).await {
Ok(res) => send_response(id, &tx, res),
Err(CallError::InvalidParams) => send_error(id, &tx, JsonRpcErrorCode::InvalidParams.into()),
Err(CallError::Failed(err)) => {
log::error!("Call failed with: {}", err);
let err = JsonRpcErrorObject {
code: JsonRpcErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE),
message: &err.to_string(),
data: None,
};
send_error(id, &tx, err)
}
};
Ok(())
};
future.boxed()
}),
);
self.methods.method_types.insert(method_name, MethodType::Async);
/// Register a new RPC subscription that invokes callback on every subscription request.
/// - RpcParams: JSONRPC parameters in the subscription request.
/// - SubscriptionSink: A sink to send messages to the subscriber.
/// - Context: Any type that can be embedded into the RpcContextModule.
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_utils::server::rpc_module::RpcModule;
///
/// let mut ctx = RpcModule::new(99_usize);
/// ctx.register_subscription("sub", "unsub", |params, sink, ctx| {
/// let x: usize = params.one()?;
/// std::thread::spawn(move || {
/// });
/// Ok(())
/// });
/// ```
pub fn register_subscription<F>(
&mut self,
subscribe_method_name: &'static str,
unsubscribe_method_name: &'static str,
callback: F,
) -> Result<(), Error>
where
Context: Send + Sync + 'static,
F: Fn(RpcParams, SubscriptionSink, Arc<Context>) -> Result<(), Error> + Send + Sync + 'static,
if subscribe_method_name == unsubscribe_method_name {
return Err(Error::SubscriptionNameConflict(subscribe_method_name.into()));
}
self.methods.verify_method_name(subscribe_method_name)?;
self.methods.verify_method_name(unsubscribe_method_name)?;
self.methods.method_types.insert(subscribe_method_name, MethodType::Sync);
self.methods.method_types.insert(unsubscribe_method_name, MethodType::Sync);
let subscribers = self.subscribers.clone();
Box::new(move |id, params, method_sink, conn| {
let (online_tx, online_rx) = oneshot::channel::<()>();
let sub_id = {
const JS_NUM_MASK: SubscriptionId = !0 >> 11;
let sub_id = rand::random::<SubscriptionId>() & JS_NUM_MASK;
subscribers.lock().insert((conn, sub_id), (method_sink.clone(), online_rx));
send_response(id, method_sink, sub_id);
let sink = SubscriptionSink {
inner: method_sink.clone(),
method: subscribe_method_name,
sub_id,
is_online: online_tx,
};
let subscribers = self.subscribers.clone();
unsubscribe_method_name,
Box::new(move |id, params, tx, conn| {
subscribers.lock().remove(&(conn, sub_id));
send_response(id, tx, "Unsubscribed");
Ok(())
}),
);
}
/// Merge two [`RpcModule`]'s by adding all [`Methods`] `other` into `self`.
/// Fails if any of the methods in `other` is present already.
pub fn merge<Context2>(&mut self, other: RpcModule<Context2>) -> Result<(), Error> {
self.methods.merge(other.methods)?;
/// Represents a single subscription.
#[derive(Debug)]
pub struct SubscriptionSink {
/// Sink.
inner: mpsc::UnboundedSender<String>,
/// Method.
/// SubscriptionID,
sub_id: SubscriptionId,
/// Whether the subscriber is still alive (to avoid send messages that the subscriber is not interested in).
is_online: oneshot::Sender<()>,
}
impl SubscriptionSink {
/// Send message on this subscription.
pub fn send<T: Serialize>(&self, result: &T) -> Result<(), Error> {
let result = to_raw_value(result)?;
self.send_raw_value(&result)
}
fn send_raw_value(&self, result: &RawValue) -> Result<(), Error> {
let msg = serde_json::to_string(&JsonRpcSubscriptionResponse {
jsonrpc: TwoPointZero,
method: self.method,
params: JsonRpcNotificationParams { subscription: self.sub_id, result: &*result },
})?;
self.inner_send(msg).map_err(Into::into)
}
fn inner_send(&self, msg: String) -> Result<(), Error> {
if self.is_online() {
self.inner.unbounded_send(msg).map_err(|e| Error::Internal(e.into_send_error()))
} else {
Err(Error::Custom("Subscription canceled".into()))
fn is_online(&self) -> bool {
!self.is_online.is_canceled()
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rpc_modules_with_different_contexts_can_be_merged() {
let mut mod1 = RpcModule::new(cx);
mod1.register_method("bla with Vec context", |_: RpcParams, _| Ok(())).unwrap();
let mut mod2 = RpcModule::new(String::new());
mod2.register_method("bla with String context", |_: RpcParams, _| Ok(())).unwrap();
mod1.merge(mod2).unwrap();
let methods = mod1.into_methods();
assert!(methods.sync_method(&"bla with Vec context").is_some());
assert!(methods.sync_method(&"bla with String context").is_some());
}
#[test]
fn rpc_context_modules_can_register_subscriptions() {
let cx = ();
let mut cxmodule = RpcModule::new(cx);
let _subscription = cxmodule.register_subscription("hi", "goodbye", |_, _, _| Ok(()));
let methods = cxmodule.into_methods();
assert!(methods.sync_method(&"hi").is_some());
assert!(methods.sync_method(&"goodbye").is_some());