Unverified Commit 2784beef authored by Niklas Adolfsson's avatar Niklas Adolfsson
Browse files

Merge remote-tracking branch 'origin/master' into na-middleware

parents 0e7d1154 a13ae7a2
Pipeline #199750 passed with stages
in 5 minutes and 57 seconds
......@@ -38,7 +38,7 @@ Support `WebSocket` and `HTTP` transports for both client and server.
## Roadmap
See [tracking issue for next stable release (0.9)](https://github.com/paritytech/jsonrpsee/issues/670)
See [our tracking milestone](https://github.com/paritytech/jsonrpsee/milestone/2) for the upcoming stable v1.0 release.
## Users
......
......@@ -21,6 +21,7 @@ serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1.16", features = ["time"] }
tracing = "0.1"
tracing-futures = "0.2.5"
[dev-dependencies]
jsonrpsee-test-utils = { path = "../../test-utils" }
......
......@@ -31,10 +31,12 @@ use crate::transport::HttpTransportClient;
use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response};
use async_trait::async_trait;
use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::tracing::RpcTracing;
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::error::CallError;
use rustc_hash::FxHashMap;
use serde::de::DeserializeOwned;
use tracing_futures::Instrument;
/// Http Client Builder.
#[derive(Debug)]
......@@ -44,6 +46,7 @@ pub struct HttpClientBuilder {
max_concurrent_requests: usize,
certificate_store: CertificateStore,
id_kind: IdKind,
max_log_length: u32,
}
impl HttpClientBuilder {
......@@ -77,10 +80,19 @@ impl HttpClientBuilder {
self
}
/// Max length for logging for requests and responses in number characters.
///
/// Logs bigger than this limit will be truncated.
pub fn set_max_logging_length(mut self, max: u32) -> Self {
self.max_log_length = max;
self
}
/// Build the HTTP client with target to connect to.
pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient, Error> {
let transport = HttpTransportClient::new(target, self.max_request_body_size, self.certificate_store)
.map_err(|e| Error::Transport(e.into()))?;
let transport =
HttpTransportClient::new(target, self.max_request_body_size, self.certificate_store, self.max_log_length)
.map_err(|e| Error::Transport(e.into()))?;
Ok(HttpClient {
transport,
id_manager: Arc::new(RequestIdManager::new(self.max_concurrent_requests, self.id_kind)),
......@@ -97,6 +109,7 @@ impl Default for HttpClientBuilder {
max_concurrent_requests: 256,
certificate_store: CertificateStore::Native,
id_kind: IdKind::Number,
max_log_length: 4096,
}
}
}
......@@ -115,8 +128,13 @@ pub struct HttpClient {
#[async_trait]
impl ClientT for HttpClient {
async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error> {
let notif = NotificationSer::new(method, params);
let fut = self.transport.send(serde_json::to_string(&notif).map_err(Error::ParseError)?);
let trace = RpcTracing::notification(method);
let _enter = trace.span().enter();
let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?;
let fut = self.transport.send(notif).in_current_span();
match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(ok)) => Ok(ok),
Err(_) => Err(Error::RequestTimeout),
......@@ -132,8 +150,12 @@ impl ClientT for HttpClient {
let guard = self.id_manager.next_request_id()?;
let id = guard.inner();
let request = RequestSer::new(&id, method, params);
let trace = RpcTracing::method_call(method);
let _enter = trace.span().enter();
let raw = serde_json::to_string(&request).map_err(Error::ParseError)?;
let fut = self.transport.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?);
let fut = self.transport.send_and_read_body(raw).in_current_span();
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => {
......@@ -165,6 +187,8 @@ impl ClientT for HttpClient {
{
let guard = self.id_manager.next_request_ids(batch.len())?;
let ids: Vec<Id> = guard.inner();
let trace = RpcTracing::batch();
let _enter = trace.span().enter();
let mut batch_request = Vec::with_capacity(batch.len());
// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
......@@ -177,7 +201,10 @@ impl ClientT for HttpClient {
request_set.insert(&ids[pos], pos);
}
let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
let fut = self
.transport
.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?)
.in_current_span();
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
......
......@@ -11,6 +11,7 @@ use hyper::Uri;
use jsonrpsee_core::client::CertificateStore;
use jsonrpsee_core::error::GenericTransportError;
use jsonrpsee_core::http_helpers;
use jsonrpsee_core::tracing::{rx_log_from_bytes, tx_log_from_str};
use thiserror::Error;
const CONTENT_TYPE_JSON: &str = "application/json";
......@@ -43,6 +44,10 @@ pub struct HttpTransportClient {
client: HyperClient,
/// Configurable max request body size
max_request_body_size: u32,
/// Max length for logging for requests and responses
///
/// Logs bigger than this limit will be truncated.
max_log_length: u32,
}
impl HttpTransportClient {
......@@ -51,6 +56,7 @@ impl HttpTransportClient {
target: impl AsRef<str>,
max_request_body_size: u32,
cert_store: CertificateStore,
max_log_length: u32,
) -> Result<Self, Error> {
let target: Uri = target.as_ref().parse().map_err(|e| Error::Url(format!("Invalid URL: {}", e)))?;
if target.port_u16().is_none() {
......@@ -84,11 +90,11 @@ impl HttpTransportClient {
return Err(Error::Url(err.into()));
}
};
Ok(Self { target, client, max_request_body_size })
Ok(Self { target, client, max_request_body_size, max_log_length })
}
async fn inner_send(&self, body: String) -> Result<hyper::Response<hyper::Body>, Error> {
tracing::debug!("send: {}", body);
tx_log_from_str(&body, self.max_log_length);
if body.len() > self.max_request_body_size as usize {
return Err(Error::RequestTooLarge);
......@@ -113,12 +119,16 @@ impl HttpTransportClient {
let response = self.inner_send(body).await?;
let (parts, body) = response.into_parts();
let (body, _) = http_helpers::read_body(&parts.headers, body, self.max_request_body_size).await?;
rx_log_from_bytes(&body, self.max_log_length);
Ok(body)
}
/// Send serialized message without reading the HTTP message body.
pub(crate) async fn send(&self, body: String) -> Result<(), Error> {
let _ = self.inner_send(body).await?;
Ok(())
}
}
......@@ -188,36 +198,37 @@ mod tests {
#[test]
fn invalid_http_url_rejected() {
let err = HttpTransportClient::new("ws://localhost:9933", 80, CertificateStore::Native).unwrap_err();
let err = HttpTransportClient::new("ws://localhost:9933", 80, CertificateStore::Native, 80).unwrap_err();
assert!(matches!(err, Error::Url(_)));
}
#[cfg(feature = "tls")]
#[test]
fn https_works() {
let client = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native).unwrap();
let client = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native, 80).unwrap();
assert_target(&client, "localhost", "https", "/", 9933, 80);
}
#[cfg(not(feature = "tls"))]
#[test]
fn https_fails_without_tls_feature() {
let err = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native).unwrap_err();
let err = HttpTransportClient::new("https://localhost:9933", 80, CertificateStore::Native, 80).unwrap_err();
assert!(matches!(err, Error::Url(_)));
}
#[test]
fn faulty_port() {
let err = HttpTransportClient::new("http://localhost:-43", 80, CertificateStore::Native).unwrap_err();
let err = HttpTransportClient::new("http://localhost:-43", 80, CertificateStore::Native, 80).unwrap_err();
assert!(matches!(err, Error::Url(_)));
let err = HttpTransportClient::new("http://localhost:-99999", 80, CertificateStore::Native).unwrap_err();
let err = HttpTransportClient::new("http://localhost:-99999", 80, CertificateStore::Native, 80).unwrap_err();
assert!(matches!(err, Error::Url(_)));
}
#[test]
fn url_with_path_works() {
let client =
HttpTransportClient::new("http://localhost:9944/my-special-path", 1337, CertificateStore::Native).unwrap();
HttpTransportClient::new("http://localhost:9944/my-special-path", 1337, CertificateStore::Native, 80)
.unwrap();
assert_target(&client, "localhost", "http", "/my-special-path", 9944, 1337);
}
......@@ -227,6 +238,7 @@ mod tests {
"http://127.0.0.1:9999/my?name1=value1&name2=value2",
u32::MAX,
CertificateStore::WebPki,
80,
)
.unwrap();
assert_target(&client, "127.0.0.1", "http", "/my?name1=value1&name2=value2", 9999, u32::MAX);
......@@ -235,14 +247,14 @@ mod tests {
#[test]
fn url_with_fragment_is_ignored() {
let client =
HttpTransportClient::new("http://127.0.0.1:9944/my.htm#ignore", 999, CertificateStore::Native).unwrap();
HttpTransportClient::new("http://127.0.0.1:9944/my.htm#ignore", 999, CertificateStore::Native, 80).unwrap();
assert_target(&client, "127.0.0.1", "http", "/my.htm", 9944, 999);
}
#[tokio::test]
async fn request_limit_works() {
let eighty_bytes_limit = 80;
let client = HttpTransportClient::new("http://localhost:9933", 80, CertificateStore::WebPki).unwrap();
let client = HttpTransportClient::new("http://localhost:9933", 80, CertificateStore::WebPki, 99).unwrap();
assert_eq!(client.max_request_body_size, eighty_bytes_limit);
let body = "a".repeat(81);
......
......@@ -191,7 +191,7 @@ impl TransportSenderT for Sender {
/// Sends out a request. Returns a `Future` that finishes when the request has been
/// successfully sent.
async fn send(&mut self, body: String) -> Result<(), Self::Error> {
tracing::debug!("send: {}", body);
tracing::trace!("send: {}", body);
self.inner.send_text(body).await?;
self.inner.flush().await?;
Ok(())
......
......@@ -16,6 +16,7 @@ thiserror = "1"
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1", features = ["raw_value"] }
http = "0.2.7"
tracing = "0.1"
# optional deps
arrayvec = { version = "0.7.1", optional = true }
......@@ -23,7 +24,7 @@ async-channel = { version = "1.6", optional = true }
async-lock = { version = "2.4", optional = true }
futures-util = { version = "0.3.14", default-features = false, optional = true }
hyper = { version = "0.14.10", default-features = false, features = ["stream"], optional = true }
tracing = { version = "0.1", optional = true }
tracing-futures = { version = "0.2", optional = true }
rustc-hash = { version = "1", optional = true }
rand = { version = "0.8", optional = true }
soketto = { version = "0.7.1", optional = true }
......@@ -43,7 +44,6 @@ server = [
"futures-util/alloc",
"globset",
"rustc-hash/std",
"tracing",
"parking_lot",
"rand",
"tokio/rt",
......@@ -59,7 +59,7 @@ async-client = [
"tokio/macros",
"tokio/rt",
"tokio/sync",
"tracing",
"tracing-futures",
"futures-timer",
]
async-wasm-client = [
......@@ -67,8 +67,8 @@ async-wasm-client = [
"client",
"wasm-bindgen-futures",
"rustc-hash/std",
"tracing-futures",
"futures-timer/wasm-bindgen",
"tracing",
]
[dev-dependencies]
......
......@@ -84,7 +84,7 @@ pub(crate) fn process_subscription_response(
let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
Some(request_id) => request_id,
None => {
tracing::error!("Subscription ID: {:?} is not an active subscription", sub_id);
tracing::warn!("Subscription ID: {:?} is not an active subscription", sub_id);
return Err(None);
}
};
......@@ -100,7 +100,7 @@ pub(crate) fn process_subscription_response(
}
},
None => {
tracing::error!("Subscription ID: {:?} is not an active subscription", sub_id);
tracing::warn!("Subscription ID: {:?} is not an active subscription", sub_id);
Err(None)
}
}
......
......@@ -8,6 +8,8 @@ use crate::client::{
RegisterNotificationMessage, RequestMessage, Subscription, SubscriptionClientT, SubscriptionKind,
SubscriptionMessage, TransportReceiverT, TransportSenderT,
};
use crate::tracing::{rx_log_from_json, tx_log_from_str, RpcTracing};
use core::time::Duration;
use helpers::{
build_unsubscribe_message, call_with_timeout, process_batch_response, process_error_response, process_notification,
......@@ -29,6 +31,7 @@ use jsonrpsee_types::{
SubscriptionResponse,
};
use serde::de::DeserializeOwned;
use tracing_futures::Instrument;
use super::{FrontToBack, IdKind, RequestIdManager};
......@@ -69,6 +72,7 @@ pub struct ClientBuilder {
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
id_kind: IdKind,
max_log_length: u32,
ping_interval: Option<Duration>,
}
......@@ -79,6 +83,7 @@ impl Default for ClientBuilder {
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,
id_kind: IdKind::Number,
max_log_length: 4096,
ping_interval: None,
}
}
......@@ -117,6 +122,13 @@ impl ClientBuilder {
self
}
/// Set maximum length for logging calls and responses.
///
/// Logs bigger than this limit will be truncated.
pub fn set_max_logging_length(mut self, max: u32) -> Self {
self.max_log_length = max;
self
}
/// Set the interval at which pings frames are submitted (disabled by default).
///
/// Periodically submitting pings at a defined interval has mainly two benefits:
......@@ -159,6 +171,7 @@ impl ClientBuilder {
request_timeout: self.request_timeout,
error: Mutex::new(ErrorFromBack::Unread(err_rx)),
id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind),
max_log_length: self.max_log_length,
}
}
......@@ -182,6 +195,7 @@ impl ClientBuilder {
request_timeout: self.request_timeout,
error: Mutex::new(ErrorFromBack::Unread(err_rx)),
id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind),
max_log_length: self.max_log_length,
}
}
}
......@@ -198,6 +212,10 @@ pub struct Client {
request_timeout: Duration,
/// Request ID manager.
id_manager: RequestIdManager,
/// Max length for logging for requests and responses.
///
/// Entries bigger than this limit will be truncated.
max_log_length: u32,
}
impl Client {
......@@ -228,11 +246,14 @@ impl ClientT for Client {
// NOTE: we use this to guard against max number of concurrent requests.
let _req_id = self.id_manager.next_request_id()?;
let notif = NotificationSer::new(method, params);
let trace = RpcTracing::batch();
let _enter = trace.span().enter();
let raw = serde_json::to_string(&notif).map_err(Error::ParseError)?;
tracing::trace!("[frontend]: send notification: {:?}", raw);
tx_log_from_str(&raw, self.max_log_length);
let mut sender = self.to_back.clone();
let fut = sender.send(FrontToBack::Notification(raw));
let fut = sender.send(FrontToBack::Notification(raw)).in_current_span();
match future::select(fut, Delay::new(self.request_timeout)).await {
Either::Left((Ok(()), _)) => Ok(()),
......@@ -248,26 +269,31 @@ impl ClientT for Client {
let (send_back_tx, send_back_rx) = oneshot::channel();
let guard = self.id_manager.next_request_id()?;
let id = guard.inner();
let trace = RpcTracing::method_call(method);
let _enter = trace.span().enter();
let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?;
tracing::trace!("[frontend]: send request: {:?}", raw);
tx_log_from_str(&raw, self.max_log_length);
if self
.to_back
.clone()
.send(FrontToBack::Request(RequestMessage { raw, id, send_back: Some(send_back_tx) }))
.send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) }))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await;
let json_value = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
rx_log_from_json(&Response::new(&json_value, id), self.max_log_length);
serde_json::from_value(json_value).map_err(Error::ParseError)
}
......@@ -278,6 +304,8 @@ impl ClientT for Client {
let guard = self.id_manager.next_request_ids(batch.len())?;
let batch_ids: Vec<Id> = guard.inner();
let mut batches = Vec::with_capacity(batch.len());
let log = RpcTracing::batch();
let _enter = log.span().enter();
for (idx, (method, params)) in batch.into_iter().enumerate() {
batches.push(RequestSer::new(&batch_ids[idx], method, params));
......@@ -286,7 +314,9 @@ impl ClientT for Client {
let (send_back_tx, send_back_rx) = oneshot::channel();
let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?;
tracing::trace!("[frontend]: send batch request: {:?}", raw);
tx_log_from_str(&raw, self.max_log_length);
if self
.to_back
.clone()
......@@ -297,13 +327,15 @@ impl ClientT for Client {
return Err(self.read_error_from_backend().await);
}
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await;
let json_values = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
rx_log_from_json(&json_values, self.max_log_length);
let values: Result<_, _> =
json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect();
Ok(values?)
......@@ -325,18 +357,20 @@ impl SubscriptionClientT for Client {
where
N: DeserializeOwned,
{
tracing::trace!("[frontend]: subscribe: {:?}, unsubscribe: {:?}", subscribe_method, unsubscribe_method);
if subscribe_method == unsubscribe_method {
return Err(Error::SubscriptionNameConflict(unsubscribe_method.to_owned()));
}
let guard = self.id_manager.next_request_ids(2)?;
let mut ids: Vec<Id> = guard.inner();
let trace = RpcTracing::method_call(subscribe_method);
let _enter = trace.span().enter();
let id = ids[0].clone();
let raw =
serde_json::to_string(&RequestSer::new(&ids[0], subscribe_method, params)).map_err(Error::ParseError)?;
let raw = serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
let (send_back_tx, send_back_rx) = oneshot::channel();
if self
......@@ -355,14 +389,17 @@ impl SubscriptionClientT for Client {
return Err(self.read_error_from_backend().await);
}
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await;
let (notifs_rx, id) = match res {
let (notifs_rx, sub_id) = match res {
Ok(Ok(val)) => val,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(id)))
rx_log_from_json(&Response::new(&sub_id, id), self.max_log_length);
Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id)))
}
/// Subscribe to a specific method.
......@@ -370,8 +407,6 @@ impl SubscriptionClientT for Client {
where
N: DeserializeOwned,
{
tracing::trace!("[frontend]: register_notification: {:?}", method);
let (send_back_tx, send_back_rx) = oneshot::channel();
if self
.to_back
......@@ -416,7 +451,6 @@ async fn handle_backend_messages<S: TransportSenderT, R: TransportReceiverT>(
) -> Result<(), Error> {
// Single response to a request.
if let Ok(single) = serde_json::from_slice::<Response<_>>(&raw) {
tracing::debug!("[backend]: recv method_call {:?}", single);
match process_single_response(manager, single, max_notifs_per_subscription) {
Ok(Some(unsub)) => {
stop_subscription(sender, manager, unsub).await;
......@@ -427,42 +461,40 @@ async fn handle_backend_messages<S: TransportSenderT, R: TransportReceiverT>(
}
// Subscription response.
else if let Ok(response) = serde_json::from_slice::<SubscriptionResponse<_>>(&raw) {
tracing::debug!("[backend]: recv subscription {:?}", response);
if let Err(Some(unsub)) = process_subscription_response(manager, response) {
let _ = stop_subscription(sender, manager, unsub).await;
}
}
// Subscription error response.
else if let Ok(response) = serde_json::from_slice::<SubscriptionError<_>>(&raw) {
tracing::debug!("[backend]: recv subscription closed {:?}", response);
let _ = process_subscription_close_response(manager, response);
}
// Incoming Notification
else if let Ok(notif) = serde_json::from_slice::<Notification<_>>(&raw) {
tracing::debug!("[backend]: recv notification {:?}", notif);
let _ = process_notification(manager, notif);
}
// Batch response.
else if let Ok(batch) = serde_json::from_slice::<Vec<Response<_>>>(&raw) {
tracing::debug!("[backend]: recv batch {:?}", batch);
if let Err(e) = process_batch_response(manager, batch) {
return Err(e);
}
}
// Error response
else if let Ok(err) = serde_json::from_slice::<ErrorResponse>(&raw) {
tracing::debug!("[backend]: recv error response {:?}", err);
if let Err(e) = process_error_response(manager, err) {
return Err(e);
}
}
// Unparsable response
else {
tracing::debug!(
"[backend]: recv unparseable message: {:?}",
serde_json::from_slice::<serde_json::Value>(&raw)
);
return Err(Error::Custom("Unparsable response".into()));
let json = serde_json::from_slice::<serde_json::Value>(&raw);
let json_str = match json {
Ok(json) => serde_json::to_string(&json).expect("valid JSON; qed"),
Err(e) => e.to_string(),
};
return Err(Error::Custom(format!("Unparseable message: {}", json_str)));
}
Ok(())
}
......@@ -507,7 +539,6 @@ async fn handle_frontend_messages<S: TransportSenderT>(
}
Some(FrontToBack::Batch(batch)) => {
tracing::trace!("[backend]: client prepares to send batch request: {:?}", batch.raw);
if let Err(send_back) = manager.insert_pending_batch(batch.ids.clone(), batch.send_back) {
tracing::warn!("[backend]: batch request: {:?} already pending", batch.ids);
let _ = send_back.send(Err(Error::InvalidRequestId));
......@@ -521,24 +552,18 @@ async fn handle_frontend_messages<S: TransportSenderT>(
}
// User called `notification` on the front-end
Some(FrontToBack::Notification(notif)) => {
tracing::trace!("[backend]: client prepares to send notification: {:?}", notif);
if let Err(e) = sender.send(notif).await {
tracing::warn!("[backend]: client notif failed: {:?}", e);
}