Unverified Commit c2642e45 authored by Niklas Adolfsson's avatar Niklas Adolfsson
Browse files

revert async accept API

parent 136f97e1
......@@ -143,10 +143,9 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws
module
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, pending, _ctx| {
let x = "Hello";
tokio::spawn(async move {
let mut sink = pending.accept().await.unwrap();
let _ = sink.send(&x);
});
let sink = pending.accept().unwrap();
let _ = sink.send(&x);
})
.unwrap();
......
......@@ -392,9 +392,7 @@ impl Methods {
///
/// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| {
/// tokio::spawn(async move {
/// pending.accept().await.unwrap().send(&"one answer").unwrap();
/// });
/// pending.accept().unwrap().send(&"one answer").unwrap();
/// }).unwrap();
/// let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#).await.unwrap();
/// let resp = serde_json::from_str::<Response<u64>>(&resp.result).unwrap();
......@@ -464,9 +462,7 @@ impl Methods {
///
/// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| {
/// tokio::spawn(async move {
/// pending.accept().await.unwrap().send(&"one answer").unwrap();
/// });
/// pending.accept().unwrap().send(&"one answer").unwrap();
/// }).unwrap();
///
/// let mut sub = module.subscribe("hi", EmptyParams::new()).await.unwrap();
......@@ -691,13 +687,11 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// }
/// };
///
/// tokio::spawn(async move {
/// // Only fails in the connection is closed.
/// let mut sink = pending.accept().await.unwrap();
/// // Only fails in the connection is closed.
/// let sink = pending.accept().unwrap();
///
/// let sum = x + (*ctx);
/// let _ = sink.send(&sum);
/// });
/// let sum = x + (*ctx);
/// let _ = sink.send(&sum);
/// });
/// ```
pub fn register_subscription<F>(
......@@ -864,7 +858,7 @@ impl PendingSubscription {
/// Attempt to accept the subscription and respond the subscription method call.
///
/// Fails if the connection was closed
pub async fn accept(mut self) -> Option<SubscriptionSink> {
pub fn accept(mut self) -> Option<SubscriptionSink> {
let inner = self.0.take()?;
let InnerPendingSubscription { sink, close_notify, method, uniq_sub, subscribers, id, subscribe_call, claimed } =
......@@ -936,7 +930,7 @@ impl SubscriptionSink {
/// Returns `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated)
/// Return `Err(err)` if the message could not be serialized.
///
pub fn send<T: Serialize>(&mut self, result: &T) -> Result<bool, serde_json::Error> {
pub fn send<T: Serialize>(&self, result: &T) -> Result<bool, serde_json::Error> {
// only possible to trigger when the connection is dropped.
if self.is_closed() {
return Ok(false);
......@@ -965,13 +959,12 @@ impl SubscriptionSink {
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
///
///
/// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
/// // because after the `Err(_)` the stream is terminated.
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
/// let sink = pending.accept().unwrap();
///
/// tokio::spawn(async move {
/// let mut sink = pending.accept().await.unwrap();
///
/// // jsonrpsee doesn't send an error notification unless `close` is explicitly called.
/// // If we pipe messages to the sink, we can inspect why it ended:
......@@ -989,7 +982,7 @@ impl SubscriptionSink {
/// });
/// });
/// ```
pub async fn pipe_from_try_stream<S, T, E>(&mut self, mut stream: S) -> SubscriptionClosed
pub async fn pipe_from_try_stream<S, T, E>(&self, mut stream: S) -> SubscriptionClosed
where
S: TryStream<Ok = T, Error = E> + Unpin,
T: Serialize,
......@@ -1056,14 +1049,15 @@ impl SubscriptionSink {
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
/// let sink = pending.accept().unwrap();
/// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
///
/// tokio::spawn(async move {
/// let mut sink = pending.accept().await.unwrap();
/// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
/// sink.pipe_from_stream(stream).await;
/// });
/// });
/// ```
pub async fn pipe_from_stream<S, T>(&mut self, stream: S) -> SubscriptionClosed
pub async fn pipe_from_stream<S, T>(&self, stream: S) -> SubscriptionClosed
where
S: Stream<Item = T> + Unpin,
T: Serialize,
......
......@@ -61,11 +61,9 @@ impl RpcServer<ExampleHash, ExampleStorageKey> for RpcServerImpl {
}
fn subscribe_storage(&self, pending: PendingSubscription, _keys: Option<Vec<ExampleStorageKey>>) {
tokio::spawn(async move {
if let Some(mut sink) = pending.accept().await {
let _ = sink.send(&vec![[0; 32]]);
}
});
if let Some(sink) = pending.accept() {
let _ = sink.send(&vec![[0; 32]]);
}
}
}
......
......@@ -75,7 +75,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let rx = BroadcastStream::new(tx.clone().subscribe());
tokio::spawn(async move {
let mut sink = match pending.accept().await {
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
......
......@@ -77,7 +77,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let stream = IntervalStream::new(interval).map(move |_| item);
tokio::spawn(async move {
let mut sink = match pending.accept().await {
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
......@@ -108,7 +108,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let stream = IntervalStream::new(interval).map(move |_| item);
tokio::spawn(async move {
let mut sink = match pending.accept().await {
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
......
......@@ -287,9 +287,10 @@ pub(crate) mod visitor;
/// // The stream API can be used to pipe items from the underlying stream
/// // as subscription responses.
/// fn sub_override_notif_method(&self, pending: PendingSubscription) {
/// let sink = pending.accept().unwrap();
/// let stream = futures_util::stream::iter(["one", "two", "three"]);
///
/// tokio::spawn(async move {
/// let mut sink = pending.accept().await.unwrap();
/// let stream = futures_util::stream::iter(["one", "two", "three"]);
/// sink.pipe_from_stream(stream).await;
/// });
/// }
......@@ -298,11 +299,9 @@ pub(crate) mod visitor;
/// // but for simplicity of the example we will only send two values and then close
/// // the subscription.
/// fn sub(&self, pending: PendingSubscription) {
/// tokio::spawn(async move {
/// let mut sink = pending.accept().await.unwrap();
/// let _ = sink.send(&"Response_A");
/// let _ = sink.send(&"Response_B");
/// });
/// let sink = pending.accept().unwrap();
/// let _ = sink.send(&"Response_A");
/// let _ = sink.send(&"Response_B");
/// }
/// }
/// }
......
......@@ -64,33 +64,27 @@ impl RpcServer for RpcServerImpl {
}
fn sub(&self, pending: PendingSubscription) {
tokio::spawn(async move {
let mut sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
let _ = sink.send(&"Response_A");
let _ = sink.send(&"Response_B");
});
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let _ = sink.send(&"Response_A");
let _ = sink.send(&"Response_B");
}
fn sub_with_params(&self, pending: PendingSubscription, val: u32) {
tokio::spawn(async move {
let mut sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
let _ = sink.send(&val);
let _ = sink.send(&val);
});
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let _ = sink.send(&val);
let _ = sink.send(&val);
}
fn sub_with_override_notif_method(&self, pending: PendingSubscription) {
tokio::spawn(async move {
if let Some(mut sink) = pending.accept().await {
let _ = sink.send(&1);
}
});
if let Some(sink) = pending.accept() {
let _ = sink.send(&1);
}
}
}
......
......@@ -29,15 +29,13 @@ impl RpcServer for RpcServerImpl {
}
fn sub(&self, pending: PendingSubscription) {
tokio::spawn(async move {
let mut sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
let _ = sink.send(&"Response_A");
let _ = sink.send(&"Response_B");
});
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let _ = sink.send(&"Response_A");
let _ = sink.send(&"Response_B");
}
}
......
......@@ -45,56 +45,65 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
module
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, pending, _| {
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let interval = interval(Duration::from_millis(50));
let stream = IntervalStream::new(interval).map(move |_| &"hello from subscription");
tokio::spawn(async move {
let mut sink = pending.accept().await.unwrap();
loop {
if let Ok(false) = sink.send(&"hello from subscription") {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
sink.pipe_from_stream(stream).await;
});
})
.unwrap();
module
.register_subscription("subscribe_foo", "subscribe_foo", "unsubscribe_foo", |_, pending, _| {
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let interval = interval(Duration::from_millis(100));
let stream = IntervalStream::new(interval).map(move |_| 1337_usize);
tokio::spawn(async move {
let mut sink = pending.accept().await.unwrap();
loop {
if let Ok(false) = sink.send(&1337_usize) {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
sink.pipe_from_stream(stream).await;
});
})
.unwrap();
module
.register_subscription("subscribe_add_one", "subscribe_add_one", "unsubscribe_add_one", |params, pending, _| {
let mut count = match params.one::<usize>() {
Ok(count) => count,
let count = match params.one::<usize>() {
Ok(count) => count.wrapping_add(1),
_ => return,
};
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let wrapping_counter = futures::stream::iter((count..).cycle());
let interval = interval(Duration::from_millis(100));
let stream = IntervalStream::new(interval).zip(wrapping_counter).map(move |(_, c)| c);
tokio::spawn(async move {
let mut sink = pending.accept().await.unwrap();
loop {
count = count.wrapping_add(1);
if let Err(_) | Ok(false) = sink.send(&count) {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
sink.pipe_from_stream(stream).await;
});
})
.unwrap();
module
.register_subscription("subscribe_noop", "subscribe_noop", "unsubscribe_noop", |_, pending, _| {
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
let sink = pending.accept().await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let err = ErrorObject::owned(
SUBSCRIPTION_CLOSED_WITH_ERROR,
......@@ -108,8 +117,12 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
module
.register_subscription("subscribe_5_ints", "n", "unsubscribe_5_ints", |_, pending, _| {
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
let mut sink = pending.accept().await.unwrap();
let interval = interval(Duration::from_millis(50));
let stream = IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| c);
......@@ -125,9 +138,12 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
module
.register_subscription("can_reuse_subscription", "n", "u_can_reuse_subscription", |_, pending, _| {
tokio::spawn(async move {
let mut sink = pending.accept().await.unwrap();
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
let stream1 = IntervalStream::new(interval(Duration::from_millis(50)))
.zip(futures::stream::iter(1..=5))
.map(|(_, c)| c);
......@@ -156,11 +172,14 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
move |_, pending, _| {
let err: &'static str = "error on the stream";
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
// create stream that produce an error which will cancel the subscription.
let stream = futures::stream::iter(vec![Ok(1_u32), Err(err), Ok(2), Ok(3)]);
tokio::spawn(async move {
let mut sink = pending.accept().await.unwrap();
match sink.pipe_from_try_stream(stream).await {
SubscriptionClosed::Failed(e) => {
sink.close(e);
......@@ -206,8 +225,12 @@ pub async fn websocket_server_with_sleeping_subscription(tx: futures::channel::m
module
.register_subscription("subscribe_sleep", "n", "unsubscribe_sleep", |_, pending, mut tx| {
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
let mut sink = pending.accept().await.unwrap();
let interval = interval(Duration::from_secs(60 * 60));
let stream = IntervalStream::new(interval).zip(futures::stream::iter(1..=5)).map(|(_, c)| c);
......
......@@ -425,9 +425,12 @@ async fn ws_server_should_stop_subscription_after_client_drop() {
module
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, pending, mut tx| {
tokio::spawn(async move {
let mut sink = pending.accept().await.unwrap();
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
let close_err = loop {
if !sink.send(&1_usize).expect("usize can be serialized; qed") {
break ErrorObject::borrowed(0, &"Subscription terminated successfully", None);
......@@ -591,12 +594,12 @@ async fn ws_server_limit_subs_per_conn_works() {
module
.register_subscription("subscribe_forever", "n", "unsubscribe_forever", |_, pending, _| {
tokio::spawn(async move {
let mut sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
let interval = interval(Duration::from_millis(50));
let stream = IntervalStream::new(interval).map(move |_| 0_usize);
......@@ -650,12 +653,12 @@ async fn ws_server_unsub_methods_should_ignore_sub_limit() {
module
.register_subscription("subscribe_forever", "n", "unsubscribe_forever", |_, pending, _| {
tokio::spawn(async move {
let mut sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
let interval = interval(Duration::from_millis(50));
let stream = IntervalStream::new(interval).map(move |_| 0_usize);
......
......@@ -167,25 +167,21 @@ mod rpc_impl {
}
fn sub(&self, pending: PendingSubscription) {
tokio::spawn(async move {
let mut sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
let _ = sink.send(&"Response_A");
let _ = sink.send(&"Response_B");
});
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let _ = sink.send(&"Response_A");
let _ = sink.send(&"Response_B");
}
fn sub_with_params(&self, pending: PendingSubscription, val: u32) {
tokio::spawn(async move {
let mut sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
let _ = sink.send(&val);
let _ = sink.send(&val);
});
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let _ = sink.send(&val);
let _ = sink.send(&val);
}
}
......@@ -199,13 +195,11 @@ mod rpc_impl {
#[async_trait]
impl OnlyGenericSubscriptionServer<String, String> for RpcServerImpl {
fn sub(&self, pending: PendingSubscription, _: String) {
tokio::spawn(async move {
let mut sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
let _ = sink.send(&"hello");
});
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let _ = sink.send(&"hello");
}
}
}
......
......@@ -67,12 +67,7 @@ fn module_manual() -> Result<RpcModule<()>, Error> {
// to get dropped. This is the equivalent of not having any resource limits (ie, sink is never used).
module
.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, pending, _| {
tokio::spawn(async move {
let mut _sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
});
let _sink = pending.accept();
})?
.resource("SUB", 3)?;
......@@ -80,12 +75,12 @@ fn module_manual() -> Result<RpcModule<()>, Error> {
// and the subscription method gets limited.
module
.register_subscription("subscribe_hello_limit", "s_hello", "unsubscribe_hello_limit", move |_, pending, _| {
tokio::spawn(async move {
let mut sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
for val in 0..10 {
sink.send(&val).unwrap();
sleep(Duration::from_secs(1)).await;
......@@ -127,21 +122,18 @@ fn module_macro() -> RpcModule<()> {
impl RpcServer for () {
fn sub_hello(&self, pending: PendingSubscription) {
tokio::spawn(async move {
let mut _sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
});
let _sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
}
fn sub_hello_limit(&self, pending: PendingSubscription) {
tokio::spawn(async move {
let mut sink = match pending.accept().await {
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let interval = interval(Duration::from_secs(1));
let stream = IntervalStream::new(interval).map(move |_| 1);
......
......@@ -213,9 +213,13 @@ async fn subscribing_without_server() {
module
.register_subscription("my_sub", "my_sub", "my_unsub", |_, pending, _| {
let mut stream_data = vec!['0', '1', '2'];
tokio::spawn(async move {
let mut sink = pending.accept().await.unwrap();
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
while let Some(letter) = stream_data.pop() {
tracing::debug!("This is your friendly subscription sending data.");
let _ = sink.send(&letter);
......@@ -244,12 +248,12 @@ async fn close_test_subscribing_without_server() {
let mut module = RpcModule::new(());
module
.register_subscription("my_sub", "my_sub", "my_unsub", |_, pending, _| {
tokio::spawn(async move {
let mut sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};