Unverified Commit 24fcf3ef authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

Remove `PendingSubscription`


Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent 76572020
......@@ -777,154 +777,6 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
}
}
/// Represent a pending subscription which waits to be accepted or rejected.
///
/// Note: you need to call either `PendingSubscription::accept` or `PendingSubscription::reject` otherwise
/// the subscription will be dropped with an `InvalidParams` error.
#[derive(Debug)]
struct InnerPendingSubscription {
/// Sink.
sink: MethodSink,
/// Get notified when subscribers leave so we can exit
close_notify: Option<SubscriptionPermit>,
/// MethodCallback.
method: &'static str,
/// Unique subscription.
uniq_sub: SubscriptionKey,
/// Shared Mutex of subscriptions
subscribers: Subscribers,
/// Request ID.
id: Id<'static>,
/// Claimed resources.
claimed: Option<ResourceGuard>,
}
/// Represent a pending subscription which waits until it's either accepted or rejected.
///
/// This type implements `Drop` for ease of use, e.g. when dropped in error short circuiting via `map_err()?`.
#[derive(Debug)]
pub struct PendingSubscription(Option<InnerPendingSubscription>);
impl PendingSubscription {
/// Reject the subscription call from [`ErrorObject`].
pub fn reject(mut self, err: impl Into<ErrorObjectOwned>) -> bool {
if let Some(inner) = self.0.take() {
let InnerPendingSubscription { sink, id, .. } = inner;
sink.send_error(id, err.into())
} else {
false
}
}
/// Attempt to accept the subscription and respond the subscription method call.
///
/// Fails if the connection was closed
pub fn accept(mut self) -> Result<SubscriptionSink, SubscriptionEmptyError> {
let inner = self.0.take().ok_or(SubscriptionEmptyError)?;
let InnerPendingSubscription { sink, close_notify, method, uniq_sub, subscribers, id, claimed } = inner;
if sink.send_response(id, &uniq_sub.sub_id) {
let (tx, rx) = watch::channel(());
subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), tx));
Ok(SubscriptionSink { inner: sink, close_notify, method, uniq_sub, subscribers, _claimed: claimed, state: SubscriptionSinkState::Rejected })
} else {
Err(SubscriptionEmptyError)
}
}
/// Accepts the subscription connection and wraps the [`SubscriptionSink::pipe_from_try_stream`] for
/// better ergonomics.
///
/// Returns `(Ok(sink), SubscriptionClosed)` if the connection was accepted successfully. The returned
/// sink can be used to send error notifications back.
///
/// Returns `(None, SubscriptionClosed::RemotePeerAborted)` if the connection was not accepted, or the
/// client disconnected while piping the stream.
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::RpcModule;
/// use jsonrpsee_core::error::{Error, SubscriptionClosed};
/// use jsonrpsee_types::ErrorObjectOwned;
/// use anyhow::anyhow;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
/// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
/// // because after the `Err(_)` the stream is terminated.
/// tokio::spawn(async move {
/// // jsonrpsee doesn't send an error notification unless `close` is explicitly called.
/// // If we pipe messages to the sink, we can inspect why it ended:
/// pending
/// .pipe_from_try_stream(stream)
/// .await
/// .on_success(|sink| {
/// let err_obj: ErrorObjectOwned = SubscriptionClosed::Success.into();
/// sink.close(err_obj);
/// })
/// .on_failure(|sink, err| {
/// sink.close(err);
/// })
/// });
/// Ok(())
/// });
/// ```
pub async fn pipe_from_try_stream<S, T, E>(self, stream: S) -> PipeFromStreamResult
where
S: TryStream<Ok = T, Error = E> + Unpin,
T: Serialize,
E: std::fmt::Display,
{
if let Ok(mut sink) = self.accept() {
let result = sink.pipe_from_try_stream(stream).await;
match result {
SubscriptionClosed::Success => PipeFromStreamResult::Success(Some(sink)),
SubscriptionClosed::Failed(error) => PipeFromStreamResult::Failure(Some((sink, error))),
SubscriptionClosed::RemotePeerAborted => PipeFromStreamResult::RemotePeerAborted,
}
} else {
PipeFromStreamResult::RemotePeerAborted
}
}
/// Similar to [`PendingSubscription::pipe_from_try_stream`] but it doesn't require the stream return `Result`.
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::RpcModule;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
/// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
/// tokio::spawn(async move { pending.pipe_from_stream(stream).await; });
/// Ok(())
/// });
/// ```
pub async fn pipe_from_stream<S, T>(self, stream: S) -> PipeFromStreamResult
where
S: Stream<Item = T> + Unpin,
T: Serialize,
{
self.pipe_from_try_stream::<_, _, Error>(stream.map(|item| Ok(item))).await
}
}
// When dropped it returns an [`InvalidParams`] error to the subscriber
impl Drop for PendingSubscription {
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
let InnerPendingSubscription { sink, id, .. } = inner;
sink.send_error(id, ErrorCode::InvalidParams.into());
}
}
}
/// The result obtain from calling [`PendingSubscription::pipe_from_try_stream`] that
/// can be utilized to execute specific operations depending on the result.
#[derive(Debug)]
......
......@@ -92,7 +92,7 @@ cfg_types! {
}
cfg_server! {
pub use jsonrpsee_core::server::rpc_module::{PendingSubscription, RpcModule, SubscriptionSink};
pub use jsonrpsee_core::server::rpc_module::{RpcModule, SubscriptionSink};
}
cfg_client_or_server! {
......
......@@ -39,7 +39,7 @@ mod server;
mod tests;
pub use future::{ServerHandle as WsServerHandle, ShutdownWaiter as WsShutdownWaiter};
pub use jsonrpsee_core::server::rpc_module::{PendingSubscription, RpcModule, SubscriptionSink};
pub use jsonrpsee_core::server::rpc_module::{RpcModule, SubscriptionSink};
pub use jsonrpsee_core::{id_providers::*, traits::IdProvider};
pub use jsonrpsee_types as types;
pub use server::{Builder as WsServerBuilder, Server as WsServer};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment