Unverified Commit 60546591 authored by Niklas Adolfsson's avatar Niklas Adolfsson Committed by GitHub
Browse files

fix(benches): relax `default measurement time`, split `benches` and run `fast...

fix(benches): relax `default measurement time`, split `benches` and run `fast call` only on batches (#885)

* fix(benches): relax default and split benches

* decrease slow and memory call overhead

* revert `memory and slow call` + batch on slow_call

* Update benches/helpers.rs

* add asserts for fast_call
parent 354c185a
Pipeline #217554 passed with stages
in 5 minutes and 37 seconds
......@@ -54,5 +54,5 @@ Run benchmarks with tokio-console support.
Some of the benchmarks are quite expensive to run and doesn't run with enough samples with the default values
provided by criterion. Currently the default values are very conversative which can be modified by the following environment variables:
- "SLOW_MEASUREMENT_TIME" - sets the measurement time for slow benchmarks (default is 250 seconds)
- "MEASUREMENT_TIME" - sets the measurement time for fast benchmarks (default is 50 seconds)
- "SLOW_MEASUREMENT_TIME" - sets the measurement time for slow benchmarks (default is 60 seconds)
- "MEASUREMENT_TIME" - sets the measurement time for fast benchmarks (default is 10 seconds)
......@@ -17,37 +17,47 @@ use tokio::runtime::Runtime as TokioRuntime;
mod helpers;
fn measurement_time_slow() -> Duration {
std::env::var("SLOW_MEASUREMENT_TIME").map_or(Duration::from_secs(250), |val| {
std::env::var("SLOW_MEASUREMENT_TIME").map_or(Duration::from_secs(60), |val| {
Duration::from_secs(val.parse().expect("SLOW_SAMPLE_TIME must be an integer"))
})
}
fn measurement_time() -> Duration {
std::env::var("MEASUREMENT_TIME").map_or(Duration::from_secs(50), |val| {
fn measurement_mid() -> Duration {
std::env::var("MEASUREMENT_TIME").map_or(Duration::from_secs(10), |val| {
Duration::from_secs(val.parse().expect("SAMPLE_TIME must be an integer"))
})
}
criterion_group!(
name = types_benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))).measurement_time(measurement_time());
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = jsonrpsee_types_v2
);
criterion_group!(
name = sync_benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))).measurement_time(measurement_time());
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = SyncBencher::http_benches, SyncBencher::websocket_benches
);
criterion_group!(
name = sync_slow_benches;
name = sync_benches_mid;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))).measurement_time(measurement_mid());
targets = SyncBencher::http_benches_mid, SyncBencher::websocket_benches_mid
);
criterion_group!(
name = sync_benches_slow;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))).measurement_time(measurement_time_slow());
targets = SyncBencher::http_benches_slow, SyncBencher::websocket_benches_slow
);
criterion_group!(
name = async_benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))).measurement_time(measurement_time());
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = AsyncBencher::http_benches, AsyncBencher::websocket_benches
);
criterion_group!(
name = async_benches_mid;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))).measurement_time(measurement_mid());
targets = AsyncBencher::http_benches_mid, AsyncBencher::websocket_benches_mid
);
criterion_group!(
name = async_slow_benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))).measurement_time(measurement_time_slow());
......@@ -55,10 +65,19 @@ criterion_group!(
);
criterion_group!(
name = subscriptions;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))).measurement_time(measurement_time_slow());
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = AsyncBencher::subscriptions
);
criterion_main!(types_benches, sync_benches, sync_slow_benches, async_benches, async_slow_benches, subscriptions);
criterion_main!(
types_benches,
sync_benches,
sync_benches_mid,
sync_benches_slow,
async_benches,
async_benches_mid,
async_slow_benches,
subscriptions
);
#[derive(Debug, Clone, Copy)]
enum RequestType {
......@@ -140,33 +159,60 @@ trait RequestBencher {
let rt = TokioRuntime::new().unwrap();
let (url, _server) = rt.block_on(helpers::http_server(rt.handle().clone()));
let client = Arc::new(http_client(&url, HeaderMap::new()));
round_trip(&rt, crit, client.clone(), "http_round_trip", Self::REQUEST_TYPE);
http_custom_headers_round_trip(&rt, crit, &url, "http_custom_headers_round_trip", Self::REQUEST_TYPE);
http_concurrent_conn_calls(&rt, crit, &url, "http_concurrent_conn_calls", Self::REQUEST_TYPE, &[2, 4, 8]);
round_trip(&rt, crit, client.clone(), "http_round_trip", Self::REQUEST_TYPE);
batch_round_trip(&rt, crit, client, "http_batch_requests", Self::REQUEST_TYPE);
}
fn http_benches_mid(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (url, _server) = rt.block_on(helpers::http_server(rt.handle().clone()));
http_concurrent_conn_calls(&rt, crit, &url, "http_concurrent_conn_calls", Self::REQUEST_TYPE, &[16, 32, 64]);
}
fn http_benches_slow(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (url, _server) = rt.block_on(helpers::http_server(rt.handle().clone()));
let client = Arc::new(http_client(&url, HeaderMap::new()));
http_concurrent_conn_calls(&rt, crit, &url, "http_concurrent_conn_calls", Self::REQUEST_TYPE);
batch_round_trip(&rt, crit, client, "http_batch_requests", Self::REQUEST_TYPE);
http_concurrent_conn_calls(
&rt,
crit,
&url,
"http_concurrent_conn_calls",
Self::REQUEST_TYPE,
&[128, 256, 512, 1024],
);
}
fn websocket_benches(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (url, _server) = rt.block_on(helpers::ws_server(rt.handle().clone()));
let client = Arc::new(rt.block_on(ws_client(&url)));
round_trip(&rt, crit, client.clone(), "ws_round_trip", Self::REQUEST_TYPE);
ws_custom_headers_handshake(&rt, crit, &url, "ws_custom_headers_handshake", Self::REQUEST_TYPE);
ws_concurrent_conn_calls(&rt, crit, &url, "ws_concurrent_conn_calls", Self::REQUEST_TYPE, &[2, 4, 8]);
round_trip(&rt, crit, client.clone(), "ws_round_trip", Self::REQUEST_TYPE);
batch_round_trip(&rt, crit, client, "ws_batch_requests", Self::REQUEST_TYPE);
}
fn websocket_benches_mid(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (url, _server) = rt.block_on(helpers::ws_server(rt.handle().clone()));
ws_concurrent_conn_calls(&rt, crit, &url, "ws_concurrent_conn_calls", Self::REQUEST_TYPE, &[16, 32, 64]);
ws_concurrent_conn_subs(&rt, crit, &url, "ws_concurrent_conn_subs", Self::REQUEST_TYPE, &[16, 32, 64]);
}
fn websocket_benches_slow(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (url, _server) = rt.block_on(helpers::ws_server(rt.handle().clone()));
let client = Arc::new(rt.block_on(ws_client(&url)));
ws_concurrent_conn_calls(&rt, crit, &url, "ws_concurrent_conn_calls", Self::REQUEST_TYPE);
ws_concurrent_conn_subs(&rt, crit, &url, "ws_concurrent_conn_subs", Self::REQUEST_TYPE);
batch_round_trip(&rt, crit, client, "ws_batch_requests", Self::REQUEST_TYPE);
ws_concurrent_conn_calls(
&rt,
crit,
&url,
"ws_concurrent_conn_calls",
Self::REQUEST_TYPE,
&[128, 256, 512, 1024],
);
ws_concurrent_conn_subs(&rt, crit, &url, "ws_concurrent_conn_subs", Self::REQUEST_TYPE, &[128, 256, 512, 1024]);
}
fn subscriptions(crit: &mut Criterion) {
......@@ -254,7 +300,7 @@ fn sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl Subs
});
}
/// Benchmark http_batch_requests over batch sizes of 2, 5, 10, 50 and 100 RPCs in each batch.
/// Benchmark batch_requests over batch sizes of 2, 5, 10, 50 and 100 RPCs in each batch.
fn batch_round_trip(
rt: &TokioRuntime,
crit: &mut Criterion,
......@@ -262,13 +308,15 @@ fn batch_round_trip(
name: &str,
request: RequestType,
) {
for method in request.methods() {
let bench_name = format!("{}/{}", name, method);
let fast_call = request.methods()[0];
assert!(fast_call.starts_with("fast_call"));
let bench_name = format!("{}/{}", name, fast_call);
let mut group = crit.benchmark_group(request.group_name(&bench_name));
for batch_size in [2, 5, 10, 50, 100usize].iter() {
let mut batch = BatchRequestBuilder::new();
for _ in 0..*batch_size {
batch.insert(method, ArrayParams::new()).unwrap();
batch.insert(fast_call, ArrayParams::new()).unwrap();
}
group.throughput(Throughput::Elements(*batch_size as u64));
group.bench_with_input(BenchmarkId::from_parameter(batch_size), batch_size, |b, _| {
......@@ -276,13 +324,22 @@ fn batch_round_trip(
});
}
group.finish();
}
}
fn ws_concurrent_conn_calls(rt: &TokioRuntime, crit: &mut Criterion, url: &str, name: &str, request: RequestType) {
let methods = request.methods();
let mut group = crit.benchmark_group(request.group_name(name));
for conns in [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024] {
fn ws_concurrent_conn_calls(
rt: &TokioRuntime,
crit: &mut Criterion,
url: &str,
name: &str,
request: RequestType,
concurrent_conns: &[usize],
) {
let fast_call = request.methods()[0];
assert!(fast_call.starts_with("fast_call"));
let bench_name = format!("{}/{}", name, fast_call);
let mut group = crit.benchmark_group(request.group_name(&bench_name));
for conns in concurrent_conns.iter() {
group.bench_function(format!("{}", conns), |b| {
b.to_async(rt).iter_with_setup(
|| {
......@@ -291,7 +348,7 @@ fn ws_concurrent_conn_calls(rt: &TokioRuntime, crit: &mut Criterion, url: &str,
// runtime context and simply calling `block_on` here will cause the code to panic.
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
for _ in 0..conns {
for _ in 0..*conns {
clients.push(ws_client(url).await);
}
})
......@@ -305,7 +362,7 @@ fn ws_concurrent_conn_calls(rt: &TokioRuntime, crit: &mut Criterion, url: &str,
let futs = FuturesUnordered::new();
for _ in 0..10 {
futs.push(client.request::<String, ArrayParams>(methods[0], ArrayParams::new()));
futs.push(client.request::<String, ArrayParams>(fast_call, ArrayParams::new()));
}
join_all(futs).await;
......@@ -320,9 +377,16 @@ fn ws_concurrent_conn_calls(rt: &TokioRuntime, crit: &mut Criterion, url: &str,
}
// As this is so slow only fast calls are executed in this benchmark.
fn ws_concurrent_conn_subs(rt: &TokioRuntime, crit: &mut Criterion, url: &str, name: &str, request: RequestType) {
fn ws_concurrent_conn_subs(
rt: &TokioRuntime,
crit: &mut Criterion,
url: &str,
name: &str,
request: RequestType,
concurrent_conns: &[usize],
) {
let mut group = crit.benchmark_group(request.group_name(name));
for conns in [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024] {
for conns in concurrent_conns.iter() {
group.bench_function(format!("{}", conns), |b| {
b.to_async(rt).iter_with_setup(
|| {
......@@ -331,7 +395,7 @@ fn ws_concurrent_conn_subs(rt: &TokioRuntime, crit: &mut Criterion, url: &str, n
// runtime context and simply calling `block_on` here will cause the code to panic.
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
for _ in 0..conns {
for _ in 0..*conns {
clients.push(ws_client(url).await);
}
})
......@@ -372,18 +436,27 @@ fn ws_concurrent_conn_subs(rt: &TokioRuntime, crit: &mut Criterion, url: &str, n
}
// As this is so slow only fast calls are executed in this benchmark.
fn http_concurrent_conn_calls(rt: &TokioRuntime, crit: &mut Criterion, url: &str, name: &str, request: RequestType) {
let method = request.methods()[0];
let bench_name = format!("{}/{}", name, method);
fn http_concurrent_conn_calls(
rt: &TokioRuntime,
crit: &mut Criterion,
url: &str,
name: &str,
request: RequestType,
concurrent_conns: &[usize],
) {
let fast_call = request.methods()[0];
assert!(fast_call.starts_with("fast_call"));
let bench_name = format!("{}/{}", name, fast_call);
let mut group = crit.benchmark_group(request.group_name(&bench_name));
for conns in [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024] {
for conns in concurrent_conns.iter() {
group.bench_function(format!("{}", conns), |b| {
b.to_async(rt).iter_with_setup(
|| (0..conns).map(|_| http_client(url, HeaderMap::new())),
|| (0..*conns).map(|_| http_client(url, HeaderMap::new())),
|clients| async {
let tasks = clients.map(|client| {
rt.spawn(async move {
client.request::<String, ArrayParams>(method, ArrayParams::new()).await.unwrap();
client.request::<String, ArrayParams>(fast_call, ArrayParams::new()).await.unwrap();
})
});
join_all(tasks).await;
......@@ -402,7 +475,8 @@ fn http_custom_headers_round_trip(
name: &str,
request: RequestType,
) {
let method_name = request.methods()[0];
let fast_call = request.methods()[0];
assert!(fast_call.starts_with("fast_call"));
for header_size in [0, KIB, 5 * KIB, 25 * KIB, 100 * KIB] {
let mut headers = HeaderMap::new();
......@@ -415,7 +489,7 @@ fn http_custom_headers_round_trip(
crit.bench_function(&request.group_name(&bench_name), |b| {
b.to_async(rt).iter(|| async {
black_box(client.request::<String, ArrayParams>(method_name, ArrayParams::new()).await.unwrap());
black_box(client.request::<String, ArrayParams>(fast_call, ArrayParams::new()).await.unwrap());
})
});
}
......
use std::time::Duration;
use jsonrpsee::client_transport::ws::{Uri, WsTransportClientBuilder};
use jsonrpsee::http_client::{HeaderMap, HttpClient, HttpClientBuilder};
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
pub(crate) const SYNC_FAST_CALL: &str = "fast_call";
pub(crate) const ASYNC_FAST_CALL: &str = "fast_async";
pub(crate) const ASYNC_FAST_CALL: &str = "fast_call_async";
pub(crate) const SYNC_MEM_CALL: &str = "memory_intense";
pub(crate) const ASYNC_MEM_CALL: &str = "memory_intense_async";
pub(crate) const SYNC_SLOW_CALL: &str = "slow_call";
......@@ -16,6 +18,8 @@ pub(crate) const ASYNC_METHODS: [&str; 3] = [SYNC_FAST_CALL, SYNC_MEM_CALL, SYNC
// 1 KiB = 1024 bytes
pub(crate) const KIB: usize = 1024;
pub(crate) const MIB: usize = 1024 * KIB;
pub(crate) const SLOW_CALL: Duration = Duration::from_millis(1);
/// Run jsonrpc HTTP server for benchmarks.
#[cfg(feature = "jsonrpc-crate")]
......@@ -26,14 +30,14 @@ pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_htt
let mut io = IoHandler::new();
io.add_sync_method(SYNC_FAST_CALL, |_| Ok(Value::String("lo".to_string())));
io.add_method(ASYNC_FAST_CALL, |_| async { Ok(Value::String("lo".to_string())) });
io.add_sync_method(SYNC_MEM_CALL, |_| Ok(Value::String("A".repeat(1 * 1024 * 1024))));
io.add_method(ASYNC_MEM_CALL, |_| async { Ok(Value::String("A".repeat(1 * 1024 * 1024))) });
io.add_sync_method(SYNC_MEM_CALL, |_| Ok(Value::String("A".repeat(MIB))));
io.add_method(ASYNC_MEM_CALL, |_| async { Ok(Value::String("A".repeat(MIB))) });
io.add_sync_method(SYNC_SLOW_CALL, |_| {
std::thread::sleep(std::time::Duration::from_millis(1));
std::thread::sleep(SLOW_CALL);
Ok(Value::String("slow call".to_string()))
});
io.add_method(ASYNC_SLOW_CALL, |_| async {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
tokio::time::sleep(SLOW_CALL).await;
Ok(Value::String("slow call async".to_string()))
});
......@@ -63,14 +67,14 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_ws_se
let mut io = PubSubHandler::new(MetaIoHandler::default());
io.add_sync_method(SYNC_FAST_CALL, |_| Ok(Value::String("lo".to_string())));
io.add_method(ASYNC_FAST_CALL, |_| async { Ok(Value::String("lo".to_string())) });
io.add_sync_method(SYNC_MEM_CALL, |_| Ok(Value::String("A".repeat(1 * 1024 * 1024))));
io.add_method(ASYNC_MEM_CALL, |_| async { Ok(Value::String("A".repeat(1 * 1024 * 1024))) });
io.add_sync_method(SYNC_MEM_CALL, |_| Ok(Value::String("A".repeat(MIB))));
io.add_method(ASYNC_MEM_CALL, |_| async { Ok(Value::String("A".repeat(MIB))) });
io.add_sync_method(SYNC_SLOW_CALL, |_| {
std::thread::sleep(std::time::Duration::from_millis(1));
std::thread::sleep(SLOW_CALL);
Ok(Value::String("slow call".to_string()))
});
io.add_method(ASYNC_SLOW_CALL, |_| async {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
tokio::time::sleep(SLOW_CALL).await;
Ok(Value::String("slow call async".to_string()))
});
......@@ -98,9 +102,9 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_ws_se
})
.event_loop_executor(handle)
.max_connections(10 * 1024)
.max_payload(100 * 1024 * 1024)
.max_in_buffer_capacity(100 * 1024 * 1024)
.max_out_buffer_capacity(100 * 1024 * 1024)
.max_payload(100 * MIB)
.max_in_buffer_capacity(100 * MIB)
.max_out_buffer_capacity(100 * MIB)
.start(&"127.0.0.1:0".parse().unwrap())
.expect("Server must start with no issues");
......@@ -165,20 +169,20 @@ fn gen_rpc_module() -> jsonrpsee::RpcModule<()> {
module.register_method(SYNC_FAST_CALL, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_FAST_CALL, |_, _| async { Ok("lo") }).unwrap();
module.register_method(SYNC_MEM_CALL, |_, _| Ok("A".repeat(1024 * 1024))).unwrap();
module.register_method(SYNC_MEM_CALL, |_, _| Ok("A".repeat(MIB))).unwrap();
module.register_async_method(ASYNC_MEM_CALL, |_, _| async move { Ok("A".repeat(1024 * 1024)) }).unwrap();
module.register_async_method(ASYNC_MEM_CALL, |_, _| async move { Ok("A".repeat(MIB)) }).unwrap();
module
.register_method(SYNC_SLOW_CALL, |_, _| {
std::thread::sleep(std::time::Duration::from_millis(1));
std::thread::sleep(SLOW_CALL);
Ok("slow call")
})
.unwrap();
module
.register_async_method(ASYNC_SLOW_CALL, |_, _| async move {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
tokio::time::sleep(SLOW_CALL).await;
Ok("slow call async")
})
.unwrap();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment