Unverified Commit 20e6e5de authored by Niklas Adolfsson's avatar Niklas Adolfsson Committed by GitHub
Browse files

feat: WASM client via web-sys transport (#648)



* feat: untested web-sys transport

* rewrite me

* make it work

* add hacks and works :)

* add subscription test too

* revert StdError change; still works

* cleanup

* remove hacks

* more wasm tests outside workspace

* kill mutually exclusive features

* merge nits

* remove unsafe hack

* fix nit

* core: fix features and deps

* ci: add WASM test

* test again

* work work

* comeon

* work work

* revert unintentional change

* Update core/Cargo.toml

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>

* Update core/src/client/async_client/mod.rs

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>

* revert needless change: std hashmap + fxhashmap works

* cleanup

* extract try_connect_until fn

* remove todo

* fix bad merge

* add wasm client wrapper crate

* fix nits

* use gloo-net dependency

* fix build

* grumbles CI: rename to `wasm_tests`

* fix bad merge

* fix grumbles

* fix nit

* comeon CI

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>
parent 9fa817d9
Pipeline #189393 passed with stages
in 5 minutes and 3 seconds
......@@ -12,6 +12,11 @@ on:
branches:
- master
env:
CARGO_TERM_COLOR: always
# Download a RPC server to run wasm tests against.
SUBSTRATE_URL: https://releases.parity.io/substrate/x86_64-debian:stretch/latest/substrate/substrate
jobs:
check-style:
name: Check style
......@@ -149,3 +154,27 @@ jobs:
- name: Cargo nextest
run: cargo nextest run --workspace
wasm_tests:
name: Test wasm
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- name: Install
run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
- name: Download Substrate
run: |
curl $SUBSTRATE_URL --output substrate --location
chmod +x substrate
mkdir -p ~/.local/bin
mv substrate ~/.local/bin
- name: Run WASM tests
run: |
substrate --dev --tmp > /dev/null 2>&1 &
wasm-pack test --headless --firefox
wasm-pack test --headless --chrome
pkill substrate
working-directory: wasm-tests
......@@ -12,6 +12,8 @@ members = [
"client/ws-client",
"client/http-client",
"client/transport",
"client/wasm-client",
"proc-macros",
"wasm-tests",
]
resolver = "2"
......@@ -12,9 +12,13 @@ documentation = "https://docs.rs/jsonrpsee-ws-client"
[dependencies]
jsonrpsee-types = { path = "../../types", version = "0.10.1", optional = true }
jsonrpsee-core = { path = "../../core", version = "0.10.1", features = ["client"] }
tracing = { version = "0.1", optional = true }
tracing = "0.1"
# optional
thiserror = { version = "1", optional = true }
futures = { version = "0.3.14", default-features = false, features = ["std"], optional = true }
anyhow = { version = "1", optional = true }
futures-channel = { version = "0.3.14", default-features = false, optional = true }
futures-util = { version = "0.3.14", default-features = false, features = ["alloc"], optional = true }
http = { version = "0.2", optional = true }
tokio-util = { version = "0.7", features = ["compat"], optional = true }
tokio = { version = "1", features = ["net", "time", "macros"], optional = true }
......@@ -22,14 +26,20 @@ pin-project = { version = "1", optional = true }
rustls-native-certs = { version = "0.6", optional = true }
webpki-roots = { version = "0.22", optional = true }
tokio-rustls = { version = "0.23", optional = true }
futures-timer = { version = "3", optional = true }
# ws
soketto = { version = "0.7.1", optional = true }
# web-sys
wasm-bindgen = { version = "0.2.69", optional = true }
wasm-bindgen-futures = { version = "0.4.19", optional = true }
gloo-net = { version = "0.1.0", default-features = false, features = ["json", "websocket"], optional = true }
[features]
tls = ["tokio-rustls", "webpki-roots", "rustls-native-certs"]
ws = [
"futures",
"futures-util",
"http",
"tokio",
"tokio-util",
......@@ -37,5 +47,14 @@ ws = [
"pin-project",
"jsonrpsee-types",
"thiserror",
"tracing"
]
web = [
"wasm-bindgen",
"wasm-bindgen-futures",
"gloo-net",
"futures-channel",
"futures-timer",
"futures-util",
"anyhow",
"thiserror",
]
......@@ -32,3 +32,7 @@
/// Websocket transport
#[cfg(feature = "ws")]
pub mod ws;
/// Websocket transport via web-sys.
#[cfg(all(feature = "web", target_arch = "wasm32"))]
pub mod web;
use core::fmt;
use futures_channel::mpsc;
use futures_util::sink::SinkExt;
use futures_util::stream::{SplitSink, SplitStream, StreamExt};
use gloo_net::websocket::{futures::WebSocket, Message, WebSocketError};
use jsonrpsee_core::async_trait;
use jsonrpsee_core::client::{TransportReceiverT, TransportSenderT};
/// Web-sys transport error that can occur.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Internal send error
#[error("Could not send message: {0}")]
SendError(#[from] mpsc::SendError),
/// Sender went away
#[error("Sender went away couldn't receive the message")]
SenderDisconnected,
/// Error that occurred in `JS context`.
#[error("JS Error: {0:?}")]
Js(String),
/// WebSocket error
#[error("WebSocket Error: {0:?}")]
WebSocket(WebSocketError),
}
/// Sender.
pub struct Sender(SplitSink<WebSocket, Message>);
impl fmt::Debug for Sender {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Sender").finish()
}
}
/// Receiver.
pub struct Receiver(SplitStream<WebSocket>);
impl fmt::Debug for Receiver {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Receiver").finish()
}
}
#[async_trait(?Send)]
impl TransportSenderT for Sender {
type Error = Error;
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
tracing::trace!("tx: {:?}", msg);
self.0.send(Message::Text(msg)).await.map_err(|e| Error::WebSocket(e))?;
Ok(())
}
async fn close(&mut self) -> Result<(), Error> {
Ok(())
}
}
#[async_trait(?Send)]
impl TransportReceiverT for Receiver {
type Error = Error;
async fn receive(&mut self) -> Result<String, Self::Error> {
match self.0.next().await {
Some(Ok(msg)) => {
tracing::trace!("rx: {:?}", msg);
let txt = match msg {
Message::Bytes(bytes) => String::from_utf8(bytes).expect("WebSocket message is valid utf8; qed"),
Message::Text(txt) => txt,
};
Ok(txt)
}
Some(Err(err)) => Err(Error::WebSocket(err)),
None => Err(Error::SenderDisconnected),
}
}
}
/// Create a transport sender & receiver pair.
pub async fn connect(url: impl AsRef<str>) -> Result<(Sender, Receiver), Error> {
let websocket = WebSocket::open(url.as_ref()).map_err(|e| Error::Js(e.to_string()))?;
let (write, read) = websocket.split();
Ok((Sender(write), Receiver(read)))
}
......@@ -30,7 +30,7 @@ use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;
use futures::io::{BufReader, BufWriter};
use futures_util::io::{BufReader, BufWriter};
use jsonrpsee_core::client::{CertificateStore, TransportReceiverT, TransportSenderT};
use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use jsonrpsee_core::{async_trait, Cow};
......@@ -188,7 +188,7 @@ impl TransportSenderT for Sender {
/// Sends out a request. Returns a `Future` that finishes when the request has been
/// successfully sent.
async fn send(&mut self, body: String) -> Result<(), WsError> {
async fn send(&mut self, body: String) -> Result<(), Self::Error> {
tracing::debug!("send: {}", body);
self.inner.send_text(body).await?;
self.inner.flush().await?;
......@@ -206,7 +206,7 @@ impl TransportReceiverT for Receiver {
type Error = WsError;
/// Returns a `Future` resolving when the server sent us something back.
async fn receive(&mut self) -> Result<String, WsError> {
async fn receive(&mut self) -> Result<String, Self::Error> {
let mut message = Vec::new();
self.inner.receive_data(&mut message).await?;
let s = String::from_utf8(message).expect("Found invalid UTF-8");
......
......@@ -31,8 +31,8 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use futures::io::{IoSlice, IoSliceMut};
use futures::prelude::*;
use futures_util::io::{IoSlice, IoSliceMut};
use futures_util::*;
use pin_project::pin_project;
use tokio::net::TcpStream;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
......@@ -53,13 +53,13 @@ impl AsyncRead for EitherStream {
match self.project() {
EitherStreamProj::Plain(s) => {
let compat = s.compat();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncRead::poll_read(compat, cx, buf)
}
#[cfg(feature = "tls")]
EitherStreamProj::Tls(t) => {
let compat = t.compat();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncRead::poll_read(compat, cx, buf)
}
}
......@@ -73,13 +73,13 @@ impl AsyncRead for EitherStream {
match self.project() {
EitherStreamProj::Plain(s) => {
let compat = s.compat();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncRead::poll_read_vectored(compat, cx, bufs)
}
#[cfg(feature = "tls")]
EitherStreamProj::Tls(t) => {
let compat = t.compat();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncRead::poll_read_vectored(compat, cx, bufs)
}
}
......@@ -91,13 +91,13 @@ impl AsyncWrite for EitherStream {
match self.project() {
EitherStreamProj::Plain(s) => {
let compat = s.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_write(compat, cx, buf)
}
#[cfg(feature = "tls")]
EitherStreamProj::Tls(t) => {
let compat = t.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_write(compat, cx, buf)
}
}
......@@ -107,13 +107,13 @@ impl AsyncWrite for EitherStream {
match self.project() {
EitherStreamProj::Plain(s) => {
let compat = s.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_write_vectored(compat, cx, bufs)
}
#[cfg(feature = "tls")]
EitherStreamProj::Tls(t) => {
let compat = t.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_write_vectored(compat, cx, bufs)
}
}
......@@ -123,13 +123,13 @@ impl AsyncWrite for EitherStream {
match self.project() {
EitherStreamProj::Plain(s) => {
let compat = s.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_flush(compat, cx)
}
#[cfg(feature = "tls")]
EitherStreamProj::Tls(t) => {
let compat = t.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_flush(compat, cx)
}
}
......@@ -139,13 +139,13 @@ impl AsyncWrite for EitherStream {
match self.project() {
EitherStreamProj::Plain(s) => {
let compat = s.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_close(compat, cx)
}
#[cfg(feature = "tls")]
EitherStreamProj::Tls(t) => {
let compat = t.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_close(compat, cx)
}
}
......
[package]
name = "jsonrpsee-wasm-client"
version = "0.10.1"
authors = ["Parity Technologies <admin@parity.io>", "Pierre Krieger <pierre.krieger1708@gmail.com>"]
description = "WASM client for JSON-RPC"
edition = "2021"
license = "MIT"
repository = "https://github.com/paritytech/jsonrpsee"
homepage = "https://github.com/paritytech/jsonrpsee"
documentation = "https://docs.rs/jsonrpsee-ws-client"
[dependencies]
jsonrpsee-types = { path = "../../types", version = "0.10.1" }
jsonrpsee-client-transport = { path = "../transport", version = "0.10.1", features = ["web"] }
jsonrpsee-core = { path = "../../core", version = "0.10.1", features = ["async-wasm-client"] }
futures-channel = "0.3"
[dev-dependencies]
env_logger = "0.9"
jsonrpsee-test-utils = { path = "../../test-utils" }
tokio = { version = "1", features = ["macros"] }
serde_json = "1"
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
#![warn(missing_debug_implementations, missing_docs, unreachable_pub)]
#![cfg(target_arch = "wasm32")]
//! # jsonrpsee-wasm-client
pub use jsonrpsee_core::client::Client;
pub use jsonrpsee_types as types;
use std::time::Duration;
use jsonrpsee_client_transport::web;
use jsonrpsee_core::client::{ClientBuilder, IdKind};
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
/// Builder for [`Client`].
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_wasm_client::WasmClientBuilder;
///
/// #[tokio::main]
/// async fn main() {
/// // build client
/// let client = WasmClientBuilder::default()
/// .build("wss://localhost:443")
/// .await
/// .unwrap();
///
/// // use client....
/// }
///
/// ```
#[derive(Clone, Debug)]
pub struct WasmClientBuilder {
max_request_body_size: u32,
request_timeout: Duration,
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
id_kind: IdKind,
}
impl Default for WasmClientBuilder {
fn default() -> Self {
Self {
max_request_body_size: TEN_MB_SIZE_BYTES,
request_timeout: Duration::from_secs(60),
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,
id_kind: IdKind::Number,
}
}
}
impl WasmClientBuilder {
/// Max request body size.
pub fn max_request_body_size(mut self, size: u32) -> Self {
self.max_request_body_size = size;
self
}
/// See documentation [`ClientBuilder::request_timeout`] (default is 60 seconds).
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
/// See documentation [`ClientBuilder::max_concurrent_requests`] (default is 256).
pub fn max_concurrent_requests(mut self, max: usize) -> Self {
self.max_concurrent_requests = max;
self
}
/// See documentation [`ClientBuilder::max_notifs_per_subscription`] (default is 1024).
pub fn max_notifs_per_subscription(mut self, max: usize) -> Self {
self.max_notifs_per_subscription = max;
self
}
/// See documentation for [`ClientBuilder::id_format`] (default is Number).
pub fn id_format(mut self, kind: IdKind) -> Self {
self.id_kind = kind;
self
}
/// Build the client with specified URL to connect to.
pub async fn build(self, url: impl AsRef<str>) -> Result<Client, Error> {
let (sender, receiver) = web::connect(url).await.map_err(|e| Error::Transport(e.into()))?;
let builder = ClientBuilder::default();
Ok(builder.build_with_wasm(sender, receiver))
}
}
......@@ -174,6 +174,6 @@ impl<'a> WsClientBuilder<'a> {
.request_timeout(self.request_timeout)
.max_concurrent_requests(self.max_concurrent_requests)
.id_format(self.id_kind)
.build(sender, receiver))
.build_with_tokio(sender, receiver))
}
}
......@@ -8,44 +8,61 @@ license = "MIT"
[dependencies]
anyhow = "1"
arrayvec = "0.7.1"
async-trait = "0.1"
beef = { version = "0.5.1", features = ["impl_serde"] }
futures-channel = "0.3.14"
jsonrpsee-types = { path = "../types", version = "0.10.1" }
thiserror = "1"
futures-channel = { version = "0.3.14", default-features = false }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1", features = ["raw_value"] }
# optional deps
arrayvec = { version = "0.7.1", optional = true }
async-channel = { version = "1.6", optional = true }
async-lock = { version = "2.4", optional = true }
futures-util = { version = "0.3.14", default-features = false, optional = true }
hyper = { version = "0.14.10", default-features = false, features = ["stream"] }
jsonrpsee-types = { path = "../types", version = "0.10.1"}
hyper = { version = "0.14.10", default-features = false, features = ["stream"], optional = true }
tracing = { version = "0.1", optional = true }
rustc-hash = { version = "1", optional = true }
rand = { version = "0.8", optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1", features = ["raw_value"] }
soketto = "0.7.1"
soketto = { version = "0.7.1", optional = true }
parking_lot = { version = "0.12", optional = true }
tokio = { version = "1.8", optional = true }
wasm-bindgen-futures = { version = "0.4.19", optional = true }
futures-timer = { version = "3", optional = true }
[features]
default = []
http-helpers = ["futures-util"]
http-helpers = ["hyper", "futures-util"]
server = [
"futures-util",
"rustc-hash",
"arrayvec",
"async-channel",
"futures-util/alloc",
"rustc-hash/std",
"tracing",
"parking_lot",
"rand",
"tokio/rt",
"tokio/sync",
]
client = ["futures-util"]
client = ["futures-util/sink", "futures-channel/sink", "futures-channel/std"]
async-client = [
"async-lock",
"client",
"rustc-hash",
"tokio/macros",
"tokio/rt",
"tokio/sync",
"tokio/time",
"tracing"
"tracing",
"futures-timer",
]
async-wasm-client = [
"async-lock",
"client",
"wasm-bindgen-futures",
"rustc-hash/std",
"futures-timer/wasm-bindgen",
"tracing",
]
[dev-dependencies]
......
......@@ -24,13 +24,14 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::time::Duration;
use crate::client::async_client::manager::{RequestManager, RequestStatus};
use crate::client::{RequestMessage, TransportSenderT};
use crate::Error;
use futures_channel::{mpsc, oneshot};
use futures_channel::mpsc;
use futures_timer::Delay;
use futures_util::future::{self, Either};
use jsonrpsee_types::error::CallError;
use jsonrpsee_types::response::SubscriptionError;
use jsonrpsee_types::{
......@@ -253,12 +254,11 @@ pub(crate) fn process_error_response(manager: &mut RequestManager, err: ErrorRes
/// Wait for a stream to complete within the given timeout.
pub(crate) async fn call_with_timeout<T>(
timeout: Duration,
rx: oneshot::Receiver<Result<T, Error>>,
) -> Result<Result<T, Error>, oneshot::Canceled> {
let timeout = tokio::time::sleep(timeout);
tokio::select! {
res = rx => res,
_ = timeout => Ok(Err(Error::RequestTimeout))
timeout: std::time::Duration,
rx: futures_channel::oneshot::Receiver<Result<T, Error>>,
) -> Result<Result<T, Error>, futures_channel::oneshot::Canceled> {
match future::select(rx, Delay::new(timeout)).await {
Either::Left((res, _)) => res,
Either::Right((_, _)) => Ok(Err(Error::RequestTimeout)),
}
}