Unverified Commit 31153ac7 authored by Niklas Adolfsson's avatar Niklas Adolfsson Committed by GitHub
Browse files

[client] use types v2 (less alloc) (#269)

* rewrite me

* v2

* PoC works without batch request

* remove `PartialEq` bounds

* add naive benches types

* misc

* remove useless lifetime

* [ws client]: move request ID generation to client

* make tests compile again

* [client transport]: kill leaky abstractions.

* [http client transport]: minor changes in the API.

* [ws client]: fix batch requests.

* fix nits

* [ws client]: generate two request IDs for subscrib

* fix tests

* remove unused types + less alloc for params.

* fix nits

* more tweaks.

* remove unused code

* fix more nits

* remove unused legacy types

* reorg types_v2 mod

* port macros to new types

* fix tests again; more jsonvalue

* [proc macros]: bring back impl Into for params.

* fix build

* [proc macros]: make it work for external crates.

* [types]: remove weird From<Option<T>> to impl.

* cleanup again

* [examples]: remove unused async-std dep

* Update types/src/v2/mod.rs

* [types]: remove unsed dep smallvec

* rewrite me

* [types]: error code impl ser/deser

Manual implementation of serialize/deserialize to get rid of duplicated message string

* [types v2]: re-org with explicit mods

* fix faulty test

* add missed files

* [ws client]: req_manager reserve unsubscribe slot.

* simplify test code

* add tracking issue for TODO

* remove unused deps
parent de7b58a8
......@@ -6,6 +6,7 @@ members = [
"http-server",
"test-utils",
"jsonrpsee",
"tests",
"types",
"utils",
"ws-client",
......
......@@ -12,6 +12,7 @@ criterion = "0.3"
futures-channel = "0.3"
jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
num_cpus = "1"
serde_json = "1"
tokio = { version = "1", features = ["full"] }
[[bench]]
......
use criterion::*;
use jsonrpsee::{
http_client::{jsonrpc::Params, Client, HttpClientBuilder},
http_client::{
traits::Client,
v2::params::{Id, JsonRpcParams},
v2::request::JsonRpcCallSer,
HttpClientBuilder,
},
ws_client::WsClientBuilder,
};
use std::sync::Arc;
......@@ -8,9 +13,32 @@ use tokio::runtime::Runtime as TokioRuntime;
mod helpers;
criterion_group!(benches, http_requests, websocket_requests);
criterion_group!(benches, http_requests, websocket_requests, jsonrpsee_types_v2);
criterion_main!(benches);
fn v2_serialize<'a>(req: JsonRpcCallSer<'a>) -> String {
serde_json::to_string(&req).unwrap()
}
pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
crit.bench_function("jsonrpsee_types_v2_array_ref", |b| {
b.iter(|| {
let params = &[1_u64.into(), 2_u32.into()];
let params = JsonRpcParams::ArrayRef(params);
let request = JsonRpcCallSer::new(Id::Number(0), "say_hello", params);
v2_serialize(request);
})
});
crit.bench_function("jsonrpsee_types_v2_vec", |b| {
b.iter(|| {
let params = JsonRpcParams::Array(vec![1_u64.into(), 2_u32.into()]);
let request = JsonRpcCallSer::new(Id::Number(0), "say_hello", params);
v2_serialize(request);
})
});
}
pub fn http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::http_server());
......@@ -22,7 +50,8 @@ pub fn http_requests(crit: &mut Criterion) {
pub fn websocket_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::ws_server());
let client = Arc::new(rt.block_on(WsClientBuilder::default().build(&url)).unwrap());
let client =
Arc::new(rt.block_on(WsClientBuilder::default().max_concurrent_requests(1024 * 1024).build(&url)).unwrap());
run_round_trip(&rt, crit, client.clone(), "ws_round_trip");
run_concurrent_round_trip(&rt, crit, client.clone(), "ws_concurrent_round_trip");
}
......@@ -31,7 +60,7 @@ fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Clie
crit.bench_function(name, |b| {
b.iter(|| {
rt.block_on(async {
black_box(client.request::<String, _, _>("say_hello", Params::None).await.unwrap());
black_box(client.request::<String>("say_hello", JsonRpcParams::NoParams).await.unwrap());
})
})
});
......@@ -51,7 +80,8 @@ fn run_concurrent_round_trip<C: 'static + Client + Send + Sync>(
for _ in 0..num_concurrent_tasks {
let client_rc = client.clone();
let task = rt.spawn(async move {
let _ = black_box(client_rc.request::<String, _, _>("say_hello", Params::None)).await;
let _ =
black_box(client_rc.request::<String>("say_hello", JsonRpcParams::NoParams).await.unwrap());
});
tasks.push(task);
}
......
......@@ -24,3 +24,7 @@ path = "ws.rs"
[[example]]
name = "ws_subscription"
path = "ws_subscription.rs"
[[example]]
name = "proc_macro"
path = "proc_macro.rs"
......@@ -25,7 +25,7 @@
// DEALINGS IN THE SOFTWARE.
use jsonrpsee::{
http_client::{jsonrpc::Params, Client, HttpClientBuilder},
http_client::{traits::Client, HttpClientBuilder, JsonValue},
http_server::HttpServerBuilder,
};
use std::net::SocketAddr;
......@@ -37,8 +37,10 @@ async fn main() -> anyhow::Result<()> {
let server_addr = run_server().await?;
let url = format!("http://{}", server_addr);
let params: &[JsonValue] = &[1_u64.into(), 2.into(), 3.into()];
let client = HttpClientBuilder::default().build(url)?;
let response: Result<String, _> = client.request("say_hello", Params::None).await;
let response: Result<String, _> = client.request("say_hello", params.into()).await;
println!("r: {:?}", response);
Ok(())
......
......@@ -24,41 +24,34 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::jsonrpc::{self, wrapped::Params};
use core::fmt;
use jsonrpsee::{http_client::HttpClientBuilder, http_server::HttpServerBuilder};
use std::net::SocketAddr;
/// Notification received on a server.
///
/// Wraps around a `jsonrpc::Notification`.
#[derive(PartialEq)]
pub struct Notification(jsonrpc::Notification);
impl From<jsonrpc::Notification> for Notification {
fn from(notif: jsonrpc::Notification) -> Notification {
Notification(notif)
jsonrpsee::proc_macros::rpc_client_api! {
RpcApi {
#[rpc(method = "state_getPairs", positional_params)]
fn storage_pairs(prefix: usize, hash: Option<String>) -> Vec<u8>;
}
}
impl From<Notification> for jsonrpc::Notification {
fn from(notif: Notification) -> jsonrpc::Notification {
notif.0
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
impl Notification {
/// Returns the method of this notification.
pub fn method(&self) -> &str {
&self.0.method
}
let server_addr = run_server().await?;
let url = format!("http://{}", server_addr);
/// Returns the parameters of the notification.
pub fn params(&self) -> Params {
Params::from(&self.0.params)
}
let client = HttpClientBuilder::default().build(url)?;
let response = RpcApi::storage_pairs(&client, 0_usize, Some("aaa".to_string())).await?;
println!("r: {:?}", response);
Ok(())
}
impl fmt::Debug for Notification {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Notification").field("method", &self.method()).field("params", &self.params()).finish()
}
async fn run_server() -> anyhow::Result<SocketAddr> {
let mut server = HttpServerBuilder::default().build("127.0.0.1:0".parse()?)?;
server.register_method("state_getPairs", |_| Ok(vec![1, 2, 3]))?;
let addr = server.local_addr();
tokio::spawn(async move { server.start().await });
addr
}
......@@ -25,7 +25,7 @@
// DEALINGS IN THE SOFTWARE.
use jsonrpsee::{
ws_client::{jsonrpc::Params, Client, WsClientBuilder},
ws_client::{traits::Client, v2::params::JsonRpcParams, WsClientBuilder},
ws_server::WsServer,
};
use std::net::SocketAddr;
......@@ -37,7 +37,7 @@ async fn main() -> anyhow::Result<()> {
let url = format!("ws://{}", addr);
let client = WsClientBuilder::default().build(&url).await?;
let response: String = client.request("say_hello", Params::None).await?;
let response: String = client.request("say_hello", JsonRpcParams::NoParams).await?;
println!("r: {:?}", response);
Ok(())
......
......@@ -25,7 +25,7 @@
// DEALINGS IN THE SOFTWARE.
use jsonrpsee::{
ws_client::{jsonrpc::Params, SubscriptionClient, WsClientBuilder, WsSubscription},
ws_client::{traits::SubscriptionClient, v2::params::JsonRpcParams, Subscription, WsClientBuilder},
ws_server::WsServer,
};
use std::net::SocketAddr;
......@@ -39,8 +39,8 @@ async fn main() -> anyhow::Result<()> {
let url = format!("ws://{}", addr);
let client = WsClientBuilder::default().build(&url).await?;
let mut subscribe_hello: WsSubscription<String> =
client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await?;
let mut subscribe_hello: Subscription<String> =
client.subscribe("subscribe_hello", JsonRpcParams::NoParams, "unsubscribe_hello").await?;
let mut i = 0;
while i <= NUM_SUBSCRIPTION_RESPONSES {
......
......@@ -18,7 +18,6 @@ log = "0.4"
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
unicase = "2.6"
url = "2.2"
fnv = "1"
......
use crate::traits::Client;
use crate::transport::HttpTransportClient;
use crate::v2::request::{JsonRpcCallSer, JsonRpcNotificationSer};
use crate::v2::{
error::JsonRpcErrorAlloc,
params::{Id, JsonRpcParams},
response::JsonRpcResponse,
};
use crate::{Error, JsonRawValue};
use async_trait::async_trait;
use fnv::FnvHashMap;
use jsonrpc::DeserializeOwned;
use jsonrpsee_types::{
error::{Error, Mismatch},
jsonrpc,
traits::Client,
};
use std::convert::TryInto;
use serde::de::DeserializeOwned;
use std::sync::atomic::{AtomicU64, Ordering};
/// Http Client Builder.
......@@ -48,115 +50,96 @@ pub struct HttpClient {
#[async_trait]
impl Client for HttpClient {
async fn notification<M, P>(&self, method: M, params: P) -> Result<(), Error>
where
M: Into<String> + Send,
P: Into<jsonrpc::Params> + Send,
{
let request = jsonrpc::Request::Single(jsonrpc::Call::Notification(jsonrpc::Notification {
jsonrpc: jsonrpc::Version::V2,
method: method.into(),
params: params.into(),
}));
self.transport.send_notification(request).await.map_err(|e| Error::TransportError(Box::new(e)))
async fn notification<'a>(&self, method: &'a str, params: JsonRpcParams<'a>) -> Result<(), Error> {
let notif = JsonRpcNotificationSer::new(method, params);
self.transport
.send(serde_json::to_string(&notif).map_err(Error::ParseError)?)
.await
.map_err(|e| Error::TransportError(Box::new(e)))
}
/// Perform a request towards the server.
async fn request<T, M, P>(&self, method: M, params: P) -> Result<T, Error>
async fn request<'a, R>(&self, method: &'a str, params: JsonRpcParams<'a>) -> Result<R, Error>
where
T: DeserializeOwned,
M: Into<String> + Send,
P: Into<jsonrpc::Params> + Send,
R: DeserializeOwned,
{
// NOTE: `fetch_add` wraps on overflow which is intended.
let id = self.request_id.fetch_add(1, Ordering::Relaxed);
let request = jsonrpc::Request::Single(jsonrpc::Call::MethodCall(jsonrpc::MethodCall {
jsonrpc: jsonrpc::Version::V2,
method: method.into(),
params: params.into(),
id: jsonrpc::Id::Num(id),
}));
let response = self
let request = JsonRpcCallSer::new(Id::Number(id), method, params);
let body = self
.transport
.send_request_and_wait_for_response(request)
.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?)
.await
.map_err(|e| Error::TransportError(Box::new(e)))?;
let json_value = match response {
jsonrpc::Response::Single(response) => match response.id() {
jsonrpc::Id::Num(n) if n == &id => response.try_into().map_err(Error::Request),
_ => Err(Error::InvalidRequestId),
},
jsonrpc::Response::Batch(_rps) => Err(Error::InvalidResponse(Mismatch {
expected: "Single response".into(),
got: "Batch Response".into(),
})),
jsonrpc::Response::Notif(_notif) => Err(Error::InvalidResponse(Mismatch {
expected: "Single response".into(),
got: "Notification Response".into(),
})),
}?;
jsonrpc::from_value(json_value).map_err(Error::ParseError)
let response: JsonRpcResponse<_> = match serde_json::from_slice(&body) {
Ok(response) => response,
Err(_) => {
let err: JsonRpcErrorAlloc = serde_json::from_slice(&body).map_err(Error::ParseError)?;
return Err(Error::Request(err));
}
};
let response_id = parse_request_id(response.id)?;
if response_id == id {
Ok(response.result)
} else {
Err(Error::InvalidRequestId)
}
}
async fn batch_request<T, M, P>(&self, batch: Vec<(M, P)>) -> Result<Vec<T>, Error>
async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, JsonRpcParams<'a>)>) -> Result<Vec<R>, Error>
where
T: DeserializeOwned + Default + Clone,
M: Into<String> + Send,
P: Into<jsonrpc::Params> + Send,
R: DeserializeOwned + Default + Clone,
{
let mut calls = Vec::with_capacity(batch.len());
let mut batch_request = Vec::with_capacity(batch.len());
// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
let mut ordered_requests = Vec::with_capacity(batch.len());
let mut request_set = FnvHashMap::with_capacity_and_hasher(batch.len(), Default::default());
for (pos, (method, params)) in batch.into_iter().enumerate() {
let id = self.request_id.fetch_add(1, Ordering::SeqCst);
calls.push(jsonrpc::Call::MethodCall(jsonrpc::MethodCall {
jsonrpc: jsonrpc::Version::V2,
method: method.into(),
params: params.into(),
id: jsonrpc::Id::Num(id),
}));
batch_request.push(JsonRpcCallSer::new(Id::Number(id), method, params));
ordered_requests.push(id);
request_set.insert(id, pos);
}
let batch_request = jsonrpc::Request::Batch(calls);
let response = self
let body = self
.transport
.send_request_and_wait_for_response(batch_request)
.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?)
.await
.map_err(|e| Error::TransportError(Box::new(e)))?;
match response {
jsonrpc::Response::Single(_) => Err(Error::InvalidResponse(Mismatch {
expected: "Batch response".into(),
got: "Single Response".into(),
})),
jsonrpc::Response::Notif(_notif) => Err(Error::InvalidResponse(Mismatch {
expected: "Batch response".into(),
got: "Notification response".into(),
})),
jsonrpc::Response::Batch(rps) => {
// NOTE: `T::default` is placeholder and will be replaced in loop below.
let mut responses = vec![T::default(); ordered_requests.len()];
for rp in rps {
let id = match rp.id().as_number() {
Some(n) => *n,
_ => return Err(Error::InvalidRequestId),
};
let pos = match request_set.get(&id) {
Some(pos) => *pos,
None => return Err(Error::InvalidRequestId),
};
let json_val: jsonrpc::JsonValue = rp.try_into().map_err(Error::Request)?;
let response = jsonrpc::from_value(json_val).map_err(Error::ParseError)?;
responses[pos] = response;
}
Ok(responses)
let rps: Vec<JsonRpcResponse<_>> = match serde_json::from_slice(&body) {
Ok(response) => response,
Err(_) => {
let err: JsonRpcErrorAlloc = serde_json::from_slice(&body).map_err(Error::ParseError)?;
return Err(Error::Request(err));
}
};
// NOTE: `R::default` is placeholder and will be replaced in loop below.
let mut responses = vec![R::default(); ordered_requests.len()];
for rp in rps {
let response_id = parse_request_id(rp.id)?;
let pos = match request_set.get(&response_id) {
Some(pos) => *pos,
None => return Err(Error::InvalidRequestId),
};
responses[pos] = rp.result
}
Ok(responses)
}
}
fn parse_request_id(raw: Option<&JsonRawValue>) -> Result<u64, Error> {
match raw {
None => Err(Error::InvalidRequestId),
Some(id) => {
let id = serde_json::from_str(id.get()).map_err(Error::ParseError)?;
Ok(id)
}
}
}
......@@ -43,4 +43,4 @@ mod transport;
mod tests;
pub use client::{HttpClient, HttpClientBuilder};
pub use jsonrpsee_types::{error::Error, jsonrpc, traits::Client};
pub use jsonrpsee_types::*;
use crate::client::HttpClientBuilder;
use jsonrpsee_types::{
error::Error,
jsonrpc::{self, ErrorCode, JsonValue, Params},
traits::Client,
};
use crate::v2::{error::ErrorCode, params::JsonRpcParams};
use crate::{traits::Client, Error, HttpClientBuilder, JsonValue};
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::Id;
......@@ -20,7 +15,7 @@ async fn notification_works() {
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().build(&uri).unwrap();
client
.notification("i_dont_care_about_the_response_because_the_server_should_not_respond", Params::None)
.notification("i_dont_care_about_the_response_because_the_server_should_not_respond", JsonRpcParams::NoParams)
.await
.unwrap();
}
......@@ -34,46 +29,46 @@ async fn response_with_wrong_id() {
#[tokio::test]
async fn response_method_not_found() {
let err = run_request_with_response(method_not_found(Id::Num(0))).await.unwrap_err();
assert_jsonrpc_error_response(err, ErrorCode::MethodNotFound, METHOD_NOT_FOUND.into());
assert_jsonrpc_error_response(err, ErrorCode::MethodNotFound);
}
#[tokio::test]
async fn response_parse_error() {
let err = run_request_with_response(parse_error(Id::Num(0))).await.unwrap_err();
assert_jsonrpc_error_response(err, ErrorCode::ParseError, PARSE_ERROR.into());
assert_jsonrpc_error_response(err, ErrorCode::ParseError);
}
#[tokio::test]
async fn invalid_request_works() {
let err = run_request_with_response(invalid_request(Id::Num(0_u64))).await.unwrap_err();
assert_jsonrpc_error_response(err, ErrorCode::InvalidRequest, INVALID_REQUEST.into());
assert_jsonrpc_error_response(err, ErrorCode::InvalidRequest);
}
#[tokio::test]
async fn invalid_params_works() {
let err = run_request_with_response(invalid_params(Id::Num(0_u64))).await.unwrap_err();
assert_jsonrpc_error_response(err, ErrorCode::InvalidParams, INVALID_PARAMS.into());
assert_jsonrpc_error_response(err, ErrorCode::InvalidParams);
}
#[tokio::test]
async fn internal_error_works() {
let err = run_request_with_response(internal_error(Id::Num(0_u64))).await.unwrap_err();
assert_jsonrpc_error_response(err, ErrorCode::InternalError, INTERNAL_ERROR.into());
assert_jsonrpc_error_response(err, ErrorCode::InternalError);
}
#[tokio::test]
async fn subscription_response_to_request() {
let req = r#"{"jsonrpc":"2.0","method":"subscribe_hello","params":{"subscription":"3px4FrtxSYQ1zBKW154NoVnrDhrq764yQNCXEgZyM6Mu","result":"hello my friend"}}"#.to_string();
let err = run_request_with_response(req).await.unwrap_err();
assert!(matches!(err, Error::InvalidResponse(_)));
assert!(matches!(err, Error::ParseError(_)));
}
#[tokio::test]
async fn batch_request_works() {
let batch_request = vec![
("say_hello".to_string(), Params::None),
("say_goodbye".to_string(), Params::Array(vec![0.into(), 1.into(), 2.into()])),
("get_swag".to_string(), Params::None),
("say_hello", JsonRpcParams::NoParams),
("say_goodbye", JsonRpcParams::Array(vec![0_u64.into(), 1.into(), 2.into()])),
("get_swag", JsonRpcParams::NoParams),
];
let server_response = r#"[{"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}, {"jsonrpc":"2.0","result":"here's your swag","id":2}]"#.to_string();
let response = run_batch_request_with_response(batch_request, server_response).await.unwrap();
......@@ -83,16 +78,19 @@ async fn batch_request_works() {
#[tokio::test]
async fn batch_request_out_of_order_response() {
let batch_request = vec![
("say_hello".to_string(), Params::None),
("say_goodbye".to_string(), Params::Array(vec![0.into(), 1.into(), 2.into()])),
("get_swag".to_string(), Params::None),
("say_hello", JsonRpcParams::NoParams),
("say_goodbye", JsonRpcParams::Array(vec![0_u64.into(), 1.into(), 2.into()])),
("get_swag", JsonRpcParams::NoParams),
];
let server_response = r#"[{"jsonrpc":"2.0","result":"here's your swag","id":2}, {"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}]"#.to_string();
let response = run_batch_request_with_response(batch_request, server_response).await.unwrap();
assert_eq!(response, vec!["hello".to_string(), "goodbye".to_string(), "here's your swag".to_string()]);
}
async fn run_batch_request_with_response(batch: Vec<(String, Params)>, response: String) -> Result<Vec<String>, Error> {
async fn run_batch_request_with_response<'a>(
batch: Vec<(&'a str, JsonRpcParams<'a>)>,
response: String,
) -> Result<Vec<String>, Error> {
let server_addr = http_server_with_hardcoded_response(response).await;
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().build(&uri).unwrap();
......@@ -103,15 +101,12 @@ async fn run_request_with_response(response: String) -> Result<JsonValue, Error>
let server_addr = http_server_with_hardcoded_response(response).await;
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().build(&uri).unwrap();
client.request("say_hello", Params::None).await
client.request("say_hello", JsonRpcParams::NoParams).await
}
fn assert_jsonrpc_error_response(response: Error, code: ErrorCode, message: String) {
let expected = jsonrpc::Error { code, message, data: None };
match response {
Error::Request(err) => {
assert_eq!(err, expected);
}
e @ _ => panic!("Expected error: \"{}\", got: {:?}", expected, e),
fn assert_jsonrpc_error_response(error: Error, code: ErrorCode) {
match &error {
Error::Request(e) => assert_eq!(e.error, code),
e => panic!("Expected error: \"{}\", got: {:?}", error, e),
};
}
......@@ -6,9 +6,9 @@
// that we need to be guaranteed that hyper doesn't re-use an existing connection if we ever reset
// the JSON-RPC request id to a value that might have already been used.
use crate::error::GenericTransportError;
use hyper::client::{Client, HttpConnector};
use hyper_rustls::HttpsConnector;
use jsonrpsee_types::{error::GenericTransportError, jsonrpc};
use jsonrpsee_utils::hyper_helpers;
use thiserror::Error;
......@@ -41,10 +41,8 @@ impl HttpTransportClient {
}
}
/// Send request.
async fn send_request(&self, request: jsonrpc::Request) -> Result<hyper::Response<hyper::Body>, Error> {
let body = jsonrpc::to_vec(&request).map_err(Error::Serialization)?;
log::debug!("send: {}", request);