Unverified Commit 8cf401c3 authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

Adjust tests to `SubscriptionSink::pipe_from_stream` private interface


Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent 0b1c9272
Pipeline #200249 failed with stages
in 5 minutes and 33 seconds
......@@ -289,10 +289,9 @@ pub(crate) mod visitor;
/// // The stream API can be used to pipe items from the underlying stream
/// // as subscription responses.
/// fn sub_override_notif_method(&self, pending: PendingSubscription) -> SubscriptionResult {
/// let mut sink = pending.accept().unwrap();
/// tokio::spawn(async move {
/// let stream = futures_util::stream::iter(["one", "two", "three"]);
/// sink.pipe_from_stream(stream).await;
/// pending.pipe_from_stream(stream).await;
/// });
/// Ok(())
/// }
......
......@@ -30,6 +30,7 @@ use std::time::Duration;
use futures::{SinkExt, StreamExt};
use jsonrpsee::core::error::SubscriptionClosed;
use jsonrpsee::core::server::access_control::{AccessControl, AccessControlBuilder};
use jsonrpsee::core::server::rpc_module::PipeFromStreamResult;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
use jsonrpsee::types::error::{ErrorObject, SUBSCRIPTION_CLOSED_WITH_ERROR};
use jsonrpsee::ws_server::{WsServerBuilder, WsServerHandle};
......@@ -107,18 +108,16 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
module
.register_subscription("subscribe_5_ints", "n", "unsubscribe_5_ints", |_, pending, _| {
let mut sink = pending.accept().unwrap();
tokio::spawn(async move {
let interval = interval(Duration::from_millis(50));
let stream = IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| c);
match sink.pipe_from_stream(stream).await {
SubscriptionClosed::Success => {
sink.close(SubscriptionClosed::Success);
}
match pending.pipe_from_stream(stream).await.on_success(|sink| {
sink.close(SubscriptionClosed::Success);
}) {
PipeFromStreamResult::Success(None) => (),
_ => unreachable!(),
}
};
});
Ok(())
})
......@@ -126,23 +125,19 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
module
.register_subscription("can_reuse_subscription", "n", "u_can_reuse_subscription", |_, pending, _| {
let mut sink = pending.accept().unwrap();
tokio::spawn(async move {
let stream1 = IntervalStream::new(interval(Duration::from_millis(50)))
.zip(futures::stream::iter(1..=5))
.map(|(_, c)| c);
let stream2 = IntervalStream::new(interval(Duration::from_millis(50)))
.zip(futures::stream::iter(6..=10))
.map(|(_, c)| c);
let result = sink.pipe_from_stream(stream1).await;
assert!(matches!(result, SubscriptionClosed::Success));
match sink.pipe_from_stream(stream2).await {
SubscriptionClosed::Success => {
sink.close(SubscriptionClosed::Success);
}
// TODO(lexnv): Merge streams or `pipe_from_stream` to take `&mut self`.
// let stream2 = IntervalStream::new(interval(Duration::from_millis(50)))
// .zip(futures::stream::iter(6..=10))
// .map(|(_, c)| c);
match pending.pipe_from_stream(stream1).await.on_success(|sink| {
sink.close(SubscriptionClosed::Success);
}) {
PipeFromStreamResult::Success(None) => (),
_ => unreachable!(),
}
});
......@@ -156,17 +151,15 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
"n",
"unsubscribe_with_err_on_stream",
move |_, pending, _| {
let mut sink = pending.accept().unwrap();
let err: &'static str = "error on the stream";
// create stream that produce an error which will cancel the subscription.
let stream = futures::stream::iter(vec![Ok(1_u32), Err(err), Ok(2), Ok(3)]);
tokio::spawn(async move {
match sink.pipe_from_try_stream(stream).await {
SubscriptionClosed::Failed(e) => {
sink.close(e);
}
match pending.pipe_from_try_stream(stream).await.on_failure(|sink, e| {
sink.close(e);
}) {
PipeFromStreamResult::Failure(None) => (),
_ => unreachable!(),
}
});
......@@ -209,13 +202,12 @@ pub async fn websocket_server_with_sleeping_subscription(tx: futures::channel::m
module
.register_subscription("subscribe_sleep", "n", "unsubscribe_sleep", |_, pending, mut tx| {
let mut sink = pending.accept().unwrap();
tokio::spawn(async move {
let interval = interval(Duration::from_secs(60 * 60));
let stream = IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| c);
sink.pipe_from_stream(stream).await;
pending.pipe_from_stream(stream).await;
let send_back = std::sync::Arc::make_mut(&mut tx);
send_back.send(()).await.unwrap();
});
......
......@@ -34,6 +34,7 @@ use futures::{channel::mpsc, StreamExt, TryStreamExt};
use helpers::{http_server, http_server_with_access_control, websocket_server, websocket_server_with_subscription};
use jsonrpsee::core::client::{ClientT, IdKind, Subscription, SubscriptionClientT};
use jsonrpsee::core::error::SubscriptionClosed;
use jsonrpsee::core::server::rpc_module::PipeFromStreamResult;
use jsonrpsee::core::{Error, JsonValue};
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::http_server::AccessControlBuilder;
......@@ -546,6 +547,7 @@ async fn ws_server_pipe_from_stream_should_cancel_tasks_immediately() {
assert_eq!(rx_len, 10);
}
// TODO(lexnv): pipe from stream cannot be reused without having `SubscriptionSink::pipe_from_stream` public.
#[tokio::test]
async fn ws_server_pipe_from_stream_can_be_reused() {
init_logger();
......@@ -591,18 +593,16 @@ async fn ws_server_limit_subs_per_conn_works() {
module
.register_subscription("subscribe_forever", "n", "unsubscribe_forever", |_, pending, _| {
let mut sink = pending.accept()?;
tokio::spawn(async move {
let interval = interval(Duration::from_millis(50));
let stream = IntervalStream::new(interval).map(move |_| 0_usize);
match sink.pipe_from_stream(stream).await {
SubscriptionClosed::Success => {
sink.close(SubscriptionClosed::Success);
}
match pending.pipe_from_stream(stream).await.on_success(|sink| {
sink.close(SubscriptionClosed::Success);
}) {
PipeFromStreamResult::Success(None) => (),
_ => unreachable!(),
};
}
});
Ok(())
})
......@@ -648,16 +648,12 @@ async fn ws_server_unsub_methods_should_ignore_sub_limit() {
module
.register_subscription("subscribe_forever", "n", "unsubscribe_forever", |_, pending, _| {
let mut sink = pending.accept()?;
tokio::spawn(async move {
let interval = interval(Duration::from_millis(50));
let stream = IntervalStream::new(interval).map(move |_| 0_usize);
match sink.pipe_from_stream(stream).await {
SubscriptionClosed::RemotePeerAborted => {
sink.close(SubscriptionClosed::RemotePeerAborted);
}
match pending.pipe_from_stream(stream).await {
PipeFromStreamResult::RemotePeerAborted => (),
_ => unreachable!(),
};
});
......
......@@ -315,13 +315,11 @@ async fn subscribe_unsubscribe_without_server() {
let mut module = RpcModule::new(());
module
.register_subscription("my_sub", "my_sub", "my_unsub", |_, pending, _| {
let mut sink = pending.accept()?;
let interval = interval(Duration::from_millis(200));
let stream = IntervalStream::new(interval).map(move |_| 1);
tokio::spawn(async move {
sink.pipe_from_stream(stream).await;
pending.pipe_from_stream(stream).await;
});
Ok(())
})
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment