// Copyright 2019-2021 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any // person obtaining a copy of this software and associated // documentation files (the "Software"), to deal in the // Software without restriction, including without // limitation the rights to use, copy, modify, merge, // publish, distribute, sublicense, and/or sell copies of // the Software, and to permit persons to whom the Software // is furnished to do so, subject to the following // conditions: // // The above copyright notice and this permission notice // shall be included in all copies or substantial portions // of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF // ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED // TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A // PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT // SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. //! Example of using proc macro to generate working client and server. use std::collections::BTreeMap; use std::net::SocketAddr; use jsonrpsee::core::client::ClientT; use jsonrpsee::core::{client::SubscriptionClientT, Error}; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::http_server::HttpServerBuilder; use jsonrpsee::rpc_params; use jsonrpsee::types::error::{CallError, ErrorCode}; use jsonrpsee::types::ParamsSer; use jsonrpsee::ws_client::*; use jsonrpsee::ws_server::WsServerBuilder; use serde_json::json; mod rpc_impl { use jsonrpsee::core::{async_trait, RpcResult}; use jsonrpsee::proc_macros::rpc; use jsonrpsee::types::SubscriptionResult; use jsonrpsee::SubscriptionSink; #[rpc(client, server, namespace = "foo")] pub trait Rpc { #[method(name = "foo")] async fn async_method(&self, param_a: u8, param_b: String) -> RpcResult; #[method(name = "bar")] fn sync_method(&self) -> RpcResult; #[subscription(name = "sub", unsubscribe = "unsub", item = String)] fn sub(&self); #[subscription(name = "echo", unsubscribe = "unsubscribe_echo", aliases = ["alias_echo"], item = u32)] fn sub_with_params(&self, val: u32); #[method(name = "params")] fn params(&self, a: u8, b: &str) -> RpcResult { Ok(format!("Called with: {}, {}", a, b)) } #[method(name = "optional_params")] fn optional_params(&self, a: u32, b: Option, c: Option) -> RpcResult { Ok(format!("Called with: {}, {:?}, {:?}", a, b, c)) } #[method(name = "lifetimes")] fn lifetimes( &self, a: &str, b: &'_ str, c: std::borrow::Cow<'_, str>, d: Option>, ) -> RpcResult { Ok(format!("Called with: {}, {}, {}, {:?}", a, b, c, d)) } #[method(name = "zero_copy_cow")] fn zero_copy_cow(&self, a: std::borrow::Cow<'_, str>, b: beef::Cow<'_, str>) -> RpcResult { Ok(format!("Zero copy params: {}, {}", matches!(a, std::borrow::Cow::Borrowed(_)), b.is_borrowed())) } #[method(name = "blocking_call", blocking)] fn blocking_call(&self) -> RpcResult { std::thread::sleep(std::time::Duration::from_millis(50)); Ok(42) } } #[rpc(client, server, namespace = "chain")] pub trait ChainApi { /// Get header of a relay chain block. #[method(name = "getHeader")] fn header(&self, hash: Option) -> RpcResult>; /// Get header and body of a relay chain block. #[method(name = "getBlock")] async fn block(&self, hash: Option) -> RpcResult>; /// Get hash of the n-th block in the canon chain. /// /// By default returns latest block hash. #[method(name = "getBlockHash")] fn block_hash(&self, hash: Hash) -> RpcResult>; /// Get hash of the last finalized block in the canon chain. #[method(name = "getFinalizedHead")] fn finalized_head(&self) -> RpcResult; /// All head subscription #[subscription(name = "subscribeAllHeads", item = Header)] fn subscribe_all_heads(&self, hash: Hash); } /// Trait to ensure that the trait bounds are correct. #[rpc(client, server, namespace = "generic_call")] pub trait OnlyGenericCall { #[method(name = "getHeader")] fn call(&self, input: I) -> RpcResult; } /// Trait to ensure that the trait bounds are correct. #[rpc(client, server, namespace = "generic_sub")] pub trait OnlyGenericSubscription { /// Get header of a relay chain block. #[subscription(name = "sub", unsubscribe = "unsub", item = Vec)] fn sub(&self, hash: Input); } /// Trait to ensure that the trait bounds are correct. #[rpc(client, server, namespace = "generic_with_where_clause")] pub trait GenericWhereClause where I: std::fmt::Debug, R: Copy + Clone, { #[method(name = "getHeader")] fn call(&self, input: I) -> RpcResult; } /// Trait to ensure that the trait bounds are correct. #[rpc(client, server, namespace = "generic_with_where_clause")] pub trait GenericWhereClauseWithTypeBoundsToo where I: std::fmt::Debug, R: Copy + Clone, { #[method(name = "getHeader")] fn call(&self, input: I) -> RpcResult; } pub struct RpcServerImpl; #[async_trait] impl RpcServer for RpcServerImpl { async fn async_method(&self, _param_a: u8, _param_b: String) -> RpcResult { Ok(42u16) } fn sync_method(&self) -> RpcResult { Ok(10u16) } fn sub(&self, mut sink: SubscriptionSink) -> SubscriptionResult { let mut sink = sink.accept()?; let _ = sink.send(&"Response_A"); let _ = sink.send(&"Response_B"); Ok(()) } fn sub_with_params(&self, pending: PendingSubscription, val: u32) -> SubscriptionResult { let mut sink = pending.accept()?; let _ = sink.send(&val); let _ = sink.send(&val); Ok(()) } } #[async_trait] impl OnlyGenericCallServer for RpcServerImpl { fn call(&self, _: String) -> RpcResult { Ok("hello".to_string()) } } #[async_trait] impl OnlyGenericSubscriptionServer for RpcServerImpl { fn sub(&self, pending: PendingSubscription, _: String) -> SubscriptionResult { let mut sink = pending.accept()?; let _ = sink.send(&"hello"); Ok(()) } } } // Use generated implementations of server and client. use rpc_impl::{RpcClient, RpcServer, RpcServerImpl}; pub async fn websocket_server() -> SocketAddr { let server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap(); let addr = server.local_addr().unwrap(); server.start(RpcServerImpl.into_rpc()).unwrap(); addr } #[tokio::test] async fn proc_macros_generic_ws_client_api() { tracing_subscriber::FmtSubscriber::builder() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .try_init() .expect("setting default subscriber failed"); let server_addr = websocket_server().await; let server_url = format!("ws://{}", server_addr); let client = WsClientBuilder::default().build(&server_url).await.unwrap(); assert_eq!(client.async_method(10, "a".into()).await.unwrap(), 42); assert_eq!(client.sync_method().await.unwrap(), 10); // Sub without params let mut sub = client.sub().await.unwrap(); let first_recv = sub.next().await.unwrap().unwrap(); assert_eq!(first_recv, "Response_A".to_string()); let second_recv = sub.next().await.unwrap().unwrap(); assert_eq!(second_recv, "Response_B".to_string()); // Sub with params let mut sub = client.sub_with_params(42).await.unwrap(); let first_recv = sub.next().await.unwrap().unwrap(); assert_eq!(first_recv, 42); let second_recv = sub.next().await.unwrap().unwrap(); assert_eq!(second_recv, 42); } #[tokio::test] async fn macro_param_parsing() { let module = RpcServerImpl.into_rpc(); let res: String = module.call("foo_params", [json!(42_u64), json!("Hello")]).await.unwrap(); assert_eq!(&res, "Called with: 42, Hello"); } #[tokio::test] async fn macro_optional_param_parsing() { let module = RpcServerImpl.into_rpc(); // Optional param omitted at tail let res: String = module.call("foo_optional_params", [42_u64, 70]).await.unwrap(); assert_eq!(&res, "Called with: 42, Some(70), None"); // Optional param using `null` let res: String = module.call("foo_optional_params", [json!(42_u64), json!(null), json!(70_u64)]).await.unwrap(); assert_eq!(&res, "Called with: 42, None, Some(70)"); // Named params using a map let (resp, _) = module .raw_json_request(r#"{"jsonrpc":"2.0","method":"foo_optional_params","params":{"a":22,"c":50},"id":0}"#) .await .unwrap(); assert_eq!(resp, r#"{"jsonrpc":"2.0","result":"Called with: 22, None, Some(50)","id":0}"#); } #[tokio::test] async fn macro_lifetimes_parsing() { let module = RpcServerImpl.into_rpc(); let res: String = module.call("foo_lifetimes", ["foo", "bar", "baz", "qux"]).await.unwrap(); assert_eq!(&res, "Called with: foo, bar, baz, Some(\"qux\")"); } #[tokio::test] async fn macro_zero_copy_cow() { let module = RpcServerImpl.into_rpc(); let (result, _) = module .raw_json_request(r#"{"jsonrpc":"2.0","method":"foo_zero_copy_cow","params":["foo", "bar"],"id":0}"#) .await .unwrap(); // std::borrow::Cow always deserialized to owned variant here assert_eq!(result, r#"{"jsonrpc":"2.0","result":"Zero copy params: false, true","id":0}"#); // serde_json will have to allocate a new string to replace `\t` with byte 0x09 (tab) let (result, _) = module .raw_json_request(r#"{"jsonrpc":"2.0","method":"foo_zero_copy_cow","params":["\tfoo", "\tbar"],"id":0}"#) .await .unwrap(); assert_eq!(result, r#"{"jsonrpc":"2.0","result":"Zero copy params: false, false","id":0}"#); } // Disabled on MacOS as GH CI timings on Mac vary wildly (~100ms) making this test fail. #[cfg(not(target_os = "macos"))] #[tokio::test] async fn multiple_blocking_calls_overlap() { use jsonrpsee::types::EmptyParams; use std::time::{Duration, Instant}; let module = RpcServerImpl.into_rpc(); let futures = std::iter::repeat_with(|| module.call::<_, u64>("foo_blocking_call", EmptyParams::new())).take(4); let now = Instant::now(); let results = futures::future::join_all(futures).await; let elapsed = now.elapsed(); for result in results { assert_eq!(result.unwrap(), 42); } // Each request takes 50ms, added 10ms margin for scheduling assert!(elapsed < Duration::from_millis(60), "Expected less than 60ms, got {:?}", elapsed); } #[tokio::test] async fn subscriptions_do_not_work_for_http_servers() { let htserver = HttpServerBuilder::default().build("127.0.0.1:0").await.unwrap(); let addr = htserver.local_addr().unwrap(); let htserver_url = format!("http://{}", addr); let _handle = htserver.start(RpcServerImpl.into_rpc()).unwrap(); let htclient = HttpClientBuilder::default().build(&htserver_url).unwrap(); assert_eq!(htclient.sync_method().await.unwrap(), 10); assert!(htclient.sub().await.is_err()); assert!(matches!(htclient.sub().await, Err(Error::HttpNotImplemented))); assert_eq!(htclient.sync_method().await.unwrap(), 10); } #[tokio::test] async fn calls_with_bad_params() { let server_addr = websocket_server().await; let server_url = format!("ws://{}", server_addr); let client = WsClientBuilder::default().build(&server_url).await.unwrap(); // Sub with faulty params as array. let err = client .subscribe::("foo_echo", rpc_params!["0x0"], "foo_unsubscribe_echo") .await .unwrap_err(); assert!( matches!(err, Error::Call(CallError::Custom (err)) if err.message().contains("invalid type: string \"0x0\", expected u32") && err.code() == ErrorCode::InvalidParams.code()) ); // Call with faulty params as array. let err = client.request::("foo_foo", rpc_params!["faulty", "ok"]).await.unwrap_err(); assert!( matches!(err, Error::Call(CallError::Custom (err)) if err.message().contains("invalid type: string \"faulty\", expected u8") && err.code() == ErrorCode::InvalidParams.code()) ); // Sub with faulty params as map. let mut map = BTreeMap::new(); map.insert("val", "0x0".into()); let params = ParamsSer::Map(map); let err = client.subscribe::("foo_echo", Some(params), "foo_unsubscribe_echo").await.unwrap_err(); assert!( matches!(err, Error::Call(CallError::Custom (err)) if err.message().contains("invalid type: string \"0x0\", expected u32") && err.code() == ErrorCode::InvalidParams.code()) ); // Call with faulty params as map. let mut map = BTreeMap::new(); map.insert("param_a", 1.into()); map.insert("param_b", 99.into()); let params = ParamsSer::Map(map); let err = client.request::("foo_foo", Some(params)).await.unwrap_err(); assert!( matches!(err, Error::Call(CallError::Custom (err)) if err.message().contains("invalid type: integer `99`, expected a string") && err.code() == ErrorCode::InvalidParams.code()) ); }