Unverified Commit 75d5fc28 authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

Merge remote-tracking branch 'origin/master' into poc_middleware_layer_v4


Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parents bd0e6871 7f198385
Pipeline #206752 passed with stages
in 4 minutes and 51 seconds
......@@ -21,7 +21,7 @@ jobs:
override: true
- name: Rust Cache
uses: Swatinem/rust-cache@v1.4.0
uses: Swatinem/rust-cache@v2.0.0
- name: Run benchmark
run: cargo bench -p jsonrpsee-benchmarks -- --output-format bencher | tee output.txt
......
......@@ -34,7 +34,7 @@ jobs:
components: clippy, rustfmt
- name: Rust Cache
uses: Swatinem/rust-cache@v1.4.0
uses: Swatinem/rust-cache@v2.0.0
- name: Cargo fmt
uses: actions-rs/cargo@v1.0.3
......@@ -86,7 +86,7 @@ jobs:
version: 0.5
- name: Rust Cache
uses: Swatinem/rust-cache@v1.4.0
uses: Swatinem/rust-cache@v2.0.0
- name: Cargo check all targets and features
run: cargo hack check --workspace --each-feature --all-targets
......@@ -106,7 +106,7 @@ jobs:
override: true
- name: Rust Cache
uses: Swatinem/rust-cache@v1.4.0
uses: Swatinem/rust-cache@v2.0.0
- name: Cargo test
uses: actions-rs/cargo@v1.0.3
......@@ -129,7 +129,7 @@ jobs:
override: true
- name: Rust Cache
uses: Swatinem/rust-cache@v1.4.0
uses: Swatinem/rust-cache@v2.0.0
- name: Cargo test
uses: actions-rs/cargo@v1.0.3
......@@ -152,7 +152,7 @@ jobs:
override: true
- name: Rust Cache
uses: Swatinem/rust-cache@v1.4.0
uses: Swatinem/rust-cache@v2.0.0
- name: Cargo test
uses: actions-rs/cargo@v1.0.3
......
......@@ -4,6 +4,13 @@ The format is based on [Keep a Changelog].
[Keep a Changelog]: http://keepachangelog.com/en/1.0.0/
## [v0.15.1] - 2022-07-29
This release fixes some incorrect tracing spans.
### [Fixed]
- [Bug Fix] - Incorrect trace caused by use of Span::enter in asynchronous code [#835](https://github.com/paritytech/jsonrpsee/pull/835)
## [v0.15.0] - 2022-07-20
v0.15.0 is a breaking release. The main changes are:
......
......@@ -32,7 +32,7 @@ Support `WebSocket` and `HTTP` transports for both client and server.
- [WebSocket](./examples/examples/ws.rs)
- [WebSocket pubsub](./examples/examples/ws_pubsub_broadcast.rs)
- [API generation with proc macro](./examples/examples/proc_macro.rs)
- [Middleware](./examples/examples/multi_middleware.rs)
- [Logger](./examples/examples/multi_logger.rs)
- [CORS server](./examples/examples/cors_server.rs)
- [Core client](./examples/examples/core_client.rs)
......
[package]
name = "jsonrpsee-benchmarks"
version = "0.15.0"
version = "0.15.1"
authors = ["Parity Technologies <admin@parity.io>"]
description = "Benchmarks for jsonrpsee"
edition = "2021"
......
[package]
name = "jsonrpsee-http-client"
version = "0.15.0"
version = "0.15.1"
authors = ["Parity Technologies <admin@parity.io>", "Pierre Krieger <pierre.krieger1708@gmail.com>"]
description = "HTTP client for JSON-RPC"
edition = "2021"
......@@ -14,8 +14,8 @@ async-trait = "0.1"
rustc-hash = "1"
hyper = { version = "0.14.10", features = ["client", "http1", "http2", "tcp"] }
hyper-rustls = { version = "0.23", optional = true }
jsonrpsee-types = { path = "../../types", version = "0.15.0" }
jsonrpsee-core = { path = "../../core", version = "0.15.0", features = ["client", "http-helpers"] }
jsonrpsee-types = { path = "../../types", version = "0.15.1" }
jsonrpsee-core = { path = "../../core", version = "0.15.1", features = ["client", "http-helpers"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
......
......@@ -168,17 +168,19 @@ pub struct HttpClient {
impl ClientT for HttpClient {
async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error> {
let trace = RpcTracing::notification(method);
let _enter = trace.span().enter();
async {
let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?;
let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?;
let fut = self.transport.send(notif);
let fut = self.transport.send(notif).in_current_span();
match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(ok)) => Ok(ok),
Err(_) => Err(Error::RequestTimeout),
Ok(Err(e)) => Err(Error::Transport(e.into())),
match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(ok)) => Ok(ok),
Err(_) => Err(Error::RequestTimeout),
Ok(Err(e)) => Err(Error::Transport(e.into())),
}
}
.instrument(trace.into_span())
.await
}
/// Perform a request towards the server.
......@@ -190,34 +192,37 @@ impl ClientT for HttpClient {
let id = guard.inner();
let request = RequestSer::new(&id, method, params);
let trace = RpcTracing::method_call(method);
let _enter = trace.span().enter();
let raw = serde_json::to_string(&request).map_err(Error::ParseError)?;
async {
let raw = serde_json::to_string(&request).map_err(Error::ParseError)?;
let fut = self.transport.send_and_read_body(raw).in_current_span();
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => {
return Err(Error::RequestTimeout);
}
Ok(Err(e)) => {
return Err(Error::Transport(e.into()));
}
};
let fut = self.transport.send_and_read_body(raw);
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => {
return Err(Error::RequestTimeout);
}
Ok(Err(e)) => {
return Err(Error::Transport(e.into()));
}
};
let response: Response<_> = match serde_json::from_slice(&body) {
Ok(response) => response,
Err(_) => {
let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?;
return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned())));
}
};
let response: Response<_> = match serde_json::from_slice(&body) {
Ok(response) => response,
Err(_) => {
let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?;
return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned())));
}
};
if response.id == id {
Ok(response.result)
} else {
Err(Error::InvalidRequestId)
if response.id == id {
Ok(response.result)
} else {
Err(Error::InvalidRequestId)
}
}
.instrument(trace.into_span())
.await
}
async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option<ParamsSer<'a>>)>) -> Result<Vec<R>, Error>
......@@ -227,46 +232,47 @@ impl ClientT for HttpClient {
let guard = self.id_manager.next_request_ids(batch.len())?;
let ids: Vec<Id> = guard.inner();
let trace = RpcTracing::batch();
let _enter = trace.span().enter();
let mut batch_request = Vec::with_capacity(batch.len());
// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
let mut ordered_requests = Vec::with_capacity(batch.len());
let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default());
async {
let mut batch_request = Vec::with_capacity(batch.len());
// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
let mut ordered_requests = Vec::with_capacity(batch.len());
let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default());
for (pos, (method, params)) in batch.into_iter().enumerate() {
batch_request.push(RequestSer::new(&ids[pos], method, params));
ordered_requests.push(&ids[pos]);
request_set.insert(&ids[pos], pos);
}
for (pos, (method, params)) in batch.into_iter().enumerate() {
batch_request.push(RequestSer::new(&ids[pos], method, params));
ordered_requests.push(&ids[pos]);
request_set.insert(&ids[pos], pos);
}
let fut = self
.transport
.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?)
.in_current_span();
let fut =
self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => return Err(Error::RequestTimeout),
Ok(Err(e)) => return Err(Error::Transport(e.into())),
};
let body = match tokio::time::timeout(self.request_timeout, fut).await {
Ok(Ok(body)) => body,
Err(_e) => return Err(Error::RequestTimeout),
Ok(Err(e)) => return Err(Error::Transport(e.into())),
};
let rps: Vec<Response<_>> =
serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::<ErrorResponse>(&body) {
Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())),
Err(e) => Error::ParseError(e),
})?;
let rps: Vec<Response<_>> =
serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::<ErrorResponse>(&body) {
Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())),
Err(e) => Error::ParseError(e),
})?;
// NOTE: `R::default` is placeholder and will be replaced in loop below.
let mut responses = vec![R::default(); ordered_requests.len()];
for rp in rps {
let pos = match request_set.get(&rp.id) {
Some(pos) => *pos,
None => return Err(Error::InvalidRequestId),
};
responses[pos] = rp.result
// NOTE: `R::default` is placeholder and will be replaced in loop below.
let mut responses = vec![R::default(); ordered_requests.len()];
for rp in rps {
let pos = match request_set.get(&rp.id) {
Some(pos) => *pos,
None => return Err(Error::InvalidRequestId),
};
responses[pos] = rp.result
}
Ok(responses)
}
Ok(responses)
.instrument(trace.into_span())
.await
}
}
......
[package]
name = "jsonrpsee-client-transport"
version = "0.15.0"
version = "0.15.1"
authors = ["Parity Technologies <admin@parity.io>", "Pierre Krieger <pierre.krieger1708@gmail.com>"]
description = "WebSocket client for JSON-RPC"
edition = "2021"
......@@ -10,8 +10,8 @@ homepage = "https://github.com/paritytech/jsonrpsee"
documentation = "https://docs.rs/jsonrpsee-ws-client"
[dependencies]
jsonrpsee-types = { path = "../../types", version = "0.15.0", optional = true }
jsonrpsee-core = { path = "../../core", version = "0.15.0", features = ["client"] }
jsonrpsee-types = { path = "../../types", version = "0.15.1", optional = true }
jsonrpsee-core = { path = "../../core", version = "0.15.1", features = ["client"] }
tracing = "0.1.34"
# optional
......
[package]
name = "jsonrpsee-wasm-client"
version = "0.15.0"
version = "0.15.1"
authors = ["Parity Technologies <admin@parity.io>", "Pierre Krieger <pierre.krieger1708@gmail.com>"]
description = "WASM client for JSON-RPC"
edition = "2021"
......@@ -10,9 +10,9 @@ homepage = "https://github.com/paritytech/jsonrpsee"
documentation = "https://docs.rs/jsonrpsee-ws-client"
[dependencies]
jsonrpsee-types = { path = "../../types", version = "0.15.0" }
jsonrpsee-client-transport = { path = "../transport", version = "0.15.0", features = ["web"] }
jsonrpsee-core = { path = "../../core", version = "0.15.0", features = ["async-wasm-client"] }
jsonrpsee-types = { path = "../../types", version = "0.15.1" }
jsonrpsee-client-transport = { path = "../transport", version = "0.15.1", features = ["web"] }
jsonrpsee-core = { path = "../../core", version = "0.15.1", features = ["async-wasm-client"] }
[dev-dependencies]
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
......
[package]
name = "jsonrpsee-ws-client"
version = "0.15.0"
version = "0.15.1"
authors = ["Parity Technologies <admin@parity.io>", "Pierre Krieger <pierre.krieger1708@gmail.com>"]
description = "WebSocket client for JSON-RPC"
edition = "2021"
......@@ -10,9 +10,9 @@ homepage = "https://github.com/paritytech/jsonrpsee"
documentation = "https://docs.rs/jsonrpsee-ws-client"
[dependencies]
jsonrpsee-types = { path = "../../types", version = "0.15.0" }
jsonrpsee-client-transport = { path = "../transport", version = "0.15.0", features = ["ws"] }
jsonrpsee-core = { path = "../../core", version = "0.15.0", features = ["async-client"] }
jsonrpsee-types = { path = "../../types", version = "0.15.1" }
jsonrpsee-client-transport = { path = "../transport", version = "0.15.1", features = ["ws"] }
jsonrpsee-core = { path = "../../core", version = "0.15.1", features = ["async-client"] }
http = "0.2.0"
[dev-dependencies]
......
[package]
name = "jsonrpsee-core"
version = "0.15.0"
version = "0.15.1"
authors = ["Parity Technologies <admin@parity.io>"]
description = "Utilities for jsonrpsee"
edition = "2021"
......@@ -11,7 +11,7 @@ anyhow = "1"
async-trait = "0.1"
beef = { version = "0.5.1", features = ["impl_serde"] }
futures-channel = "0.3.14"
jsonrpsee-types = { path = "../types", version = "0.15.0" }
jsonrpsee-types = { path = "../types", version = "0.15.1" }
thiserror = "1"
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1", features = ["raw_value"] }
......@@ -51,6 +51,7 @@ server = [
"lazy_static",
"unicase",
"http",
"hyper",
]
client = ["futures-util/sink", "futures-channel/sink", "futures-channel/std"]
async-client = [
......
......@@ -243,102 +243,104 @@ impl Drop for Client {
#[async_trait]
impl ClientT for Client {
async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error> {
// NOTE: we use this to guard against max number of concurrent requests.
let _req_id = self.id_manager.next_request_id()?;
let notif = NotificationSer::new(method, params);
let trace = RpcTracing::batch();
let _enter = trace.span().enter();
let raw = serde_json::to_string(&notif).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
let mut sender = self.to_back.clone();
let fut = sender.send(FrontToBack::Notification(raw)).in_current_span();
match future::select(fut, Delay::new(self.request_timeout)).await {
Either::Left((Ok(()), _)) => Ok(()),
Either::Left((Err(_), _)) => Err(self.read_error_from_backend().await),
Either::Right((_, _)) => Err(Error::RequestTimeout),
}
}
// NOTE: we use this to guard against max number of concurrent requests.
let _req_id = self.id_manager.next_request_id()?;
let notif = NotificationSer::new(method, params);
let trace = RpcTracing::batch();
async {
let raw = serde_json::to_string(&notif).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
let mut sender = self.to_back.clone();
let fut = sender.send(FrontToBack::Notification(raw));
match future::select(fut, Delay::new(self.request_timeout)).await {
Either::Left((Ok(()), _)) => Ok(()),
Either::Left((Err(_), _)) => Err(self.read_error_from_backend().await),
Either::Right((_, _)) => Err(Error::RequestTimeout),
}
}.instrument(trace.into_span()).await
}
async fn request<'a, R>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<R, Error>
where
R: DeserializeOwned,
{
let (send_back_tx, send_back_rx) = oneshot::channel();
let guard = self.id_manager.next_request_id()?;
let id = guard.inner();
let trace = RpcTracing::method_call(method);
let _enter = trace.span().enter();
let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
if self
.to_back
.clone()
.send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) }))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await;
let json_value = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
rx_log_from_json(&Response::new(&json_value, id), self.max_log_length);
serde_json::from_value(json_value).map_err(Error::ParseError)
}
{
let (send_back_tx, send_back_rx) = oneshot::channel();
let guard = self.id_manager.next_request_id()?;
let id = guard.inner();
let trace = RpcTracing::method_call(method);
async {
let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
if self
.to_back
.clone()
.send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) }))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
let json_value = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
rx_log_from_json(&Response::new(&json_value, id), self.max_log_length);
serde_json::from_value(json_value).map_err(Error::ParseError)
}.instrument(trace.into_span()).await
}
async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option<ParamsSer<'a>>)>) -> Result<Vec<R>, Error>
where
R: DeserializeOwned + Default + Clone,
{
let guard = self.id_manager.next_request_ids(batch.len())?;
let batch_ids: Vec<Id> = guard.inner();
let mut batches = Vec::with_capacity(batch.len());
let log = RpcTracing::batch();
let _enter = log.span().enter();
for (idx, (method, params)) in batch.into_iter().enumerate() {
batches.push(RequestSer::new(&batch_ids[idx], method, params));
}
let (send_back_tx, send_back_rx) = oneshot::channel();
let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
if self
.to_back
.clone()
.send(FrontToBack::Batch(BatchMessage { raw, ids: batch_ids, send_back: send_back_tx }))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await;
let json_values = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
rx_log_from_json(&json_values, self.max_log_length);
let values: Result<_, _> =
json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect();
Ok(values?)
{
let trace = RpcTracing::batch();
async {
let guard = self.id_manager.next_request_ids(batch.len())?;
let batch_ids: Vec<Id> = guard.inner();
let mut batches = Vec::with_capacity(batch.len());
for (idx, (method, params)) in batch.into_iter().enumerate() {
batches.push(RequestSer::new(&batch_ids[idx], method, params));
}
let (send_back_tx, send_back_rx) = oneshot::channel();
let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?;
tx_log_from_str(&raw, self.max_log_length);
if self
.to_back
.clone()
.send(FrontToBack::Batch(BatchMessage { raw, ids: batch_ids, send_back: send_back_tx }))
.await
.is_err()
{
return Err(self.read_error_from_backend().await);
}
let res = call_with_timeout(self.request_timeout, send_back_rx).await;
let json_values = match res {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => return Err(self.read_error_from_backend().await),
};
rx_log_from_json(&json_values, self.max_log_length);
let values: Result<_, _> =
json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect();
Ok(values?)
}.instrument(trace.into_span()).await
}
}
......@@ -356,51 +358,52 @@ impl SubscriptionClientT for Client {
) -> Result<Subscription<N>, Error>
where
N: DeserializeOwned,
{
if subscribe_method == unsubscribe_method {
return Err(Error::SubscriptionNameConflict(unsubscribe_method.to_owned()));
}
let guard = self.id_manager.next_request_ids(2)?;
let mut ids: Vec<Id> = guard.inner();
let trace = RpcTracing::method_call(subscribe_method);
let _enter = trace.span().enter();
let id = ids[0].clone();
let raw = serde_json::to_string<