Unverified Commit 3c416ac5 authored by Patrick Kuo's avatar Patrick Kuo Committed by GitHub
Browse files

[Bug Fix] - Incorrect trace caused by use of `Span::enter` in asynchronous code (#835)

* add id to tracing span

* Revert "add id to tracing span"

This reverts commit 5e01e6be.

* Avoid using Span::enter() in async functions, following tracing's doc instruction https://docs.rs/tracing/latest/tracing/struct.Span.html#in-asynchronous-code

* * fixed all Span::enter()
* clean up

* fix fmt

* changed RpcTracing::span -> into_span instead of cloning the span
parent e7dc80d0
Pipeline #205302 passed with stages
in 4 minutes and 43 seconds
......@@ -168,17 +168,19 @@ pub struct HttpClient {
impl ClientT for HttpClient {
async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error> {
let trace = RpcTracing::notification(method);
let _enter = trace.span().enter();
async {
let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?;
let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?;
let fut = self.transport.send(notif);
let fut = self.transport.send(notif).in_current_span();
match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(ok)) => Ok(ok),
Err(_) => Err(Error::RequestTimeout),
Ok(Err(e)) => Err(Error::Transport(e.into())),
match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(ok)) => Ok(ok),
Err(_) => Err(Error::RequestTimeout),
Ok(Err(e)) => Err(Error::Transport(e.into())),
}
}
.instrument(trace.into_span())
.await
}
/// Perform a request towards the server.
......@@ -190,34 +192,37 @@ impl ClientT for HttpClient {
let id = guard.inner();
let request = RequestSer::new(&id, method, params);
let trace = RpcTracing::method_call(method);
let _enter = trace.span().enter();
let raw = serde_json::to_string(&request).map_err(Error::ParseError)?;
async {
let raw = serde_json::to_string(&request).map_err(Error::ParseError)?;
let fut = self.transport.send_and_read_body(raw).in_current_span();
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => {
return Err(Error::RequestTimeout);
}
Ok(Err(e)) => {
return Err(Error::Transport(e.into()));
}
};
let fut = self.transport.send_and_read_body(raw);
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => {
return Err(Error::RequestTimeout);
}
Ok(Err(e)) => {
return Err(Error::Transport(e.into()));
}
};
let response: Response<_> = match serde_json::from_slice(&body) {
Ok(response) => response,
Err(_) => {
let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?;
return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned())));
}
};
let response: Response<_> = match serde_json::from_slice(&body) {
Ok(response) => response,
Err(_) => {
let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?;
return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned())));
}
};
if response.id == id {
Ok(response.result)
} else {
Err(Error::InvalidRequestId)
if response.id == id {
Ok(response.result)
} else {
Err(Error::InvalidRequestId)
}
}
.instrument(trace.into_span())
.await
}
async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option<ParamsSer<'a>>)>) -> Result<Vec<R>, Error>
......@@ -227,46 +232,47 @@ impl ClientT for HttpClient {
let guard = self.id_manager.next_request_ids(batch.len())?;
let ids: Vec<Id> = guard.inner();
let trace = RpcTracing::batch();
let _enter = trace.span().enter();
let mut batch_request = Vec::with_capacity(batch.len());
// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
let mut ordered_requests = Vec::with_capacity(batch.len());
let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default());
async {
let mut batch_request = Vec::with_capacity(batch.len());
// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
let mut ordered_requests = Vec::with_capacity(batch.len());
let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default());
for (pos, (method, params)) in batch.into_iter().enumerate() {
batch_request.push(RequestSer::new(&ids[pos], method, params));
ordered_requests.push(&ids[pos]);
request_set.insert(&ids[pos], pos);
}
for (pos, (method, params)) in batch.into_iter().enumerate() {
batch_request.push(RequestSer::new(&ids[pos], method, params));
ordered_requests.push(&ids[pos]);
request_set.insert(&ids[pos], pos);
}
let fut = self
.transport
.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?)
.in_current_span();
let fut =
self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => return Err(Error::RequestTimeout),
Ok(Err(e)) => return Err(Error::Transport(e.into())),
};
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => return Err(Error::RequestTimeout),
Ok(Err(e)) => return Err(Error::Transport(e.into())),
};
let rps: Vec<Response<_>> =
serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::<ErrorResponse>(&body) {
Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())),
Err(e) => Error::ParseError(e),
})?;
let rps: Vec<Response<_>> =
serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::<ErrorResponse>(&body) {
Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())),
Err(e) => Error::ParseError(e),
})?;
// NOTE: `R::default` is placeholder and will be replaced in loop below.
let mut responses = vec![R::default(); ordered_requests.len()];
for rp in rps {
let pos = match request_set.get(&rp.id) {
Some(pos) => *pos,
None => return Err(Error::InvalidRequestId),
};
responses[pos] = rp.result
// NOTE: `R::default` is placeholder and will be replaced in loop below.
let mut responses = vec![R::default(); ordered_requests.len()];
for rp in rps {
let pos = match request_set.get(&rp.id) {
Some(pos) => *pos,
None => return Err(Error::InvalidRequestId),
};
responses[pos] = rp.result
}
Ok(responses)
}
Ok(responses)
.instrument(trace.into_span())
.await
}
}
......
......@@ -243,102 +243,104 @@ impl Drop for Client {
#[async_trait]
impl ClientT for Client {
async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error> {
// NOTE: we use this to guard against max number of concurrent requests.
let _req_id = self.id_manager.next_request_id()?;
let notif = NotificationSer::new(method, params);
let trace = RpcTracing::batch();
let _enter = trace.span().enter();
let raw = serde_json::to_string(&notif).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
let mut sender = self.to_back.clone();
let fut = sender.send(FrontToBack::Notification(raw)).in_current_span();
match future::select(fut, Delay::new(self.request_timeout)).await {
Either::Left((Ok(()), _)) => Ok(()),
Either::Left((Err(_), _)) => Err(self.read_error_from_backend().await),
Either::Right((_, _)) => Err(Error::RequestTimeout),
}
}
// NOTE: we use this to guard against max number of concurrent requests.
let _req_id = self.id_manager.next_request_id()?;
let notif = NotificationSer::new(method, params);
let trace = RpcTracing::batch();
async {
let raw = serde_json::to_string(&notif).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
let mut sender = self.to_back.clone();
let fut = sender.send(FrontToBack::Notification(raw));
match future::select(fut, Delay::new(self.request_timeout)).await {
Either::Left((Ok(()), _)) => Ok(()),
Either::Left((Err(_), _)) => Err(self.read_error_from_backend().await),
Either::Right((_, _)) => Err(Error::RequestTimeout),
}
}.instrument(trace.into_span()).await
}
async fn request<'a, R>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<R, Error>
where
R: DeserializeOwned,
{
let (send_back_tx, send_back_rx) = oneshot::channel();
let guard = self.id_manager.next_request_id()?;
let id = guard.inner();
let trace = RpcTracing::method_call(method);
let _enter = trace.span().enter();
let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
if self
.to_back
.clone()
.send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) }))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await;
let json_value = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
rx_log_from_json(&Response::new(&json_value, id), self.max_log_length);
serde_json::from_value(json_value).map_err(Error::ParseError)
}
{
let (send_back_tx, send_back_rx) = oneshot::channel();
let guard = self.id_manager.next_request_id()?;
let id = guard.inner();
let trace = RpcTracing::method_call(method);
async {
let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
if self
.to_back
.clone()
.send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) }))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
let json_value = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
rx_log_from_json(&Response::new(&json_value, id), self.max_log_length);
serde_json::from_value(json_value).map_err(Error::ParseError)
}.instrument(trace.into_span()).await
}
async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option<ParamsSer<'a>>)>) -> Result<Vec<R>, Error>
where
R: DeserializeOwned + Default + Clone,
{
let guard = self.id_manager.next_request_ids(batch.len())?;
let batch_ids: Vec<Id> = guard.inner();
let mut batches = Vec::with_capacity(batch.len());
let log = RpcTracing::batch();
let _enter = log.span().enter();
for (idx, (method, params)) in batch.into_iter().enumerate() {
batches.push(RequestSer::new(&batch_ids[idx], method, params));
}
let (send_back_tx, send_back_rx) = oneshot::channel();
let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
if self
.to_back
.clone()
.send(FrontToBack::Batch(BatchMessage { raw, ids: batch_ids, send_back: send_back_tx }))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await;
let json_values = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
rx_log_from_json(&json_values, self.max_log_length);
let values: Result<_, _> =
json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect();
Ok(values?)
{
let trace = RpcTracing::batch();
async {
let guard = self.id_manager.next_request_ids(batch.len())?;
let batch_ids: Vec<Id> = guard.inner();
let mut batches = Vec::with_capacity(batch.len());
for (idx, (method, params)) in batch.into_iter().enumerate() {
batches.push(RequestSer::new(&batch_ids[idx], method, params));
}
let (send_back_tx, send_back_rx) = oneshot::channel();
let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
if self
.to_back
.clone()
.send(FrontToBack::Batch(BatchMessage { raw, ids: batch_ids, send_back: send_back_tx }))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
let json_values = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
rx_log_from_json(&json_values, self.max_log_length);
let values: Result<_, _> =
json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect();
Ok(values?)
}.instrument(trace.into_span()).await
}
}
......@@ -356,51 +358,52 @@ impl SubscriptionClientT for Client {
) -> Result<Subscription<N>, Error>
where
N: DeserializeOwned,
{
if subscribe_method == unsubscribe_method {
return Err(Error::SubscriptionNameConflict(unsubscribe_method.to_owned()));
}
let guard = self.id_manager.next_request_ids(2)?;
let mut ids: Vec<Id> = guard.inner();
let trace = RpcTracing::method_call(subscribe_method);
let _enter = trace.span().enter();
let id = ids[0].clone();
let raw = serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
let (send_back_tx, send_back_rx) = oneshot::channel();
if self
.to_back
.clone()
.send(FrontToBack::Subscribe(SubscriptionMessage {
raw,
subscribe_id: ids.swap_remove(0),
unsubscribe_id: ids.swap_remove(0),
unsubscribe_method: unsubscribe_method.to_owned(),
send_back: send_back_tx,
}))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await;
let (notifs_rx, sub_id) = match res {
Ok(Ok(val)) => val,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
rx_log_from_json(&Response::new(&sub_id, id), self.max_log_length);
Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id)))
}
{
if subscribe_method == unsubscribe_method {
return Err(Error::SubscriptionNameConflict(unsubscribe_method.to_owned()));
}
let guard = self.id_manager.next_request_ids(2)?;
let mut ids: Vec<Id> = guard.inner();
let trace = RpcTracing::method_call(subscribe_method);
async {
let id = ids[0].clone();
let raw = serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
let (send_back_tx, send_back_rx) = oneshot::channel();
if self
.to_back
.clone()
.send(FrontToBack::Subscribe(SubscriptionMessage {
raw,
subscribe_id: ids.swap_remove(0),
unsubscribe_id: ids.swap_remove(0),
unsubscribe_method: unsubscribe_method.to_owned(),
send_back: send_back_tx,
}))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
let (notifs_rx, sub_id) = match res {
Ok(Ok(val)) => val,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
rx_log_from_json(&Response::new(&sub_id, id), self.max_log_length);
Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id)))
}.instrument(trace.into_span()).await
}
/// Subscribe to a specific method.
async fn subscribe_to_method<'a, N>(&self, method: &'a str) -> Result<Subscription<N>, Error>
......
......@@ -28,8 +28,8 @@ impl RpcTracing {
}
/// Get the inner span.
pub fn span(&self) -> &tracing::Span {
&self.0
pub fn into_span(self) -> tracing::Span {
self.0
}
}
......
......@@ -686,43 +686,43 @@ async fn process_health_request<M: Middleware>(
max_log_length: u32,
) -> Result<hyper::Response<hyper::Body>, HyperError> {
let trace = RpcTracing::method_call(&health_api.method);
let _enter = trace.span().enter();
tx_log_from_str("HTTP health API", max_log_length);
async {
tx_log_from_str("HTTP health API", max_log_length);
let response = match methods.method_with_name(&health_api.method) {
None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)),
Some((_name, method_callback)) => match method_callback.inner() {
MethodKind::Sync(callback) => {
(callback)(Id::Number(0), Params::new(None), max_response_body_size as usize)
}
MethodKind::Async(callback) => {
(callback)(Id::Number(0), Params::new(None), 0, max_response_body_size as usize, None).await
}
MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError))
}
},
};
let response = match methods.method_with_name(&health_api.method) {
None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)),
Some((_name, method_callback)) => match method_callback.inner() {
MethodKind::Sync(callback) => (callback)(Id::Number(0), Params::new(None), max_response_body_size as usize),
MethodKind::Async(callback) => {
(callback)(Id::Number(0), Params::new(None), 0, max_response_body_size as usize, None)
.in_current_span()
.await
}
rx_log_from_str(&response.result, max_log_length);
middleware.on_result(&health_api.method, response.success, request_start);
middleware.on_response(&response.result, request_start);
MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError))
if response.success {
#[derive(serde::Deserialize)]
struct RpcPayload<'a> {
#[serde(borrow)]
result: &'a serde_json::value::RawValue,
}
},
};
rx_log_from_str(&response.result, max_log_length);
middleware.on_result(&health_api.method, response.success, request_start);
middleware.on_response(&response.result, request_start);
if response.success {
#[derive(serde::Deserialize)]
struct RpcPayload<'a> {
#[serde(borrow)]
result: &'a serde_json::value::RawValue,
let payload: RpcPayload = serde_json::from_str(&response.result)
.expect("valid JSON-RPC response must have a result field and be valid JSON; qed");
Ok(response::ok_response(payload.result.to_string()))
} else {
Ok(response::internal_error())
}
let payload: RpcPayload = serde_json::from_str(&response.result)
.expect("valid JSON-RPC response must have a result field and be valid JSON; qed");
Ok(response::ok_response(payload.result.to_string()))
} else {
Ok(response::internal_error())
}
.instrument(trace.into_span())
.await
}
#[derive(Debug, Clone)]
......@@ -766,26 +766,25 @@ where
let batch_stream = futures_util::stream::iter(batch);
let trace = RpcTracing::batch();
let _enter = trace.span().enter();
let batch_response = batch_stream
.try_fold(
BatchResponseBuilder::new_with_limit(max_response_size as usize),
|batch_response, (req, call)| async move {
let params = Params::new(req.params.map(|params| params.get()));
let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await;
batch_response.append(&response)
},
)
.in_current_span()