Unverified Commit 98c23fc1 authored by Alexandru Vasile's avatar Alexandru Vasile Committed by GitHub
Browse files

Return error from subscription callbacks (#799)



* subscription: Allow errors in subscription callbacks
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* subscription: Remove the need to own the error
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* error: Build `ErrorObject` from `CallError` for improved ergonomics
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Update examples for the new subscription API
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Add alias for subscription result
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* macros: Render server subscription method with `ResultSubscription`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Port `proc_macro` example to new API
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Rename `ResultSubscription` to `ReturnTypeSubscription` to avoid confusion
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Port all tests to new subscription API
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Update documentation
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Port benches
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Replace tabs with spaces & add documentation
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Add dummy error for subscriptions
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Implement `From` for `SubscriptionError`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Return `SubscriptionError` when parsing params
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Rename `SubscriptionError` to `SubscriptionEmptyError`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Change `accept` signature
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Port tests to new `accept` api
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Implement `pipe_from_try_stream` and `pipe_from_stream` for `PendingSubscription`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Modify examples to ilustrate new API
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Fix docs tests
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Rename previously `SubscriptionResult` -> `InnerSubscriptionResult`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Rename `ReturnTypeSubscription` -> `SubscriptionResult`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Remove documentation line
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Implement `PipeFromStreamResult`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Add comment for empty error
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Update proc-macros/src/lib.rs
Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>

* Update proc-macros/src/lib.rs
Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>

* Update proc-macros/src/lib.rs
Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>

* Change `ReturnTypeSubscription` -> `SubscriptionResult`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Add `ResultConsumed` for `PipeFromStreamResult`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Update examples to use `PipeFromStreamResult`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Replace ConsumedResult with Options
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Log warning when subscription callback fails
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Change ubuntu test names
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* server: Make `pipe` methods of `SubscriptionSink` private
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* server: Remove `pipe_from_stream` method of `SubscriptionSink`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* server: Update PipeFromStreamResult documentation
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Adjust tests to `SubscriptionSink::pipe_from_stream` private interface
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Add `accept-reject` API on `SubscriptionSink`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Make `pipe_from_try_stream` public
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Maybe accept the subscription
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Revert "server: Remove `pipe_from_stream` method of `SubscriptionSink`"

This reverts commit d3c3ce9c

.

* Make `unsubscribe` channel optional on accepting the connection
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Pass `SubscriptionSink` to subscription callbacks
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Implement subscription sink state
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Submit `InvalidParams` if sink was never accepted
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Handle rejected sinks
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Remove `PendingSubscription`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Fix doc tests
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* macro: Make subscription sink mutable
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Fix tests and examples
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* macro: Return `sink.reject()` result
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* tests: Add test for `SubscriptionSinkState`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Test internal subscription sink state
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Fix `send_error` to not always return `false`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Fix benches
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Remove `PipeFromStreamResult`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Use valid Json-RPC return code for test errors
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Remove `SubscriptionSinkState`"

* Remodel state machine using `Option`s for `SubscriptionSink`s
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* tests: Double accept / reject API for `SubscriptionSink`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Implement `SubscriptionAcceptRejectError` for error propagation
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Remove `maybe_accept` wrapper
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Update comments and documentation
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Update core/src/server/rpc_module.rs
Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>

* Update core/src/server/rpc_module.rs
Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>

* rpc_server: Add type alias for unsubscription calls
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* rpc_server: Improve comment regarding dropped error
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* style: Single line return errors
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Make comment more verbose
Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>
Co-authored-by: James Wilson's avatarJames Wilson <james@jsdw.me>
parent a35f8c30
Pipeline #200820 passed with stages
in 5 minutes and 42 seconds
...@@ -92,7 +92,7 @@ jobs: ...@@ -92,7 +92,7 @@ jobs:
run: cargo hack check --workspace --each-feature --all-targets run: cargo hack check --workspace --each-feature --all-targets
tests_ubuntu: tests_ubuntu:
name: Run nextests on Ubuntu name: Run tests Ubuntu
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Checkout sources - name: Checkout sources
......
...@@ -141,13 +141,10 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws ...@@ -141,13 +141,10 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws
let mut module = gen_rpc_module(); let mut module = gen_rpc_module();
module module
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, pending, _ctx| { .register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let x = "Hello"; let x = "Hello";
tokio::spawn(async move { sink.send(&x) }); tokio::spawn(async move { sink.send(&x) });
Ok(())
}) })
.unwrap(); .unwrap();
......
...@@ -156,9 +156,10 @@ impl MethodSink { ...@@ -156,9 +156,10 @@ impl MethodSink {
if let Err(err) = self.send_raw(json) { if let Err(err) = self.send_raw(json) {
tracing::warn!("Error sending response {:?}", err); tracing::warn!("Error sending response {:?}", err);
false
} else {
true
} }
false
} }
/// Helper for sending the general purpose `Error` as a JSON-RPC errors to the client /// Helper for sending the general purpose `Error` as a JSON-RPC errors to the client
......
...@@ -39,11 +39,14 @@ use futures_channel::mpsc; ...@@ -39,11 +39,14 @@ use futures_channel::mpsc;
use futures_util::future::Either; use futures_util::future::Either;
use futures_util::pin_mut; use futures_util::pin_mut;
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStream, TryStreamExt}; use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStream, TryStreamExt};
use jsonrpsee_types::error::{CallError, ErrorCode, ErrorObject, ErrorObjectOwned, SUBSCRIPTION_CLOSED_WITH_ERROR}; use jsonrpsee_types::error::{
CallError, ErrorCode, ErrorObject, ErrorObjectOwned, INTERNAL_ERROR_CODE,
SUBSCRIPTION_CLOSED_WITH_ERROR, SubscriptionAcceptRejectError
};
use jsonrpsee_types::response::{SubscriptionError, SubscriptionPayloadError}; use jsonrpsee_types::response::{SubscriptionError, SubscriptionPayloadError};
use jsonrpsee_types::{ use jsonrpsee_types::{
ErrorResponse, Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload, ErrorResponse, Id, Params, Request, Response, SubscriptionResult,
SubscriptionResponse, SubscriptionId as RpcSubscriptionId, SubscriptionPayload, SubscriptionResponse
}; };
use parking_lot::Mutex; use parking_lot::Mutex;
use rustc_hash::FxHashMap; use rustc_hash::FxHashMap;
...@@ -87,7 +90,7 @@ pub struct ConnState<'a> { ...@@ -87,7 +90,7 @@ pub struct ConnState<'a> {
/// Outcome of a successful terminated subscription. /// Outcome of a successful terminated subscription.
#[derive(Debug)] #[derive(Debug)]
pub enum SubscriptionResult { pub enum InnerSubscriptionResult {
/// The subscription stream was executed successfully. /// The subscription stream was executed successfully.
Success, Success,
/// The subscription was aborted by the remote peer. /// The subscription was aborted by the remote peer.
...@@ -382,8 +385,9 @@ impl Methods { ...@@ -382,8 +385,9 @@ impl Methods {
/// use futures_util::StreamExt; /// use futures_util::StreamExt;
/// ///
/// let mut module = RpcModule::new(()); /// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| { /// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| {
/// pending.accept().unwrap().send(&"one answer").unwrap(); /// sink.send(&"one answer").unwrap();
/// Ok(())
/// }).unwrap(); /// }).unwrap();
/// let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#).await.unwrap(); /// let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#).await.unwrap();
/// let resp = serde_json::from_str::<Response<u64>>(&resp).unwrap(); /// let resp = serde_json::from_str::<Response<u64>>(&resp).unwrap();
...@@ -443,8 +447,9 @@ impl Methods { ...@@ -443,8 +447,9 @@ impl Methods {
/// use jsonrpsee::{RpcModule, types::EmptyParams}; /// use jsonrpsee::{RpcModule, types::EmptyParams};
/// ///
/// let mut module = RpcModule::new(()); /// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| { /// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| {
/// pending.accept().unwrap().send(&"one answer").unwrap(); /// sink.send(&"one answer").unwrap();
/// Ok(())
/// }).unwrap(); /// }).unwrap();
/// ///
/// let mut sub = module.subscribe("hi", EmptyParams::new()).await.unwrap(); /// let mut sub = module.subscribe("hi", EmptyParams::new()).await.unwrap();
...@@ -653,27 +658,22 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> { ...@@ -653,27 +658,22 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// use jsonrpsee_core::Error; /// use jsonrpsee_core::Error;
/// ///
/// let mut ctx = RpcModule::new(99_usize); /// let mut ctx = RpcModule::new(99_usize);
/// ctx.register_subscription("sub", "notif_name", "unsub", |params, pending, ctx| { /// ctx.register_subscription("sub", "notif_name", "unsub", |params, mut sink, ctx| {
/// let x = match params.one::<usize>() { /// let x = match params.one::<usize>() {
/// Ok(x) => x, /// Ok(x) => x,
/// Err(e) => { /// Err(e) => {
/// let err: Error = e.into(); /// let err: Error = e.into();
/// pending.reject(err); /// sink.reject(err);
/// return; /// return Ok(());
/// }
/// };
///
/// let mut sink = match pending.accept() {
/// Some(sink) => sink,
/// _ => {
/// return;
/// } /// }
/// }; /// };
/// /// // Sink is accepted on the first `send` call.
/// std::thread::spawn(move || { /// std::thread::spawn(move || {
/// let sum = x + (*ctx); /// let sum = x + (*ctx);
/// let _ = sink.send(&sum); /// let _ = sink.send(&sum);
/// }); /// });
///
/// Ok(())
/// }); /// });
/// ``` /// ```
pub fn register_subscription<F>( pub fn register_subscription<F>(
...@@ -685,7 +685,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> { ...@@ -685,7 +685,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
) -> Result<MethodResourcesBuilder, Error> ) -> Result<MethodResourcesBuilder, Error>
where where
Context: Send + Sync + 'static, Context: Send + Sync + 'static,
F: Fn(Params, PendingSubscription, Arc<Context>) + Send + Sync + 'static, F: Fn(Params, SubscriptionSink, Arc<Context>) -> SubscriptionResult + Send + Sync + 'static,
{ {
if subscribe_method_name == unsubscribe_method_name { if subscribe_method_name == unsubscribe_method_name {
return Err(Error::SubscriptionNameConflict(subscribe_method_name.into())); return Err(Error::SubscriptionNameConflict(subscribe_method_name.into()));
...@@ -740,17 +740,21 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> { ...@@ -740,17 +740,21 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn, claimed| { MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn, claimed| {
let sub_id: RpcSubscriptionId = conn.id_provider.next_id(); let sub_id: RpcSubscriptionId = conn.id_provider.next_id();
let sink = PendingSubscription(Some(InnerPendingSubscription { let sink = SubscriptionSink {
sink: method_sink.clone(), inner: method_sink.clone(),
close_notify: Some(conn.close_notify), close_notify: Some(conn.close_notify),
method: notif_method_name, method: notif_method_name,
subscribers: subscribers.clone(), subscribers: subscribers.clone(),
uniq_sub: SubscriptionKey { conn_id: conn.conn_id, sub_id }, uniq_sub: SubscriptionKey { conn_id: conn.conn_id, sub_id },
id: id.clone().into_owned(), id: Some(id.clone().into_owned()),
claimed, unsubscribe: None,
})); _claimed: claimed,
};
callback(params, sink, ctx.clone()); // The callback returns a `SubscriptionResult` for better ergonomics and is not propagated further.
if let Err(_) = callback(params, sink, ctx.clone()) {
tracing::warn!("subscribe call `{}` failed", subscribe_method_name);
}
true true
})), })),
...@@ -775,101 +779,75 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> { ...@@ -775,101 +779,75 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
} }
} }
/// Represent a pending subscription which waits to be accepted or rejected. /// Returns once the unsubscribe method has been called.
/// type UnsubscribeCall = Option<watch::Receiver<()>>;
/// Note: you need to call either `PendingSubscription::accept` or `PendingSubscription::reject` otherwise
/// the subscription will be dropped with an `InvalidParams` error. /// Represents a single subscription.
#[derive(Debug)] #[derive(Debug)]
struct InnerPendingSubscription { pub struct SubscriptionSink {
/// Sink. /// Sink.
sink: MethodSink, inner: MethodSink,
/// Get notified when subscribers leave so we can exit /// Get notified when subscribers leave so we can exit
close_notify: Option<SubscriptionPermit>, close_notify: Option<SubscriptionPermit>,
/// MethodCallback. /// MethodCallback.
method: &'static str, method: &'static str,
/// Shared Mutex of subscriptions for this method.
subscribers: Subscribers,
/// Unique subscription. /// Unique subscription.
uniq_sub: SubscriptionKey, uniq_sub: SubscriptionKey,
/// Shared Mutex of subscriptions /// Id of the `subscription call` (i.e. not the same as subscription id) which is used
subscribers: Subscribers, /// to reply to subscription method call and must only be used once.
/// Request ID. ///
id: Id<'static>, /// *Note*: Having some value means the subscription was not accepted or rejected yet.
id: Option<Id<'static>>,
/// Having some value means the subscription was accepted.
unsubscribe: UnsubscribeCall,
/// Claimed resources. /// Claimed resources.
claimed: Option<ResourceGuard>, _claimed: Option<ResourceGuard>,
} }
/// Represent a pending subscription which waits until it's either accepted or rejected. impl SubscriptionSink {
///
/// This type implements `Drop` for ease of use, e.g. when dropped in error short circuiting via `map_err()?`.
#[derive(Debug)]
pub struct PendingSubscription(Option<InnerPendingSubscription>);
impl PendingSubscription {
/// Reject the subscription call from [`ErrorObject`]. /// Reject the subscription call from [`ErrorObject`].
pub fn reject(mut self, err: impl Into<ErrorObjectOwned>) -> bool { pub fn reject(&mut self, err: impl Into<ErrorObjectOwned>) -> Result<(), SubscriptionAcceptRejectError> {
if let Some(inner) = self.0.take() { let id = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?;
let InnerPendingSubscription { sink, id, .. } = inner;
sink.send_error(id, err.into()) if self.inner.send_error(id, err.into()) {
Ok(())
} else { } else {
false Err(SubscriptionAcceptRejectError::RemotePeerAborted)
} }
} }
/// Attempt to accept the subscription and respond the subscription method call. /// Attempt to accept the subscription and respond the subscription method call.
/// ///
/// Fails if the connection was closed /// Fails if the connection was closed, or if called multiple times.
pub fn accept(mut self) -> Option<SubscriptionSink> { pub fn accept(&mut self) -> Result<(), SubscriptionAcceptRejectError> {
let inner = self.0.take()?; let id = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?;
let InnerPendingSubscription { sink, close_notify, method, uniq_sub, subscribers, id, claimed } = inner;
if sink.send_response(id, &uniq_sub.sub_id) { if self.inner.send_response(id, &self.uniq_sub.sub_id) {
let (tx, rx) = watch::channel(()); let (tx, rx) = watch::channel(());
subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), tx)); self.subscribers.lock().insert(self.uniq_sub.clone(), (self.inner.clone(), tx));
Some(SubscriptionSink { inner: sink, close_notify, method, uniq_sub, subscribers, unsubscribe: rx, _claimed: claimed }) self.unsubscribe = Some(rx);
Ok(())
} else { } else {
None Err(SubscriptionAcceptRejectError::RemotePeerAborted)
} }
} }
}
// When dropped it returns an [`InvalidParams`] error to the subscriber
impl Drop for PendingSubscription {
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
let InnerPendingSubscription { sink, id, .. } = inner;
sink.send_error(id, ErrorCode::InvalidParams.into());
}
}
}
/// Represents a single subscription.
#[derive(Debug)]
pub struct SubscriptionSink {
/// Sink.
inner: MethodSink,
/// Get notified when subscribers leave so we can exit
close_notify: Option<SubscriptionPermit>,
/// MethodCallback.
method: &'static str,
/// Unique subscription.
uniq_sub: SubscriptionKey,
/// Shared Mutex of subscriptions for this method.
subscribers: Subscribers,
/// Future that returns when the unsubscribe method has been called.
unsubscribe: watch::Receiver<()>,
/// Claimed resources.
_claimed: Option<ResourceGuard>,
}
impl SubscriptionSink {
/// Send a message back to subscribers. /// Send a message back to subscribers.
/// ///
/// Returns `Ok(true)` if the message could be send /// Returns
/// Returns `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated) /// - `Ok(true)` if the message could be send.
/// Return `Err(err)` if the message could not be serialized. /// - `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated),
/// /// or the subscription could not be accepted.
/// - `Err(err)` if the message could not be serialized.
pub fn send<T: Serialize>(&mut self, result: &T) -> Result<bool, serde_json::Error> { pub fn send<T: Serialize>(&mut self, result: &T) -> Result<bool, serde_json::Error> {
// only possible to trigger when the connection is dropped. // Cannot accept the subscription.
if let Err(SubscriptionAcceptRejectError::RemotePeerAborted) = self.accept() {
return Ok(false);
}
// Only possible to trigger when the connection is dropped.
if self.is_closed() { if self.is_closed() {
return Ok(false); return Ok(false);
} }
...@@ -889,14 +867,13 @@ impl SubscriptionSink { ...@@ -889,14 +867,13 @@ impl SubscriptionSink {
/// ///
/// ```no_run /// ```no_run
/// ///
/// use jsonrpsee_core::server::rpc_module::{RpcModule, SubscriptionResult}; /// use jsonrpsee_core::server::rpc_module::RpcModule;
/// use jsonrpsee_core::error::{Error, SubscriptionClosed}; /// use jsonrpsee_core::error::{Error, SubscriptionClosed};
/// use jsonrpsee_types::ErrorObjectOwned; /// use jsonrpsee_types::ErrorObjectOwned;
/// use anyhow::anyhow; /// use anyhow::anyhow;
/// ///
/// let mut m = RpcModule::new(()); /// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| { /// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let mut sink = pending.accept().unwrap();
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]); /// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
/// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber /// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
/// // because after the `Err(_)` the stream is terminated. /// // because after the `Err(_)` the stream is terminated.
...@@ -915,6 +892,7 @@ impl SubscriptionSink { ...@@ -915,6 +892,7 @@ impl SubscriptionSink {
/// } /// }
/// } /// }
/// }); /// });
/// Ok(())
/// }); /// });
/// ``` /// ```
pub async fn pipe_from_try_stream<S, T, E>(&mut self, mut stream: S) -> SubscriptionClosed pub async fn pipe_from_try_stream<S, T, E>(&mut self, mut stream: S) -> SubscriptionClosed
...@@ -923,14 +901,24 @@ impl SubscriptionSink { ...@@ -923,14 +901,24 @@ impl SubscriptionSink {
T: Serialize, T: Serialize,
E: std::fmt::Display, E: std::fmt::Display,
{ {
if let Err(SubscriptionAcceptRejectError::RemotePeerAborted) = self.accept() {
return SubscriptionClosed::RemotePeerAborted;
}
let conn_closed = match self.close_notify.as_ref().map(|cn| cn.handle()) { let conn_closed = match self.close_notify.as_ref().map(|cn| cn.handle()) {
Some(cn) => cn, Some(cn) => cn,
None => { None => return SubscriptionClosed::RemotePeerAborted,
return SubscriptionClosed::RemotePeerAborted; };
}
let mut sub_closed = match self.unsubscribe.as_ref() {
Some(rx) => rx.clone(),
_ => return SubscriptionClosed::Failed(ErrorObject::owned(
INTERNAL_ERROR_CODE,
"Unsubscribe watcher not set after accepting the subscription".to_string(),
None::<()>
)),
}; };
let mut sub_closed = self.unsubscribe.clone();
let sub_closed_fut = sub_closed.changed(); let sub_closed_fut = sub_closed.changed();
let conn_closed_fut = conn_closed.notified(); let conn_closed_fut = conn_closed.notified();
...@@ -983,10 +971,10 @@ impl SubscriptionSink { ...@@ -983,10 +971,10 @@ impl SubscriptionSink {
/// use jsonrpsee_core::server::rpc_module::RpcModule; /// use jsonrpsee_core::server::rpc_module::RpcModule;
/// ///
/// let mut m = RpcModule::new(()); /// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| { /// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let mut sink = pending.accept().unwrap();
/// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]); /// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
/// tokio::spawn(async move { sink.pipe_from_stream(stream).await; }); /// tokio::spawn(async move { sink.pipe_from_stream(stream).await; });
/// Ok(())
/// }); /// });
/// ``` /// ```
pub async fn pipe_from_stream<S, T>(&mut self, stream: S) -> SubscriptionClosed pub async fn pipe_from_stream<S, T>(&mut self, stream: S) -> SubscriptionClosed
...@@ -1003,7 +991,10 @@ impl SubscriptionSink { ...@@ -1003,7 +991,10 @@ impl SubscriptionSink {
} }
fn is_active_subscription(&self) -> bool { fn is_active_subscription(&self) -> bool {
!self.unsubscribe.has_changed().is_err() match self.unsubscribe.as_ref() {
Some(unsubscribe) => !unsubscribe.has_changed().is_err(),
_ => false,
}
} }
fn build_message<T: Serialize>(&self, result: &T) -> Result<String, serde_json::Error> { fn build_message<T: Serialize>(&self, result: &T) -> Result<String, serde_json::Error> {
...@@ -1057,7 +1048,13 @@ impl SubscriptionSink { ...@@ -1057,7 +1048,13 @@ impl SubscriptionSink {
impl Drop for SubscriptionSink { impl Drop for SubscriptionSink {
fn drop(&mut self) { fn drop(&mut self) {
if self.is_active_subscription() { if let Some(id) = self.id.take() {
// Subscription was never accepted / rejected. As such,
// we default to assuming that the params were invalid,
// because that's how the previous PendingSubscription logic
// worked.
self.inner.send_error(id, ErrorCode::InvalidParams.into());
} else if self.is_active_subscription() {
self.subscribers.lock().remove(&self.uniq_sub); self.subscribers.lock().remove(&self.uniq_sub);
} }
} }
......
...@@ -28,8 +28,9 @@ use std::net::SocketAddr; ...@@ -28,8 +28,9 @@ use std::net::SocketAddr;
use jsonrpsee::core::{async_trait, client::Subscription, Error}; use jsonrpsee::core::{async_trait, client::Subscription, Error};
use jsonrpsee::proc_macros::rpc; use jsonrpsee::proc_macros::rpc;
use jsonrpsee::types::SubscriptionResult;
use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{PendingSubscription, WsServerBuilder, WsServerHandle}; use jsonrpsee::ws_server::{SubscriptionSink, WsServerBuilder, WsServerHandle};
type ExampleHash = [u8; 32]; type ExampleHash = [u8; 32];
type ExampleStorageKey = Vec<u8>; type ExampleStorageKey = Vec<u8>;
...@@ -60,10 +61,14 @@ impl RpcServer<ExampleHash, ExampleStorageKey> for RpcServerImpl { ...@@ -60,10 +61,14 @@ impl RpcServer<ExampleHash, ExampleStorageKey> for RpcServerImpl {
Ok(vec![storage_key]) Ok(vec![storage_key])
} }
fn subscribe_storage(&self, pending: PendingSubscription, _keys: Option<Vec<ExampleStorageKey>>) { // Note that the server's subscription method must return `SubscriptionResult`.
if let Some(mut sink) = pending.accept() { fn subscribe_storage(
let _ = sink.send(&vec![[0; 32]]); &self,
} mut sink: SubscriptionSink,
_keys: Option<Vec<ExampleStorageKey>>,
) -> SubscriptionResult {
let _ = sink.send(&vec![[0; 32]]);
Ok(())
} }
} }
......
...@@ -71,12 +71,8 @@ async fn run_server() -> anyhow::Result<SocketAddr> { ...@@ -71,12 +71,8 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
std::thread::spawn(move || produce_items(tx2)); std::thread::spawn(move || produce_items(tx2));
module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, pending, _| { module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, mut sink, _| {
let rx = BroadcastStream::new(tx.clone().subscribe()); let rx = BroadcastStream::new(tx.clone().subscribe());
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move { tokio::spawn(async move {
match sink.pipe_from_try_stream(rx).await { match sink.pipe_from_try_stream(rx).await {
...@@ -89,6 +85,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> { ...@@ -89,6 +85,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
} }
}; };
}); });