// Copyright 2019-2021 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any // person obtaining a copy of this software and associated // documentation files (the "Software"), to deal in the // Software without restriction, including without // limitation the rights to use, copy, modify, merge, // publish, distribute, sublicense, and/or sell copies of // the Software, and to permit persons to whom the Software // is furnished to do so, subject to the following // conditions: // // The above copyright notice and this permission notice // shall be included in all copies or substantial portions // of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF // ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED // TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A // PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT // SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. use crate::server::helpers::{send_call_error, send_error, send_response}; use crate::server::resource_limiting::{ResourceGuard, ResourceTable, ResourceVec, Resources}; use beef::Cow; use futures_channel::{mpsc, oneshot}; use futures_util::{future::BoxFuture, FutureExt, StreamExt}; use jsonrpsee_types::to_json_raw_value; use jsonrpsee_types::v2::error::{invalid_subscription_err, CALL_EXECUTION_FAILED_CODE}; use jsonrpsee_types::{ error::{Error, SubscriptionClosedError}, traits::ToRpcParams, v2::{ ErrorCode, Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload, SubscriptionResponse, TwoPointZero, }, DeserializeOwned, }; use parking_lot::Mutex; use rustc_hash::FxHashMap; use serde::Serialize; use serde_json::value::RawValue; use std::collections::hash_map::Entry; use std::fmt::Debug; use std::future::Future; use std::ops::{Deref, DerefMut}; use std::sync::Arc; /// A `MethodCallback` is an RPC endpoint, callable with a standard JSON-RPC request, /// implemented as a function pointer to a `Fn` function taking four arguments: /// the `id`, `params`, a channel the function uses to communicate the result (or error) /// back to `jsonrpsee`, and the connection ID (useful for the websocket transport). pub type SyncMethod = Arc; /// Similar to [`SyncMethod`], but represents an asynchronous handler and takes an additional argument containing a [`ResourceGuard`] if configured. pub type AsyncMethod<'a> = Arc< dyn Send + Sync + Fn(Id<'a>, Params<'a>, MethodSink, Option, MaxResponseSize) -> BoxFuture<'a, ()>, >; /// Connection ID, used for stateful protocol such as WebSockets. /// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value. pub type ConnectionId = usize; /// Subscription ID. pub type SubscriptionId = u64; /// Sink that is used to send back the result to the server for a specific method. pub type MethodSink = mpsc::UnboundedSender; /// Max response size in bytes for a executed call. pub type MaxResponseSize = u32; type Subscribers = Arc)>>>; /// Represent a unique subscription entry based on [`SubscriptionId`] and [`ConnectionId`]. #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] struct SubscriptionKey { conn_id: ConnectionId, sub_id: SubscriptionId, } /// Callback wrapper that can be either sync or async. #[derive(Clone)] enum MethodKind { /// Synchronous method handler. Sync(SyncMethod), /// Asynchronous method handler. Async(AsyncMethod<'static>), } /// Information about resources the method uses during its execution. Initialized when the the server starts. #[derive(Clone, Debug)] enum MethodResources { /// Uninitialized resource table, mapping string label to units. Uninitialized(Box<[(&'static str, u16)]>), /// Initialized resource table containing units for each `ResourceId`. Initialized(ResourceTable), } /// Method callback wrapper that contains a sync or async closure, /// plus a table with resources it needs to claim to run #[derive(Clone, Debug)] pub struct MethodCallback { callback: MethodKind, resources: MethodResources, } /// Builder for configuring resources used by a method. #[derive(Debug)] pub struct MethodResourcesBuilder<'a> { build: ResourceVec<(&'static str, u16)>, callback: &'a mut MethodCallback, } impl<'a> MethodResourcesBuilder<'a> { /// Define how many units of a given named resource the method uses during its execution. pub fn resource(mut self, label: &'static str, units: u16) -> Result { self.build.try_push((label, units)).map_err(|_| Error::MaxResourcesReached)?; Ok(self) } } impl<'a> Drop for MethodResourcesBuilder<'a> { fn drop(&mut self) { self.callback.resources = MethodResources::Uninitialized(self.build[..].into()); } } impl MethodCallback { fn new_sync(callback: SyncMethod) -> Self { MethodCallback { callback: MethodKind::Sync(callback), resources: MethodResources::Uninitialized([].into()) } } fn new_async(callback: AsyncMethod<'static>) -> Self { MethodCallback { callback: MethodKind::Async(callback), resources: MethodResources::Uninitialized([].into()) } } /// Attempt to claim resources prior to executing a method. On success returns a guard that releases /// claimed resources when dropped. pub fn claim(&self, name: &str, resources: &Resources) -> Result { match self.resources { MethodResources::Uninitialized(_) => Err(Error::UninitializedMethod(name.into())), MethodResources::Initialized(units) => resources.claim(units), } } /// Execute the callback, sending the resulting JSON (success or error) to the specified sink. pub fn execute( &self, tx: &MethodSink, req: Request<'_>, conn_id: ConnectionId, claimed: Option, max_response_size: MaxResponseSize, ) -> Option> { let id = req.id.clone(); let params = Params::new(req.params.map(|params| params.get())); match &self.callback { MethodKind::Sync(callback) => { tracing::trace!( "[MethodCallback::execute] Executing sync callback, params={:?}, req.id={:?}, conn_id={:?}", params, id, conn_id ); (callback)(id, params, tx, conn_id, max_response_size); // Release claimed resources drop(claimed); None } MethodKind::Async(callback) => { let tx = tx.clone(); let params = params.into_owned(); let id = id.into_owned(); tracing::trace!( "[MethodCallback::execute] Executing async callback, params={:?}, req.id={:?}, conn_id={:?}", params, id, conn_id ); Some((callback)(id, params, tx, claimed, max_response_size)) } } } } impl Debug for MethodKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Async(_) => write!(f, "Async"), Self::Sync(_) => write!(f, "Sync"), } } } /// Reference-counted, clone-on-write collection of synchronous and asynchronous methods. #[derive(Default, Debug, Clone)] pub struct Methods { callbacks: Arc>, } impl Methods { /// Creates a new empty [`Methods`]. pub fn new() -> Self { Self::default() } fn verify_method_name(&mut self, name: &'static str) -> Result<(), Error> { if self.callbacks.contains_key(name) { return Err(Error::MethodAlreadyRegistered(name.into())); } Ok(()) } /// Inserts the method callback for a given name, or returns an error if the name was already taken. /// On success it returns a mut reference to the [`MethodCallback`] just inserted. fn verify_and_insert( &mut self, name: &'static str, callback: MethodCallback, ) -> Result<&mut MethodCallback, Error> { match self.mut_callbacks().entry(name) { Entry::Occupied(_) => Err(Error::MethodAlreadyRegistered(name.into())), Entry::Vacant(vacant) => Ok(vacant.insert(callback)), } } /// Initialize resources for all methods in this collection. This method has no effect if called more than once. pub fn initialize_resources(mut self, resources: &Resources) -> Result { let callbacks = self.mut_callbacks(); for (&method_name, callback) in callbacks.iter_mut() { if let MethodResources::Uninitialized(uninit) = &callback.resources { let mut map = resources.defaults; for &(label, units) in uninit.iter() { let idx = match resources.labels.iter().position(|&l| l == label) { Some(idx) => idx, None => return Err(Error::ResourceNameNotFoundForMethod(label, method_name)), }; // If resource capacity set to `0`, we ignore the unit value of the method // and set it to `0` as well, effectively making the resource unlimited. if resources.capacities[idx] == 0 { map[idx] = 0; } else { map[idx] = units; } } callback.resources = MethodResources::Initialized(map); } } Ok(self) } /// Helper for obtaining a mut ref to the callbacks HashMap. fn mut_callbacks(&mut self) -> &mut FxHashMap<&'static str, MethodCallback> { Arc::make_mut(&mut self.callbacks) } /// Merge two [`Methods`]'s by adding all [`MethodCallback`]s from `other` into `self`. /// Fails if any of the methods in `other` is present already. pub fn merge(&mut self, other: impl Into) -> Result<(), Error> { let mut other = other.into(); for name in other.callbacks.keys() { self.verify_method_name(name)?; } let callbacks = self.mut_callbacks(); for (name, callback) in other.mut_callbacks().drain() { callbacks.insert(name, callback); } Ok(()) } /// Returns the method callback. pub fn method(&self, method_name: &str) -> Option<&MethodCallback> { self.callbacks.get(method_name) } /// Attempt to execute a callback, sending the resulting JSON (success or error) to the specified sink. pub fn execute( &self, tx: &MethodSink, req: Request, conn_id: ConnectionId, max_response_size: MaxResponseSize, ) -> Option> { tracing::trace!("[Methods::execute] Executing request: {:?}", req); match self.callbacks.get(&*req.method) { Some(callback) => callback.execute(tx, req, conn_id, None, max_response_size), None => { send_error(req.id, tx, ErrorCode::MethodNotFound.into()); None } } } /// Attempt to execute a callback while checking that the call does not exhaust the available resources, sending the resulting JSON (success or error) to the specified sink. pub fn execute_with_resources( &self, tx: &MethodSink, req: Request, conn_id: ConnectionId, resources: &Resources, max_response_size: MaxResponseSize, ) -> Option> { tracing::trace!("[Methods::execute_with_resources] Executing request: {:?}", req); match self.callbacks.get(&*req.method) { Some(callback) => match callback.claim(&req.method, resources) { Ok(guard) => callback.execute(tx, req, conn_id, Some(guard), max_response_size), Err(err) => { tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err); send_error(req.id, tx, ErrorCode::ServerIsBusy.into()); None } }, None => { send_error(req.id, tx, ErrorCode::MethodNotFound.into()); None } } } /// Helper to call a method on the `RPC module` without having to spin up a server. /// /// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation. pub async fn call_with(&self, method: &str, params: Params) -> Option { let params = params.to_rpc_params().ok(); self.call(method, params).await } /// Helper alternative to `execute`, useful for writing unit tests without having to spin /// a server up. pub async fn call(&self, method: &str, params: Option>) -> Option { let req = Request { jsonrpc: TwoPointZero, id: Id::Number(0), method: Cow::borrowed(method), params: params.as_deref(), }; let (tx, mut rx) = mpsc::unbounded(); if let Some(fut) = self.execute(&tx, req, 0, MaxResponseSize::MAX) { fut.await; } rx.next().await } /// Test helper that sets up a subscription using the given `method`. Returns a tuple of the /// [`SubscriptionId`] and a channel on which subscription JSON payloads can be received. pub async fn test_subscription(&self, method: &str, params: impl ToRpcParams) -> TestSubscription { let params = params.to_rpc_params().expect("valid JSON-RPC params"); tracing::trace!("[Methods::test_subscription] Calling subscription method: {:?}, params: {:?}", method, params); let req = Request { jsonrpc: TwoPointZero, id: Id::Number(0), method: Cow::borrowed(method), params: Some(¶ms) }; let (tx, mut rx) = mpsc::unbounded(); if let Some(fut) = self.execute(&tx, req, 0, MaxResponseSize::MAX) { fut.await; } let response = rx.next().await.expect("Could not establish subscription."); let subscription_response = serde_json::from_str::>(&response) .unwrap_or_else(|_| panic!("Could not deserialize subscription response {:?}", response)); let sub_id = subscription_response.result; TestSubscription { tx, rx, sub_id } } /// Returns an `Iterator` with all the method names registered on this server. pub fn method_names(&self) -> impl Iterator + '_ { self.callbacks.keys().copied() } } impl Deref for RpcModule { type Target = Methods; fn deref(&self) -> &Methods { &self.methods } } impl DerefMut for RpcModule { fn deref_mut(&mut self) -> &mut Methods { &mut self.methods } } /// Sets of JSON-RPC methods can be organized into a "module"s that are in turn registered on the server or, /// alternatively, merged with other modules to construct a cohesive API. [`RpcModule`] wraps an additional context /// argument that can be used to access data during call execution. #[derive(Debug, Clone)] pub struct RpcModule { ctx: Arc, methods: Methods, } impl RpcModule { /// Create a new module with a given shared `Context`. pub fn new(ctx: Context) -> Self { Self { ctx: Arc::new(ctx), methods: Default::default() } } } impl From> for Methods { fn from(module: RpcModule) -> Methods { module.methods } } impl RpcModule { /// Register a new synchronous RPC method, which computes the response with the given callback. pub fn register_method( &mut self, method_name: &'static str, callback: F, ) -> Result where Context: Send + Sync + 'static, R: Serialize, F: Fn(Params, &Context) -> Result + Send + Sync + 'static, { let ctx = self.ctx.clone(); let callback = self.methods.verify_and_insert( method_name, MethodCallback::new_sync(Arc::new(move |id, params, tx, _, max_response_size| { match callback(params, &*ctx) { Ok(res) => send_response(id, tx, res, max_response_size), Err(err) => send_call_error(id, tx, err), }; })), )?; Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback }) } /// Register a new asynchronous RPC method, which computes the response with the given callback. pub fn register_async_method( &mut self, method_name: &'static str, callback: Fun, ) -> Result where R: Serialize + Send + Sync + 'static, Fut: Future> + Send, Fun: (Fn(Params<'static>, Arc) -> Fut) + Copy + Send + Sync + 'static, { let ctx = self.ctx.clone(); let callback = self.methods.verify_and_insert( method_name, MethodCallback::new_async(Arc::new(move |id, params, tx, claimed, max_response_size| { let ctx = ctx.clone(); let future = async move { match callback(params, ctx).await { Ok(res) => send_response(id, &tx, res, max_response_size), Err(err) => send_call_error(id, &tx, err), }; // Release claimed resources drop(claimed); }; future.boxed() })), )?; Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback }) } /// Register a new **blocking** synchronous RPC method, which computes the response with the given callback. /// Unlike the regular [`register_method`](RpcModule::register_method), this method can block its thread and perform expensive computations. pub fn register_blocking_method( &mut self, method_name: &'static str, callback: F, ) -> Result where Context: Send + Sync + 'static, R: Serialize, F: Fn(Params, Arc) -> Result + Copy + Send + Sync + 'static, { let ctx = self.ctx.clone(); let callback = self.methods.verify_and_insert( method_name, MethodCallback::new_async(Arc::new(move |id, params, tx, claimed, max_response_size| { let ctx = ctx.clone(); tokio::task::spawn_blocking(move || { match callback(params, ctx) { Ok(res) => send_response(id, &tx, res, max_response_size), Err(err) => send_call_error(id, &tx, err), }; // Release claimed resources drop(claimed); }) .map(|err| { tracing::error!("Join error for blocking RPC method: {:?}", err); }) .boxed() })), )?; Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback }) } /// Register a new RPC subscription that invokes callback on every subscription request. /// The callback itself takes three parameters: /// - [`Params`]: JSONRPC parameters in the subscription request. /// - [`SubscriptionSink`]: A sink to send messages to the subscriber. /// - Context: Any type that can be embedded into the [`RpcModule`]. /// /// # Examples /// /// ```no_run /// /// use jsonrpsee_utils::server::rpc_module::RpcModule; /// /// let mut ctx = RpcModule::new(99_usize); /// ctx.register_subscription("sub", "unsub", |params, mut sink, ctx| { /// let x: usize = params.one()?; /// std::thread::spawn(move || { /// let sum = x + (*ctx); /// sink.send(&sum) /// }); /// Ok(()) /// }); /// ``` pub fn register_subscription( &mut self, subscribe_method_name: &'static str, unsubscribe_method_name: &'static str, callback: F, ) -> Result<(), Error> where Context: Send + Sync + 'static, F: Fn(Params, SubscriptionSink, Arc) -> Result<(), Error> + Send + Sync + 'static, { if subscribe_method_name == unsubscribe_method_name { return Err(Error::SubscriptionNameConflict(subscribe_method_name.into())); } self.methods.verify_method_name(subscribe_method_name)?; self.methods.verify_method_name(unsubscribe_method_name)?; let ctx = self.ctx.clone(); let subscribers = Subscribers::default(); { let subscribers = subscribers.clone(); self.methods.mut_callbacks().insert( subscribe_method_name, MethodCallback::new_sync(Arc::new(move |id, params, method_sink, conn_id, max_response_size| { let (conn_tx, conn_rx) = oneshot::channel::<()>(); let sub_id = { const JS_NUM_MASK: SubscriptionId = !0 >> 11; let sub_id = rand::random::() & JS_NUM_MASK; let uniq_sub = SubscriptionKey { conn_id, sub_id }; subscribers.lock().insert(uniq_sub, (method_sink.clone(), conn_rx)); sub_id }; send_response(id.clone(), method_sink, sub_id, max_response_size); let sink = SubscriptionSink { inner: method_sink.clone(), method: subscribe_method_name, subscribers: subscribers.clone(), uniq_sub: SubscriptionKey { conn_id, sub_id }, is_connected: Some(conn_tx), }; if let Err(err) = callback(params, sink, ctx.clone()) { tracing::error!( "subscribe call '{}' failed: {:?}, request id={:?}", subscribe_method_name, err, id ); send_error(id, method_sink, ErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE).into()); } })), ); } { self.methods.mut_callbacks().insert( unsubscribe_method_name, MethodCallback::new_sync(Arc::new(move |id, params, tx, conn_id, max_response_size| { let sub_id = match params.one() { Ok(sub_id) => sub_id, Err(_) => { tracing::error!( "unsubscribe call '{}' failed: couldn't parse subscription id, request id={:?}", unsubscribe_method_name, id ); let err = to_json_raw_value(&"Invalid subscription ID type, must be integer").ok(); send_error(id, tx, invalid_subscription_err(err.as_deref())); return; } }; if subscribers.lock().remove(&SubscriptionKey { conn_id, sub_id }).is_some() { send_response(id, tx, "Unsubscribed", max_response_size); } else { let err = to_json_raw_value(&format!("Invalid subscription ID={}", sub_id)).ok(); send_error(id, tx, invalid_subscription_err(err.as_deref())) } })), ); } Ok(()) } /// Register an alias for an existing_method. Alias uniqueness is enforced. pub fn register_alias(&mut self, alias: &'static str, existing_method: &'static str) -> Result<(), Error> { self.methods.verify_method_name(alias)?; let callback = match self.methods.callbacks.get(existing_method) { Some(callback) => callback.clone(), None => return Err(Error::MethodNotFound(existing_method.into())), }; self.methods.mut_callbacks().insert(alias, callback); Ok(()) } } /// Represents a single subscription. #[derive(Debug)] pub struct SubscriptionSink { /// Sink. inner: mpsc::UnboundedSender, /// MethodCallback. method: &'static str, /// Unique subscription. uniq_sub: SubscriptionKey, /// Shared Mutex of subscriptions for this method. subscribers: Subscribers, /// A type to track whether the subscription is active (the subscriber is connected). /// /// None - implies that the subscription as been closed. is_connected: Option>, } impl SubscriptionSink { /// Send a message back to subscribers. pub fn send(&mut self, result: &T) -> Result<(), Error> { let msg = self.build_message(result)?; self.inner_send(msg).map_err(Into::into) } fn build_message(&self, result: &T) -> Result { serde_json::to_string(&SubscriptionResponse { jsonrpc: TwoPointZero, method: self.method, params: SubscriptionPayload { subscription: RpcSubscriptionId::Num(self.uniq_sub.sub_id), result }, }) .map_err(Into::into) } fn inner_send(&mut self, msg: String) -> Result<(), Error> { let res = match self.is_connected.as_ref() { Some(conn) if !conn.is_canceled() => { // unbounded send only fails if the receiver has been dropped. self.inner.unbounded_send(msg).map_err(|_| { Some(SubscriptionClosedError::new("Closed by the client (connection reset)", self.uniq_sub.sub_id)) }) } Some(_) => Err(Some(SubscriptionClosedError::new("Closed by unsubscribe call", self.uniq_sub.sub_id))), // NOTE(niklasad1): this should be unreachble, after the first error is detected the subscription is closed. None => Err(None), }; if let Err(Some(e)) = &res { self.inner_close(e); } res.map_err(|e| { let err = e.unwrap_or_else(|| SubscriptionClosedError::new("Close reason unknown", self.uniq_sub.sub_id)); Error::SubscriptionClosed(err) }) } /// Close the subscription sink with a customized error message. pub fn close(&mut self, msg: &str) { let err = SubscriptionClosedError::new(msg, self.uniq_sub.sub_id); self.inner_close(&err); } fn inner_close(&mut self, err: &SubscriptionClosedError) { self.is_connected.take(); if let Some((sink, _)) = self.subscribers.lock().remove(&self.uniq_sub) { tracing::debug!("Closing subscription: {:?}", self.uniq_sub.sub_id); let msg = self.build_message(err).expect("valid json infallible; qed"); let _ = sink.unbounded_send(msg); } } } impl Drop for SubscriptionSink { fn drop(&mut self) { let err = SubscriptionClosedError::new("Closed by the server", self.uniq_sub.sub_id); self.inner_close(&err); } } /// Wrapper struct that maintains a subscription for testing. #[derive(Debug)] pub struct TestSubscription { tx: mpsc::UnboundedSender, rx: mpsc::UnboundedReceiver, sub_id: u64, } impl TestSubscription { /// Close the subscription channel. pub fn close(&mut self) { self.tx.close_channel(); } /// Get the subscription ID pub fn subscription_id(&self) -> u64 { self.sub_id } /// Returns `Some((val, sub_id))` for the next element of type T from the underlying stream, /// otherwise `None` if the subscruption was closed. /// /// # Panics /// /// If the decoding the value as `T` fails. pub async fn next(&mut self) -> Option<(T, jsonrpsee_types::v2::SubscriptionId)> { let raw = self.rx.next().await?; let val: SubscriptionResponse = serde_json::from_str(&raw).expect("valid response in TestSubscription::next()"); Some((val.params.result, val.params.subscription)) } } impl Drop for TestSubscription { fn drop(&mut self) { self.close(); } } #[cfg(test)] mod tests { use super::*; use jsonrpsee_types::v2; use serde::Deserialize; use std::collections::HashMap; #[test] fn rpc_modules_with_different_contexts_can_be_merged() { let cx = Vec::::new(); let mut mod1 = RpcModule::new(cx); mod1.register_method("bla with Vec context", |_: Params, _| Ok(())).unwrap(); let mut mod2 = RpcModule::new(String::new()); mod2.register_method("bla with String context", |_: Params, _| Ok(())).unwrap(); mod1.merge(mod2).unwrap(); assert!(mod1.method("bla with Vec context").is_some()); assert!(mod1.method("bla with String context").is_some()); } #[test] fn rpc_context_modules_can_register_subscriptions() { let cx = (); let mut cxmodule = RpcModule::new(cx); let _subscription = cxmodule.register_subscription("hi", "goodbye", |_, _, _| Ok(())); assert!(cxmodule.method("hi").is_some()); assert!(cxmodule.method("goodbye").is_some()); } #[test] fn rpc_register_alias() { let mut module = RpcModule::new(()); module.register_method("hello_world", |_: Params, _| Ok(())).unwrap(); module.register_alias("hello_foobar", "hello_world").unwrap(); assert!(module.method("hello_world").is_some()); assert!(module.method("hello_foobar").is_some()); } #[tokio::test] async fn calling_method_without_server() { // Call sync method with no params let mut module = RpcModule::new(()); module.register_method("boo", |_: Params, _| Ok(String::from("boo!"))).unwrap(); let result = module.call("boo", None).await.unwrap(); assert_eq!(result, r#"{"jsonrpc":"2.0","result":"boo!","id":0}"#); // Call sync method with params module .register_method("foo", |params, _| { let n: u16 = params.one()?; Ok(n * 2) }) .unwrap(); let result = module.call_with("foo", [3]).await.unwrap(); assert_eq!(result, r#"{"jsonrpc":"2.0","result":6,"id":0}"#); // Call sync method with bad param let result = module.call_with("foo", (false,)).await.unwrap(); assert_eq!( result, r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"invalid type: boolean `false`, expected u16 at line 1 column 6"},"id":0}"# ); // Call async method with params and context struct MyContext; impl MyContext { fn roo(&self, things: Vec) -> u16 { things.iter().sum::().into() } } let mut module = RpcModule::new(MyContext); module .register_async_method("roo", |params, ctx| { let ns: Vec = params.parse().expect("valid params please"); async move { Ok(ctx.roo(ns)) } }) .unwrap(); let result = module.call_with("roo", vec![12, 13]).await.unwrap(); assert_eq!(result, r#"{"jsonrpc":"2.0","result":25,"id":0}"#); } #[tokio::test] async fn calling_method_without_server_using_proc_macro() { use jsonrpsee::{proc_macros::rpc, types::async_trait}; // Setup #[derive(Debug, Deserialize, Serialize)] #[allow(unreachable_pub)] pub struct Gun { shoots: bool, } #[derive(Debug, Deserialize, Serialize)] #[allow(unreachable_pub)] pub struct Beverage { ice: bool, } #[rpc(server)] pub trait Cool { /// Sync method, no params. #[method(name = "rebel_without_cause")] fn rebel_without_cause(&self) -> Result; /// Sync method. #[method(name = "rebel")] fn rebel(&self, gun: Gun, map: HashMap) -> Result; /// Async method. #[method(name = "revolution")] async fn can_have_any_name(&self, beverage: Beverage, some_bytes: Vec) -> Result; } struct CoolServerImpl; #[async_trait] impl CoolServer for CoolServerImpl { fn rebel_without_cause(&self) -> Result { Ok(false) } fn rebel(&self, gun: Gun, map: HashMap) -> Result { Ok(format!("{} {:?}", map.values().len(), gun)) } async fn can_have_any_name(&self, beverage: Beverage, some_bytes: Vec) -> Result { Ok(format!("drink: {:?}, phases: {:?}", beverage, some_bytes)) } } let module = CoolServerImpl.into_rpc(); // Call sync method with no params let result = module.call("rebel_without_cause", None).await.unwrap(); assert_eq!(result, r#"{"jsonrpc":"2.0","result":false,"id":0}"#); // Call sync method with no params, alternative way. let result = module.call_with::<[u8; 0]>("rebel_without_cause", []).await.unwrap(); assert_eq!(result, r#"{"jsonrpc":"2.0","result":false,"id":0}"#); // Call sync method with params let result = module.call_with("rebel", (Gun { shoots: true }, HashMap::::default())).await.unwrap(); assert_eq!(result, r#"{"jsonrpc":"2.0","result":"0 Gun { shoots: true }","id":0}"#); // Call sync method with bad params let result = module.call_with("rebel", (Gun { shoots: true }, false)).await.unwrap(); assert_eq!( result, r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"invalid type: boolean `false`, expected a map at line 1 column 5"},"id":0}"# ); // Call async method with params and context let result = module.call_with("revolution", (Beverage { ice: true }, vec![1, 2, 3])).await.unwrap(); assert_eq!(result, r#"{"jsonrpc":"2.0","result":"drink: Beverage { ice: true }, phases: [1, 2, 3]","id":0}"#); } #[tokio::test] async fn subscribing_without_server() { let mut module = RpcModule::new(()); module .register_subscription("my_sub", "my_unsub", |_, mut sink, _| { let mut stream_data = vec!['0', '1', '2']; std::thread::spawn(move || loop { tracing::debug!("This is your friendly subscription sending data."); if let Some(letter) = stream_data.pop() { if let Err(Error::SubscriptionClosed(_)) = sink.send(&letter) { return; } } else { return; } std::thread::sleep(std::time::Duration::from_millis(500)); }); Ok(()) }) .unwrap(); let mut my_sub: TestSubscription = module.test_subscription("my_sub", Vec::<()>::new()).await; for i in (0..=2).rev() { let (val, id) = my_sub.next::().await.unwrap(); assert_eq!(val, std::char::from_digit(i, 10).unwrap()); assert_eq!(id, v2::params::SubscriptionId::Num(my_sub.subscription_id())); } // The subscription is now closed by the server. let (sub_closed_err, _) = my_sub.next::().await.unwrap(); assert_eq!(sub_closed_err.subscription_id(), my_sub.subscription_id()); assert_eq!(sub_closed_err.close_reason(), "Closed by the server"); } #[tokio::test] async fn close_test_subscribing_without_server() { let mut module = RpcModule::new(()); module .register_subscription("my_sub", "my_unsub", |_, mut sink, _| { std::thread::spawn(move || loop { if let Err(Error::SubscriptionClosed(_)) = sink.send(&"lo") { return; } std::thread::sleep(std::time::Duration::from_millis(500)); }); Ok(()) }) .unwrap(); let mut my_sub: TestSubscription = module.test_subscription("my_sub", Vec::<()>::new()).await; let (val, id) = my_sub.next::().await.unwrap(); assert_eq!(&val, "lo"); assert_eq!(id, v2::params::SubscriptionId::Num(my_sub.subscription_id())); // close the subscription to ensure it doesn't return any items. my_sub.close(); assert_eq!(None, my_sub.next::().await); } }