Unverified Commit 38f3a9f7 authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

Fix tests and examples


Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent c968cac6
Pipeline #200475 failed with stages
in 1 minute and 46 seconds
......@@ -30,7 +30,7 @@ use jsonrpsee::core::{async_trait, client::Subscription, Error};
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::types::SubscriptionResult;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{PendingSubscription, WsServerBuilder, WsServerHandle};
use jsonrpsee::ws_server::{SubscriptionSink, WsServerBuilder, WsServerHandle};
type ExampleHash = [u8; 32];
type ExampleStorageKey = Vec<u8>;
......@@ -64,10 +64,9 @@ impl RpcServer<ExampleHash, ExampleStorageKey> for RpcServerImpl {
// Note that the server's subscription method must return `SubscriptionResult`.
fn subscribe_storage(
&self,
pending: PendingSubscription,
mut sink: SubscriptionSink,
_keys: Option<Vec<ExampleStorageKey>>,
) -> SubscriptionResult {
let mut sink = pending.accept()?;
let _ = sink.send(&vec![[0; 32]]);
Ok(())
}
......
......@@ -71,19 +71,19 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
std::thread::spawn(move || produce_items(tx2));
module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, pending, _| {
module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, mut sink, _| {
let rx = BroadcastStream::new(tx.clone().subscribe());
tokio::spawn(async move {
pending
.pipe_from_try_stream(rx)
.await
.on_success(|sink| {
match sink.pipe_from_try_stream(rx).await {
SubscriptionClosed::Success => {
sink.close(SubscriptionClosed::Success);
})
.on_failure(|sink, err| {
}
SubscriptionClosed::RemotePeerAborted => (),
SubscriptionClosed::Failed(err) => {
sink.close(err);
});
}
};
});
Ok(())
})?;
......
......@@ -80,11 +80,6 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
}
_ => (),
};
//
// sink.pipe_from_stream(stream).await.on_failure(|sink, err| {
// // Send close notification when subscription stream failed.
// sink.close(err);
// });
});
Ok(())
})
......
......@@ -7,7 +7,7 @@ use jsonrpsee::types::SubscriptionResult;
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::*;
use jsonrpsee::ws_server::{PendingSubscription, WsServerBuilder};
use jsonrpsee::ws_server::{SubscriptionSink, WsServerBuilder};
#[rpc(client, server, namespace = "foo")]
pub trait Rpc {
......@@ -64,24 +64,20 @@ impl RpcServer for RpcServerImpl {
Ok(10u16)
}
fn sub(&self, pending: PendingSubscription) -> SubscriptionResult {
let mut sink = pending.accept()?;
fn sub(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let _ = sink.send(&"Response_A");
let _ = sink.send(&"Response_B");
Ok(())
}
fn sub_with_params(&self, pending: PendingSubscription, val: u32) -> SubscriptionResult {
let mut sink = pending.accept()?;
fn sub_with_params(&self, mut sink: SubscriptionSink, val: u32) -> SubscriptionResult {
let _ = sink.send(&val);
let _ = sink.send(&val);
Ok(())
}
fn sub_with_override_notif_method(&self, pending: PendingSubscription) -> SubscriptionResult {
if let Ok(mut sink) = pending.accept() {
let _ = sink.send(&1);
}
fn sub_with_override_notif_method(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let _ = sink.send(&1);
Ok(())
}
}
......
......@@ -3,7 +3,7 @@ use std::net::SocketAddr;
use jsonrpsee::core::{async_trait, RpcResult};
use jsonrpsee::types::SubscriptionResult;
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::ws_server::{PendingSubscription, WsServerBuilder};
use jsonrpsee::ws_server::{SubscriptionSink, WsServerBuilder};
#[rpc(server)]
pub trait Rpc {
......@@ -29,8 +29,8 @@ impl RpcServer for RpcServerImpl {
Ok(10u16)
}
fn sub(&self, pending: PendingSubscription) -> SubscriptionResult {
let mut sink = pending.accept()?;
fn sub(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
sink.accept()?;
let _ = sink.send(&"Response_A");
let _ = sink.send(&"Response_B");
......
......@@ -30,7 +30,6 @@ use std::time::Duration;
use futures::{SinkExt, StreamExt};
use jsonrpsee::core::error::SubscriptionClosed;
use jsonrpsee::core::server::access_control::{AccessControl, AccessControlBuilder};
use jsonrpsee::core::server::rpc_module::PipeFromStreamResult;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
use jsonrpsee::types::error::{ErrorObject, SUBSCRIPTION_CLOSED_WITH_ERROR};
use jsonrpsee::ws_server::{WsServerBuilder, WsServerHandle};
......@@ -45,8 +44,9 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
module.register_method("say_hello", |_, _| Ok("hello")).unwrap();
module
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, pending, _| {
let mut sink = pending.accept().unwrap();
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, mut sink, _| {
// Explicit call to accept.
sink.accept().unwrap();
std::thread::spawn(move || loop {
if let Ok(false) = sink.send(&"hello from subscription") {
break;
......@@ -58,9 +58,9 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
.unwrap();
module
.register_subscription("subscribe_foo", "subscribe_foo", "unsubscribe_foo", |_, pending, _| {
let mut sink = pending.accept().unwrap();
.register_subscription("subscribe_foo", "subscribe_foo", "unsubscribe_foo", |_, mut sink, _| {
std::thread::spawn(move || loop {
// Implicit call to accept for the first send.
if let Ok(false) = sink.send(&1337_usize) {
break;
}
......@@ -71,28 +71,31 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
.unwrap();
module
.register_subscription("subscribe_add_one", "subscribe_add_one", "unsubscribe_add_one", |params, pending, _| {
let mut count = match params.one::<usize>() {
Ok(count) => count,
_ => return Ok(()),
};
let mut sink = pending.accept().unwrap();
.register_subscription(
"subscribe_add_one",
"subscribe_add_one",
"unsubscribe_add_one",
|params, mut sink, _| {
let mut count = match params.one::<usize>() {
Ok(count) => count,
_ => return Ok(()),
};
std::thread::spawn(move || loop {
count = count.wrapping_add(1);
if let Err(_) | Ok(false) = sink.send(&count) {
break;
}
std::thread::sleep(Duration::from_millis(100));
});
Ok(())
})
std::thread::spawn(move || loop {
count = count.wrapping_add(1);
if let Err(_) | Ok(false) = sink.send(&count) {
break;
}
std::thread::sleep(Duration::from_millis(100));
});
Ok(())
},
)
.unwrap();
module
.register_subscription("subscribe_noop", "subscribe_noop", "unsubscribe_noop", |_, pending, _| {
let sink = pending.accept().unwrap();
.register_subscription("subscribe_noop", "subscribe_noop", "unsubscribe_noop", |_, mut sink, _| {
sink.accept().unwrap();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(1));
let err = ErrorObject::owned(
......@@ -107,37 +110,39 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
.unwrap();
module
.register_subscription("subscribe_5_ints", "n", "unsubscribe_5_ints", |_, pending, _| {
.register_subscription("subscribe_5_ints", "n", "unsubscribe_5_ints", |_, mut sink, _| {
tokio::spawn(async move {
let interval = interval(Duration::from_millis(50));
let stream = IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| c);
match pending.pipe_from_stream(stream).await.on_success(|sink| {
sink.close(SubscriptionClosed::Success);
}) {
PipeFromStreamResult::Success(None) => (),
match sink.pipe_from_stream(stream).await {
SubscriptionClosed::Success => {
sink.close(SubscriptionClosed::Success);
}
_ => unreachable!(),
};
}
});
Ok(())
})
.unwrap();
module
.register_subscription("can_reuse_subscription", "n", "u_can_reuse_subscription", |_, pending, _| {
.register_subscription("can_reuse_subscription", "n", "u_can_reuse_subscription", |_, mut sink, _| {
tokio::spawn(async move {
let stream1 = IntervalStream::new(interval(Duration::from_millis(50)))
.zip(futures::stream::iter(1..=5))
.map(|(_, c)| c);
// TODO(lexnv): Merge streams or `pipe_from_stream` to take `&mut self`.
// let stream2 = IntervalStream::new(interval(Duration::from_millis(50)))
// .zip(futures::stream::iter(6..=10))
// .map(|(_, c)| c);
match pending.pipe_from_stream(stream1).await.on_success(|sink| {
sink.close(SubscriptionClosed::Success);
}) {
PipeFromStreamResult::Success(None) => (),
let stream2 = IntervalStream::new(interval(Duration::from_millis(50)))
.zip(futures::stream::iter(6..=10))
.map(|(_, c)| c);
let result = sink.pipe_from_stream(stream1).await;
assert!(matches!(result, SubscriptionClosed::Success));
match sink.pipe_from_stream(stream2).await {
SubscriptionClosed::Success => {
sink.close(SubscriptionClosed::Success);
}
_ => unreachable!(),
}
});
......@@ -150,16 +155,16 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
"subscribe_with_err_on_stream",
"n",
"unsubscribe_with_err_on_stream",
move |_, pending, _| {
move |_, mut sink, _| {
let err: &'static str = "error on the stream";
// create stream that produce an error which will cancel the subscription.
// Create stream that produce an error which will cancel the subscription.
let stream = futures::stream::iter(vec![Ok(1_u32), Err(err), Ok(2), Ok(3)]);
tokio::spawn(async move {
match pending.pipe_from_try_stream(stream).await.on_failure(|sink, e| {
sink.close(e);
}) {
PipeFromStreamResult::Failure(None) => (),
match sink.pipe_from_try_stream(stream).await {
SubscriptionClosed::Failed(e) => {
sink.close(e);
}
_ => unreachable!(),
}
});
......@@ -201,13 +206,12 @@ pub async fn websocket_server_with_sleeping_subscription(tx: futures::channel::m
let mut module = RpcModule::new(tx);
module
.register_subscription("subscribe_sleep", "n", "unsubscribe_sleep", |_, pending, mut tx| {
.register_subscription("subscribe_sleep", "n", "unsubscribe_sleep", |_, mut sink, mut tx| {
tokio::spawn(async move {
let interval = interval(Duration::from_secs(60 * 60));
let stream = IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| c);
pending.pipe_from_stream(stream).await;
sink.pipe_from_stream(stream).await;
let send_back = std::sync::Arc::make_mut(&mut tx);
send_back.send(()).await.unwrap();
});
......
......@@ -34,7 +34,6 @@ use futures::{channel::mpsc, StreamExt, TryStreamExt};
use helpers::{http_server, http_server_with_access_control, websocket_server, websocket_server_with_subscription};
use jsonrpsee::core::client::{ClientT, IdKind, Subscription, SubscriptionClientT};
use jsonrpsee::core::error::SubscriptionClosed;
use jsonrpsee::core::server::rpc_module::PipeFromStreamResult;
use jsonrpsee::core::{Error, JsonValue};
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::http_server::AccessControlBuilder;
......@@ -425,8 +424,8 @@ async fn ws_server_should_stop_subscription_after_client_drop() {
let mut module = RpcModule::new(tx);
module
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, pending, mut tx| {
let mut sink = pending.accept().unwrap();
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, mut sink, mut tx| {
sink.accept().unwrap();
tokio::spawn(async move {
let close_err = loop {
if !sink.send(&1_usize).expect("usize can be serialized; qed") {
......@@ -547,7 +546,6 @@ async fn ws_server_pipe_from_stream_should_cancel_tasks_immediately() {
assert_eq!(rx_len, 10);
}
// TODO(lexnv): pipe from stream cannot be reused without having `SubscriptionSink::pipe_from_stream` public.
#[tokio::test]
async fn ws_server_pipe_from_stream_can_be_reused() {
init_logger();
......@@ -592,17 +590,17 @@ async fn ws_server_limit_subs_per_conn_works() {
let mut module = RpcModule::new(());
module
.register_subscription("subscribe_forever", "n", "unsubscribe_forever", |_, pending, _| {
.register_subscription("subscribe_forever", "n", "unsubscribe_forever", |_, mut sink, _| {
tokio::spawn(async move {
let interval = interval(Duration::from_millis(50));
let stream = IntervalStream::new(interval).map(move |_| 0_usize);
match pending.pipe_from_stream(stream).await.on_success(|sink| {
sink.close(SubscriptionClosed::Success);
}) {
PipeFromStreamResult::Success(None) => (),
match sink.pipe_from_stream(stream).await {
SubscriptionClosed::Success => {
sink.close(SubscriptionClosed::Success);
}
_ => unreachable!(),
}
};
});
Ok(())
})
......@@ -647,13 +645,15 @@ async fn ws_server_unsub_methods_should_ignore_sub_limit() {
let mut module = RpcModule::new(());
module
.register_subscription("subscribe_forever", "n", "unsubscribe_forever", |_, pending, _| {
.register_subscription("subscribe_forever", "n", "unsubscribe_forever", |_, mut sink, _| {
tokio::spawn(async move {
let interval = interval(Duration::from_millis(50));
let stream = IntervalStream::new(interval).map(move |_| 0_usize);
match pending.pipe_from_stream(stream).await {
PipeFromStreamResult::RemotePeerAborted => (),
match sink.pipe_from_stream(stream).await {
SubscriptionClosed::RemotePeerAborted => {
sink.close(SubscriptionClosed::RemotePeerAborted);
}
_ => unreachable!(),
};
});
......
......@@ -168,14 +168,12 @@ mod rpc_impl {
}
fn sub(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let mut sink = sink.accept()?;
let _ = sink.send(&"Response_A");
let _ = sink.send(&"Response_B");
Ok(())
}
fn sub_with_params(&self, pending: PendingSubscription, val: u32) -> SubscriptionResult {
let mut sink = pending.accept()?;
fn sub_with_params(&self, mut sink: SubscriptionSink, val: u32) -> SubscriptionResult {
let _ = sink.send(&val);
let _ = sink.send(&val);
Ok(())
......@@ -191,8 +189,7 @@ mod rpc_impl {
#[async_trait]
impl OnlyGenericSubscriptionServer<String, String> for RpcServerImpl {
fn sub(&self, pending: PendingSubscription, _: String) -> SubscriptionResult {
let mut sink = pending.accept()?;
fn sub(&self, mut sink: SubscriptionSink, _: String) -> SubscriptionResult {
let _ = sink.send(&"hello");
Ok(())
}
......
......@@ -203,8 +203,8 @@ async fn calling_method_without_server_using_proc_macro() {
async fn subscribing_without_server() {
let mut module = RpcModule::new(());
module
.register_subscription("my_sub", "my_sub", "my_unsub", |_, pending, _| {
let mut sink = pending.accept()?;
.register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| {
sink.accept()?;
let mut stream_data = vec!['0', '1', '2'];
std::thread::spawn(move || {
......@@ -239,8 +239,8 @@ async fn close_test_subscribing_without_server() {
let mut module = RpcModule::new(());
module
.register_subscription("my_sub", "my_sub", "my_unsub", |_, pending, _| {
let mut sink = pending.accept()?;
.register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| {
sink.accept()?;
std::thread::spawn(move || {
// make sure to only send one item
......@@ -287,17 +287,17 @@ async fn close_test_subscribing_without_server() {
async fn subscribing_without_server_bad_params() {
let mut module = RpcModule::new(());
module
.register_subscription("my_sub", "my_sub", "my_unsub", |params, pending, _| {
.register_subscription("my_sub", "my_sub", "my_unsub", |params, mut sink, _| {
let p = match params.one::<String>() {
Ok(p) => p,
Err(e) => {
let err: Error = e.into();
let _ = pending.reject(err);
let _ = sink.reject(err);
return Ok(());
}
};
let mut sink = pending.accept()?;
sink.accept()?;
sink.send(&p).unwrap();
Ok(())
})
......@@ -314,12 +314,12 @@ async fn subscribing_without_server_bad_params() {
async fn subscribe_unsubscribe_without_server() {
let mut module = RpcModule::new(());
module
.register_subscription("my_sub", "my_sub", "my_unsub", |_, pending, _| {
.register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| {
let interval = interval(Duration::from_millis(200));
let stream = IntervalStream::new(interval).map(move |_| 1);
tokio::spawn(async move {
pending.pipe_from_stream(stream).await;
sink.pipe_from_stream(stream).await;
});
Ok(())
})
......
......@@ -120,8 +120,8 @@ async fn server_with_handles() -> (SocketAddr, ServerHandle) {
})
.unwrap();
module
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, pending, _| {
let sink = pending.accept()?;
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, mut sink, _| {
sink.accept()?;
std::thread::spawn(move || loop {
let _ = &sink;
std::thread::sleep(std::time::Duration::from_secs(30));
......@@ -678,8 +678,8 @@ async fn custom_subscription_id_works() {
let addr = server.local_addr().unwrap();
let mut module = RpcModule::new(());
module
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, pending, _| {
let sink = pending.accept()?;
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, mut sink, _| {
sink.accept()?;
std::thread::spawn(move || loop {
let _ = &sink;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment