Newer
Older
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
pub(crate) const SYNC_FAST_CALL: &str = "fast_call";
pub(crate) const ASYNC_FAST_CALL: &str = "fast_async";
pub(crate) const SYNC_MEM_CALL: &str = "memory_intense";
pub(crate) const ASYNC_MEM_CALL: &str = "memory_intense_async";
pub(crate) const SYNC_SLOW_CALL: &str = "slow_call";
pub(crate) const ASYNC_SLOW_CALL: &str = "slow_call_async";
pub(crate) const SUB_METHOD_NAME: &str = "sub";
pub(crate) const UNSUB_METHOD_NAME: &str = "unsub";
pub(crate) const SYNC_METHODS: [&str; 3] = [SYNC_FAST_CALL, SYNC_MEM_CALL, SYNC_SLOW_CALL];
pub(crate) const ASYNC_METHODS: [&str; 3] = [SYNC_FAST_CALL, SYNC_MEM_CALL, SYNC_SLOW_CALL];
Niklas Adolfsson
committed
/// Run jsonrpc HTTP server for benchmarks.
#[cfg(feature = "jsonrpc-crate")]
pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_http_server::Server) {
use jsonrpc_http_server::jsonrpc_core::*;
use jsonrpc_http_server::*;
let mut io = IoHandler::new();
io.add_sync_method(SYNC_FAST_CALL, |_| Ok(Value::String("lo".to_string())));
io.add_method(ASYNC_FAST_CALL, |_| async { Ok(Value::String("lo".to_string())) });
io.add_sync_method(SYNC_MEM_CALL, |_| Ok(Value::String("A".repeat(1 * 1024 * 1024))));
io.add_method(ASYNC_MEM_CALL, |_| async { Ok(Value::String("A".repeat(1 * 1024 * 1024))) });
io.add_sync_method(SYNC_SLOW_CALL, |_| {
std::thread::sleep(std::time::Duration::from_millis(1));
Ok(Value::String("slow call".to_string()))
});
io.add_method(ASYNC_SLOW_CALL, |_| async {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
Ok(Value::String("slow call async".to_string()))
});
Niklas Adolfsson
committed
let server = ServerBuilder::new(io)
.max_request_body_size(usize::MAX)
.event_loop_executor(handle)
.start_http(&"127.0.0.1:0".parse().unwrap())
.expect("Server must start with no issues");
let addr = *server.address();
(format!("http://{}", addr), server)
}
/// Run jsonrpc WebSocket server for benchmarks.
#[cfg(feature = "jsonrpc-crate")]
pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_ws_server::Server) {
use std::sync::atomic::{AtomicU64, Ordering};
Niklas Adolfsson
committed
use jsonrpc_pubsub::{PubSubHandler, Session, Subscriber, SubscriptionId};
use jsonrpc_ws_server::jsonrpc_core::*;
use jsonrpc_ws_server::*;
const ID: AtomicU64 = AtomicU64::new(0);
let handle2 = handle.clone();
let mut io = PubSubHandler::new(MetaIoHandler::default());
io.add_sync_method(SYNC_FAST_CALL, |_| Ok(Value::String("lo".to_string())));
io.add_method(ASYNC_FAST_CALL, |_| async { Ok(Value::String("lo".to_string())) });
io.add_sync_method(SYNC_MEM_CALL, |_| Ok(Value::String("A".repeat(1 * 1024 * 1024))));
io.add_method(ASYNC_MEM_CALL, |_| async { Ok(Value::String("A".repeat(1 * 1024 * 1024))) });
io.add_sync_method(SYNC_SLOW_CALL, |_| {
std::thread::sleep(std::time::Duration::from_millis(1));
Ok(Value::String("slow call".to_string()))
});
io.add_method(ASYNC_SLOW_CALL, |_| async {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
Ok(Value::String("slow call async".to_string()))
});
Niklas Adolfsson
committed
io.add_subscription(
SUB_METHOD_NAME,
(SUB_METHOD_NAME, move |_params: Params, _, subscriber: Subscriber| {
handle2.spawn(async move {
let id = ID.fetch_add(1, Ordering::Relaxed);
let sink = subscriber.assign_id(SubscriptionId::Number(id)).unwrap();
// NOTE(niklasad1): the way jsonrpc works this is the only way to get this working
// -> jsonrpc responds to the request in background so not possible to know when the request
// has been answered, so this benchmark is bad.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let mut map = serde_json::Map::new();
map.insert("subscription".into(), id.into());
map.insert("result".into(), "hello".into());
let _ = sink.notify(Params::Map(map));
});
}),
(UNSUB_METHOD_NAME, |_id: SubscriptionId, _| futures::future::ok(Value::Bool(true))),
);
let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
std::sync::Arc::new(Session::new(context.sender().clone()))
})
.event_loop_executor(handle)
.max_connections(10 * 1024)
.max_payload(100 * 1024 * 1024)
.max_in_buffer_capacity(100 * 1024 * 1024)
.max_out_buffer_capacity(100 * 1024 * 1024)
Niklas Adolfsson
committed
.start(&"127.0.0.1:0".parse().unwrap())
.expect("Server must start with no issues");
let addr = *server.addr();
(format!("ws://{}", addr), server)
}
/// Run jsonrpsee HTTP server for benchmarks.
Niklas Adolfsson
committed
#[cfg(not(feature = "jsonrpc-crate"))]
pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::http_server::HttpServerHandle) {
use jsonrpsee::http_server::HttpServerBuilder;
Niklas Adolfsson
committed
let server = HttpServerBuilder::default()
.max_request_body_size(u32::MAX)
.max_response_body_size(u32::MAX)
Niklas Adolfsson
committed
.custom_tokio_runtime(handle)
Niklas Adolfsson
committed
.build("127.0.0.1:0")
Niklas Adolfsson
committed
.await
Niklas Adolfsson
committed
.unwrap();
Niklas Adolfsson
committed
let module = gen_rpc_module();
Niklas Adolfsson
committed
let addr = server.local_addr().unwrap();
let handle = server.start(module).unwrap();
(format!("http://{}", addr), handle)
}
/// Run jsonrpsee WebSocket server for benchmarks.
Niklas Adolfsson
committed
#[cfg(not(feature = "jsonrpc-crate"))]
pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws_server::WsServerHandle) {
use jsonrpsee::ws_server::WsServerBuilder;
Niklas Adolfsson
committed
let server = WsServerBuilder::default()
.max_request_body_size(u32::MAX)
.max_response_body_size(u32::MAX)
.max_connections(10 * 1024)
Niklas Adolfsson
committed
.custom_tokio_runtime(handle)
.build("127.0.0.1:0")
.await
.unwrap();
let mut module = gen_rpc_module();
Niklas Adolfsson
committed
module
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
Niklas Adolfsson
committed
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Niklas Adolfsson
committed
})
.unwrap();
Niklas Adolfsson
committed
let addr = format!("ws://{}", server.local_addr().unwrap());
let handle = server.start(module).unwrap();
(addr, handle)
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#[cfg(not(feature = "jsonrpc-crate"))]
fn gen_rpc_module() -> jsonrpsee::RpcModule<()> {
let mut module = jsonrpsee::RpcModule::new(());
module.register_method(SYNC_FAST_CALL, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_FAST_CALL, |_, _| async { Ok("lo") }).unwrap();
module.register_method(SYNC_MEM_CALL, |_, _| Ok("A".repeat(1 * 1024 * 1024))).unwrap();
module.register_async_method(ASYNC_MEM_CALL, |_, _| async move { Ok("A".repeat(1 * 1024 * 1024)) }).unwrap();
module
.register_method(SYNC_SLOW_CALL, |_, _| {
std::thread::sleep(std::time::Duration::from_millis(1));
Ok("slow call")
})
.unwrap();
module
.register_async_method(ASYNC_SLOW_CALL, |_, _| async move {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
Ok("slow call async")
})
.unwrap();
module
}
pub(crate) fn http_client(url: &str) -> HttpClient {
HttpClientBuilder::default()
.max_request_body_size(u32::MAX)
.max_concurrent_requests(1024 * 1024)
.build(url)
.unwrap()
}
pub(crate) async fn ws_client(url: &str) -> WsClient {
WsClientBuilder::default()
.max_request_body_size(u32::MAX)
.max_concurrent_requests(1024 * 1024)
.build(url)
.await
.unwrap()