Unverified Commit 360a7f31 authored by David's avatar David Committed by GitHub
Browse files

Timeouts for all requests (#406)



* [clients]: use request timeout by-default

* add timeout for notif

* more feature flag mess

* rexport tokio types

* Update ws-client/src/client.rs

Co-authored-by: default avatarMaciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

* Impose a timeout on all requests

Variant of #367

This PR takes a more opinionated stance than #367, where timeouts are optional. In this PR I suggest we make a all requests use a timeout and only let users choose the length.

* fmt

* Address review grumbles

* fmt

* Use tokio::select! for cleaner code

Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>
Co-authored-by: default avatarMaciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>
Co-authored-by: default avatarMaciej Hirsz <hello@maciej.codes>
parent ddb50806
......@@ -11,6 +11,7 @@ documentation = "https://docs.rs/jsonrpsee-http-client"
[dependencies]
async-trait = "0.1"
futures = { version = "0.3.14", default-features = false, features = ["std"] }
hyper13-rustls = { package = "hyper-rustls", version = "0.21", optional = true }
hyper14-rustls = { package = "hyper-rustls", version = "0.22", optional = true }
hyper14 = { package = "hyper", version = "0.14", features = ["client", "http1", "http2", "tcp"], optional = true }
......@@ -20,15 +21,17 @@ jsonrpsee-utils = { path = "../utils", version = "0.2.0", optional = true }
log = "0.4"
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = "1.0"
tokioV1 = { package = "tokio", version = "1", features = ["time"], optional = true }
tokioV02 = { package = "tokio", version = "0.2", features = ["time"], optional = true }
thiserror = "1.0"
url = "2.2"
fnv = "1"
[features]
default = ["tokio1"]
tokio1 = ["hyper14", "hyper14-rustls", "jsonrpsee-utils/hyper_14"]
tokio02 = ["hyper13", "hyper13-rustls", "jsonrpsee-utils/hyper_13"]
tokio1 = ["hyper14", "hyper14-rustls", "jsonrpsee-utils/hyper_14", "tokioV1" ]
tokio02 = ["hyper13", "hyper13-rustls", "jsonrpsee-utils/hyper_13", "tokioV02" ]
[dev-dependencies]
jsonrpsee-test-utils = { path = "../test-utils" }
tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros"] }
tokioV1 = { package = "tokio", version = "1", features = ["net", "rt-multi-thread", "macros"] }
\ No newline at end of file
......@@ -11,11 +11,13 @@ use async_trait::async_trait;
use fnv::FnvHashMap;
use serde::de::DeserializeOwned;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
/// Http Client Builder.
#[derive(Debug)]
pub struct HttpClientBuilder {
max_request_body_size: u32,
request_timeout: Duration,
}
impl HttpClientBuilder {
......@@ -25,17 +27,23 @@ impl HttpClientBuilder {
self
}
/// Set request timeout (default is 60 seconds).
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
/// Build the HTTP client with target to connect to.
pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient, Error> {
let transport =
HttpTransportClient::new(target, self.max_request_body_size).map_err(|e| Error::Transport(Box::new(e)))?;
Ok(HttpClient { transport, request_id: AtomicU64::new(0) })
Ok(HttpClient { transport, request_id: AtomicU64::new(0), request_timeout: self.request_timeout })
}
}
impl Default for HttpClientBuilder {
fn default() -> Self {
Self { max_request_body_size: TEN_MB_SIZE_BYTES }
Self { max_request_body_size: TEN_MB_SIZE_BYTES, request_timeout: Duration::from_secs(60) }
}
}
......@@ -46,16 +54,20 @@ pub struct HttpClient {
transport: HttpTransportClient,
/// Request ID that wraps around when overflowing.
request_id: AtomicU64,
/// Request timeout. Defaults to 60sec.
request_timeout: Duration,
}
#[async_trait]
impl Client for HttpClient {
async fn notification<'a>(&self, method: &'a str, params: JsonRpcParams<'a>) -> Result<(), Error> {
let notif = JsonRpcNotificationSer::new(method, params);
self.transport
.send(serde_json::to_string(&notif).map_err(Error::ParseError)?)
.await
.map_err(|e| Error::Transport(Box::new(e)))
let fut = self.transport.send(serde_json::to_string(&notif).map_err(Error::ParseError)?);
match crate::tokio::timeout(self.request_timeout, fut).await {
Ok(Ok(ok)) => Ok(ok),
Err(_) => Err(Error::RequestTimeout),
Ok(Err(e)) => Err(Error::Transport(Box::new(e))),
}
}
/// Perform a request towards the server.
......@@ -67,11 +79,12 @@ impl Client for HttpClient {
let id = self.request_id.fetch_add(1, Ordering::SeqCst);
let request = JsonRpcCallSer::new(Id::Number(id), method, params);
let body = self
.transport
.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?)
.await
.map_err(|e| Error::Transport(Box::new(e)))?;
let fut = self.transport.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?);
let body = match crate::tokio::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => return Err(Error::RequestTimeout),
Ok(Err(e)) => return Err(Error::Transport(Box::new(e))),
};
let response: JsonRpcResponse<_> = match serde_json::from_slice(&body) {
Ok(response) => response,
......@@ -106,11 +119,13 @@ impl Client for HttpClient {
request_set.insert(id, pos);
}
let body = self
.transport
.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?)
.await
.map_err(|e| Error::Transport(Box::new(e)))?;
let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
let body = match crate::tokio::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => return Err(Error::RequestTimeout),
Ok(Err(e)) => return Err(Error::Transport(Box::new(e))),
};
let rps: Vec<JsonRpcResponse<_>> = match serde_json::from_slice(&body) {
Ok(response) => response,
......
......@@ -39,6 +39,19 @@ extern crate hyper13_rustls as hyper_rustls;
mod client;
mod transport;
#[cfg(all(feature = "tokio1", not(feature = "tokio02")))]
mod tokio {
pub(crate) use tokioV1::time::timeout;
#[cfg(test)]
pub(crate) use tokioV1::{runtime, test};
}
#[cfg(all(feature = "tokio02", not(feature = "tokio1")))]
mod tokio {
pub(crate) use tokioV02::time::timeout;
pub(crate) use tokioV02::time::Elapsed;
}
#[cfg(test)]
mod tests;
......
......@@ -2,7 +2,7 @@ use crate::v2::{
error::{JsonRpcError, JsonRpcErrorCode, JsonRpcErrorObject},
params::JsonRpcParams,
};
use crate::{traits::Client, Error, HttpClientBuilder, JsonValue};
use crate::{tokio, traits::Client, Error, HttpClientBuilder, JsonValue};
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::Id;
use jsonrpsee_test_utils::TimeoutFutureExt;
......
......@@ -115,6 +115,7 @@ where
#[cfg(test)]
mod tests {
use super::{Error, HttpTransportClient};
use crate::tokio;
#[test]
fn invalid_http_url_rejected() {
......
......@@ -11,14 +11,14 @@ documentation = "https://docs.rs/jsonrpsee-ws-client"
[dependencies]
# Tokio v1 deps
tokioV1 = { package="tokio", version = "1", features = ["net", "time", "rt-multi-thread"], optional=true }
tokioV1-rustls = { package="tokio-rustls", version = "0.22", optional=true }
tokioV1-util = { package="tokio-util", version = "0.6", features = ["compat"], optional=true }
tokioV1 = { package="tokio", version = "1", features = ["net", "time", "rt-multi-thread", "macros"], optional = true }
tokioV1-rustls = { package="tokio-rustls", version = "0.22", optional = true }
tokioV1-util = { package="tokio-util", version = "0.6", features = ["compat"], optional = true }
# Tokio v0.2 deps
tokioV02 = { package="tokio", version = "0.2", features = ["net", "time", "rt-threaded", "sync"], optional=true }
tokioV02-rustls = { package="tokio-rustls", version = "0.15", optional=true }
tokioV02-util = { package="tokio-util", version = "0.3", features = ["compat"], optional=true }
tokioV02 = { package="tokio", version = "0.2", features = ["net", "time", "rt-threaded", "sync", "macros"], optional = true }
tokioV02-rustls = { package="tokio-rustls", version = "0.15", optional = true }
tokioV02-util = { package="tokio-util", version = "0.3", features = ["compat"], optional = true }
async-trait = "0.1"
fnv = "1"
......
......@@ -32,6 +32,10 @@ use crate::v2::params::{Id, JsonRpcParams};
use crate::v2::request::{JsonRpcCallSer, JsonRpcNotification, JsonRpcNotificationSer};
use crate::v2::response::JsonRpcResponse;
use crate::TEN_MB_SIZE_BYTES;
use crate::{
helpers::call_with_timeout, manager::RequestManager, BatchMessage, Error, FrontToBack, RegisterNotificationMessage,
RequestMessage, Subscription, SubscriptionMessage,
};
use crate::{
helpers::{
build_unsubscribe_message, process_batch_response, process_error_response, process_notification,
......@@ -39,10 +43,6 @@ use crate::{
},
transport::CertificateStore,
};
use crate::{
manager::RequestManager, BatchMessage, Error, FrontToBack, RegisterNotificationMessage, RequestMessage,
Subscription, SubscriptionMessage,
};
use async_trait::async_trait;
use futures::{
channel::{mpsc, oneshot},
......@@ -102,8 +102,8 @@ pub struct WsClient {
/// If the background thread terminates the error is sent to this channel.
// NOTE(niklasad1): This is a Mutex to circumvent that the async fns takes immutable references.
error: Mutex<ErrorFromBack>,
/// Request timeout
request_timeout: Option<Duration>,
/// Request timeout. Defaults to 60sec.
request_timeout: Duration,
/// Request ID manager.
id_guard: RequestIdGuard,
}
......@@ -174,7 +174,7 @@ impl RequestIdGuard {
pub struct WsClientBuilder<'a> {
certificate_store: CertificateStore,
max_request_body_size: u32,
request_timeout: Option<Duration>,
request_timeout: Duration,
connection_timeout: Duration,
origin_header: Option<Cow<'a, str>>,
max_concurrent_requests: usize,
......@@ -186,7 +186,7 @@ impl<'a> Default for WsClientBuilder<'a> {
Self {
certificate_store: CertificateStore::Native,
max_request_body_size: TEN_MB_SIZE_BYTES,
request_timeout: None,
request_timeout: Duration::from_secs(60),
connection_timeout: Duration::from_secs(10),
origin_header: None,
max_concurrent_requests: 256,
......@@ -208,9 +208,9 @@ impl<'a> WsClientBuilder<'a> {
self
}
/// Set request timeout.
/// Set request timeout (default is 60 seconds).
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = Some(timeout);
self.request_timeout = timeout;
self
}
......@@ -313,7 +313,17 @@ impl Client for WsClient {
Error::ParseError(e)
})?;
log::trace!("[frontend]: send notification: {:?}", raw);
let res = self.to_back.clone().send(FrontToBack::Notification(raw)).await;
let mut sender = self.to_back.clone();
let fut = sender.send(FrontToBack::Notification(raw));
let timeout = crate::tokio::sleep(self.request_timeout);
let res = crate::tokio::select! {
x = fut => x,
_ = timeout => return Err(Error::RequestTimeout)
};
self.id_guard.reclaim_request_id();
match res {
Ok(()) => Ok(()),
......@@ -344,19 +354,10 @@ impl Client for WsClient {
return Err(self.read_error_from_backend().await);
}
let send_back_rx_out = if let Some(duration) = self.request_timeout {
let timeout = crate::tokio::sleep(duration);
futures::pin_mut!(send_back_rx, timeout);
match future::select(send_back_rx, timeout).await {
future::Either::Left((send_back_rx_out, _)) => send_back_rx_out,
future::Either::Right((_, _)) => Ok(Err(Error::RequestTimeout)),
}
} else {
send_back_rx.await
};
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
self.id_guard.reclaim_request_id();
let json_value = match send_back_rx_out {
let json_value = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
......@@ -393,7 +394,8 @@ impl Client for WsClient {
return Err(self.read_error_from_backend().await);
}
let res = send_back_rx.await;
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
self.id_guard.reclaim_request_id();
let json_values = match res {
Ok(Ok(v)) => v,
......@@ -453,7 +455,8 @@ impl SubscriptionClient for WsClient {
return Err(self.read_error_from_backend().await);
}
let res = send_back_rx.await;
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
self.id_guard.reclaim_request_id();
let (notifs_rx, id) = match res {
Ok(Ok(val)) => val,
......@@ -484,7 +487,8 @@ impl SubscriptionClient for WsClient {
return Err(self.read_error_from_backend().await);
}
let res = send_back_rx.await;
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
let (notifs_rx, method) = match res {
Ok(Ok(val)) => val,
Ok(Err(err)) => return Err(err),
......
use crate::manager::{RequestManager, RequestStatus};
use crate::transport::Sender as WsSender;
use futures::channel::mpsc;
use futures::channel::{mpsc, oneshot};
use jsonrpsee_types::v2::params::{Id, JsonRpcParams, JsonRpcSubscriptionParams, SubscriptionId};
use jsonrpsee_types::v2::request::{JsonRpcCallSer, JsonRpcNotification};
use jsonrpsee_types::v2::response::JsonRpcResponse;
use jsonrpsee_types::{v2::error::JsonRpcError, Error, RequestMessage};
use serde_json::Value as JsonValue;
use std::time::Duration;
/// Attempts to process a batch response.
///
......@@ -188,3 +189,15 @@ pub fn process_error_response(manager: &mut RequestManager, err: JsonRpcError) -
_ => Err(Error::InvalidRequestId),
}
}
/// Wait for a stream to complete within the given timeout.
pub async fn call_with_timeout<T>(
timeout: Duration,
rx: oneshot::Receiver<Result<T, Error>>,
) -> Result<Result<T, Error>, oneshot::Canceled> {
let timeout = crate::tokio::sleep(timeout);
crate::tokio::select! {
res = rx => res,
_ = timeout => Ok(Err(Error::RequestTimeout))
}
}
......@@ -25,6 +25,8 @@ mod tokio_impl {
pub(crate) use tokioV1_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
pub(crate) use tokioV1::time::sleep;
pub(crate) use tokioV1::select;
}
// Note that we check for `not(feature = "tokio1")` here, but not above.
......@@ -50,4 +52,6 @@ mod tokio_impl {
// In 0.2 `tokio::time::sleep` had different name.
pub(crate) use tokioV02::time::delay_for as sleep;
pub(crate) use tokioV02::select;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment