Unverified Commit 292bd88a authored by Niklas Adolfsson's avatar Niklas Adolfsson Committed by GitHub
Browse files

extract async client abstraction. (#580)



* extract ws client to async client

* bring back jsonrpsee-ws-client crate

* new crate core client

* add missing file

* jsonrpsee crate: add core client

* rexport core client

* more re-exports

* downgrade trait bounds

* update version

* fix nits

* send close reason

* Update types/src/traits.rs

* move 'TEN_MB_CONST' to core

* Update client/core-client/Cargo.toml

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>

* Update client/core-client/src/lib.rs

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>

* cargo fmt

* Update client/ws-client/src/lib.rs

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>

* Update client/ws-client/src/lib.rs

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>

* move `async-client` core

The `async-client` is hidden behind a new feature flag `async-client`
because it brings in additional dependecies such as tokio rt.

* fix docs

* add example how to use "core client"

* fix build

* Update http-server/Cargo.toml

* Update client/transport/Cargo.toml

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>
parent e159c449
......@@ -2,15 +2,16 @@
members = [
"examples",
"benches",
"http-client",
"http-server",
"test-utils",
"jsonrpsee",
"tests",
"types",
"core",
"ws-client",
"ws-server",
"client/ws-client",
"client/http-client",
"client/transport",
"proc-macros",
]
resolver = "2"
[package]
name = "jsonrpsee-benchmarks"
version = "0.6.0"
version = "0.6.1"
authors = ["Parity Technologies <admin@parity.io>"]
description = "Benchmarks for jsonrpsee"
edition = "2018"
......
......@@ -3,7 +3,7 @@ use std::sync::Arc;
use criterion::*;
use futures_util::future::join_all;
use helpers::{SUB_METHOD_NAME, UNSUB_METHOD_NAME};
use jsonrpsee::core::client::{Client, SubscriptionClient};
use jsonrpsee::core::client::{ClientT, SubscriptionClientT};
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::types::{Id, ParamsSer, RequestSer};
use jsonrpsee::ws_client::WsClientBuilder;
......@@ -132,7 +132,13 @@ impl RequestBencher for AsyncBencher {
const REQUEST_TYPE: RequestType = RequestType::Async;
}
fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Client>, name: &str, request: RequestType) {
fn run_round_trip(
rt: &TokioRuntime,
crit: &mut Criterion,
client: Arc<impl ClientT>,
name: &str,
request: RequestType,
) {
crit.bench_function(&request.group_name(name), |b| {
b.to_async(rt).iter(|| async {
black_box(client.request::<String>(request.method_name(), None).await.unwrap());
......@@ -140,7 +146,7 @@ fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Clie
});
}
fn run_sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl SubscriptionClient>, name: &str) {
fn run_sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl SubscriptionClientT>, name: &str) {
let mut group = crit.benchmark_group(name);
group.bench_function("subscribe", |b| {
b.to_async(rt).iter_with_large_drop(|| async {
......@@ -188,7 +194,7 @@ fn run_sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl
fn run_round_trip_with_batch(
rt: &TokioRuntime,
crit: &mut Criterion,
client: Arc<impl Client>,
client: Arc<impl ClientT>,
name: &str,
request: RequestType,
) {
......@@ -203,7 +209,7 @@ fn run_round_trip_with_batch(
group.finish();
}
fn run_concurrent_round_trip<C: 'static + Client + Send + Sync>(
fn run_concurrent_round_trip<C: 'static + ClientT + Send + Sync>(
rt: &TokioRuntime,
crit: &mut Criterion,
client: Arc<C>,
......
[package]
name = "jsonrpsee-http-client"
version = "0.6.0"
version = "0.6.1"
authors = ["Parity Technologies <admin@parity.io>", "Pierre Krieger <pierre.krieger1708@gmail.com>"]
description = "HTTP client for JSON-RPC"
edition = "2018"
......@@ -14,8 +14,8 @@ async-trait = "0.1"
rustc-hash = "1"
hyper = { version = "0.14.10", features = ["client", "http1", "http2", "tcp"] }
hyper-rustls = { version = "0.23", optional = true }
jsonrpsee-types = { path = "../types", version = "0.6.0" }
jsonrpsee-core = { path = "../core", version = "0.6.0", features = ["client", "http-helpers"] }
jsonrpsee-types = { path = "../../types", version = "0.6.1" }
jsonrpsee-core = { path = "../../core", version = "0.6.1", features = ["client", "http-helpers"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
......@@ -23,7 +23,7 @@ tokio = { version = "1.8", features = ["time"] }
tracing = "0.1"
[dev-dependencies]
jsonrpsee-test-utils = { path = "../test-utils" }
jsonrpsee-test-utils = { path = "../../test-utils" }
tokio = { version = "1.8", features = ["net", "rt-multi-thread", "macros"] }
[features]
......
......@@ -28,10 +28,10 @@ use std::sync::Arc;
use std::time::Duration;
use crate::transport::HttpTransportClient;
use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response, TEN_MB_SIZE_BYTES};
use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response};
use async_trait::async_trait;
use jsonrpsee_core::client::{CertificateStore, Client, RequestIdManager, Subscription, SubscriptionClient};
use jsonrpsee_core::Error;
use jsonrpsee_core::client::{CertificateStore, ClientT, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use rustc_hash::FxHashMap;
use serde::de::DeserializeOwned;
......@@ -104,7 +104,7 @@ pub struct HttpClient {
}
#[async_trait]
impl Client for HttpClient {
impl ClientT for HttpClient {
async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error> {
let notif = NotificationSer::new(method, params);
let fut = self.transport.send(serde_json::to_string(&notif).map_err(Error::ParseError)?);
......@@ -196,7 +196,7 @@ impl Client for HttpClient {
}
#[async_trait]
impl SubscriptionClient for HttpClient {
impl SubscriptionClientT for HttpClient {
/// Send a subscription request to the server. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`].
async fn subscribe<'a, N>(
&self,
......
......@@ -27,7 +27,7 @@
use crate::types::error::{ErrorCode, ErrorObject, ErrorResponse};
use crate::types::ParamsSer;
use crate::HttpClientBuilder;
use jsonrpsee_core::client::Client;
use jsonrpsee_core::client::ClientT;
use jsonrpsee_core::rpc_params;
use jsonrpsee_core::Error;
use jsonrpsee_test_utils::helpers::*;
......
[package]
name = "jsonrpsee-client-transport"
version = "0.6.1"
authors = ["Parity Technologies <admin@parity.io>", "Pierre Krieger <pierre.krieger1708@gmail.com>"]
description = "WebSocket client for JSON-RPC"
edition = "2018"
license = "MIT"
repository = "https://github.com/paritytech/jsonrpsee"
homepage = "https://github.com/paritytech/jsonrpsee"
documentation = "https://docs.rs/jsonrpsee-ws-client"
[dependencies]
jsonrpsee-types = { path = "../../types", version = "0.6.1", optional = true }
jsonrpsee-core = { path = "../../core", version = "0.6.1", features = ["client"] }
tracing = { version = "0.1", optional = true }
thiserror = { version = "1", optional = true }
futures = { version = "0.3.14", default-features = false, features = ["std"], optional = true }
http = { version = "0.2", optional = true }
tokio-util = { version = "0.6", features = ["compat"], optional = true }
tokio = { version = "1", features = ["net", "time", "macros"], optional = true }
pin-project = { version = "1", optional = true }
rustls-native-certs = { version = "0.6", optional = true }
webpki-roots = { version = "0.22", optional = true }
tokio-rustls = { version = "0.23", optional = true }
# ws
soketto = { version = "0.7.1", optional = true }
[features]
tls = ["tokio-rustls", "webpki-roots", "rustls-native-certs"]
ws = [
"futures",
"http",
"tokio",
"tokio-util",
"soketto",
"pin-project",
"jsonrpsee-types",
"thiserror",
"tracing"
]
......@@ -26,27 +26,9 @@
#![warn(missing_debug_implementations, missing_docs, unreachable_pub)]
//! # jsonrpsee-ws-client
//! # jsonrpsee-client-transports
//!
//! `jsonrpsee-ws-client` is a [JSON RPC](https://www.jsonrpc.org/specification) WebSocket client library that's is built for `async/await`.
//!
//! ## Runtime support
//!
//! This library uses `tokio` as the runtime and does not support other kinds of runtimes.
/// WebSocket Client.
pub mod client;
/// Helpers.
pub mod helpers;
/// Request manager.
pub mod manager;
/// Stream.
pub mod stream;
/// WebSocket transport.
pub mod transport;
#[cfg(test)]
mod tests;
pub use client::{WsClient, WsClientBuilder};
pub use jsonrpsee_types as types;
/// Websocket transport
#[cfg(feature = "ws")]
pub mod ws;
......@@ -24,21 +24,26 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
mod stream;
use std::convert::{TryFrom, TryInto};
use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;
use crate::stream::EitherStream;
use beef::Cow;
use futures::io::{BufReader, BufWriter};
use http::Uri;
use jsonrpsee_core::client::CertificateStore;
use jsonrpsee_core::client::{CertificateStore, TransportReceiverT, TransportSenderT};
use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use jsonrpsee_core::{async_trait, Cow};
use soketto::connection;
use soketto::handshake::client::{Client as WsHandshakeClient, Header, ServerResponse};
use soketto::handshake::client::{Client as WsHandshakeClient, ServerResponse};
use stream::EitherStream;
use thiserror::Error;
use tokio::net::TcpStream;
pub use http::{uri::InvalidUri, Uri};
pub use soketto::handshake::client::Header;
/// Sending end of WebSocket transport.
#[derive(Debug)]
pub struct Sender {
......@@ -56,10 +61,8 @@ pub struct Receiver {
pub struct WsTransportClientBuilder<'a> {
/// What certificate store to use
pub certificate_store: CertificateStore,
/// Remote WebSocket target.
pub target: Target,
/// Timeout for the connection.
pub timeout: Duration,
pub connection_timeout: Duration,
/// Custom headers to pass during the HTTP handshake. If `None`, no
/// custom header is passed.
pub headers: Vec<Header<'a>>,
......@@ -69,6 +72,53 @@ pub struct WsTransportClientBuilder<'a> {
pub max_redirections: usize,
}
impl<'a> Default for WsTransportClientBuilder<'a> {
fn default() -> Self {
Self {
certificate_store: CertificateStore::Native,
max_request_body_size: TEN_MB_SIZE_BYTES,
connection_timeout: Duration::from_secs(10),
headers: Vec::new(),
max_redirections: 5,
}
}
}
impl<'a> WsTransportClientBuilder<'a> {
/// Set whether to use system certificates (default is native).
pub fn certificate_store(mut self, certificate_store: CertificateStore) -> Self {
self.certificate_store = certificate_store;
self
}
/// Set max request body size (default is 10 MB).
pub fn max_request_body_size(mut self, size: u32) -> Self {
self.max_request_body_size = size;
self
}
/// Set connection timeout for the handshake (default is 10 seconds).
pub fn connection_timeout(mut self, timeout: Duration) -> Self {
self.connection_timeout = timeout;
self
}
/// Set a custom header passed to the server during the handshake (default is none).
///
/// The caller is responsible for checking that the headers do not conflict or are duplicated.
pub fn add_header(mut self, name: &'a str, value: &'a str) -> Self {
self.headers.push(Header { name, value: value.as_bytes() });
self
}
/// Set the max number of redirections to perform until a connection is regarded as failed.
/// (default is 5).
pub fn max_redirections(mut self, redirect: usize) -> Self {
self.max_redirections = redirect;
self
}
}
/// Stream mode, either plain TCP or TLS.
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Mode {
......@@ -131,16 +181,15 @@ pub enum WsError {
/// Error in the WebSocket connection.
#[error("WebSocket connection error: {}", 0)]
Connection(#[source] soketto::connection::Error),
/// Failed to parse the message in JSON.
#[error("Failed to parse message in JSON: {}", 0)]
ParseError(#[source] serde_json::error::Error),
}
impl Sender {
#[async_trait]
impl TransportSenderT for Sender {
type Error = WsError;
/// Sends out a request. Returns a `Future` that finishes when the request has been
/// successfully sent.
pub async fn send(&mut self, body: String) -> Result<(), WsError> {
async fn send(&mut self, body: String) -> Result<(), WsError> {
tracing::debug!("send: {}", body);
self.inner.send_text(body).await?;
self.inner.flush().await?;
......@@ -148,33 +197,37 @@ impl Sender {
}
/// Send a close message and close the connection.
pub async fn close(&mut self) -> Result<(), WsError> {
async fn close(&mut self) -> Result<(), WsError> {
self.inner.close().await.map_err(Into::into)
}
}
impl Receiver {
#[async_trait]
impl TransportReceiverT for Receiver {
type Error = WsError;
/// Returns a `Future` resolving when the server sent us something back.
pub async fn next_response(&mut self) -> Result<Vec<u8>, WsError> {
async fn receive(&mut self) -> Result<String, WsError> {
let mut message = Vec::new();
self.inner.receive_data(&mut message).await?;
Ok(message)
let s = String::from_utf8(message).expect("Found invalid UTF-8");
Ok(s)
}
}
impl<'a> WsTransportClientBuilder<'a> {
/// Try to establish the connection.
pub async fn build(self) -> Result<(Sender, Receiver), WsHandshakeError> {
self.try_connect().await
pub async fn build(self, uri: Uri) -> Result<(Sender, Receiver), WsHandshakeError> {
let target: Target = uri.try_into()?;
self.try_connect(target).await
}
async fn try_connect(self) -> Result<(Sender, Receiver), WsHandshakeError> {
let mut target = self.target;
async fn try_connect(self, mut target: Target) -> Result<(Sender, Receiver), WsHandshakeError> {
let mut err = None;
// Only build TLS connector if `wss` in URL.
#[cfg(feature = "tls")]
let mut connector = match target.mode {
let mut connector = match target._mode {
Mode::Tls => Some(build_tls_config(&self.certificate_store)?),
Mode::Plain => None,
};
......@@ -186,7 +239,7 @@ impl<'a> WsTransportClientBuilder<'a> {
let sockaddrs = std::mem::take(&mut target.sockaddrs);
for sockaddr in &sockaddrs {
#[cfg(feature = "tls")]
let tcp_stream = match connect(*sockaddr, self.timeout, &target.host, connector.as_ref()).await {
let tcp_stream = match connect(*sockaddr, self.connection_timeout, &target.host, connector.as_ref()).await {
Ok(stream) => stream,
Err(e) => {
tracing::debug!("Failed to connect to sockaddr: {:?}", sockaddr);
......@@ -196,7 +249,7 @@ impl<'a> WsTransportClientBuilder<'a> {
};
#[cfg(not(feature = "tls"))]
let tcp_stream = match connect(*sockaddr, self.timeout).await {
let tcp_stream = match connect(*sockaddr, self.connection_timeout).await {
Ok(stream) => stream,
Err(e) => {
tracing::debug!("Failed to connect to sockaddr: {:?}", sockaddr);
......@@ -238,7 +291,7 @@ impl<'a> WsTransportClientBuilder<'a> {
// Only build TLS connector if `wss` in redirection URL.
#[cfg(feature = "tls")]
match target.mode {
match target._mode {
Mode::Tls if connector.is_none() => {
connector = Some(build_tls_config(&self.certificate_store)?);
}
......@@ -369,7 +422,7 @@ pub struct Target {
/// The Host request header specifies the host and port number of the server to which the request is being sent.
host_header: String,
/// WebSocket stream mode, see [`Mode`] for further documentation.
mode: Mode,
_mode: Mode,
/// The path and query parts from an URL.
path_and_query: String,
}
......@@ -378,7 +431,7 @@ impl TryFrom<Uri> for Target {
type Error = WsHandshakeError;
fn try_from(uri: Uri) -> Result<Self, Self::Error> {
let mode = match uri.scheme_str() {
let _mode = match uri.scheme_str() {
Some("ws") => Mode::Plain,
#[cfg(feature = "tls")]
Some("wss") => Mode::Tls,
......@@ -398,7 +451,13 @@ impl TryFrom<Uri> for Target {
let parts = uri.into_parts();
let path_and_query = parts.path_and_query.ok_or_else(|| WsHandshakeError::Url("No path in URL".into()))?;
let sockaddrs = host_header.to_socket_addrs().map_err(WsHandshakeError::ResolutionFailed)?;
Ok(Self { sockaddrs: sockaddrs.collect(), host, host_header, mode, path_and_query: path_and_query.to_string() })
Ok(Self {
sockaddrs: sockaddrs.collect(),
host,
host_header,
_mode,
path_and_query: path_and_query.to_string(),
})
}
}
......@@ -451,7 +510,7 @@ mod tests {
fn assert_ws_target(target: Target, host: &str, host_header: &str, mode: Mode, path_and_query: &str) {
assert_eq!(&target.host, host);
assert_eq!(&target.host_header, host_header);
assert_eq!(target.mode, mode);
assert_eq!(target._mode, mode);
assert_eq!(&target.path_and_query, path_and_query);
}
......
......@@ -40,7 +40,7 @@ use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
/// Stream to represent either a unencrypted or encrypted socket stream.
#[pin_project(project = EitherStreamProj)]
#[derive(Debug)]
pub enum EitherStream {
pub(crate) enum EitherStream {
/// Unencrypted socket stream.
Plain(#[pin] TcpStream),
/// Encrypted socket stream.
......
[package]
name = "jsonrpsee-ws-client"
version = "0.6.0"
version = "0.6.1"
authors = ["Parity Technologies <admin@parity.io>", "Pierre Krieger <pierre.krieger1708@gmail.com>"]
description = "WebSocket client for JSON-RPC"
edition = "2018"
......@@ -10,30 +10,12 @@ homepage = "https://github.com/paritytech/jsonrpsee"
documentation = "https://docs.rs/jsonrpsee-ws-client"
[dependencies]
async-trait = "0.1"
beef = "0.5.1"
rustc-hash = "1"
futures = { version = "0.3.14", default-features = false, features = ["std"] }
http = "0.2"
jsonrpsee-types = { path = "../types", version = "0.6.0" }
jsonrpsee-core = { path = "../core", features = ["client"] }
pin-project = "1"
rustls-native-certs = "0.6.0"
serde = "1"
serde_json = "1"
soketto = "0.7.1"
thiserror = "1"
tokio = { version = "1.8", features = ["net", "time", "rt-multi-thread", "macros"] }
tokio-rustls = { version = "0.23", optional = true }
tokio-util = { version = "0.6", features = ["compat"] }
tracing = "0.1"
webpki-roots = "0.22.0"
jsonrpsee-types = { path = "../../types", version = "0.6.1" }
jsonrpsee-client-transport = { path = "../transport", version = "0.6.1", features = ["ws"] }
jsonrpsee-core = { path = "../../core", version = "0.6.1", features = ["async-client"] }
[dev-dependencies]
env_logger = "0.9.0"
jsonrpsee-test-utils = { path = "../test-utils" }
tokio = { version = "1.8", features = ["macros"] }
[features]
default = ["tls"]
tls = ["tokio-rustls"]
env_logger = "0.9"
jsonrpsee-test-utils = { path = "../../test-utils" }
tokio = { version = "1", features = ["macros"] }
serde_json = "1"
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
#![warn(missing_debug_implementations, missing_docs, unreachable_pub)]
//! # jsonrpsee-ws-client
//!
//! `jsonrpsee-ws-client` is a [JSON RPC](https://www.jsonrpc.org/specification) WebSocket client library that's is built for `async/await`.
//!
//! ## Async runtime support
//!
//! This library uses `tokio` as the runtime and does not support other runtimes.
#[cfg(test)]
mod tests;
pub use jsonrpsee_types as types;
use std::time::Duration;
use jsonrpsee_client_transport::ws::{Header, InvalidUri, Uri, WsTransportClientBuilder};
use jsonrpsee_core::client::{CertificateStore, Client, ClientBuilder};
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
/// Builder for [`Client`].
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_ws_client::WsClientBuilder;
///
/// #[tokio::main]
/// async fn main() {
/// // build client
/// let client = WsClientBuilder::default()
/// .add_header("Any-Header-You-Like", "42")
/// .build("wss://localhost:443")
/// .await
/// .unwrap();
///
/// // use client....
/// }
///
/// ```
#[derive(Clone, Debug)]
pub struct WsClientBuilder<'a> {
certificate_store: CertificateStore,
max_request_body_size: u32,
request_timeout: Duration,
connection_timeout: Duration,
headers: Vec<Header<'a>>,
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
max_redirections: usize,
}
impl<'a> Default for WsClientBuilder<'a> {
fn default() -> Self {
Self {
certificate_store: CertificateStore::Native,
max_request_body_size: TEN_MB_SIZE_BYTES,
request_timeout: Duration::from_secs(60),
connection_timeout: Duration::from_secs(10),
headers: Vec::new(),
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,