Commit 4427f2a6 authored by Niklas Adolfsson's avatar Niklas Adolfsson
Browse files

Merge remote-tracking branch 'origin/master' into poc_middleware_layer_v4

parents de1276d3 8622facb
......@@ -42,11 +42,11 @@ jobs:
command: fmt
args: --all -- --check
- name: Cargo clippy
uses: actions-rs/cargo@v1.0.3
- name: Check clippy
uses: actions-rs/clippy-check@v1
with:
command: clippy
args: --all-targets
token: ${{ secrets.GITHUB_TOKEN }}
args: --all-features
check-docs:
name: Check rustdoc
......
use std::sync::Arc;
use crate::helpers::{ws_handshake, KIB};
use criterion::*;
use futures_util::future::{join_all, FutureExt};
use futures_util::stream::FuturesUnordered;
use helpers::{http_client, ws_client, SUB_METHOD_NAME, UNSUB_METHOD_NAME};
use jsonrpsee::core::client::{ClientT, SubscriptionClientT};
use jsonrpsee::http_client::HeaderMap;
use jsonrpsee::types::{Id, ParamsSer, RequestSer};
use pprof::criterion::{Output, PProfProfiler};
use tokio::runtime::Runtime as TokioRuntime;
......@@ -85,10 +87,11 @@ trait RequestBencher {
fn http_benches(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (url, _server) = rt.block_on(helpers::http_server(rt.handle().clone()));
let client = Arc::new(http_client(&url));
let client = Arc::new(http_client(&url, HeaderMap::new()));
round_trip(&rt, crit, client.clone(), "http_round_trip", Self::REQUEST_TYPE);
http_concurrent_conn_calls(&rt, crit, &url, "http_concurrent_conn_calls", Self::REQUEST_TYPE);
batch_round_trip(&rt, crit, client, "http_batch_requests", Self::REQUEST_TYPE);
http_custom_headers_round_trip(&rt, crit, &url, "http_custom_headers_round_trip", Self::REQUEST_TYPE);
}
fn websocket_benches(crit: &mut Criterion) {
......@@ -99,6 +102,7 @@ trait RequestBencher {
ws_concurrent_conn_calls(&rt, crit, &url, "ws_concurrent_conn_calls", Self::REQUEST_TYPE);
ws_concurrent_conn_subs(&rt, crit, &url, "ws_concurrent_conn_subs", Self::REQUEST_TYPE);
batch_round_trip(&rt, crit, client, "ws_batch_requests", Self::REQUEST_TYPE);
ws_custom_headers_handshake(&rt, crit, &url, "ws_custom_headers_handshake", Self::REQUEST_TYPE);
}
fn subscriptions(crit: &mut Criterion) {
......@@ -293,7 +297,7 @@ fn http_concurrent_conn_calls(rt: &TokioRuntime, crit: &mut Criterion, url: &str
for conns in [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024] {
group.bench_function(format!("{}", conns), |b| {
b.to_async(rt).iter_with_setup(
|| (0..conns).map(|_| http_client(url)),
|| (0..conns).map(|_| http_client(url, HeaderMap::new())),
|clients| async {
let tasks = clients.map(|client| {
rt.spawn(async move {
......@@ -307,3 +311,48 @@ fn http_concurrent_conn_calls(rt: &TokioRuntime, crit: &mut Criterion, url: &str
}
group.finish();
}
/// Bench `round_trip` with different header sizes.
fn http_custom_headers_round_trip(
rt: &TokioRuntime,
crit: &mut Criterion,
url: &str,
name: &str,
request: RequestType,
) {
let method_name = request.methods()[0];
for header_size in [0, KIB, 5 * KIB, 25 * KIB, 100 * KIB] {
let mut headers = HeaderMap::new();
if header_size != 0 {
headers.insert("key", "A".repeat(header_size).parse().unwrap());
}
let client = Arc::new(http_client(url, headers));
let bench_name = format!("{}/{}kb", name, header_size / KIB);
crit.bench_function(&request.group_name(&bench_name), |b| {
b.to_async(rt).iter(|| async {
black_box(client.request::<String>(method_name, None).await.unwrap());
})
});
}
}
/// Bench WS handshake with different header sizes.
fn ws_custom_headers_handshake(rt: &TokioRuntime, crit: &mut Criterion, url: &str, name: &str, request: RequestType) {
let mut group = crit.benchmark_group(request.group_name(name));
for header_size in [0, KIB, 2 * KIB, 4 * KIB] {
group.bench_function(format!("{}kb", header_size / KIB), |b| {
b.to_async(rt).iter(|| async move {
let mut headers = HeaderMap::new();
if header_size != 0 {
headers.insert("key", "A".repeat(header_size).parse().unwrap());
}
ws_handshake(url, headers).await;
})
});
}
group.finish();
}
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use jsonrpsee::client_transport::ws::{Uri, WsTransportClientBuilder};
use jsonrpsee::http_client::{HeaderMap, HttpClient, HttpClientBuilder};
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
pub(crate) const SYNC_FAST_CALL: &str = "fast_call";
......@@ -13,6 +14,9 @@ pub(crate) const UNSUB_METHOD_NAME: &str = "unsub";
pub(crate) const SYNC_METHODS: [&str; 3] = [SYNC_FAST_CALL, SYNC_MEM_CALL, SYNC_SLOW_CALL];
pub(crate) const ASYNC_METHODS: [&str; 3] = [SYNC_FAST_CALL, SYNC_MEM_CALL, SYNC_SLOW_CALL];
// 1 KiB = 1024 bytes
pub(crate) const KIB: usize = 1024;
/// Run jsonrpc HTTP server for benchmarks.
#[cfg(feature = "jsonrpc-crate")]
pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_http_server::Server) {
......@@ -160,9 +164,9 @@ fn gen_rpc_module() -> jsonrpsee::RpcModule<()> {
module.register_method(SYNC_FAST_CALL, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_FAST_CALL, |_, _| async { Ok("lo") }).unwrap();
module.register_method(SYNC_MEM_CALL, |_, _| Ok("A".repeat(1 * 1024 * 1024))).unwrap();
module.register_method(SYNC_MEM_CALL, |_, _| Ok("A".repeat(1024 * 1024))).unwrap();
module.register_async_method(ASYNC_MEM_CALL, |_, _| async move { Ok("A".repeat(1 * 1024 * 1024)) }).unwrap();
module.register_async_method(ASYNC_MEM_CALL, |_, _| async move { Ok("A".repeat(1024 * 1024)) }).unwrap();
module
.register_method(SYNC_SLOW_CALL, |_, _| {
......@@ -181,10 +185,11 @@ fn gen_rpc_module() -> jsonrpsee::RpcModule<()> {
module
}
pub(crate) fn http_client(url: &str) -> HttpClient {
pub(crate) fn http_client(url: &str, headers: HeaderMap) -> HttpClient {
HttpClientBuilder::default()
.max_request_body_size(u32::MAX)
.max_concurrent_requests(1024 * 1024)
.set_headers(headers)
.build(url)
.unwrap()
}
......@@ -197,3 +202,8 @@ pub(crate) async fn ws_client(url: &str) -> WsClient {
.await
.unwrap()
}
pub(crate) async fn ws_handshake(url: &str, headers: HeaderMap) {
let uri: Uri = url.parse().unwrap();
WsTransportClientBuilder::default().max_request_body_size(u32::MAX).set_headers(headers).build(uri).await.unwrap();
}
......@@ -118,7 +118,9 @@ impl HttpTransportClient {
}
let mut req = hyper::Request::post(&self.target);
req.headers_mut().map(|headers| *headers = self.headers.clone());
if let Some(headers) = req.headers_mut() {
*headers = self.headers.clone();
}
let req = req.body(From::from(body)).expect("URI and request headers are valid; qed");
let response = self.client.request(req).await.map_err(|e| Error::Http(Box::new(e)))?;
......
......@@ -40,6 +40,7 @@ use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
/// Stream to represent either a unencrypted or encrypted socket stream.
#[pin_project(project = EitherStreamProj)]
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum EitherStream {
/// Unencrypted socket stream.
Plain(#[pin] TcpStream),
......
......@@ -337,9 +337,7 @@ impl ClientT for Client {
rx_log_from_json(&json_values, self.max_log_length);
let values: Result<_, _> =
json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect();
Ok(values?)
json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect()
}.instrument(trace.into_span()).await
}
}
......
......@@ -146,9 +146,9 @@ impl Error {
}
}
impl Into<ErrorObjectOwned> for Error {
fn into(self) -> ErrorObjectOwned {
match self {
impl From<Error> for ErrorObjectOwned {
fn from(err: Error) -> Self {
match err {
Error::Call(CallError::Custom(err)) => err,
Error::Call(CallError::InvalidParams(e)) => {
ErrorObject::owned(INVALID_PARAMS_CODE, e.to_string(), None::<()>)
......@@ -156,7 +156,7 @@ impl Into<ErrorObjectOwned> for Error {
Error::Call(CallError::Failed(e)) => {
ErrorObject::owned(CALL_EXECUTION_FAILED_CODE, e.to_string(), None::<()>)
}
_ => ErrorObject::owned(UNKNOWN_ERROR_CODE, self.to_string(), None::<()>),
_ => ErrorObject::owned(UNKNOWN_ERROR_CODE, err.to_string(), None::<()>),
}
}
}
......@@ -173,18 +173,18 @@ pub enum SubscriptionClosed {
Failed(ErrorObject<'static>),
}
impl Into<ErrorObjectOwned> for SubscriptionClosed {
fn into(self) -> ErrorObjectOwned {
match self {
Self::RemotePeerAborted => {
impl From<SubscriptionClosed> for ErrorObjectOwned {
fn from(err: SubscriptionClosed) -> Self {
match err {
SubscriptionClosed::RemotePeerAborted => {
ErrorObject::owned(SUBSCRIPTION_CLOSED, "Subscription was closed by the remote peer", None::<()>)
}
Self::Success => ErrorObject::owned(
SubscriptionClosed::Success => ErrorObject::owned(
SUBSCRIPTION_CLOSED,
"Subscription was completed by the server successfully",
None::<()>,
),
Self::Failed(err) => err,
SubscriptionClosed::Failed(err) => err,
}
}
}
......
......@@ -147,7 +147,8 @@ where
type Instant = (A::Instant, B::Instant);
fn on_connect(&self, remote_addr: std::net::SocketAddr, headers: &Headers) {
(self.0.on_connect(remote_addr, headers), self.1.on_connect(remote_addr, headers));
self.0.on_connect(remote_addr, headers);
self.1.on_connect(remote_addr, headers);
}
fn on_request(&self) -> Self::Instant {
......@@ -170,7 +171,8 @@ where
}
fn on_disconnect(&self, remote_addr: std::net::SocketAddr) {
(self.0.on_disconnect(remote_addr), self.1.on_disconnect(remote_addr));
self.0.on_disconnect(remote_addr);
self.1.on_disconnect(remote_addr);
}
}
......
......@@ -128,6 +128,7 @@ impl ops::Deref for Origin {
/// Origins allowed to access
#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(clippy::large_enum_variant)]
pub enum AllowOrigin {
/// Specific origin.
Origin(Origin),
......
......@@ -186,22 +186,22 @@ mod tests {
#[test]
fn should_reject_if_header_not_on_the_list() {
assert!((AllowHosts::Only(vec![].into())).verify("parity.io").is_err());
assert!((AllowHosts::Only(vec![])).verify("parity.io").is_err());
}
#[test]
fn should_accept_if_on_the_list() {
assert!((AllowHosts::Only(vec!["parity.io".into()].into())).verify("parity.io").is_ok());
assert!((AllowHosts::Only(vec!["parity.io".into()])).verify("parity.io").is_ok());
}
#[test]
fn should_accept_if_on_the_list_with_port() {
assert!((AllowHosts::Only(vec!["parity.io:443".into()].into())).verify("parity.io:443").is_ok());
assert!((AllowHosts::Only(vec!["parity.io".into()].into())).verify("parity.io:443").is_err());
assert!((AllowHosts::Only(vec!["parity.io:443".into()])).verify("parity.io:443").is_ok());
assert!((AllowHosts::Only(vec!["parity.io".into()])).verify("parity.io:443").is_err());
}
#[test]
fn should_support_wildcards() {
assert!((AllowHosts::Only(vec!["*.web3.site:*".into()].into())).verify("parity.web3.site:8180").is_ok());
assert!((AllowHosts::Only(vec!["*.web3.site:*".into()])).verify("parity.web3.site:8180").is_ok());
}
}
......@@ -1038,7 +1038,7 @@ impl SubscriptionSink {
fn is_active_subscription(&self) -> bool {
match self.unsubscribe.as_ref() {
Some(unsubscribe) => !unsubscribe.has_changed().is_err(),
Some(unsubscribe) => unsubscribe.has_changed().is_ok(),
_ => false,
}
}
......
......@@ -136,7 +136,7 @@ async fn main() -> anyhow::Result<()> {
println!("response: {:?}", response);
let _response: Result<String, _> = client.request("unknown_method", None).await;
let _ = client.request::<String>("say_hello", None).await?;
let _ = client.request::<()>("thready", rpc_params![4]).await?;
client.request::<()>("thready", rpc_params![4]).await?;
Ok(())
}
......
......@@ -74,12 +74,9 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let stream = IntervalStream::new(interval).map(move |_| item);
tokio::spawn(async move {
match sink.pipe_from_stream(stream).await {
SubscriptionClosed::Failed(err) => {
sink.close(err);
}
_ => (),
};
if let SubscriptionClosed::Failed(err) = sink.pipe_from_stream(stream).await {
sink.close(err);
}
});
Ok(())
})
......@@ -94,12 +91,9 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let stream = IntervalStream::new(interval).map(move |_| item);
tokio::spawn(async move {
match sink.pipe_from_stream(stream).await {
SubscriptionClosed::Failed(err) => {
sink.close(err);
}
_ => (),
};
if let SubscriptionClosed::Failed(err) = sink.pipe_from_stream(stream).await {
sink.close(err);
}
});
Ok(())
......
......@@ -67,20 +67,20 @@ async fn server() -> (SocketAddr, ServerHandle) {
module.register_method("notif", |_, _| Ok("")).unwrap();
module
.register_method("should_err", |_, ctx| {
let _ = ctx.err().map_err(CallError::Failed)?;
ctx.err().map_err(CallError::Failed)?;
Ok("err")
})
.unwrap();
module
.register_method("should_ok", |_, ctx| {
let _ = ctx.ok().map_err(CallError::Failed)?;
ctx.ok().map_err(CallError::Failed)?;
Ok("ok")
})
.unwrap();
module
.register_async_method("should_ok_async", |_p, ctx| async move {
let _ = ctx.ok().map_err(CallError::Failed)?;
ctx.ok().map_err(CallError::Failed)?;
Ok("ok")
})
.unwrap();
......
......@@ -682,7 +682,7 @@ async fn ws_server_unsub_methods_should_ignore_sub_limit() {
// This should not hit any limits, and unsubscription should have worked:
assert!(res.is_ok(), "Unsubscription method was successfully called");
assert_eq!(res.unwrap(), true, "Unsubscription was successful");
assert!(res.unwrap(), "Unsubscription was successful");
}
#[tokio::test]
......@@ -711,9 +711,7 @@ async fn http_unsupported_methods_dont_work() {
for verb in [Method::GET, Method::PUT, Method::PATCH, Method::DELETE] {
assert!(req_is_client_error(verb).await);
}
for verb in [Method::POST] {
assert!(!req_is_client_error(verb).await);
}
assert!(!req_is_client_error(Method::POST).await);
}
#[tokio::test]
......
......@@ -217,14 +217,16 @@ async fn http_server_logger() {
assert!(client.request::<String>("unknown_method", None).await.is_err());
let inner = counter.inner.lock().unwrap();
assert_eq!(inner.requests, (5, 5));
assert_eq!(inner.calls["say_hello"], (3, vec![0, 2, 3]));
assert_eq!(inner.calls["unknown_method"], (2, vec![]));
{
let inner = counter.inner.lock().unwrap();
assert_eq!(inner.requests, (5, 5));
assert_eq!(inner.calls["say_hello"], (3, vec![0, 2, 3]));
assert_eq!(inner.calls["unknown_method"], (2, vec![]));
}
server_handle.stop().unwrap().await.unwrap();
// HTTP server doesn't track connections
let inner = counter.inner.lock().unwrap();
assert_eq!(inner.connections, (0, 0));
}
......@@ -74,8 +74,7 @@ fn flatten_rpc_modules() {
#[test]
fn rpc_context_modules_can_register_subscriptions() {
let cx = ();
let mut cxmodule = RpcModule::new(cx);
let mut cxmodule = RpcModule::new(());
cxmodule.register_subscription("hi", "hi", "goodbye", |_, _, _| Ok(())).unwrap();
assert!(cxmodule.method("hi").is_some());
......
......@@ -283,7 +283,7 @@ async fn handshake<L: Logger>(socket: tokio::net::TcpStream, mode: HandshakeResp
batch_requests_supported: cfg.batch_requests_supported,
bounded_subscriptions: BoundedSubscriptions::new(cfg.max_subscriptions_per_connection),
stop_server: stop_monitor.clone(),
logger: logger,
logger,
id_provider,
ping_interval: cfg.ping_interval,
remote_addr,
......@@ -464,7 +464,7 @@ async fn background_task<L: Logger>(input: BackgroundTask<'_, L>) -> Result<(),
bounded_subscriptions,
sink: &sink,
id_provider: &*id_provider,
logger: logger,
logger,
request_start,
};
......@@ -511,7 +511,7 @@ async fn background_task<L: Logger>(input: BackgroundTask<'_, L>) -> Result<(),
bounded_subscriptions,
sink: &sink,
id_provider: &*id_provider,
logger: logger,
logger,
request_start,
},
})
......
......@@ -148,21 +148,21 @@ async fn server_with_context() -> SocketAddr {
rpc_module
.register_method("should_err", |_p, ctx| {
let _ = ctx.err().map_err(CallError::Failed)?;
ctx.err().map_err(CallError::Failed)?;
Ok("err")
})
.unwrap();
rpc_module
.register_method("should_ok", |_p, ctx| {
let _ = ctx.ok().map_err(CallError::Failed)?;
ctx.ok().map_err(CallError::Failed)?;
Ok("ok")
})
.unwrap();
rpc_module
.register_async_method("should_ok_async", |_p, ctx| async move {
let _ = ctx.ok().map_err(CallError::Failed)?;
ctx.ok().map_err(CallError::Failed)?;
// Call some async function inside.
Ok(futures_util::future::ready("ok!").await)
})
......@@ -170,7 +170,7 @@ async fn server_with_context() -> SocketAddr {
rpc_module
.register_async_method("err_async", |_p, ctx| async move {
let _ = ctx.ok().map_err(CallError::Failed)?;
ctx.ok().map_err(CallError::Failed)?;
// Async work that returns an error
futures_util::future::err::<(), _>(anyhow!("nah").into()).await
})
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment