Unverified Commit 1f66edf1 authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

Implement subscription sink state


Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent 712112bc
......@@ -744,9 +744,8 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
close_notify: Some(conn.close_notify),
method: notif_method_name,
subscribers: subscribers.clone(),
unsubscribe: None,
uniq_sub: SubscriptionKey { conn_id: conn.conn_id, sub_id },
id: Some(id.clone().into_owned()),
state: SubscriptionSinkState::new(id.clone().into_owned()),
_claimed: claimed,
};
......@@ -828,7 +827,7 @@ impl PendingSubscription {
if sink.send_response(id, &uniq_sub.sub_id) {
let (tx, rx) = watch::channel(());
subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), tx));
Ok(SubscriptionSink { inner: sink, close_notify, method, uniq_sub, subscribers, unsubscribe: Some(rx), _claimed: claimed, id: None })
Ok(SubscriptionSink { inner: sink, close_notify, method, uniq_sub, subscribers, _claimed: claimed, state: SubscriptionSinkState::Rejected })
} else {
Err(SubscriptionEmptyError)
}
......@@ -975,6 +974,54 @@ impl PipeFromStreamResult {
}
}
/// The state of the [`SubscriptionSink`].
#[derive(Debug)]
enum SubscriptionSinkState {
/// The subscription is pending and needs to be accepted or rejected
/// before utilizing the sink. This is the initial state of the sink.
///
/// This state contains the request ID of the subscription.
Pending(Option<Id<'static>>),
/// The subscription was accepted via the [`SubscriptionSink::accept`] call.
/// This state contains the future that returns when the unsubscribe method has been called.
Accepted(watch::Receiver<()>),
/// The subscription was rejected via the [`SubscriptionSink::reject`] call.
/// In this state, the subscription cannot be utilized.
Rejected,
}
impl SubscriptionSinkState {
/// Initialize the sink's state from the subscription ID.
fn new(id: Id<'static>) -> Self {
SubscriptionSinkState::Pending(Some(id))
}
/// Takes the ID out of the `Pending` state.
fn id(&mut self) -> Result<Id<'static>, SubscriptionEmptyError> {
match self {
SubscriptionSinkState::Pending(id) => id.take().ok_or(SubscriptionEmptyError),
_ => Err(SubscriptionEmptyError),
}
}
/// Advance the state of the sink to the accepted state.
fn accept(&self, rx: watch::Receiver<()>) -> Result<SubscriptionSinkState, SubscriptionEmptyError> {
match self {
// Cannot transition to accepted if the ID was not previously consumed.
SubscriptionSinkState::Pending(None) => Ok(SubscriptionSinkState::Accepted(rx)),
_ => Err(SubscriptionEmptyError),
}
}
/// Reject the subscription.
fn reject(&self) -> Result<SubscriptionSinkState, SubscriptionEmptyError> {
match self {
SubscriptionSinkState::Pending(None) => Ok(SubscriptionSinkState::Rejected),
_ => Err(SubscriptionEmptyError),
}
}
}
/// Represents a single subscription.
#[derive(Debug)]
pub struct SubscriptionSink {
......@@ -986,25 +1033,24 @@ pub struct SubscriptionSink {
method: &'static str,
/// Shared Mutex of subscriptions for this method.
subscribers: Subscribers,
/// Future that returns when the unsubscribe method has been called.
unsubscribe: Option<watch::Receiver<()>>,
/// Unique subscription.
uniq_sub: SubscriptionKey,
/// Request ID of the subscription.
///
/// **Note**: This field becomes `None` when the subscription is accepted, or rejected.
id: Option<Id<'static>>,
/// The state of the subscription sink.
state: SubscriptionSinkState,
/// Claimed resources.
_claimed: Option<ResourceGuard>,
}
impl SubscriptionSink {
/// Reject the subscription call from [`ErrorObject`].
pub fn reject(&mut self, err: impl Into<ErrorObjectOwned>) -> bool {
if let Some(id) = self.id.take() {
self.inner.send_error(id, err.into())
pub fn reject(&mut self, err: impl Into<ErrorObjectOwned>) -> Result<(), SubscriptionEmptyError> {
let id = self.state.id()?;
if self.inner.send_error(id, err.into()) {
self.state = self.state.reject()?;
Ok(())
} else {
false
Err(SubscriptionEmptyError)
}
}
......@@ -1012,12 +1058,12 @@ impl SubscriptionSink {
///
/// Fails if the connection was closed, or if called multiple times.
pub fn accept(&mut self) -> Result<(), SubscriptionEmptyError> {
let id = self.id.take().ok_or(SubscriptionEmptyError)?;
let id = self.state.id()?;
if self.inner.send_response(id, &self.uniq_sub.sub_id) {
let (tx, rx) = watch::channel(());
self.subscribers.lock().insert(self.uniq_sub.clone(), (self.inner.clone(), tx));
self.unsubscribe = Some(rx);
self.state = self.state.accept(rx)?;
Ok(())
} else {
Err(SubscriptionEmptyError)
......@@ -1100,9 +1146,9 @@ impl SubscriptionSink {
}
};
let mut sub_closed = match self.unsubscribe.clone() {
Some(sub_closed) => sub_closed,
None => {
let mut sub_closed = match &self.state {
SubscriptionSinkState::Accepted(rx) => rx.clone(),
_ => {
let err = ErrorObject::owned(
INTERNAL_ERROR_CODE,
"Unsubscribe watcher not set after accepting the subscription".to_string(),
......@@ -1185,7 +1231,10 @@ impl SubscriptionSink {
}
fn is_active_subscription(&self) -> bool {
self.unsubscribe.as_ref().map(|u| !u.has_changed().is_err()).unwrap_or(false)
match &self.state {
SubscriptionSinkState::Accepted(unsubscribe) => !unsubscribe.has_changed().is_err(),
_ => false
}
}
fn build_message<T: Serialize>(&self, result: &T) -> Result<String, serde_json::Error> {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment