Unverified Commit 41b8a2c9 authored by Alexandru Vasile's avatar Alexandru Vasile Committed by GitHub
Browse files

Optimize serialization for client parameters (#864)



* core: Fix doc typo

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* types: Implement generic `ParamBuilder` for RPC parameters

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* types: Add specialized RPC parameter builder for arrays and maps

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* types: Implement parameter builder for batch requests

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* types: Implement `rpc_params` in the `types` crate

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* core: Adjust `ClientT` for generic efficient parameters

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* proc-macro: Render clients using the parameter builders

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Adjust testing to the `ToRpcParams` interface

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* core: Move `rpc_params` to core and simplify testing

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* core: Rename server's trait to `ToRpcServerParams`

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* bench: Adjust benches to the `ToRpcParams` interface

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Fix clippy

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* types: Rename batch builder to `BatchRequestBuilder`

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* examples: Re-enable proc-macro example

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* types: Fix doc tests and add panic documentation

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* core: Fix documentation link

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* client: Use BatchRequestBuilder as parameter for batch requests

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Update core/src/server/rpc_module.rs

Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>

* Update core/src/server/rpc_module.rs

Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>

* types: Add specialized constructors for internal `ParamsBuilder`

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* types: Implement `EmptyParams` for client's parameters

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* tests: Fix macos disabled test

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* types: Improve comment

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Fix clippy

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* benches: Rename functions

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* types: Rename param types to `ArrayParams` and `ObjectParams`

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Move paramters to core crate

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* core: Return `core::Error` from `ToRpcParams` trait

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Fix doc link

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Fix `ArrayParamsBuilder` doc links

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Remove `ToRpcServerParams` trait

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* core: Fix `ToRpcParams` docs

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Remove `ParamsSer` and extend benchmarking

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* core: Optimise `rpc_params` to avoid allocation on error

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* params: zero allocation for empty params

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* examples: Add copyright back

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* traits: Remove empty doc line

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Update core/src/traits.rs

Co-authored-by: James Wilson's avatarJames Wilson <james@jsdw.me>

* Update core/src/traits.rs

Co-authored-by: James Wilson's avatarJames Wilson <james@jsdw.me>

* examples: Restore `proc_macro` example to origin/master

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* core: Remove empty case for `rpc_params` macro

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>
Co-authored-by: James Wilson's avatarJames Wilson <james@jsdw.me>
parent 5a2f6f11
Pipeline #213153 passed with stages
in 5 minutes and 28 seconds
......@@ -6,8 +6,10 @@ use futures_util::future::{join_all, FutureExt};
use futures_util::stream::FuturesUnordered;
use helpers::{http_client, ws_client, SUB_METHOD_NAME, UNSUB_METHOD_NAME};
use jsonrpsee::core::client::{ClientT, SubscriptionClientT};
use jsonrpsee::core::params::{ArrayParams, BatchRequestBuilder, ObjectParams};
use jsonrpsee::core::traits::ToRpcParams;
use jsonrpsee::http_client::HeaderMap;
use jsonrpsee::types::{Id, ParamsSer, RequestSer};
use jsonrpsee::types::{Id, RequestSer};
use pprof::criterion::{Output, PProfProfiler};
use tokio::runtime::Runtime as TokioRuntime;
......@@ -63,22 +65,46 @@ fn v2_serialize(req: RequestSer<'_>) -> String {
}
pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
crit.bench_function("jsonrpsee_types_v2_array_ref", |b| {
// Construct the serialized array request using the `RawValue` directly.
crit.bench_function("jsonrpsee_types_array_params_baseline", |b| {
b.iter(|| {
let params = &[1_u64.into(), 2_u32.into()];
let params = ParamsSer::ArrayRef(params);
let params = serde_json::value::RawValue::from_string("[1, 2]".to_string()).unwrap();
let request = RequestSer::new(&Id::Number(0), "say_hello", Some(params));
v2_serialize(request);
})
});
// Construct the serialized request using the `ArrayParams`.
crit.bench_function("jsonrpsee_types_array_params", |b| {
b.iter(|| {
let mut builder = ArrayParams::new();
builder.insert(1u64).unwrap();
builder.insert(2u32).unwrap();
let params = builder.to_rpc_params().expect("Valid params");
let request = RequestSer::new(&Id::Number(0), "say_hello", params);
v2_serialize(request);
})
});
crit.bench_function("jsonrpsee_types_v2_vec", |b| {
// Construct the serialized object request using the `RawValue` directly.
crit.bench_function("jsonrpsee_types_object_params_baseline", |b| {
b.iter(|| {
let params = ParamsSer::Array(vec![1_u64.into(), 2_u32.into()]);
let params = serde_json::value::RawValue::from_string(r#"{"key": 1}"#.to_string()).unwrap();
let request = RequestSer::new(&Id::Number(0), "say_hello", Some(params));
v2_serialize(request);
})
});
// Construct the serialized request using the `ObjectParams`.
crit.bench_function("jsonrpsee_types_object_params", |b| {
b.iter(|| {
let mut builder = ObjectParams::new();
builder.insert("key", 1u32).unwrap();
let params = builder.to_rpc_params().expect("Valid params");
let request = RequestSer::new(&Id::Number(0), "say_hello", params);
v2_serialize(request);
})
});
}
trait RequestBencher {
......@@ -129,7 +155,7 @@ fn round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl ClientT>
let bench_name = format!("{}/{}", name, method);
crit.bench_function(&request.group_name(&bench_name), |b| {
b.to_async(rt).iter(|| async {
black_box(client.request::<String>(method, None).await.unwrap());
black_box(client.request::<String, ArrayParams>(method, ArrayParams::new()).await.unwrap());
})
});
}
......@@ -139,7 +165,12 @@ fn sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Subs
let mut group = crit.benchmark_group(name);
group.bench_function("subscribe", |b| {
b.to_async(rt).iter_with_large_drop(|| async {
black_box(client.subscribe::<String>(SUB_METHOD_NAME, None, UNSUB_METHOD_NAME).await.unwrap());
black_box(
client
.subscribe::<String, ArrayParams>(SUB_METHOD_NAME, ArrayParams::new(), UNSUB_METHOD_NAME)
.await
.unwrap(),
);
})
});
group.bench_function("subscribe_response", |b| {
......@@ -149,7 +180,10 @@ fn sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Subs
// runtime context and simply calling `block_on` here will cause the code to panic.
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
client.subscribe::<String>(SUB_METHOD_NAME, None, UNSUB_METHOD_NAME).await.unwrap()
client
.subscribe::<String, ArrayParams>(SUB_METHOD_NAME, ArrayParams::new(), UNSUB_METHOD_NAME)
.await
.unwrap()
})
})
},
......@@ -166,7 +200,10 @@ fn sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Subs
b.iter_with_setup(
|| {
rt.block_on(async {
client.subscribe::<String>(SUB_METHOD_NAME, None, UNSUB_METHOD_NAME).await.unwrap()
client
.subscribe::<String, ArrayParams>(SUB_METHOD_NAME, ArrayParams::new(), UNSUB_METHOD_NAME)
.await
.unwrap()
})
},
|sub| {
......@@ -191,7 +228,10 @@ fn batch_round_trip(
let bench_name = format!("{}/{}", name, method);
let mut group = crit.benchmark_group(request.group_name(&bench_name));
for batch_size in [2, 5, 10, 50, 100usize].iter() {
let batch = vec![(method, None); *batch_size];
let mut batch = BatchRequestBuilder::new();
for _ in 0..*batch_size {
batch.insert(method, ArrayParams::new()).unwrap();
}
group.throughput(Throughput::Elements(*batch_size as u64));
group.bench_with_input(BenchmarkId::from_parameter(batch_size), batch_size, |b, _| {
b.to_async(rt).iter(|| async { client.batch_request::<String>(batch.clone()).await.unwrap() })
......@@ -227,7 +267,7 @@ fn ws_concurrent_conn_calls(rt: &TokioRuntime, crit: &mut Criterion, url: &str,
let futs = FuturesUnordered::new();
for _ in 0..10 {
futs.push(client.request::<String>(methods[0], None));
futs.push(client.request::<String, ArrayParams>(methods[0], ArrayParams::new()));
}
join_all(futs).await;
......@@ -267,13 +307,17 @@ fn ws_concurrent_conn_subs(rt: &TokioRuntime, crit: &mut Criterion, url: &str, n
let futs = FuturesUnordered::new();
for _ in 0..10 {
let fut = client.subscribe::<String>(SUB_METHOD_NAME, None, UNSUB_METHOD_NAME).then(
|sub| async move {
let fut = client
.subscribe::<String, ArrayParams>(
SUB_METHOD_NAME,
ArrayParams::new(),
UNSUB_METHOD_NAME,
)
.then(|sub| async move {
let mut s = sub.unwrap();
s.next().await.unwrap().unwrap()
},
);
});
futs.push(Box::pin(fut));
}
......@@ -301,7 +345,7 @@ fn http_concurrent_conn_calls(rt: &TokioRuntime, crit: &mut Criterion, url: &str
|clients| async {
let tasks = clients.map(|client| {
rt.spawn(async move {
client.request::<String>(method, None).await.unwrap();
client.request::<String, ArrayParams>(method, ArrayParams::new()).await.unwrap();
})
});
join_all(tasks).await;
......@@ -333,7 +377,7 @@ fn http_custom_headers_round_trip(
crit.bench_function(&request.group_name(&bench_name), |b| {
b.to_async(rt).iter(|| async {
black_box(client.request::<String>(method_name, None).await.unwrap());
black_box(client.request::<String, ArrayParams>(method_name, ArrayParams::new()).await.unwrap());
})
});
}
......
......@@ -28,11 +28,13 @@ use std::sync::Arc;
use std::time::Duration;
use crate::transport::HttpTransportClient;
use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response};
use crate::types::{ErrorResponse, Id, NotificationSer, RequestSer, Response};
use async_trait::async_trait;
use hyper::http::HeaderMap;
use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::tracing::RpcTracing;
use jsonrpsee_core::traits::ToRpcParams;
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::error::CallError;
use rustc_hash::FxHashMap;
......@@ -166,9 +168,13 @@ pub struct HttpClient {
#[async_trait]
impl ClientT for HttpClient {
async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error> {
async fn notification<Params>(&self, method: &str, params: Params) -> Result<(), Error>
where
Params: ToRpcParams + Send,
{
let trace = RpcTracing::notification(method);
async {
let params = params.to_rpc_params()?;
let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?;
let fut = self.transport.send(notif);
......@@ -184,12 +190,15 @@ impl ClientT for HttpClient {
}
/// Perform a request towards the server.
async fn request<'a, R>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<R, Error>
async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, Error>
where
R: DeserializeOwned,
Params: ToRpcParams + Send,
{
let guard = self.id_manager.next_request_id()?;
let id = guard.inner();
let params = params.to_rpc_params()?;
let request = RequestSer::new(&id, method, params);
let trace = RpcTracing::method_call(method);
......@@ -225,10 +234,11 @@ impl ClientT for HttpClient {
.await
}
async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option<ParamsSer<'a>>)>) -> Result<Vec<R>, Error>
async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result<Vec<R>, Error>
where
R: DeserializeOwned + Default + Clone,
{
let batch = batch.build();
let guard = self.id_manager.next_request_ids(batch.len())?;
let ids: Vec<Id> = guard.inner();
let trace = RpcTracing::batch();
......@@ -279,13 +289,14 @@ impl ClientT for HttpClient {
#[async_trait]
impl SubscriptionClientT for HttpClient {
/// Send a subscription request to the server. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`].
async fn subscribe<'a, N>(
async fn subscribe<'a, N, Params>(
&self,
_subscribe_method: &'a str,
_params: Option<ParamsSer<'a>>,
_params: Params,
_unsubscribe_method: &'a str,
) -> Result<Subscription<N>, Error>
where
Params: ToRpcParams + Send,
N: DeserializeOwned,
{
Err(Error::HttpNotImplemented)
......
......@@ -25,9 +25,10 @@
// DEALINGS IN THE SOFTWARE.
use crate::types::error::{ErrorCode, ErrorObject};
use crate::types::ParamsSer;
use crate::HttpClientBuilder;
use jsonrpsee_core::client::{ClientT, IdKind};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::rpc_params;
use jsonrpsee_core::Error;
use jsonrpsee_test_utils::helpers::*;
......@@ -52,10 +53,8 @@ async fn method_call_with_wrong_id_kind() {
http_server_with_hardcoded_response(ok_response(exp.into(), Id::Num(0))).with_default_timeout().await.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
assert!(matches!(
client.request::<String>("o", None).with_default_timeout().await.unwrap(),
Err(Error::InvalidRequestId)
));
let res: Result<String, Error> = client.request("o", rpc_params![]).with_default_timeout().await.unwrap();
assert!(matches!(res, Err(Error::InvalidRequestId)));
}
#[tokio::test]
......@@ -67,7 +66,7 @@ async fn method_call_with_id_str() {
.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
let response: String = client.request::<String>("o", None).with_default_timeout().await.unwrap().unwrap();
let response: String = client.request("o", rpc_params![]).with_default_timeout().await.unwrap().unwrap();
assert_eq!(&response, exp);
}
......@@ -77,7 +76,7 @@ async fn notification_works() {
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().build(&uri).unwrap();
client
.notification("i_dont_care_about_the_response_because_the_server_should_not_respond", None)
.notification("i_dont_care_about_the_response_because_the_server_should_not_respond", rpc_params![])
.with_default_timeout()
.await
.unwrap()
......@@ -137,8 +136,10 @@ async fn subscription_response_to_request() {
#[tokio::test]
async fn batch_request_works() {
let batch_request =
vec![("say_hello", rpc_params![]), ("say_goodbye", rpc_params![0_u64, 1, 2]), ("get_swag", None)];
let mut batch_request = BatchRequestBuilder::new();
batch_request.insert("say_hello", rpc_params![]).unwrap();
batch_request.insert("say_goodbye", rpc_params![0_u64, 1, 2]).unwrap();
batch_request.insert("get_swag", rpc_params![]).unwrap();
let server_response = r#"[{"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}, {"jsonrpc":"2.0","result":"here's your swag","id":2}]"#.to_string();
let response =
run_batch_request_with_response(batch_request, server_response).with_default_timeout().await.unwrap().unwrap();
......@@ -147,16 +148,18 @@ async fn batch_request_works() {
#[tokio::test]
async fn batch_request_out_of_order_response() {
let batch_request =
vec![("say_hello", rpc_params! {}), ("say_goodbye", rpc_params![0_u64, 1, 2]), ("get_swag", None)];
let mut batch_request = BatchRequestBuilder::new();
batch_request.insert("say_hello", rpc_params![]).unwrap();
batch_request.insert("say_goodbye", rpc_params![0_u64, 1, 2]).unwrap();
batch_request.insert("get_swag", rpc_params![]).unwrap();
let server_response = r#"[{"jsonrpc":"2.0","result":"here's your swag","id":2}, {"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}]"#.to_string();
let response =
run_batch_request_with_response(batch_request, server_response).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response, vec!["hello".to_string(), "goodbye".to_string(), "here's your swag".to_string()]);
}
async fn run_batch_request_with_response<'a>(
batch: Vec<(&'a str, Option<ParamsSer<'a>>)>,
async fn run_batch_request_with_response(
batch: BatchRequestBuilder<'_>,
response: String,
) -> Result<Vec<String>, Error> {
let server_addr = http_server_with_hardcoded_response(response).with_default_timeout().await.unwrap();
......@@ -169,7 +172,7 @@ async fn run_request_with_response(response: String) -> Result<String, Error> {
let server_addr = http_server_with_hardcoded_response(response).with_default_timeout().await.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().build(&uri).unwrap();
client.request("say_hello", None).with_default_timeout().await.unwrap()
client.request("say_hello", rpc_params![]).with_default_timeout().await.unwrap()
}
fn assert_jsonrpc_error_response(err: Error, exp: ErrorObjectOwned) {
......
......@@ -26,12 +26,12 @@
#![cfg(test)]
use crate::types::error::{ErrorCode, ErrorObject};
use crate::types::ParamsSer;
use crate::WsClientBuilder;
use jsonrpsee_core::client::{ClientT, SubscriptionClientT};
use jsonrpsee_core::client::{IdKind, Subscription};
use jsonrpsee_core::rpc_params;
use jsonrpsee_core::Error;
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::{rpc_params, Error};
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::mocks::{Id, WebSocketTestServer};
use jsonrpsee_test_utils::TimeoutFutureExt;
......@@ -62,7 +62,7 @@ async fn method_call_with_wrong_id_kind() {
let client =
WsClientBuilder::default().id_format(IdKind::String).build(&uri).with_default_timeout().await.unwrap().unwrap();
let err = client.request::<String>("o", None).with_default_timeout().await.unwrap();
let err: Result<String, Error> = client.request("o", rpc_params![]).with_default_timeout().await.unwrap();
assert!(matches!(err, Err(Error::RestartNeeded(e)) if e == "Invalid request ID"));
}
......@@ -79,7 +79,7 @@ async fn method_call_with_id_str() {
let uri = format!("ws://{}", server.local_addr());
let client =
WsClientBuilder::default().id_format(IdKind::String).build(&uri).with_default_timeout().await.unwrap().unwrap();
let response: String = client.request::<String>("o", None).with_default_timeout().await.unwrap().unwrap();
let response: String = client.request("o", rpc_params![]).with_default_timeout().await.unwrap().unwrap();
assert_eq!(&response, exp);
}
......@@ -92,7 +92,7 @@ async fn notif_works() {
.unwrap();
let uri = to_ws_uri_string(server.local_addr());
let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap();
assert!(client.notification("notif", None).with_default_timeout().await.unwrap().is_ok());
assert!(client.notification("notif", rpc_params![]).with_default_timeout().await.unwrap().is_ok());
}
#[tokio::test]
......@@ -153,7 +153,7 @@ async fn subscription_works() {
let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap();
{
let mut sub: Subscription<String> = client
.subscribe("subscribe_hello", None, "unsubscribe_hello")
.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello")
.with_default_timeout()
.await
.unwrap()
......@@ -226,7 +226,10 @@ async fn notification_without_polling_doesnt_make_client_unuseable() {
#[tokio::test]
async fn batch_request_works() {
let batch_request = vec![("say_hello", None), ("say_goodbye", rpc_params![0_u64, 1, 2]), ("get_swag", None)];
let mut batch_request = BatchRequestBuilder::new();
batch_request.insert("say_hello", rpc_params![]).unwrap();
batch_request.insert("say_goodbye", rpc_params![0_u64, 1, 2]).unwrap();
batch_request.insert("get_swag", rpc_params![]).unwrap();
let server_response = r#"[{"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}, {"jsonrpc":"2.0","result":"here's your swag","id":2}]"#.to_string();
let response =
run_batch_request_with_response(batch_request, server_response).with_default_timeout().await.unwrap().unwrap();
......@@ -235,7 +238,10 @@ async fn batch_request_works() {
#[tokio::test]
async fn batch_request_out_of_order_response() {
let batch_request = vec![("say_hello", None), ("say_goodbye", rpc_params![0_u64, 1, 2]), ("get_swag", None)];
let mut batch_request = BatchRequestBuilder::new();
batch_request.insert("say_hello", rpc_params![]).unwrap();
batch_request.insert("say_goodbye", rpc_params![0_u64, 1, 2]).unwrap();
batch_request.insert("get_swag", rpc_params![]).unwrap();
let server_response = r#"[{"jsonrpc":"2.0","result":"here's your swag","id":2}, {"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}]"#.to_string();
let response =
run_batch_request_with_response(batch_request, server_response).with_default_timeout().await.unwrap().unwrap();
......@@ -260,15 +266,16 @@ async fn is_connected_works() {
let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap();
assert!(client.is_connected());
client.request::<String>("say_hello", None).with_default_timeout().await.unwrap().unwrap_err();
let res: Result<String, Error> = client.request("say_hello", rpc_params![]).with_default_timeout().await.unwrap();
res.unwrap_err();
// give the background thread some time to terminate.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert!(!client.is_connected())
}
async fn run_batch_request_with_response<'a>(
batch: Vec<(&'a str, Option<ParamsSer<'a>>)>,
async fn run_batch_request_with_response(
batch: BatchRequestBuilder<'_>,
response: String,
) -> Result<Vec<String>, Error> {
let server = WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), response)
......@@ -287,7 +294,7 @@ async fn run_request_with_response(response: String) -> Result<String, Error> {
.unwrap();
let uri = format!("ws://{}", server.local_addr());
let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap();
client.request("say_hello", None).with_default_timeout().await.unwrap()
client.request("say_hello", rpc_params![]).with_default_timeout().await.unwrap()
}
fn assert_error_response(err: Error, exp: ErrorObjectOwned) {
......@@ -326,6 +333,6 @@ async fn redirections() {
// It's connected
assert!(client.is_connected());
// It works
let response = client.request::<String>("anything", None).with_default_timeout().await.unwrap();
assert_eq!(response.unwrap(), String::from(expected));
let response: String = client.request("anything", rpc_params![]).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response, String::from(expected));
}
......@@ -34,10 +34,10 @@ use futures_util::future::{self, Either};
use jsonrpsee_types::error::CallError;
use jsonrpsee_types::response::SubscriptionError;
use jsonrpsee_types::{
ErrorResponse, Id, Notification, ParamsSer, RequestSer, Response, SubscriptionId, SubscriptionResponse,
};
use jsonrpsee_types::{ErrorResponse, Id, Notification, RequestSer, Response, SubscriptionId, SubscriptionResponse};
use serde_json::Value as JsonValue;
use crate::params::ArrayParams;
use crate::traits::ToRpcParams;
/// Attempts to process a batch response.
///
......@@ -222,10 +222,12 @@ pub(crate) fn build_unsubscribe_message(
sub_id: SubscriptionId<'static>,
) -> Option<RequestMessage> {
let (unsub_req_id, _, unsub, sub_id) = manager.remove_subscription(sub_req_id, sub_id)?;
let sub_id_slice: &[JsonValue] = &[sub_id.into()];
// TODO: https://github.com/paritytech/jsonrpsee/issues/275
let params = ParamsSer::ArrayRef(sub_id_slice);
let raw = serde_json::to_string(&RequestSer::new(&unsub_req_id, &unsub, Some(params))).ok()?;
let mut params = ArrayParams::new();
params.insert(sub_id).ok()?;
let params = params.to_rpc_params().ok()?;
let raw = serde_json::to_string(&RequestSer::new(&unsub_req_id, &unsub, params)).ok()?;
Some(RequestMessage { raw, id: unsub_req_id, send_back: None })
}
......
......@@ -181,7 +181,7 @@ impl RequestManager {
}
}
/// Inserts a handler for incoming notifications
/// Inserts a handler for incoming notifications.
pub(crate) fn insert_notification_handler(
&mut self,
method: &str,
......@@ -195,7 +195,7 @@ impl RequestManager {
}
}
/// Removes a notification handler
/// Removes a notification handler.
pub(crate) fn remove_notification_handler(&mut self, method: String) -> Result<(), Error> {
if self.notification_handlers.remove(&method).is_some() {
Ok(())
......@@ -224,7 +224,7 @@ impl RequestManager {
}
}
/// Tries to complete a pending batch request
/// Tries to complete a pending batch request.
///
/// Returns `Some` if the subscription was completed otherwise `None`.
pub(crate) fn complete_pending_batch(&mut self, batch: Vec<RequestId>) -> Option<BatchState> {
......@@ -237,7 +237,7 @@ impl RequestManager {
}
}
/// Tries to complete a pending call..
/// Tries to complete a pending call.
///
/// Returns `Some` if the call was completed otherwise `None`.
pub(crate) fn complete_pending_call(&mut self, request_id: RequestId) -> Option<PendingCallOneshot> {
......
......@@ -27,11 +27,13 @@ use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use futures_util::FutureExt;
use jsonrpsee_types::{
response::SubscriptionError, ErrorResponse, Id, Notification, NotificationSer, ParamsSer, RequestSer, Response,
response::SubscriptionError, ErrorResponse, Id, Notification, NotificationSer, RequestSer, Response,
SubscriptionResponse,
};
use serde::de::DeserializeOwned;
use tracing_futures::Instrument;
use crate::params::BatchRequestBuilder;
use crate::traits::ToRpcParams;
use super::{FrontToBack, IdKind, RequestIdManager};
......@@ -174,7 +176,7 @@ impl ClientBuilder {
ping_interval,
on_close_tx,
)
.await;
.await;
});
Client {
to_back,
......@@ -190,9 +192,9 @@ impl ClientBuilder {
#[cfg(all(feature = "async-wasm-client", target_arch = "wasm32"))]
#[cfg_attr(docsrs, doc(cfg(feature = "async-wasm-client")))]
pub fn build_with_wasm<S, R>(self, sender: S, receiver: R) -> Client
where
S: TransportSenderT,
R: TransportReceiverT,
where
S: TransportSenderT,
R: TransportReceiverT,
{
let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
let (err_tx, err_rx) = oneshot::channel();
......@@ -272,10 +274,14 @@ impl Drop for Client {
}
#[async_trait]
impl ClientT for Client {
async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error> {
impl ClientT for Client
{
async fn notification<Params>(&self, method: &str, params: Params) -> Result<(), Error>
where
Params: ToRpcParams + Send {
// NOTE: we use this to guard against max number of concurrent requests.
let _req_id = self.id_manager.next_request_id()?;
let params = params.to_rpc_params()?;
let notif = NotificationSer::new(method, params);
let trace = RpcTracing::batch();
......@@ -292,13 +298,14 @@ impl ClientT for Client {
Either::Right((_, _)) => Err(Error::RequestTimeout),
}
}
.instrument(trace.into_span())
.await
.instrument(trace.into_span())
.await
}
async fn request<'a, R>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<R, Error>
async fn request<R