Unverified Commit f30d9d01 authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

Fix doc tests


Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent 24fcf3ef
...@@ -382,8 +382,8 @@ impl Methods { ...@@ -382,8 +382,8 @@ impl Methods {
/// use futures_util::StreamExt; /// use futures_util::StreamExt;
/// ///
/// let mut module = RpcModule::new(()); /// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| { /// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| {
/// pending.accept().unwrap().send(&"one answer").unwrap(); /// sink.send(&"one answer").unwrap();
/// Ok(()) /// Ok(())
/// }).unwrap(); /// }).unwrap();
/// let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#).await.unwrap(); /// let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#).await.unwrap();
...@@ -444,8 +444,8 @@ impl Methods { ...@@ -444,8 +444,8 @@ impl Methods {
/// use jsonrpsee::{RpcModule, types::EmptyParams}; /// use jsonrpsee::{RpcModule, types::EmptyParams};
/// ///
/// let mut module = RpcModule::new(()); /// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| { /// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| {
/// pending.accept().unwrap().send(&"one answer").unwrap(); /// sink.send(&"one answer").unwrap();
/// Ok(()) /// Ok(())
/// }).unwrap(); /// }).unwrap();
/// ///
...@@ -655,18 +655,16 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> { ...@@ -655,18 +655,16 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// use jsonrpsee_core::Error; /// use jsonrpsee_core::Error;
/// ///
/// let mut ctx = RpcModule::new(99_usize); /// let mut ctx = RpcModule::new(99_usize);
/// ctx.register_subscription("sub", "notif_name", "unsub", |params, pending, ctx| { /// ctx.register_subscription("sub", "notif_name", "unsub", |params, mut sink, ctx| {
/// let x = match params.one::<usize>() { /// let x = match params.one::<usize>() {
/// Ok(x) => x, /// Ok(x) => x,
/// Err(e) => { /// Err(e) => {
/// let err: Error = e.into(); /// let err: Error = e.into();
/// pending.reject(err); /// sink.reject(err);
/// return Ok(()); /// return Ok(());
/// } /// }
/// }; /// };
/// /// // Sink is accepted on the first `send` call.
/// let mut sink = pending.accept()?;
///
/// std::thread::spawn(move || { /// std::thread::spawn(move || {
/// let sum = x + (*ctx); /// let sum = x + (*ctx);
/// let _ = sink.send(&sum); /// let _ = sink.send(&sum);
...@@ -971,23 +969,24 @@ impl SubscriptionSink { ...@@ -971,23 +969,24 @@ impl SubscriptionSink {
/// use anyhow::anyhow; /// use anyhow::anyhow;
/// ///
/// let mut m = RpcModule::new(()); /// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| { /// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]); /// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
/// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber /// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
/// // because after the `Err(_)` the stream is terminated. /// // because after the `Err(_)` the stream is terminated.
/// tokio::spawn(async move { /// tokio::spawn(async move {
/// // jsonrpsee doesn't send an error notification unless `close` is explicitly called. /// // jsonrpsee doesn't send an error notification unless `close` is explicitly called.
/// // If we pipe messages to the sink, we can inspect why it ended: /// // If we pipe messages to the sink, we can inspect why it ended:
/// pending /// match sink.pipe_from_try_stream(stream).await {
/// .pipe_from_try_stream(stream) /// SubscriptionClosed::Success => {
/// .await /// let err_obj: ErrorObjectOwned = SubscriptionClosed::Success.into();
/// .on_success(|sink| { /// sink.close(err_obj);
/// let err_obj: ErrorObjectOwned = SubscriptionClosed::Success.into(); /// }
/// sink.close(err_obj); /// // we don't want to send close reason when the client is unsubscribed or disconnected.
/// }) /// SubscriptionClosed::RemotePeerAborted => (),
/// .on_failure(|sink, err| { /// SubscriptionClosed::Failed(e) => {
/// sink.close(err); /// sink.close(e);
/// }) /// }
/// }
/// }); /// });
/// Ok(()) /// Ok(())
/// }); /// });
...@@ -1073,8 +1072,7 @@ impl SubscriptionSink { ...@@ -1073,8 +1072,7 @@ impl SubscriptionSink {
/// use jsonrpsee_core::server::rpc_module::RpcModule; /// use jsonrpsee_core::server::rpc_module::RpcModule;
/// ///
/// let mut m = RpcModule::new(()); /// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| { /// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let mut sink = pending.accept().unwrap();
/// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]); /// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
/// tokio::spawn(async move { sink.pipe_from_stream(stream).await; }); /// tokio::spawn(async move { sink.pipe_from_stream(stream).await; });
/// Ok(()) /// Ok(())
......
...@@ -29,6 +29,7 @@ use std::time::Duration; ...@@ -29,6 +29,7 @@ use std::time::Duration;
use futures::StreamExt; use futures::StreamExt;
use jsonrpsee::core::client::SubscriptionClientT; use jsonrpsee::core::client::SubscriptionClientT;
use jsonrpsee::core::error::SubscriptionClosed;
use jsonrpsee::rpc_params; use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder}; use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};
...@@ -65,7 +66,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> { ...@@ -65,7 +66,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?; let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(()); let mut module = RpcModule::new(());
module module
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, pending, _| { .register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, mut sink, _| {
let idx = params.one()?; let idx = params.one()?;
let item = LETTERS.chars().nth(idx); let item = LETTERS.chars().nth(idx);
...@@ -73,16 +74,23 @@ async fn run_server() -> anyhow::Result<SocketAddr> { ...@@ -73,16 +74,23 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let stream = IntervalStream::new(interval).map(move |_| item); let stream = IntervalStream::new(interval).map(move |_| item);
tokio::spawn(async move { tokio::spawn(async move {
pending.pipe_from_stream(stream).await.on_failure(|sink, err| { match sink.pipe_from_stream(stream).await {
// Send close notification when subscription stream failed. SubscriptionClosed::Failed(err) => {
sink.close(err); sink.close(err);
}); }
_ => (),
};
//
// sink.pipe_from_stream(stream).await.on_failure(|sink, err| {
// // Send close notification when subscription stream failed.
// sink.close(err);
// });
}); });
Ok(()) Ok(())
}) })
.unwrap(); .unwrap();
module module
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, pending, _| { .register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, mut sink, _| {
let (one, two) = params.parse::<(usize, usize)>()?; let (one, two) = params.parse::<(usize, usize)>()?;
let item = &LETTERS[one..two]; let item = &LETTERS[one..two];
...@@ -91,9 +99,12 @@ async fn run_server() -> anyhow::Result<SocketAddr> { ...@@ -91,9 +99,12 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let stream = IntervalStream::new(interval).map(move |_| item); let stream = IntervalStream::new(interval).map(move |_| item);
tokio::spawn(async move { tokio::spawn(async move {
pending.pipe_from_stream(stream).await.on_failure(|sink, err| { match sink.pipe_from_stream(stream).await {
sink.close(err); SubscriptionClosed::Failed(err) => {
}) sink.close(err);
}
_ => (),
};
}); });
Ok(()) Ok(())
......
...@@ -215,7 +215,7 @@ pub(crate) mod visitor; ...@@ -215,7 +215,7 @@ pub(crate) mod visitor;
/// ///
/// // RPC is put into a separate module to clearly show names of generated entities. /// // RPC is put into a separate module to clearly show names of generated entities.
/// mod rpc_impl { /// mod rpc_impl {
/// use jsonrpsee::{proc_macros::rpc, core::async_trait, core::RpcResult, ws_server::PendingSubscription}; /// use jsonrpsee::{proc_macros::rpc, core::async_trait, core::RpcResult, ws_server::SubscriptionSink};
/// use jsonrpsee::types::SubscriptionResult; /// use jsonrpsee::types::SubscriptionResult;
/// ///
/// // Generate both server and client implementations, prepend all the methods with `foo_` prefix. /// // Generate both server and client implementations, prepend all the methods with `foo_` prefix.
...@@ -288,10 +288,10 @@ pub(crate) mod visitor; ...@@ -288,10 +288,10 @@ pub(crate) mod visitor;
/// ///
/// // The stream API can be used to pipe items from the underlying stream /// // The stream API can be used to pipe items from the underlying stream
/// // as subscription responses. /// // as subscription responses.
/// fn sub_override_notif_method(&self, pending: PendingSubscription) -> SubscriptionResult { /// fn sub_override_notif_method(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
/// tokio::spawn(async move { /// tokio::spawn(async move {
/// let stream = futures_util::stream::iter(["one", "two", "three"]); /// let stream = futures_util::stream::iter(["one", "two", "three"]);
/// pending.pipe_from_stream(stream).await; /// sink.pipe_from_stream(stream).await;
/// }); /// });
/// Ok(()) /// Ok(())
/// } /// }
...@@ -299,8 +299,7 @@ pub(crate) mod visitor; ...@@ -299,8 +299,7 @@ pub(crate) mod visitor;
/// // We could've spawned a `tokio` future that yields values while our program works, /// // We could've spawned a `tokio` future that yields values while our program works,
/// // but for simplicity of the example we will only send two values and then close /// // but for simplicity of the example we will only send two values and then close
/// // the subscription. /// // the subscription.
/// fn sub(&self, pending: PendingSubscription) -> SubscriptionResult { /// fn sub(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
/// let mut sink = pending.accept().unwrap();
/// let _ = sink.send(&"Response_A"); /// let _ = sink.send(&"Response_A");
/// let _ = sink.send(&"Response_B"); /// let _ = sink.send(&"Response_B");
/// Ok(()) /// Ok(())
......
...@@ -71,7 +71,7 @@ impl RpcDescription { ...@@ -71,7 +71,7 @@ impl RpcDescription {
let subscriptions = self.subscriptions.iter().map(|sub| { let subscriptions = self.subscriptions.iter().map(|sub| {
let docs = &sub.docs; let docs = &sub.docs;
let subscription_sink_ty = self.jrps_server_item(quote! { PendingSubscription }); let subscription_sink_ty = self.jrps_server_item(quote! { SubscriptionSink });
// Add `SubscriptionSink` as the second input parameter to the signature. // Add `SubscriptionSink` as the second input parameter to the signature.
let subscription_sink: syn::FnArg = syn::parse_quote!(subscription_sink: #subscription_sink_ty); let subscription_sink: syn::FnArg = syn::parse_quote!(subscription_sink: #subscription_sink_ty);
let mut sub_sig = sub.signature.clone(); let mut sub_sig = sub.signature.clone();
......
...@@ -44,7 +44,7 @@ mod rpc_impl { ...@@ -44,7 +44,7 @@ mod rpc_impl {
use jsonrpsee::core::{async_trait, RpcResult}; use jsonrpsee::core::{async_trait, RpcResult};
use jsonrpsee::proc_macros::rpc; use jsonrpsee::proc_macros::rpc;
use jsonrpsee::types::SubscriptionResult; use jsonrpsee::types::SubscriptionResult;
use jsonrpsee::PendingSubscription; use jsonrpsee::SubscriptionSink;
#[rpc(client, server, namespace = "foo")] #[rpc(client, server, namespace = "foo")]
pub trait Rpc { pub trait Rpc {
...@@ -167,8 +167,8 @@ mod rpc_impl { ...@@ -167,8 +167,8 @@ mod rpc_impl {
Ok(10u16) Ok(10u16)
} }
fn sub(&self, pending: PendingSubscription) -> SubscriptionResult { fn sub(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let mut sink = pending.accept()?; let mut sink = sink.accept()?;
let _ = sink.send(&"Response_A"); let _ = sink.send(&"Response_A");
let _ = sink.send(&"Response_B"); let _ = sink.send(&"Response_B");
Ok(()) Ok(())
......
...@@ -36,7 +36,7 @@ use jsonrpsee::types::error::CallError; ...@@ -36,7 +36,7 @@ use jsonrpsee::types::error::CallError;
use jsonrpsee::types::SubscriptionResult; use jsonrpsee::types::SubscriptionResult;
use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{WsServerBuilder, WsServerHandle}; use jsonrpsee::ws_server::{WsServerBuilder, WsServerHandle};
use jsonrpsee::{PendingSubscription, RpcModule}; use jsonrpsee::{RpcModule, SubscriptionSink};
use tokio::time::sleep; use tokio::time::sleep;
fn module_manual() -> Result<RpcModule<()>, Error> { fn module_manual() -> Result<RpcModule<()>, Error> {
...@@ -65,8 +65,8 @@ fn module_manual() -> Result<RpcModule<()>, Error> { ...@@ -65,8 +65,8 @@ fn module_manual() -> Result<RpcModule<()>, Error> {
// Drop the `SubscriptionSink` to cause the internal `ResourceGuard` allocated per subscription call // Drop the `SubscriptionSink` to cause the internal `ResourceGuard` allocated per subscription call
// to get dropped. This is the equivalent of not having any resource limits (ie, sink is never used). // to get dropped. This is the equivalent of not having any resource limits (ie, sink is never used).
module module
.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, pending, _| { .register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, mut sink, _| {
let mut _sink = pending.accept()?; sink.accept()?;
Ok(()) Ok(())
})? })?
.resource("SUB", 3)?; .resource("SUB", 3)?;
...@@ -74,11 +74,10 @@ fn module_manual() -> Result<RpcModule<()>, Error> { ...@@ -74,11 +74,10 @@ fn module_manual() -> Result<RpcModule<()>, Error> {
// Keep the `SubscriptionSink` alive for a bit to validate that `ResourceGuard` is alive // Keep the `SubscriptionSink` alive for a bit to validate that `ResourceGuard` is alive
// and the subscription method gets limited. // and the subscription method gets limited.
module module
.register_subscription("subscribe_hello_limit", "s_hello", "unsubscribe_hello_limit", move |_, pending, _| { .register_subscription("subscribe_hello_limit", "s_hello", "unsubscribe_hello_limit", move |_, mut sink, _| {
let mut sink = pending.accept()?;
tokio::spawn(async move { tokio::spawn(async move {
for val in 0..10 { for val in 0..10 {
// Sink is accepted on the first `send` call.
sink.send(&val).unwrap(); sink.send(&val).unwrap();
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
} }
...@@ -120,14 +119,12 @@ fn module_macro() -> RpcModule<()> { ...@@ -120,14 +119,12 @@ fn module_macro() -> RpcModule<()> {
} }
impl RpcServer for () { impl RpcServer for () {
fn sub_hello(&self, pending: PendingSubscription) -> SubscriptionResult { fn sub_hello(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let mut _sink = pending.accept()?; sink.accept()?;
Ok(()) Ok(())
} }
fn sub_hello_limit(&self, pending: PendingSubscription) -> SubscriptionResult { fn sub_hello_limit(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let mut sink = pending.accept()?;
tokio::spawn(async move { tokio::spawn(async move {
for val in 0..10 { for val in 0..10 {
sink.send(&val).unwrap(); sink.send(&val).unwrap();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment