Unverified Commit 8b04cd15 authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

Implement `SubscriptionAcceptRejectError` for error propagation


Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent f5952de6
......@@ -39,10 +39,13 @@ use futures_channel::mpsc;
use futures_util::future::Either;
use futures_util::pin_mut;
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStream, TryStreamExt};
use jsonrpsee_types::error::{CallError, ErrorCode, ErrorObject, ErrorObjectOwned, INTERNAL_ERROR_CODE, SUBSCRIPTION_CLOSED_WITH_ERROR};
use jsonrpsee_types::error::{
CallError, ErrorCode, ErrorObject, ErrorObjectOwned, INTERNAL_ERROR_CODE,
SUBSCRIPTION_CLOSED_WITH_ERROR, SubscriptionAcceptRejectError
};
use jsonrpsee_types::response::{SubscriptionError, SubscriptionPayloadError};
use jsonrpsee_types::{
ErrorResponse, Id, Params, Request, Response, SubscriptionResult, SubscriptionEmptyError,
ErrorResponse, Id, Params, Request, Response, SubscriptionResult,
SubscriptionId as RpcSubscriptionId, SubscriptionPayload, SubscriptionResponse
};
use parking_lot::Mutex;
......@@ -803,21 +806,21 @@ pub struct SubscriptionSink {
impl SubscriptionSink {
/// Reject the subscription call from [`ErrorObject`].
pub fn reject(&mut self, err: impl Into<ErrorObjectOwned>) -> Result<(), SubscriptionEmptyError> {
let id = self.id.take().ok_or(SubscriptionEmptyError)?;
pub fn reject(&mut self, err: impl Into<ErrorObjectOwned>) -> Result<(), SubscriptionAcceptRejectError> {
let id = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?;
if self.inner.send_error(id, err.into()) {
Ok(())
} else {
Err(SubscriptionEmptyError)
Err(SubscriptionAcceptRejectError::RemotePeerAborted)
}
}
/// Attempt to accept the subscription and respond the subscription method call.
///
/// Fails if the connection was closed, or if called multiple times.
pub fn accept(&mut self) -> Result<(), SubscriptionEmptyError> {
let id = self.id.take().ok_or(SubscriptionEmptyError)?;
pub fn accept(&mut self) -> Result<(), SubscriptionAcceptRejectError> {
let id = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?;
if self.inner.send_response(id, &self.uniq_sub.sub_id) {
let (tx, rx) = watch::channel(());
......@@ -825,14 +828,14 @@ impl SubscriptionSink {
self.unsubscribe = Some(rx);
Ok(())
} else {
Err(SubscriptionEmptyError)
Err(SubscriptionAcceptRejectError::RemotePeerAborted)
}
}
/// Accepts the subscription if previously not accepted.
///
/// Fails if the accept function fails internally, or if the subscription was rejected.
fn maybe_accept(&mut self) -> Result<(), SubscriptionEmptyError> {
fn maybe_accept(&mut self) -> Result<(), SubscriptionAcceptRejectError> {
// Pending subscription.
if self.id.is_some() {
return self.accept();
......@@ -842,7 +845,7 @@ impl SubscriptionSink {
if self.unsubscribe.is_some() {
Ok(())
} else {
Err(SubscriptionEmptyError)
Err(SubscriptionAcceptRejectError::RemotePeerAborted)
}
}
......
......@@ -110,6 +110,21 @@ impl<'a> From<ErrorObject<'a>> for SubscriptionEmptyError {
}
}
impl From<SubscriptionAcceptRejectError> for SubscriptionEmptyError {
fn from(_: SubscriptionAcceptRejectError) -> Self {
SubscriptionEmptyError
}
}
/// The error returned while accepting or rejecting a subscription.
#[derive(Debug)]
pub enum SubscriptionAcceptRejectError {
/// The method was already called.
AlreadyCalled,
/// The remote peer closed the connection or called the unsubscribe method.
RemotePeerAborted,
}
/// Owned variant of [`ErrorObject`].
pub type ErrorObjectOwned = ErrorObject<'static>;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment