Unverified Commit b8bd715d authored by Niklas Adolfsson's avatar Niklas Adolfsson Committed by GitHub
Browse files

[client] batch requests (#216)



* feat(http client): batch requests

* fix(http batch request): request in any order.

* Update src/types/jsonrpc/id.rs

* tests(batch out-of-order): make it less confusing.

* fix nit: `cloned` -> `copied`

* fix bad merge

* [client]: batch request generic response

* wait with impl

* [ws client]: add template for batch requests

* remove jsonvalue

* naive impl

* fix make it work,

The implemenation is quite inefficient because the responses might be unordered.
Currently, a "digest" of a BTreeSet is stored to sort the requestIDs
and don't have to try all combinations of requestIDs in the response.

* fix nits

* error handling

* fix nits

* more nits

* use error msg pattern

* add batch requests to client trait

Keep it simple by require `Vec<(Method, Params)>` could be improved.

* address review comments: use swap_remove

Replace read values with `Vec::swap_remove` instead inserting dummy value.
The reason is that `Vec::remove` is O(n)

* address review grumbles: batch trait bound Default

* don't be clever

* [http client]: faster lookup for batch request.

* fix: distguish request_id and batch_id

* thanks clippy

* fix: clarify bad response with a separate error

* fix tests

* add issue link to todo

* Update types/src/error.rs

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>
parent 7abcb1e9
......@@ -21,6 +21,7 @@ serde_json = "1.0"
thiserror = "1.0"
unicase = "2.6"
url = "2.2"
fnv = "1"
[features]
default = ["tokio1"]
......
use crate::transport::HttpTransportClient;
use async_trait::async_trait;
use fnv::FnvHashMap;
use jsonrpc::DeserializeOwned;
use jsonrpsee_types::{error::Error, http::HttpConfig, jsonrpc, traits::Client};
use jsonrpsee_types::{
error::{Error, Mismatch},
http::HttpConfig,
jsonrpc,
traits::Client,
};
use std::convert::TryInto;
use std::sync::atomic::{AtomicU64, Ordering};
......@@ -37,7 +42,6 @@ impl Client for HttpClient {
method: method.into(),
params: params.into(),
}));
self.transport.send_notification(request).await.map_err(|e| Error::TransportError(Box::new(e)))
}
......@@ -68,15 +72,75 @@ impl Client for HttpClient {
jsonrpc::Id::Num(n) if n == &id => response.try_into().map_err(Error::Request),
_ => Err(Error::InvalidRequestId),
},
// Server should not send batch response to a single request.
jsonrpc::Response::Batch(_rps) => {
Err(Error::Custom("Server replied with batch response to a single request".to_string()))
}
// Server should not reply to a Notification.
jsonrpc::Response::Notif(_notif) => {
Err(Error::Custom(format!("Server replied with notification response to request ID: {}", id)))
}
jsonrpc::Response::Batch(_rps) => Err(Error::InvalidResponse(Mismatch {
expected: "Single response".into(),
got: "Batch Response".into(),
})),
jsonrpc::Response::Notif(_notif) => Err(Error::InvalidResponse(Mismatch {
expected: "Single response".into(),
got: "Notification Response".into(),
})),
}?;
jsonrpc::from_value(json_value).map_err(Error::ParseError)
}
async fn batch_request<T, M, P>(&self, batch: Vec<(M, P)>) -> Result<Vec<T>, Error>
where
T: DeserializeOwned + Default + Clone,
M: Into<String> + Send,
P: Into<jsonrpc::Params> + Send,
{
let mut calls = Vec::with_capacity(batch.len());
// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
let mut ordered_requests = Vec::with_capacity(batch.len());
let mut request_set = FnvHashMap::with_capacity_and_hasher(batch.len(), Default::default());
for (pos, (method, params)) in batch.into_iter().enumerate() {
let id = self.request_id.fetch_add(1, Ordering::SeqCst);
calls.push(jsonrpc::Call::MethodCall(jsonrpc::MethodCall {
jsonrpc: jsonrpc::Version::V2,
method: method.into(),
params: params.into(),
id: jsonrpc::Id::Num(id),
}));
ordered_requests.push(id);
request_set.insert(id, pos);
}
let batch_request = jsonrpc::Request::Batch(calls);
let response = self
.transport
.send_request_and_wait_for_response(batch_request)
.await
.map_err(|e| Error::TransportError(Box::new(e)))?;
match response {
jsonrpc::Response::Single(_) => Err(Error::InvalidResponse(Mismatch {
expected: "Batch response".into(),
got: "Single Response".into(),
})),
jsonrpc::Response::Notif(_notif) => Err(Error::InvalidResponse(Mismatch {
expected: "Batch response".into(),
got: "Notification response".into(),
})),
jsonrpc::Response::Batch(rps) => {
// NOTE: `T::default` is placeholder and will be replaced in loop below.
let mut responses = vec![T::default(); ordered_requests.len()];
for rp in rps {
let id = match rp.id().as_number() {
Some(n) => *n,
_ => return Err(Error::InvalidRequestId),
};
let pos = match request_set.get(&id) {
Some(pos) => *pos,
None => return Err(Error::InvalidRequestId),
};
let json_val: jsonrpc::JsonValue = rp.try_into().map_err(Error::Request)?;
let response = jsonrpc::from_value(json_val).map_err(Error::ParseError)?;
responses[pos] = response;
}
Ok(responses)
}
}
}
}
......@@ -66,7 +66,38 @@ async fn internal_error_works() {
async fn subscription_response_to_request() {
let req = r#"{"jsonrpc":"2.0","method":"subscribe_hello","params":{"subscription":"3px4FrtxSYQ1zBKW154NoVnrDhrq764yQNCXEgZyM6Mu","result":"hello my friend"}}"#.to_string();
let err = run_request_with_response(req).await.unwrap_err();
assert!(matches!(err, Error::Custom(_)));
assert!(matches!(err, Error::InvalidResponse(_)));
}
#[tokio::test]
async fn batch_request_works() {
let batch_request = vec![
("say_hello".to_string(), Params::None),
("say_goodbye".to_string(), Params::Array(vec![0.into(), 1.into(), 2.into()])),
("get_swag".to_string(), Params::None),
];
let server_response = r#"[{"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}, {"jsonrpc":"2.0","result":"here's your swag","id":2}]"#.to_string();
let response = run_batch_request_with_response(batch_request, server_response).await.unwrap();
assert_eq!(response, vec!["hello".to_string(), "goodbye".to_string(), "here's your swag".to_string()]);
}
#[tokio::test]
async fn batch_request_out_of_order_response() {
let batch_request = vec![
("say_hello".to_string(), Params::None),
("say_goodbye".to_string(), Params::Array(vec![0.into(), 1.into(), 2.into()])),
("get_swag".to_string(), Params::None),
];
let server_response = r#"[{"jsonrpc":"2.0","result":"here's your swag","id":2}, {"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}]"#.to_string();
let response = run_batch_request_with_response(batch_request, server_response).await.unwrap();
assert_eq!(response, vec!["hello".to_string(), "goodbye".to_string(), "here's your swag".to_string()]);
}
async fn run_batch_request_with_response(batch: Vec<(String, Params)>, response: String) -> Result<Vec<String>, Error> {
let server_addr = http_server_with_hardcoded_response(response).await;
let uri = format!("http://{}", server_addr);
let client = HttpClient::new(&uri, HttpConfig::default()).unwrap();
client.batch_request(batch).await
}
async fn run_request_with_response(response: String) -> Result<JsonValue, Error> {
......
......@@ -25,6 +25,15 @@ pub struct NotificationMessage {
pub params: Params,
}
/// Batch request message.
#[derive(Debug)]
pub struct BatchMessage {
/// Requests in the batch
pub requests: Vec<(String, Params)>,
/// One-shot channel over which we send back the result of this request.
pub send_back: oneshot::Sender<Result<Vec<JsonValue>, Error>>,
}
/// Request message.
#[derive(Debug)]
pub struct RequestMessage {
......@@ -54,6 +63,8 @@ pub struct SubscriptionMessage {
/// Message that the Client can send to the background task.
#[derive(Debug)]
pub enum FrontToBack {
/// Send a batch request to the server.
Batch(BatchMessage),
/// Send a notification to the server.
Notification(NotificationMessage),
/// Send a request to the server.
......
......@@ -30,6 +30,9 @@ pub enum Error {
/// Frontend/backend channel error.
#[error("Frontend/backend channel error: {0}")]
Internal(#[source] futures::channel::mpsc::SendError),
/// Invalid response,
#[error("Invalid response: {0}")]
InvalidResponse(Mismatch<String>),
/// The background task has been terminated.
#[error("The background task been terminated because: {0}; restart required")]
RestartNeeded(String),
......@@ -49,7 +52,7 @@ pub enum Error {
#[error("Method: {0} was already registered")]
MethodAlreadyRegistered(String),
/// Subscribe and unsubscribe method names are the same.
#[error("Cannot use the same method name for subcribe and unsubscribe, used: {0}")]
#[error("Cannot use the same method name for subscribe and unsubscribe, used: {0}")]
SubscriptionNameConflict(String),
/// Websocket request timeout
#[error("Websocket request timeout")]
......
......@@ -19,6 +19,18 @@ pub trait Client {
T: DeserializeOwned,
M: Into<String> + Send,
P: Into<Params> + Send;
/// Send a [batch request](https://www.jsonrpc.org/specification#batch).
///
/// The response to batch are returned in the same order as it was inserted in the batch.
///
/// Returns `Ok` if all requests in the batch were answered successfully.
/// Returns `Error` if any of the requests in batch fails.
async fn batch_request<T, M, P>(&self, batch: Vec<(M, P)>) -> Result<Vec<T>, Error>
where
T: DeserializeOwned + Default + Clone,
M: Into<String> + Send,
P: Into<Params> + Send;
}
/// [JSON-RPC](https://www.jsonrpc.org/specification) client interface that can make requests, notifications and subscriptions.
......
......@@ -36,7 +36,7 @@ use futures::{
};
use jsonrpc::DeserializeOwned;
use jsonrpsee_types::{
client::{FrontToBack, NotificationMessage, RequestMessage, Subscription, SubscriptionMessage},
client::{BatchMessage, FrontToBack, NotificationMessage, RequestMessage, Subscription, SubscriptionMessage},
error::Error,
jsonrpc::{self, JsonValue, SubscriptionId},
traits::{Client, SubscriptionClient},
......@@ -243,6 +243,36 @@ impl Client for WsClient {
};
jsonrpc::from_value(json_value).map_err(Error::ParseError)
}
async fn batch_request<T, M, P>(&self, batch: Vec<(M, P)>) -> Result<Vec<T>, Error>
where
T: DeserializeOwned + Default + Clone,
M: Into<String> + Send,
P: Into<jsonrpc::Params> + Send,
{
let (send_back_tx, send_back_rx) = oneshot::channel();
let requests: Vec<(String, jsonrpc::Params)> = batch.into_iter().map(|(r, p)| (r.into(), p.into())).collect();
log::trace!("[frontend]: send batch request: {:?}", requests);
if self
.to_back
.clone()
.send(FrontToBack::Batch(BatchMessage { requests, send_back: send_back_tx }))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
let json_values = match send_back_rx.await {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
let values: Result<_, _> =
json_values.into_iter().map(|val| jsonrpc::from_value(val).map_err(Error::ParseError)).collect();
Ok(values?)
}
}
#[async_trait]
......@@ -328,6 +358,13 @@ async fn background_task(
return;
}
Either::Left((Some(FrontToBack::Batch(batch)), _)) => {
log::trace!("[backend]: client prepares to send batch request: {:?}", batch);
if let Err(e) = sender.start_batch_request(batch, &mut manager).await {
log::warn!("[backend]: client batch request failed: {:?}", e);
}
}
// User called `notification` on the front-end
Either::Left((Some(FrontToBack::Notification(notif)), _)) => {
log::trace!("[backend]: client prepares to send notification: {:?}", notif);
......@@ -374,8 +411,50 @@ async fn background_task(
}
}
}
Either::Right((Some(Ok(jsonrpc::Response::Batch(_responses))), _)) => {
log::warn!("Ignore batch response not supported, #103");
Either::Right((Some(Ok(jsonrpc::Response::Batch(batch))), _)) => {
let mut digest = Vec::with_capacity(batch.len());
let mut ordered_responses = vec![JsonValue::Null; batch.len()];
let mut rps_unordered: Vec<_> = Vec::with_capacity(batch.len());
for rp in batch {
let id = match rp.id().as_number().copied() {
Some(id) => id,
None => {
let _ = front_error.send(Error::InvalidRequestId);
return;
}
};
let rp: Result<JsonValue, Error> = rp.try_into().map_err(Error::Request);
let rp = match rp {
Ok(rp) => rp,
Err(err) => {
let _ = front_error.send(err);
return;
}
};
digest.push(id);
rps_unordered.push((id, rp));
}
digest.sort_unstable();
let batch_state = match manager.complete_pending_batch(digest) {
Some(state) => state,
None => {
log::warn!("Received unknown batch response");
continue;
}
};
for (id, rp) in rps_unordered {
let pos = batch_state
.order
.get(&id)
.copied()
.expect("All request IDs valid checked by RequestManager above; qed");
ordered_responses[pos] = rp;
}
manager.reclaim_request_id(batch_state.request_id);
let _ = batch_state.send_back.send(Ok(ordered_responses));
}
Either::Right((Some(Ok(jsonrpc::Response::Notif(notif))), _)) => {
let sub_id = notif.params.subscription;
......
......@@ -8,9 +8,9 @@ use crate::{
transport::{self, WsConnectError, WsHandshakeError, WsTransportClientBuilder},
};
use core::convert::TryInto;
use jsonrpsee_types::client::{NotificationMessage, RequestMessage, SubscriptionMessage};
use jsonrpsee_types::client::{BatchMessage, NotificationMessage, RequestMessage, SubscriptionMessage};
use jsonrpsee_types::error::Error;
use jsonrpsee_types::jsonrpc;
use jsonrpsee_types::jsonrpc::{self, Request};
/// Creates a new JSONRPC WebSocket connection, represented as a Sender and Receiver pair.
pub async fn websocket_connection(config: WsConfig<'_>) -> Result<(Sender, Receiver), WsHandshakeError> {
......@@ -31,6 +31,45 @@ impl Sender {
Self { transport }
}
/// Send a batch request.
pub async fn start_batch_request(
&mut self,
batch: BatchMessage,
request_manager: &mut RequestManager,
) -> Result<(), Error> {
let req_id = request_manager.next_request_id()?;
let mut calls = Vec::with_capacity(batch.requests.len());
let mut ids = Vec::with_capacity(batch.requests.len());
for (method, params) in batch.requests {
let batch_id = request_manager.next_batch_id();
ids.push(batch_id);
calls.push(jsonrpc::Call::MethodCall(jsonrpc::MethodCall {
jsonrpc: jsonrpc::Version::V2,
method,
params,
id: jsonrpc::Id::Num(batch_id),
}));
}
if let Err(send_back) = request_manager.insert_pending_batch(ids, batch.send_back, req_id) {
request_manager.reclaim_request_id(req_id);
let _ = send_back.send(Err(Error::InvalidRequestId));
return Err(Error::InvalidRequestId);
};
let res =
self.transport.send_request(Request::Batch(calls)).await.map_err(|e| Error::TransportError(Box::new(e)));
match res {
Ok(_) => Ok(()),
Err(e) => {
request_manager.reclaim_request_id(req_id);
Err(e)
}
}
}
/// Sends a request to the server but it doesn’t wait for a response.
/// Instead, you have keep the request ID and use the Receiver to get the response.
///
......@@ -52,7 +91,7 @@ impl Sender {
jsonrpc: jsonrpc::Version::V2,
method: request.method,
params: request.params,
id: jsonrpc::Id::Num(id as u64),
id: jsonrpc::Id::Num(id),
}));
match self.transport.send_request(req).await {
Ok(_) => {
......
......@@ -38,14 +38,33 @@ pub enum RequestStatus {
}
type PendingCallOneshot = Option<oneshot::Sender<Result<JsonValue, Error>>>;
type PendingBatchOneshot = oneshot::Sender<Result<Vec<JsonValue>, Error>>;
type PendingSubscriptionOneshot = oneshot::Sender<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>;
type SubscriptionSink = mpsc::Sender<JsonValue>;
type UnsubscribeMethod = String;
/// Unique ID that are generated by the RequestManager.
// TODO: new type for this https://github.com/paritytech/jsonrpsee/issues/249
type RequestId = u64;
/// Wrapping counter for requests within a batch request.
// TODO: new type for this https://github.com/paritytech/jsonrpsee/issues/249
type BatchId = u64;
#[derive(Debug)]
/// Batch state.
pub struct BatchState {
/// Order that the request was performed in.
pub order: FnvHashMap<BatchId, usize>,
/// Request ID fetch from the `RequestManager`
pub request_id: RequestId,
/// Oneshot send back.
pub send_back: PendingBatchOneshot,
}
#[derive(Debug)]
/// Manages and monitors JSONRPC v2 method calls and subscriptions.
pub struct RequestManager {
/// Batch ID.
batch_id: BatchId,
/// Vacant requestIDs.
free_slots: VecDeque<RequestId>,
/// List of requests that are waiting for a response from the server.
......@@ -53,18 +72,29 @@ pub struct RequestManager {
requests: FnvHashMap<RequestId, Kind>,
/// Reverse lookup, to find a request ID in constant time by `subscription ID` instead of looking through all requests.
subscriptions: HashMap<SubscriptionId, RequestId>,
/// Pending batch requests
batches: FnvHashMap<Vec<BatchId>, BatchState>,
}
impl RequestManager {
/// Create a new `RequestManager` with specified capacity.
pub fn new(slot_capacity: usize) -> Self {
Self {
batch_id: 0,
free_slots: (0..slot_capacity as u64).collect(),
requests: FnvHashMap::default(),
subscriptions: HashMap::new(),
batches: HashMap::default(),
}
}
/// Get next batch ID.
pub fn next_batch_id(&mut self) -> BatchId {
let id = self.batch_id;
self.batch_id = self.batch_id.wrapping_add(1);
id
}
/// Mark a used RequestID as free again.
pub fn reclaim_request_id(&mut self, request_id: RequestId) {
self.free_slots.push_back(request_id);
......@@ -91,6 +121,27 @@ impl RequestManager {
}
}
/// Tries to insert a new batch request
///
/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
pub fn insert_pending_batch(
&mut self,
mut batch: Vec<BatchId>,
send_back: PendingBatchOneshot,
request_id: RequestId,
) -> Result<(), PendingBatchOneshot> {
let mut order = FnvHashMap::with_capacity_and_hasher(batch.len(), Default::default());
for (idx, batch_id) in batch.iter().enumerate() {
order.insert(*batch_id, idx);
}
batch.sort_unstable();
if let Entry::Vacant(v) = self.batches.entry(batch) {
v.insert(BatchState { order, request_id, send_back });
Ok(())
} else {
Err(send_back)
}
}
/// Tries to insert a new pending subscription.
///
/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
......@@ -149,6 +200,19 @@ impl RequestManager {
}
}
/// Tries to complete a pending batch request
///
/// Returns `Some` if the subscription was completed otherwise `None`.
pub fn complete_pending_batch(&mut self, batch: Vec<BatchId>) -> Option<BatchState> {
match self.batches.entry(batch) {
Entry::Occupied(request) => {
let (_digest, state) = request.remove_entry();
Some(state)
}
_ => None,
}
}
/// Tries to complete a pending call..
///
/// Returns `Some` if the call was completed otherwise `None`.
......
......@@ -5,7 +5,7 @@ use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::{Id, WebSocketTestServer};
use jsonrpsee_types::{
error::Error,
jsonrpc,
jsonrpc::{self, Params},
traits::{Client, SubscriptionClient},
};
......@@ -127,6 +127,31 @@ async fn response_with_wrong_id() {
assert!(matches!(err, Err(Error::RestartNeeded(e)) if e.to_string().contains("Invalid request ID")));
}
#[tokio::test]
async fn batch_request_works() {
let _ = env_logger::try_init();
let batch_request = vec![
("say_hello".to_string(), Params::None),
("say_goodbye".to_string(), Params::Array(vec![0.into(), 1.into(), 2.into()])),
("get_swag".to_string(), Params::None),
];
let server_response = r#"[{"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}, {"jsonrpc":"2.0","result":"here's your swag","id":2}]"#.to_string();
let response = run_batch_request_with_response(batch_request, server_response).await.unwrap();
assert_eq!(response, vec!["hello".to_string(), "goodbye".to_string(), "here's your swag".to_string()]);
}
#[tokio::test]
async fn batch_request_out_of_order_response() {
let batch_request = vec![
("say_hello".to_string(), Params::None),
("say_goodbye".to_string(), Params::Array(vec![0.into(), 1.into(), 2.into()])),
("get_swag".to_string(), Params::None),
];
let server_response = r#"[{"jsonrpc":"2.0","result":"here's your swag","id":2}, {"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}]"#.to_string();
let response = run_batch_request_with_response(batch_request, server_response).await.unwrap();
assert_eq!(response, vec!["hello".to_string(), "goodbye".to_string(), "here's your swag".to_string()]);
}
#[tokio::test]
async fn is_connected_works() {
let server = WebSocketTestServer::with_hardcoded_response(
......@@ -140,3 +165,10 @@ async fn is_connected_works() {
client.request::<String, _, _>("say_hello", jsonrpc::Params::None).await.unwrap_err();
assert!(!client.is_connected())
}
async fn run_batch_request_with_response(batch: Vec<(String, Params)>, response: String) -> Result<Vec<String>, Error> {
let server = WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), response).await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(WsConfig::with_url(&uri)).await.unwrap();
client.batch_request(batch).await
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment