Unverified Commit 1efd5aea authored by Niklas Adolfsson's avatar Niklas Adolfsson Committed by GitHub
Browse files

fix(tracing): use tracing instrument macro (#846)

* tracing: use instrument macro

* fix merge nit

* cargo fmt

* tracing span in TRACE only

* Update core/src/client/async_client/mod.rs

* get rid of tracing-futures

* less noise for subscription spans

* nits: replace spaces with tabs
parent 94795309
Pipeline #215329 passed with stages
in 4 minutes and 53 seconds
......@@ -21,7 +21,6 @@ serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1.16", features = ["time"] }
tracing = "0.1.34"
tracing-futures = "0.2.5"
[dev-dependencies]
jsonrpsee-test-utils = { path = "../../test-utils" }
......
......@@ -33,13 +33,12 @@ use async_trait::async_trait;
use hyper::http::HeaderMap;
use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::tracing::RpcTracing;
use jsonrpsee_core::traits::ToRpcParams;
use jsonrpsee_core::{Error, JsonRawValue, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::error::CallError;
use rustc_hash::FxHashMap;
use serde::de::DeserializeOwned;
use tracing_futures::Instrument;
use tracing::instrument;
/// Http Client Builder.
///
......@@ -168,28 +167,26 @@ pub struct HttpClient {
#[async_trait]
impl ClientT for HttpClient {
#[instrument(name = "notification", skip(self, params), level = "trace")]
async fn notification<Params>(&self, method: &str, params: Params) -> Result<(), Error>
where
Params: ToRpcParams + Send,
{
let trace = RpcTracing::notification(method);
async {
let params = params.to_rpc_params()?;
let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?;
let params = params.to_rpc_params()?;
let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?;
let fut = self.transport.send(notif);
let fut = self.transport.send(notif);
match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(ok)) => Ok(ok),
Err(_) => Err(Error::RequestTimeout),
Ok(Err(e)) => Err(Error::Transport(e.into())),
}
match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(ok)) => Ok(ok),
Err(_) => Err(Error::RequestTimeout),
Ok(Err(e)) => Err(Error::Transport(e.into())),
}
.instrument(trace.into_span())
.await
}
/// Perform a request towards the server.
#[instrument(name = "method_call", skip(self, params), level = "trace")]
async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, Error>
where
R: DeserializeOwned,
......@@ -200,44 +197,39 @@ impl ClientT for HttpClient {
let params = params.to_rpc_params()?;
let request = RequestSer::new(&id, method, params);
let trace = RpcTracing::method_call(method);
async {
let raw = serde_json::to_string(&request).map_err(Error::ParseError)?;
let fut = self.transport.send_and_read_body(raw);
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => {
return Err(Error::RequestTimeout);
}
Ok(Err(e)) => {
return Err(Error::Transport(e.into()));
}
};
let raw = serde_json::to_string(&request).map_err(Error::ParseError)?;
// NOTE: it's decoded first to `JsonRawValue` and then to `R` below to get
// a better error message if `R` couldn't be decoded.
let response: Response<&JsonRawValue> = match serde_json::from_slice(&body) {
Ok(response) => response,
Err(_) => {
let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?;
return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned())));
}
};
let fut = self.transport.send_and_read_body(raw);
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => {
return Err(Error::RequestTimeout);
}
Ok(Err(e)) => {
return Err(Error::Transport(e.into()));
}
};
// NOTE: it's decoded first to `JsonRawValue` and then to `R` below to get
// a better error message if `R` couldn't be decoded.
let response: Response<&JsonRawValue> = match serde_json::from_slice(&body) {
Ok(response) => response,
Err(_) => {
let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?;
return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned())));
}
};
let result = serde_json::from_str(response.result.get()).map_err(Error::ParseError)?;
let result = serde_json::from_str(response.result.get()).map_err(Error::ParseError)?;
if response.id == id {
Ok(result)
} else {
Err(Error::InvalidRequestId)
}
if response.id == id {
Ok(result)
} else {
Err(Error::InvalidRequestId)
}
.instrument(trace.into_span())
.await
}
#[instrument(name = "batch", skip(self, batch), level = "trace")]
async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result<Vec<R>, Error>
where
R: DeserializeOwned + Default + Clone,
......@@ -245,57 +237,53 @@ impl ClientT for HttpClient {
let batch = batch.build();
let guard = self.id_manager.next_request_ids(batch.len())?;
let ids: Vec<Id> = guard.inner();
let trace = RpcTracing::batch();
async {
let mut batch_request = Vec::with_capacity(batch.len());
// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
let mut ordered_requests = Vec::with_capacity(batch.len());
let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default());
for (pos, (method, params)) in batch.into_iter().enumerate() {
batch_request.push(RequestSer::new(&ids[pos], method, params));
ordered_requests.push(&ids[pos]);
request_set.insert(&ids[pos], pos);
}
let fut =
self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
let mut batch_request = Vec::with_capacity(batch.len());
// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
let mut ordered_requests = Vec::with_capacity(batch.len());
let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default());
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => return Err(Error::RequestTimeout),
Ok(Err(e)) => return Err(Error::Transport(e.into())),
};
for (pos, (method, params)) in batch.into_iter().enumerate() {
batch_request.push(RequestSer::new(&ids[pos], method, params));
ordered_requests.push(&ids[pos]);
request_set.insert(&ids[pos], pos);
}
// NOTE: it's decoded first to `JsonRawValue` and then to `R` below to get
// a better error message if `R` couldn't be decoded.
let rps: Vec<Response<&JsonRawValue>> =
serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::<ErrorResponse>(&body) {
Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())),
Err(e) => Error::ParseError(e),
})?;
// NOTE: `R::default` is placeholder and will be replaced in loop below.
let mut responses = vec![R::default(); ordered_requests.len()];
for rp in rps {
let pos = match request_set.get(&rp.id) {
Some(pos) => *pos,
None => return Err(Error::InvalidRequestId),
};
let result = serde_json::from_str(rp.result.get()).map_err(Error::ParseError)?;
responses[pos] = result;
}
Ok(responses)
let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => return Err(Error::RequestTimeout),
Ok(Err(e)) => return Err(Error::Transport(e.into())),
};
// NOTE: it's decoded first to `JsonRawValue` and then to `R` below to get
// a better error message if `R` couldn't be decoded.
let rps: Vec<Response<&JsonRawValue>> =
serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::<ErrorResponse>(&body) {
Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())),
Err(e) => Error::ParseError(e),
})?;
// NOTE: `R::default` is placeholder and will be replaced in loop below.
let mut responses = vec![R::default(); ordered_requests.len()];
for rp in rps {
let pos = match request_set.get(&rp.id) {
Some(pos) => *pos,
None => return Err(Error::InvalidRequestId),
};
let result = serde_json::from_str(rp.result.get()).map_err(Error::ParseError)?;
responses[pos] = result;
}
.instrument(trace.into_span())
.await
Ok(responses)
}
}
#[async_trait]
impl SubscriptionClientT for HttpClient {
/// Send a subscription request to the server. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`].
#[instrument(name = "subscription", fields(method = _subscribe_method), skip(self, _params, _subscribe_method, _unsubscribe_method), level = "trace")]
async fn subscribe<'a, N, Params>(
&self,
_subscribe_method: &'a str,
......@@ -310,6 +298,7 @@ impl SubscriptionClientT for HttpClient {
}
/// Subscribe to a specific method. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`].
#[instrument(name = "subscribe_method", fields(method = _method), skip(self, _method), level = "trace")]
async fn subscribe_to_method<'a, N>(&self, _method: &'a str) -> Result<Subscription<N>, Error>
where
N: DeserializeOwned,
......
......@@ -23,7 +23,6 @@ async-channel = { version = "1.6", optional = true }
async-lock = { version = "2.4", optional = true }
futures-util = { version = "0.3.14", default-features = false, optional = true }
hyper = { version = "0.14.10", default-features = false, features = ["stream"], optional = true }
tracing-futures = { version = "0.2", optional = true }
rustc-hash = { version = "1", optional = true }
rand = { version = "0.8", optional = true }
soketto = { version = "0.7.1", optional = true }
......@@ -55,7 +54,6 @@ async-client = [
"tokio/macros",
"tokio/rt",
"tokio/sync",
"tracing-futures",
"futures-timer",
]
async-wasm-client = [
......@@ -63,7 +61,6 @@ async-wasm-client = [
"client",
"wasm-bindgen-futures",
"rustc-hash/std",
"tracing-futures",
"futures-timer/wasm-bindgen",
]
......
......@@ -8,7 +8,7 @@ use crate::client::{
RegisterNotificationMessage, RequestMessage, Subscription, SubscriptionClientT, SubscriptionKind,
SubscriptionMessage, TransportReceiverT, TransportSenderT,
};
use crate::tracing::{rx_log_from_json, tx_log_from_str, RpcTracing};
use crate::tracing::{rx_log_from_json, tx_log_from_str};
use core::time::Duration;
use helpers::{
......@@ -18,6 +18,8 @@ use helpers::{
use manager::RequestManager;
use crate::error::Error;
use crate::params::BatchRequestBuilder;
use crate::traits::ToRpcParams;
use async_lock::Mutex;
use async_trait::async_trait;
use futures_channel::{mpsc, oneshot};
......@@ -31,9 +33,7 @@ use jsonrpsee_types::{
SubscriptionResponse,
};
use serde::de::DeserializeOwned;
use tracing_futures::Instrument;
use crate::params::BatchRequestBuilder;
use crate::traits::ToRpcParams;
use tracing::instrument;
use super::{FrontToBack, IdKind, RequestIdManager};
......@@ -176,7 +176,7 @@ impl ClientBuilder {
ping_interval,
on_close_tx,
)
.await;
.await;
});
Client {
to_back,
......@@ -192,9 +192,9 @@ impl ClientBuilder {
#[cfg(all(feature = "async-wasm-client", target_arch = "wasm32"))]
#[cfg_attr(docsrs, doc(cfg(feature = "async-wasm-client")))]
pub fn build_with_wasm<S, R>(self, sender: S, receiver: R) -> Client
where
S: TransportSenderT,
R: TransportReceiverT,
where
S: TransportSenderT,
R: TransportReceiverT,
{
let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
let (err_tx, err_rx) = oneshot::channel();
......@@ -274,117 +274,104 @@ impl Drop for Client {
}
#[async_trait]
impl ClientT for Client
{
impl ClientT for Client {
#[instrument(name = "notification", skip(self, params), level = "trace")]
async fn notification<Params>(&self, method: &str, params: Params) -> Result<(), Error>
where
Params: ToRpcParams + Send {
Params: ToRpcParams + Send,
{
// NOTE: we use this to guard against max number of concurrent requests.
let _req_id = self.id_manager.next_request_id()?;
let params = params.to_rpc_params()?;
let notif = NotificationSer::new(method, params);
let trace = RpcTracing::batch();
async {
let raw = serde_json::to_string(&notif).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
let raw = serde_json::to_string(&notif).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
let mut sender = self.to_back.clone();
let fut = sender.send(FrontToBack::Notification(raw));
let mut sender = self.to_back.clone();
let fut = sender.send(FrontToBack::Notification(raw));
match future::select(fut, Delay::new(self.request_timeout)).await {
Either::Left((Ok(()), _)) => Ok(()),
Either::Left((Err(_), _)) => Err(self.read_error_from_backend().await),
Either::Right((_, _)) => Err(Error::RequestTimeout),
}
match future::select(fut, Delay::new(self.request_timeout)).await {
Either::Left((Ok(()), _)) => Ok(()),
Either::Left((Err(_), _)) => Err(self.read_error_from_backend().await),
Either::Right((_, _)) => Err(Error::RequestTimeout),
}
.instrument(trace.into_span())
.await
}
#[instrument(name = "method_call", skip(self, params), level = "trace")]
async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, Error>
where
R: DeserializeOwned,
Params: ToRpcParams + Send
Params: ToRpcParams + Send,
{
let (send_back_tx, send_back_rx) = oneshot::channel();
let guard = self.id_manager.next_request_id()?;
let id = guard.inner();
let trace = RpcTracing::method_call(method);
async {
let params = params.to_rpc_params()?;
let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
let params = params.to_rpc_params()?;
let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
if self
.to_back
.clone()
.send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) }))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
if self
.to_back
.clone()
.send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) }))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
let json_value = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
let json_value = match call_with_timeout(self.request_timeout, send_back_rx).await {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
rx_log_from_json(&Response::new(&json_value, id), self.max_log_length);
rx_log_from_json(&Response::new(&json_value, id), self.max_log_length);
serde_json::from_value(json_value).map_err(Error::ParseError)
}
.instrument(trace.into_span())
.await
serde_json::from_value(json_value).map_err(Error::ParseError)
}
#[instrument(name = "batch", skip(self, batch), level = "trace")]
async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result<Vec<R>, Error>
where
R: DeserializeOwned + Default + Clone
R: DeserializeOwned + Default + Clone,
{
let trace = RpcTracing::batch();
async {
let batch = batch.build();
let guard = self.id_manager.next_request_ids(batch.len())?;
let batch_ids: Vec<Id> = guard.inner();
let mut batches = Vec::with_capacity(batch.len());
for (idx, (method, params)) in batch.into_iter().enumerate() {
batches.push(RequestSer::new(&batch_ids[idx], method, params));
}
let batch = batch.build();
let guard = self.id_manager.next_request_ids(batch.len())?;
let batch_ids: Vec<Id> = guard.inner();
let mut batches = Vec::with_capacity(batch.len());
for (idx, (method, params)) in batch.into_iter().enumerate() {
batches.push(RequestSer::new(&batch_ids[idx], method, params));
}
let (send_back_tx, send_back_rx) = oneshot::channel();
let (send_back_tx, send_back_rx) = oneshot::channel();
let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?;
let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
tx_log_from_str(&raw, self.max_log_length);
if self
.to_back
.clone()
.send(FrontToBack::Batch(BatchMessage { raw, ids: batch_ids, send_back: send_back_tx }))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
if self
.to_back
.clone()
.send(FrontToBack::Batch(BatchMessage { raw, ids: batch_ids, send_back: send_back_tx }))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
let json_values = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
let json_values = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
rx_log_from_json(&json_values, self.max_log_length);
rx_log_from_json(&json_values, self.max_log_length);
json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect()
}
.instrument(trace.into_span())
.await
json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect()
}
}
......@@ -394,6 +381,7 @@ impl SubscriptionClientT for Client {
///
/// The `subscribe_method` and `params` are used to ask for the subscription towards the
/// server. The `unsubscribe_method` is used to close the subscription.
#[instrument(name = "subscription", fields(method = subscribe_method), skip(self, params, subscribe_method, unsubscribe_method), level = "trace")]
async fn subscribe<'a, Notif, Params>(
&self,
subscribe_method: &'a str,
......@@ -402,7 +390,7 @@ impl SubscriptionClientT for Client {
) -> Result<Subscription<Notif>, Error>
where
Params: ToRpcParams + Send,
Notif: DeserializeOwned
Notif: DeserializeOwned,
{
if subscribe_method == unsubscribe_method {
return Err(Error::SubscriptionNameConflict(unsubscribe_method.to_owned()));
......@@ -410,51 +398,43 @@ impl SubscriptionClientT for Client {
let guard = self.id_manager.next_request_ids(2)?;
let mut ids: Vec<Id> = guard.inner();
let trace = RpcTracing::method_call(subscribe_method);
let params = params.to_rpc_params()?;
async {
let id = ids[0].clone();
let raw =
serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
let (send_back_tx, send_back_rx) = oneshot::channel();
if self
.to_back
.clone()
.send(FrontToBack::Subscribe(SubscriptionMessage {
raw,
subscribe_id: ids.swap_remove(0),
unsubscribe_id: ids.swap_remove(0),
unsubscribe_method: unsubscribe_method.to_owned(),
send_back: send_back_tx,
}))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
let id = ids[0].clone();
let raw = serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?;
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
tx_log_from_str(&raw, self.max_log_length);
let (notifs_rx, sub_id) = match res {
Ok(Ok(val)) => val,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
let (send_back_tx, send_back_rx) = oneshot::channel();
if self
.to_back
.clone()
.send(FrontToBack::Subscribe(SubscriptionMessage {
raw,
subscribe_id: ids.swap_remove(0),
unsubscribe_id: ids.swap_remove(0),
unsubscribe_method: unsubscribe_method.to_owned(),
send_back: send_back_tx,
}))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
rx_log_from_json(&Response::new(&sub_id, id), self.max_log_length);
let (notifs_rx, sub_id) = match call_with_timeout(self.request_timeout, send_back_rx).await {
Ok(Ok(val)) => val,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id)))
}
.instrument(trace.into_span())
.await
rx_log_from_json(&Response::new(&sub_id, id), self.max_log_length);
Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id)))
}
/// Subscribe to a specific method.