Unverified Commit 1b5ec0ff authored by Niklas Adolfsson's avatar Niklas Adolfsson
Browse files

fix tests again

parent 6083c82a
......@@ -20,6 +20,9 @@ num_cpus = "1"
serde_json = "1"
tokio = { version = "1.16", features = ["rt-multi-thread"] }
[lib]
bench = false
[[bench]]
name = "bench"
path = "bench.rs"
......
......@@ -65,7 +65,16 @@ pub type AsyncMethod<'a> = Arc<
>;
/// Method callback for subscriptions.
pub type SubscriptionMethod<'a> = Arc<
dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState, oneshot::Receiver<()>, Option<ResourceGuard>) -> BoxFuture<'a, MethodResponse>,
dyn Send
+ Sync
+ Fn(
Id,
Params,
MethodSink,
ConnState,
oneshot::Receiver<()>,
Option<ResourceGuard>,
) -> BoxFuture<'a, MethodResponse>,
>;
// Method callback to unsubscribe.
type UnsubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, ConnectionId, MaxResponseSize) -> MethodResponse>;
......@@ -759,42 +768,43 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
let callback = {
self.methods.verify_and_insert(
subscribe_method_name,
MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn, message_sent, claimed| {
let uniq_sub = SubscriptionKey { conn_id: conn.conn_id, sub_id: conn.id_provider.next_id() };
// response to the subscription call.
let (subscribe_call_tx, subscribe_call_rx) = oneshot::channel();
let pending_subscription = PendingSubscription(Some(InnerPendingSubscription {
sink: method_sink.clone(),
subscribe_call: subscribe_call_tx,
close_notify: Some(conn.close_notify),
method: notif_method_name,
subscribers: subscribers.clone(),
uniq_sub: uniq_sub.clone(),
id: id.clone().into_owned(),
message_sent,
claimed,
}));
// The end-user needs to accept/reject the `pending_subscription` to make any progress.
callback(params, pending_subscription, ctx.clone());
let id = id.clone().into_owned();
let result = async move {
match subscribe_call_rx.await {
Ok(result) => result,
Err(_) => return MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError)),
}
};
Box::pin(result)
})),
MethodCallback::new_subscription(Arc::new(
move |id, params, method_sink, conn, message_sent, claimed| {
let uniq_sub = SubscriptionKey { conn_id: conn.conn_id, sub_id: conn.id_provider.next_id() };
// response to the subscription call.
let (subscribe_call_tx, subscribe_call_rx) = oneshot::channel();
let pending_subscription = PendingSubscription(Some(InnerPendingSubscription {
sink: method_sink,
subscribe_call: subscribe_call_tx,
close_notify: Some(conn.close_notify),
method: notif_method_name,
subscribers: subscribers.clone(),
uniq_sub,
id: id.clone().into_owned(),
message_sent,
claimed,
}));
// The end-user needs to accept/reject the `pending_subscription` to make any progress.
callback(params, pending_subscription, ctx.clone());
let id = id.clone().into_owned();
let result = async move {
match subscribe_call_rx.await {
Ok(result) => result,
Err(_) => MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError)),
}
};
Box::pin(result)
},
)),
)?
};
Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
}
......@@ -899,7 +909,7 @@ impl PendingSubscription {
uniq_sub,
subscribers,
unsubscribe: rx,
_claimed: claimed
_claimed: claimed,
});
}
......
......@@ -592,9 +592,9 @@ async fn process_validated_request(
let call = CallData {
conn_id: 0,
middleware: &middleware,
methods: &&methods,
methods: &methods,
max_response_body_size,
resources: &&resources,
resources: &resources,
request_start,
};
let response = process_single_request(body, call).await;
......@@ -616,9 +616,9 @@ async fn process_validated_request(
call: CallData {
conn_id: 0,
middleware: &middleware,
methods: &&methods,
methods: &methods,
max_response_body_size,
resources: &&resources,
resources: &resources,
request_start,
},
})
......@@ -697,7 +697,7 @@ struct Call<'a, M: Middleware> {
// Batch responses must be sent back as a single message so we read the results from each
// request in the batch and read the results off of a new channel, `rx_batch`, and then send the
// complete batch response back to the client over `tx`.
async fn process_batch_request<'a, M>(b: Batch<'a, M>) -> BatchResponse
async fn process_batch_request<M>(b: Batch<'_, M>) -> BatchResponse
where
M: Middleware,
{
......@@ -742,7 +742,7 @@ where
BatchResponse::error(id, ErrorObject::from(code))
}
async fn process_single_request<'a, M: Middleware>(data: Vec<u8>, call: CallData<'a, M>) -> MethodResponse {
async fn process_single_request<M: Middleware>(data: Vec<u8>, call: CallData<'_, M>) -> MethodResponse {
if let Ok(req) = serde_json::from_slice::<Request>(&data) {
tracing::debug!("recv method call={}", req.method);
tracing::trace!("recv: req={:?}", req);
......@@ -759,7 +759,7 @@ async fn process_single_request<'a, M: Middleware>(data: Vec<u8>, call: CallData
}
}
async fn execute_call<'a, M: Middleware>(c: Call<'a, M>) -> MethodResponse {
async fn execute_call<M: Middleware>(c: Call<'_, M>) -> MethodResponse {
let Call { name, id, params, call } = c;
let CallData { resources, methods, middleware, max_response_body_size, conn_id, request_start } = call;
......@@ -768,7 +768,7 @@ async fn execute_call<'a, M: Middleware>(c: Call<'a, M>) -> MethodResponse {
let response = match methods.method_with_name(name) {
None => MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)),
Some((name, method)) => match &method.inner() {
MethodKind::Sync(callback) => match method.claim(name, &resources) {
MethodKind::Sync(callback) => match method.claim(name, resources) {
Ok(guard) => {
let r = (callback)(id, params, max_response_body_size as usize);
drop(guard);
......@@ -779,7 +779,7 @@ async fn execute_call<'a, M: Middleware>(c: Call<'a, M>) -> MethodResponse {
MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
}
},
MethodKind::Async(callback) => match method.claim(name, &resources) {
MethodKind::Async(callback) => match method.claim(name, resources) {
Ok(guard) => {
let id = id.into_owned();
let params = params.into_owned();
......
......@@ -27,6 +27,7 @@
use std::net::SocketAddr;
use std::time::Duration;
use futures::StreamExt;
use jsonrpsee::core::client::{ClientT, SubscriptionClientT};
use jsonrpsee::core::Error;
use jsonrpsee::http_client::HttpClientBuilder;
......@@ -36,7 +37,8 @@ use jsonrpsee::types::error::CallError;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{WsServerBuilder, WsServerHandle};
use jsonrpsee::{PendingSubscription, RpcModule};
use tokio::time::sleep;
use tokio::time::{interval, sleep};
use tokio_stream::wrappers::IntervalStream;
fn module_manual() -> Result<RpcModule<()>, Error> {
let mut module = RpcModule::new(());
......@@ -65,10 +67,12 @@ fn module_manual() -> Result<RpcModule<()>, Error> {
// to get dropped. This is the equivalent of not having any resource limits (ie, sink is never used).
module
.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, pending, _| {
let mut _sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
let mut _sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
});
})?
.resource("SUB", 3)?;
......@@ -76,12 +80,12 @@ fn module_manual() -> Result<RpcModule<()>, Error> {
// and the subscription method gets limited.
module
.register_subscription("subscribe_hello_limit", "s_hello", "unsubscribe_hello_limit", move |_, pending, _| {
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
let mut sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
for val in 0..10 {
sink.send(&val).unwrap();
sleep(Duration::from_secs(1)).await;
......@@ -123,23 +127,25 @@ fn module_macro() -> RpcModule<()> {
impl RpcServer for () {
fn sub_hello(&self, pending: PendingSubscription) {
let mut _sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
let mut _sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
});
}
fn sub_hello_limit(&self, pending: PendingSubscription) {
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
for val in 0..10 {
sink.send(&val).unwrap();
sleep(Duration::from_secs(1)).await;
}
let mut sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
let interval = interval(Duration::from_secs(1));
let stream = IntervalStream::new(interval).map(move |_| 1);
sink.pipe_from_stream(stream).await;
});
}
}
......@@ -218,17 +224,16 @@ async fn run_tests_on_ws_server(server_addr: SocketAddr, server_handle: WsServer
assert_server_busy(fail_mem);
// If we issue multiple subscription requests at the same time from the same client,
// but the subscriptions immediately drop their sinks, no resources will obviously be held,
// and so there is no limit to how many can be executed.
let (pass1, pass2, pass3) = tokio::join!(
client.subscribe::<i32>("subscribe_hello", None, "unsubscribe_hello"),
// but the subscriptions drop their sinks when the subscription has been accepted or rejected.
//
// Thus, we can't assume that all subscriptions drop their resources instantly anymore.
let (pass1, pass2) = tokio::join!(
client.subscribe::<i32>("subscribe_hello", None, "unsubscribe_hello"),
client.subscribe::<i32>("subscribe_hello", None, "unsubscribe_hello"),
);
assert!(pass1.is_ok());
assert!(pass2.is_ok());
assert!(pass3.is_ok());
// 3 CPU units (manually set for subscriptions) per call, so 3th call exceeds cap
let (pass1, pass2, fail) = tokio::join!(
......
......@@ -800,7 +800,7 @@ struct Call<'a, M: Middleware> {
// Batch responses must be sent back as a single message so we read the results from each
// request in the batch and read the results off of a new channel, `rx_batch`, and then send the
// complete batch response back to the client over `tx`.
async fn process_batch_request<'a, M>(b: Batch<'a, M>) -> BatchResponse
async fn process_batch_request<M>(b: Batch<'_, M>) -> BatchResponse
where
M: Middleware,
{
......@@ -839,9 +839,9 @@ where
BatchResponse::error(id, ErrorObject::from(code))
}
async fn process_single_request<'a, M: Middleware>(
async fn process_single_request<M: Middleware>(
data: Vec<u8>,
call: CallData<'a, M>,
call: CallData<'_, M>,
) -> (MethodResponse, Option<oneshot::Sender<()>>) {
if let Ok(req) = serde_json::from_slice::<Request>(&data) {
tracing::debug!("recv method call={}", req.method);
......@@ -865,7 +865,7 @@ async fn process_single_request<'a, M: Middleware>(
///
/// Otherwise it's possible that the subscription notifications could start before that the actual subscription
/// response has been sent.
async fn execute_call<'a, M: Middleware>(c: Call<'a, M>) -> (MethodResponse, Option<oneshot::Sender<()>>) {
async fn execute_call<M: Middleware>(c: Call<'_, M>) -> (MethodResponse, Option<oneshot::Sender<()>>) {
let Call { name, id, params, call } = c;
let CallData {
resources,
......@@ -884,7 +884,7 @@ async fn execute_call<'a, M: Middleware>(c: Call<'a, M>) -> (MethodResponse, Opt
let response = match methods.method_with_name(name) {
None => (MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)), None),
Some((name, method)) => match &method.inner() {
MethodKind::Sync(callback) => match method.claim(name, &resources) {
MethodKind::Sync(callback) => match method.claim(name, resources) {
Ok(guard) => {
let r = (callback)(id, params, max_response_body_size as usize);
drop(guard);
......@@ -895,7 +895,7 @@ async fn execute_call<'a, M: Middleware>(c: Call<'a, M>) -> (MethodResponse, Opt
(MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy)), None)
}
},
MethodKind::Async(callback) => match method.claim(name, &resources) {
MethodKind::Async(callback) => match method.claim(name, resources) {
Ok(guard) => {
let id = id.into_owned();
let params = params.into_owned();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment