Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
parity
Mirrored projects
jsonrpsee
Commits
32811d3c
Unverified
Commit
32811d3c
authored
Aug 13, 2021
by
Niklas Adolfsson
Committed by
GitHub
Aug 13, 2021
Browse files
[clients]: remove tokio 0.2 runtime support (#432)
Co-authored-by:
David Palm
<
dvdplm@gmail.com
>
parent
fba533f8
Changes
20
Hide whitespace changes
Inline
Side-by-side
.github/workflows/ci.yml
View file @
32811d3c
...
...
@@ -68,12 +68,6 @@ jobs:
command
:
check
args
:
--all-targets
-
name
:
Cargo check HTTP client with tokio02
uses
:
actions-rs/cargo@v1.0.3
with
:
command
:
check
args
:
--manifest-path http-client/Cargo.toml --no-default-features --features tokio02
-
name
:
Cargo check HTTP client
uses
:
actions-rs/cargo@v1.0.3
with
:
...
...
@@ -92,12 +86,6 @@ jobs:
command
:
check
args
:
--manifest-path ws-client/Cargo.toml
-
name
:
Cargo check WS client with tokio02
uses
:
actions-rs/cargo@v1.0.3
with
:
command
:
check
args
:
--manifest-path ws-client/Cargo.toml --no-default-features --features tokio02
-
name
:
Cargo check WS server
uses
:
actions-rs/cargo@v1.0.3
with
:
...
...
http-client/Cargo.toml
View file @
32811d3c
...
...
@@ -12,26 +12,18 @@ documentation = "https://docs.rs/jsonrpsee-http-client"
[dependencies]
async-trait
=
"0.1"
futures
=
{
version
=
"0.3.14"
,
default-features
=
false
,
features
=
["std"]
}
hyper13-rustls
=
{
package
=
"hyper-rustls"
,
version
=
"0.21"
,
optional
=
true
}
hyper14-rustls
=
{
package
=
"hyper-rustls"
,
version
=
"0.22"
,
optional
=
true
}
hyper14
=
{
package
=
"hyper"
,
version
=
"0.14.10"
,
features
=
[
"client"
,
"http1"
,
"http2"
,
"tcp"
],
optional
=
true
}
hyper13
=
{
package
=
"hyper"
,
version
=
"0.13"
,
optional
=
true
}
hyper-rustls
=
"0.22"
hyper
=
{
version
=
"0.14.10"
,
features
=
[
"client"
,
"http1"
,
"http2"
,
"tcp"
]
}
jsonrpsee-types
=
{
path
=
"../types"
,
version
=
"0.3.0"
}
jsonrpsee-utils
=
{
path
=
"../utils"
,
version
=
"0.3.0"
,
optional
=
true
}
jsonrpsee-utils
=
{
path
=
"../utils"
,
version
=
"0.3.0"
,
features
=
["http-helpers"]
}
log
=
"0.4"
serde
=
{
version
=
"1.0"
,
default-features
=
false
,
features
=
["derive"]
}
serde_json
=
"1.0"
tokioV1
=
{
package
=
"tokio"
,
version
=
"1"
,
features
=
["time"]
,
optional
=
true
}
tokioV02
=
{
package
=
"tokio"
,
version
=
"0.2"
,
features
=
["time"]
,
optional
=
true
}
tokio
=
{
version
=
"1"
,
features
=
["time"]
}
thiserror
=
"1.0"
url
=
"2.2"
fnv
=
"1"
[features]
default
=
["tokio1"]
tokio1
=
[
"hyper14"
,
"hyper14-rustls"
,
"jsonrpsee-utils/hyper_14"
,
"tokioV1"
]
tokio02
=
[
"hyper13"
,
"hyper13-rustls"
,
"jsonrpsee-utils/hyper_13"
,
"tokioV02"
]
[dev-dependencies]
jsonrpsee-test-utils
=
{
path
=
"../test-utils"
}
tokio
V1
=
{
package
=
"tokio"
,
version
=
"1"
,
features
=
[
"net"
,
"rt-multi-thread"
,
"macros"
]
}
tokio
=
{
package
=
"tokio"
,
version
=
"1"
,
features
=
[
"net"
,
"rt-multi-thread"
,
"macros"
]
}
http-client/src/client.rs
View file @
32811d3c
...
...
@@ -91,7 +91,7 @@ impl Client for HttpClient {
async
fn
notification
<
'a
>
(
&
self
,
method
:
&
'a
str
,
params
:
JsonRpcParams
<
'a
>
)
->
Result
<
(),
Error
>
{
let
notif
=
JsonRpcNotificationSer
::
new
(
method
,
params
);
let
fut
=
self
.transport
.send
(
serde_json
::
to_string
(
&
notif
)
.map_err
(
Error
::
ParseError
)
?
);
match
crate
::
tokio
::
timeout
(
self
.request_timeout
,
fut
)
.await
{
match
tokio
::
time
::
timeout
(
self
.request_timeout
,
fut
)
.await
{
Ok
(
Ok
(
ok
))
=>
Ok
(
ok
),
Err
(
_
)
=>
Err
(
Error
::
RequestTimeout
),
Ok
(
Err
(
e
))
=>
Err
(
Error
::
Transport
(
Box
::
new
(
e
))),
...
...
@@ -108,7 +108,7 @@ impl Client for HttpClient {
let
request
=
JsonRpcCallSer
::
new
(
Id
::
Number
(
id
),
method
,
params
);
let
fut
=
self
.transport
.send_and_read_body
(
serde_json
::
to_string
(
&
request
)
.map_err
(
Error
::
ParseError
)
?
);
let
body
=
match
crate
::
tokio
::
timeout
(
self
.request_timeout
,
fut
)
.await
{
let
body
=
match
tokio
::
time
::
timeout
(
self
.request_timeout
,
fut
)
.await
{
Ok
(
Ok
(
body
))
=>
body
,
Err
(
_e
)
=>
return
Err
(
Error
::
RequestTimeout
),
Ok
(
Err
(
e
))
=>
return
Err
(
Error
::
Transport
(
Box
::
new
(
e
))),
...
...
@@ -149,7 +149,7 @@ impl Client for HttpClient {
let
fut
=
self
.transport
.send_and_read_body
(
serde_json
::
to_string
(
&
batch_request
)
.map_err
(
Error
::
ParseError
)
?
);
let
body
=
match
crate
::
tokio
::
timeout
(
self
.request_timeout
,
fut
)
.await
{
let
body
=
match
tokio
::
time
::
timeout
(
self
.request_timeout
,
fut
)
.await
{
Ok
(
Ok
(
body
))
=>
body
,
Err
(
_e
)
=>
return
Err
(
Error
::
RequestTimeout
),
Ok
(
Err
(
e
))
=>
return
Err
(
Error
::
Transport
(
Box
::
new
(
e
))),
...
...
http-client/src/lib.rs
View file @
32811d3c
...
...
@@ -33,51 +33,10 @@
//! It is tightly-coupled to [`tokio`](https://docs.rs/tokio) because [`hyper`](https://docs.rs/hyper) is used as transport client,
//! which is not compatible with other async runtimes such as
//! [`async-std`](https://docs.rs/async-std/), [`smol`](https://docs.rs/smol) and similar.
//!
//! It supports both [`tokio 1.0`](https://docs.rs/tokio/1.2.0/tokio/) and [`tokio 0.2`](https://docs.rs/tokio/0.2.25/tokio/index.html)
//! via [Optional features](#optional-features).
//!
//! # Optional Features
//!
//! `jsonrpsee-http-client` uses the following [feature flags]:
//!
//! - `tokio1`: Enable to use the library with [`tokio 1.0`](https://docs.rs/tokio/1.2.0/tokio/) (mutually exclusive with `tokio02`)
//! - `tokio0.2`: Enable to use the library with [`tokio 0.2`](https://docs.rs/tokio/0.2.25/tokio/index.html) (mutually exclusive with `tokio1`)
//!
//! [feature flags]: https://doc.rust-lang.org/cargo/reference/manifest.html#the-features-section
#[cfg(all(feature
=
"tokio1"
,
feature
=
"tokio02"
))]
compile_error!
(
"feature `tokio1` and `tokio02` are mutably exclusive"
);
#[cfg(not(any(feature
=
"tokio1"
,
feature
=
"tokio02"
)))]
compile_error!
(
"feature `tokio1` or `tokio02` must be enabled for this crate"
);
#[cfg(all(feature
=
"tokio1"
,
not(feature
=
"tokio02"
)))]
extern
crate
hyper14
as
hyper
;
#[cfg(all(feature
=
"tokio1"
,
not(feature
=
"tokio02"
)))]
extern
crate
hyper14_rustls
as
hyper_rustls
;
#[cfg(all(feature
=
"tokio02"
,
not(feature
=
"tokio1"
)))]
extern
crate
hyper13
as
hyper
;
#[cfg(all(feature
=
"tokio02"
,
not(feature
=
"tokio1"
)))]
extern
crate
hyper13_rustls
as
hyper_rustls
;
mod
client
;
mod
transport
;
#[cfg(all(feature
=
"tokio1"
,
not(feature
=
"tokio02"
)))]
mod
tokio
{
pub
(
crate
)
use
tokioV1
::
time
::
timeout
;
#[cfg(test)]
pub
(
crate
)
use
tokioV1
::{
runtime
,
test
};
}
#[cfg(all(feature
=
"tokio02"
,
not(feature
=
"tokio1"
)))]
mod
tokio
{
pub
(
crate
)
use
tokioV02
::
time
::
timeout
;
pub
(
crate
)
use
tokioV02
::
time
::
Elapsed
;
}
#[cfg(test)]
mod
tests
;
...
...
http-client/src/tests.rs
View file @
32811d3c
...
...
@@ -32,7 +32,7 @@ use crate::types::{
},
Error
,
JsonValue
,
};
use
crate
::
{
tokio
,
HttpClientBuilder
}
;
use
crate
::
HttpClientBuilder
;
use
jsonrpsee_test_utils
::
helpers
::
*
;
use
jsonrpsee_test_utils
::
types
::
Id
;
use
jsonrpsee_test_utils
::
TimeoutFutureExt
;
...
...
http-client/src/transport.rs
View file @
32811d3c
...
...
@@ -9,7 +9,7 @@
use
crate
::
types
::
error
::
GenericTransportError
;
use
hyper
::
client
::{
Client
,
HttpConnector
};
use
hyper_rustls
::
HttpsConnector
;
use
jsonrpsee_utils
::
h
yper
_helpers
;
use
jsonrpsee_utils
::
h
ttp
_helpers
;
use
thiserror
::
Error
;
const
CONTENT_TYPE_JSON
:
&
str
=
"application/json"
;
...
...
@@ -30,10 +30,7 @@ impl HttpTransportClient {
pub
(
crate
)
fn
new
(
target
:
impl
AsRef
<
str
>
,
max_request_body_size
:
u32
)
->
Result
<
Self
,
Error
>
{
let
target
=
url
::
Url
::
parse
(
target
.as_ref
())
.map_err
(|
e
|
Error
::
Url
(
format!
(
"Invalid URL: {}"
,
e
)))
?
;
if
target
.scheme
()
==
"http"
||
target
.scheme
()
==
"https"
{
#[cfg(feature
=
"tokio1"
)]
let
connector
=
HttpsConnector
::
with_native_roots
();
#[cfg(feature
=
"tokio02"
)]
let
connector
=
HttpsConnector
::
new
();
let
client
=
Client
::
builder
()
.build
::
<
_
,
hyper
::
Body
>
(
connector
);
Ok
(
HttpTransportClient
{
target
,
client
,
max_request_body_size
})
}
else
{
...
...
@@ -66,7 +63,7 @@ impl HttpTransportClient {
pub
(
crate
)
async
fn
send_and_read_body
(
&
self
,
body
:
String
)
->
Result
<
Vec
<
u8
>
,
Error
>
{
let
response
=
self
.inner_send
(
body
)
.await
?
;
let
(
parts
,
body
)
=
response
.into_parts
();
let
body
=
h
yper
_helpers
::
read_response_to_body
(
&
parts
.headers
,
body
,
self
.max_request_body_size
)
.await
?
;
let
body
=
h
ttp
_helpers
::
read_response_to_body
(
&
parts
.headers
,
body
,
self
.max_request_body_size
)
.await
?
;
Ok
(
body
)
}
...
...
@@ -115,7 +112,6 @@ where
#[cfg(test)]
mod
tests
{
use
super
::{
Error
,
HttpTransportClient
};
use
crate
::
tokio
;
#[test]
fn
invalid_http_url_rejected
()
{
...
...
http-server/Cargo.toml
View file @
32811d3c
...
...
@@ -15,7 +15,7 @@ hyper = { version = "0.14.10", features = ["server", "http1", "http2", "tcp"] }
futures-channel
=
"0.3.14"
futures-util
=
{
version
=
"0.3.14"
,
default-features
=
false
}
jsonrpsee-types
=
{
path
=
"../types"
,
version
=
"0.3.0"
}
jsonrpsee-utils
=
{
path
=
"../utils"
,
version
=
"0.3.0"
,
features
=
[
"server"
,
"h
yper_14
"
]
}
jsonrpsee-utils
=
{
path
=
"../utils"
,
version
=
"0.3.0"
,
features
=
[
"server"
,
"h
ttp-helpers
"
]
}
globset
=
"0.4"
lazy_static
=
"1.4"
log
=
"0.4"
...
...
http-server/src/access_control/mod.rs
View file @
32811d3c
...
...
@@ -34,7 +34,7 @@ pub(crate) use cors::{AccessControlAllowHeaders, AccessControlAllowOrigin};
use
hosts
::{
AllowHosts
,
Host
};
use
hyper
::
header
;
use
jsonrpsee_utils
::
h
yper
_helpers
;
use
jsonrpsee_utils
::
h
ttp
_helpers
;
/// Define access on control on HTTP layer.
#[derive(Clone,
Debug)]
...
...
@@ -49,14 +49,14 @@ pub struct AccessControl {
impl
AccessControl
{
/// Validate incoming request by http HOST
pub
fn
deny_host
(
&
self
,
request
:
&
hyper
::
Request
<
hyper
::
Body
>
)
->
bool
{
!
hosts
::
is_host_valid
(
h
yper
_helpers
::
read_header_value
(
request
.headers
(),
"host"
),
&
self
.allow_hosts
)
!
hosts
::
is_host_valid
(
h
ttp
_helpers
::
read_header_value
(
request
.headers
(),
"host"
),
&
self
.allow_hosts
)
}
/// Validate incoming request by CORS origin
pub
fn
deny_cors_origin
(
&
self
,
request
:
&
hyper
::
Request
<
hyper
::
Body
>
)
->
bool
{
let
header
=
cors
::
get_cors_allow_origin
(
h
yper
_helpers
::
read_header_value
(
request
.headers
(),
"origin"
),
h
yper
_helpers
::
read_header_value
(
request
.headers
(),
"host"
),
h
ttp
_helpers
::
read_header_value
(
request
.headers
(),
"origin"
),
h
ttp
_helpers
::
read_header_value
(
request
.headers
(),
"host"
),
&
self
.cors_allow_origin
,
)
.map
(|
origin
|
{
...
...
@@ -75,7 +75,7 @@ impl AccessControl {
/// Validate incoming request by CORS header
pub
fn
deny_cors_header
(
&
self
,
request
:
&
hyper
::
Request
<
hyper
::
Body
>
)
->
bool
{
let
headers
=
request
.headers
()
.keys
()
.map
(|
name
|
name
.as_str
());
let
requested_headers
=
h
yper
_helpers
::
read_header_values
(
request
.headers
(),
"access-control-request-headers"
)
let
requested_headers
=
h
ttp
_helpers
::
read_header_values
(
request
.headers
(),
"access-control-request-headers"
)
.filter_map
(|
val
|
val
.to_str
()
.ok
())
.flat_map
(|
val
|
val
.split
(
", "
))
.flat_map
(|
val
|
val
.split
(
','
));
...
...
http-server/src/server.rs
View file @
32811d3c
...
...
@@ -41,7 +41,7 @@ use jsonrpsee_types::{
},
TEN_MB_SIZE_BYTES
,
};
use
jsonrpsee_utils
::
h
yper
_helpers
::
read_response_to_body
;
use
jsonrpsee_utils
::
h
ttp
_helpers
::
read_response_to_body
;
use
jsonrpsee_utils
::
server
::{
helpers
::{
collect_batch_response
,
prepare_error
,
send_error
},
rpc_module
::
Methods
,
...
...
utils/Cargo.toml
View file @
32811d3c
...
...
@@ -11,8 +11,7 @@ beef = { version = "0.5.1", features = ["impl_serde"] }
thiserror
=
{
version
=
"1"
,
optional
=
true
}
futures-channel
=
{
version
=
"0.3.14"
,
default-features
=
false
,
optional
=
true
}
futures-util
=
{
version
=
"0.3.14"
,
default-features
=
false
,
optional
=
true
}
hyper13
=
{
package
=
"hyper"
,
version
=
"0.13"
,
default-features
=
false
,
features
=
["stream"]
,
optional
=
true
}
hyper14
=
{
package
=
"hyper"
,
version
=
"0.14.10"
,
default-features
=
false
,
features
=
["stream"]
,
optional
=
true
}
hyper
=
{
version
=
"0.14.10"
,
default-features
=
false
,
features
=
["stream"]
,
optional
=
true
}
jsonrpsee-types
=
{
path
=
"../types"
,
version
=
"0.3.0"
,
optional
=
true
}
log
=
{
version
=
"0.4"
,
optional
=
true
}
rustc-hash
=
{
version
=
"1"
,
optional
=
true
}
...
...
@@ -23,8 +22,7 @@ parking_lot = { version = "0.11", optional = true }
[features]
default
=
[]
hyper_13
=
[
"hyper13"
,
"futures-util"
,
"jsonrpsee-types"
]
hyper_14
=
[
"hyper14"
,
"futures-util"
,
"jsonrpsee-types"
]
http-helpers
=
[
"hyper"
,
"futures-util"
,
"jsonrpsee-types"
]
server
=
[
"thiserror"
,
"futures-channel"
,
...
...
utils/src/h
yper
_helpers.rs
→
utils/src/h
ttp
_helpers.rs
View file @
32811d3c
File moved
utils/src/lib.rs
View file @
32811d3c
...
...
@@ -2,18 +2,9 @@
#![warn(missing_docs,
missing_debug_implementations,
unreachable_pub)]
#[cfg(all(feature
=
"hyper13"
,
feature
=
"hyper14"
))]
compile_error!
(
"feature `hyper13` and `hyper14` are mutably exclusive"
);
#[cfg(all(feature
=
"hyper13"
,
not(feature
=
"hyper14"
)))]
extern
crate
hyper13
as
hyper
;
#[cfg(all(feature
=
"hyper14"
,
not(feature
=
"hyper13"
)))]
extern
crate
hyper14
as
hyper
;
/// Shared hyper helpers.
#[cfg(
any(
feature
=
"h
yper13"
,
feature
=
"hyper14"
)
)]
pub
mod
h
yper
_helpers
;
#[cfg(feature
=
"h
ttp-helpers"
)]
pub
mod
h
ttp
_helpers
;
/// Shared code for JSON-RPC servers.
#[cfg(feature
=
"server"
)]
...
...
ws-client/Cargo.toml
View file @
32811d3c
...
...
@@ -10,15 +10,9 @@ homepage = "https://github.com/paritytech/jsonrpsee"
documentation
=
"https://docs.rs/jsonrpsee-ws-client"
[dependencies]
# Tokio v1 deps
tokioV1
=
{
package
=
"tokio"
,
version
=
"1"
,
features
=
[
"net"
,
"time"
,
"rt-multi-thread"
,
"macros"
],
optional
=
true
}
tokioV1-rustls
=
{
package
=
"tokio-rustls"
,
version
=
"0.22"
,
optional
=
true
}
tokioV1-util
=
{
package
=
"tokio-util"
,
version
=
"0.6"
,
features
=
["compat"]
,
optional
=
true
}
# Tokio v0.2 deps
tokioV02
=
{
package
=
"tokio"
,
version
=
"0.2"
,
features
=
[
"net"
,
"time"
,
"rt-threaded"
,
"sync"
,
"macros"
],
optional
=
true
}
tokioV02-rustls
=
{
package
=
"tokio-rustls"
,
version
=
"0.15"
,
optional
=
true
}
tokioV02-util
=
{
package
=
"tokio-util"
,
version
=
"0.3"
,
features
=
["compat"]
,
optional
=
true
}
tokio
=
{
version
=
"1"
,
features
=
[
"net"
,
"time"
,
"rt-multi-thread"
,
"macros"
]
}
tokio-rustls
=
"0.22"
tokio-util
=
{
version
=
"0.6"
,
features
=
["compat"]
}
async-trait
=
"0.1"
fnv
=
"1"
...
...
@@ -37,8 +31,3 @@ rustls-native-certs = "0.5.0"
[dev-dependencies]
env_logger
=
"0.9"
jsonrpsee-test-utils
=
{
path
=
"../test-utils"
}
[features]
default
=
["tokio1"]
tokio1
=
[
"tokioV1"
,
"tokioV1-rustls"
,
"tokioV1-util"
]
tokio02
=
[
"tokioV02"
,
"tokioV02-rustls"
,
"tokioV02-util"
]
ws-client/src/client.rs
View file @
32811d3c
...
...
@@ -24,7 +24,6 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use
crate
::
tokio
::
Mutex
;
use
crate
::
transport
::{
Receiver
as
WsReceiver
,
Sender
as
WsSender
,
Target
,
WsTransportClientBuilder
};
use
crate
::
types
::{
traits
::{
Client
,
SubscriptionClient
},
...
...
@@ -52,6 +51,7 @@ use futures::{
prelude
::
*
,
sink
::
SinkExt
,
};
use
tokio
::
sync
::
Mutex
;
use
jsonrpsee_types
::
v2
::
params
::
JsonRpcSubscriptionParams
;
use
jsonrpsee_types
::
SubscriptionKind
;
...
...
@@ -276,7 +276,7 @@ impl<'a> WsClientBuilder<'a> {
let
(
sender
,
receiver
)
=
builder
.build
()
.await
.map_err
(|
e
|
Error
::
Transport
(
Box
::
new
(
e
)))
?
;
crate
::
tokio
::
spawn
(
async
move
{
tokio
::
spawn
(
async
move
{
background_task
(
sender
,
receiver
,
from_front
,
err_tx
,
max_capacity_per_subscription
)
.await
;
});
Ok
(
WsClient
{
...
...
@@ -319,9 +319,9 @@ impl Client for WsClient {
let
mut
sender
=
self
.to_back
.clone
();
let
fut
=
sender
.send
(
FrontToBack
::
Notification
(
raw
));
let
timeout
=
crate
::
tokio
::
sleep
(
self
.request_timeout
);
let
timeout
=
tokio
::
time
::
sleep
(
self
.request_timeout
);
let
res
=
crate
::
tokio
::
select!
{
let
res
=
tokio
::
select!
{
x
=
fut
=>
x
,
_
=
timeout
=>
return
Err
(
Error
::
RequestTimeout
)
};
...
...
ws-client/src/helpers.rs
View file @
32811d3c
...
...
@@ -198,8 +198,8 @@ pub async fn call_with_timeout<T>(
timeout
:
Duration
,
rx
:
oneshot
::
Receiver
<
Result
<
T
,
Error
>>
,
)
->
Result
<
Result
<
T
,
Error
>
,
oneshot
::
Canceled
>
{
let
timeout
=
crate
::
tokio
::
sleep
(
timeout
);
crate
::
tokio
::
select!
{
let
timeout
=
tokio
::
time
::
sleep
(
timeout
);
tokio
::
select!
{
res
=
rx
=>
res
,
_
=
timeout
=>
Ok
(
Err
(
Error
::
RequestTimeout
))
}
...
...
ws-client/src/lib.rs
View file @
32811d3c
...
...
@@ -7,7 +7,6 @@
//! ## Runtime support
//!
//! This library uses `tokio` as the runtime and does not support other kinds of runtimes.
//! Tokio versions v1 and v0.2 are supported behind `tokioV1` and `tokioV02` feature flags correspondingly.
/// WebSocket Client.
pub
mod
client
;
...
...
@@ -20,9 +19,6 @@ pub mod stream;
/// WebSocket transport.
pub
mod
transport
;
/// Compatibility layer to support both `tokio` 0.2 and 1.x versions.
mod
tokio
;
#[cfg(test)]
mod
tests
;
...
...
ws-client/src/stream.rs
View file @
32811d3c
...
...
@@ -26,13 +26,13 @@
//! Convenience wrapper for a stream (AsyncRead + AsyncWrite) which can either be plain TCP or TLS.
use
crate
::
tokio
::{
TokioAsyncReadCompatExt
,
TokioAsyncWriteCompatExt
};
use
futures
::{
io
::{
IoSlice
,
IoSliceMut
},
prelude
::
*
,
};
use
pin_project
::
pin_project
;
use
std
::{
io
::
Error
as
IoError
,
pin
::
Pin
,
task
::
Context
,
task
::
Poll
};
use
tokio_util
::
compat
::{
TokioAsyncReadCompatExt
,
TokioAsyncWriteCompatExt
};
/// Stream to represent either a unencrypted or encrypted socket stream.
#[pin_project(project
=
EitherStreamProj)]
...
...
ws-client/src/tests.rs
View file @
32811d3c
...
...
@@ -39,8 +39,6 @@ use jsonrpsee_test_utils::types::{Id, WebSocketTestServer};
use
jsonrpsee_test_utils
::
TimeoutFutureExt
;
use
serde_json
::
Value
as
JsonValue
;
use
crate
::
tokio
;
#[tokio::test]
async
fn
method_call_works
()
{
let
result
=
run_request_with_response
(
ok_response
(
"hello"
.into
(),
Id
::
Num
(
0
)))
...
...
ws-client/src/tokio.rs
deleted
100644 → 0
View file @
fba533f8
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Compatibility layer for supporting both tokio v0.2 and v1.
// Check that either v1 or v0.2 feature is enabled.
#[cfg(not(any(feature
=
"tokio1"
,
feature
=
"tokio02"
)))]
compile_error!
(
"Either `tokio1` or `tokio02` feature must be enabled"
);
// Also check that only *one* of them is enabled.
#[cfg(all(feature
=
"tokio1"
,
feature
=
"tokio02"
))]
compile_error!
(
"feature `tokio1` and `tokio02` are mutually exclusive"
);
pub
(
crate
)
use
tokio_impl
::
*
;
#[cfg(feature
=
"tokio1"
)]
mod
tokio_impl
{
// Required for `tokio::test` to work correctly.
#[cfg(test)]
pub
(
crate
)
use
tokioV1
::{
runtime
,
test
};
pub
(
crate
)
use
tokioV1
::{
net
::
TcpStream
,
spawn
,
sync
::
Mutex
};
pub
(
crate
)
use
tokioV1_rustls
::{
client
::
TlsStream
,
webpki
::{
DNSNameRef
,
InvalidDNSNameError
},
TlsConnector
,
};
pub
(
crate
)
use
tokioV1_util
::
compat
::{
TokioAsyncReadCompatExt
,
TokioAsyncWriteCompatExt
};
pub
(
crate
)
use
tokioV1
::
time
::
sleep
;
pub
(
crate
)
use
tokioV1
::
select
;
}
// Note that we check for `not(feature = "tokio1")` here, but not above.
// This is required so that in case of both features enabled, `tokio_impl`
// will only be defined once. This way, the only error user will get is
// the compile error about features being mutually exclusive, which will
// provide better DevEx.
#[cfg(all(feature
=
"tokio02"
,
not(feature
=
"tokio1"
)))]
mod
tokio_impl
{
// Required for `tokio::test` to work correctly.
#[cfg(test)]
pub
(
crate
)
use
tokioV02
::{
runtime
,
test
};
pub
(
crate
)
use
tokioV02
::{
net
::
TcpStream
,
spawn
,
sync
::
Mutex
};
pub
(
crate
)
use
tokioV02_rustls
::{
client
::
TlsStream
,
webpki
::{
DNSNameRef
,
InvalidDNSNameError
},
TlsConnector
,
};
pub
(
crate
)
use
tokioV02_util
::
compat
::{
Tokio02AsyncReadCompatExt
as
TokioAsyncReadCompatExt
,
Tokio02AsyncWriteCompatExt
as
TokioAsyncWriteCompatExt
,
};
// In 0.2 `tokio::time::sleep` had different name.
pub
(
crate
)
use
tokioV02
::
time
::
delay_for
as
sleep
;
pub
(
crate
)
use
tokioV02
::
select
;
}
ws-client/src/transport.rs
View file @
32811d3c
...
...
@@ -24,13 +24,18 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use
crate
::
tokio
::{
TcpStream
,
TlsStream
};
use
futures
::
io
::{
BufReader
,
BufWriter
};
use
futures
::
prelude
::
*
;
use
soketto
::
connection
;
use
soketto
::
handshake
::
client
::{
Client
as
WsRawClient
,
ServerResponse
};
use
std
::{
borrow
::
Cow
,
io
,
net
::
SocketAddr
,
sync
::
Arc
,
time
::
Duration
};
use
thiserror
::
Error
;
use
tokio
::
net
::
TcpStream
;
use
tokio_rustls
::{
client
::
TlsStream
,
webpki
::{
DNSNameRef
,
InvalidDNSNameError
},
TlsConnector
,
};
type
TlsOrPlain
=
crate
::
stream
::
EitherStream
<
TcpStream
,
TlsStream
<
TcpStream
>>
;
...
...
@@ -105,7 +110,7 @@ pub enum WsHandshakeError {
/// Invalid DNS name error for TLS
#[error(
"Invalid DNS name: {}"
,
0
)]
InvalidDnsName
(
#[source]
crate
::
tokio
::
InvalidDNSNameError
),
InvalidDnsName
(
#[source]
InvalidDNSNameError
),
/// RawServer rejected our handshake.
#[error(
"Connection rejected with status code: {}"
,
status_code)]
...
...
@@ -192,12 +197,12 @@ impl<'a> WsTransportClientBuilder<'a> {
async
fn
try_connect
(
&
self
,
sockaddr
:
SocketAddr
,
tls_connector
:
&
Option
<
crate
::
tokio
::
TlsConnector
>
,
tls_connector
:
&
Option
<
TlsConnector
>
,
)
->
Result
<
(
Sender
,
Receiver
),
WsHandshakeError
>
{
// Try establish the TCP connection.
let
tcp_stream
=
{
let
socket
=
TcpStream
::
connect
(
sockaddr
);
let
timeout
=
crate
::
tokio
::
sleep
(
self
.timeout
);
let
timeout
=
tokio
::
time
::
sleep
(
self
.timeout
);
futures
::
pin_mut!
(
socket
,
timeout
);
match
future
::
select
(
socket
,
timeout
)
.await
{
future
::
Either
::
Left
((
socket
,
_
))
=>
{
...
...
@@ -208,7 +213,7 @@ impl<'a> WsTransportClientBuilder<'a> {
match
tls_connector
{
None
=>
TlsOrPlain
::
Plain
(
socket
),
Some
(
connector
)
=>
{
let
dns_name
=
crate
::
tokio
::
DNSNameRef
::
try_from_ascii_str
(
&
self
.target.host
)
?
;
let
dns_name
=
DNSNameRef
::
try_from_ascii_str
(
&
self
.target.host
)
?
;
let
tls_stream
=
connector
.connect
(
dns_name
,
socket
)
.await
?
;
TlsOrPlain
::
Tls
(
tls_stream
)
}
...
...
@@ -251,8 +256,8 @@ impl From<io::Error> for WsHandshakeError {
}
}
impl
From
<
crate
::
tokio
::
InvalidDNSNameError
>
for
WsHandshakeError
{
fn
from
(
err
:
crate
::
tokio
::
InvalidDNSNameError
)
->
WsHandshakeError
{
impl
From
<
InvalidDNSNameError
>
for
WsHandshakeError
{
fn
from
(
err
:
InvalidDNSNameError
)
->
WsHandshakeError
{
WsHandshakeError
::
InvalidDnsName
(
err
)
}
}
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment