Unverified Commit 8d3f7e53 authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

Remodel state machine using `Option`s for `SubscriptionSink`s


Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent abbea2c0
......@@ -743,7 +743,8 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
method: notif_method_name,
subscribers: subscribers.clone(),
uniq_sub: SubscriptionKey { conn_id: conn.conn_id, sub_id },
state: SubscriptionSinkState::new(id.clone().into_owned()),
id: Some(id.clone().into_owned()),
unsubscribe: None,
_claimed: claimed,
};
......@@ -775,54 +776,6 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
}
}
/// The state of the [`SubscriptionSink`].
#[derive(Debug)]
enum SubscriptionSinkState {
/// The subscription is pending and needs to be accepted or rejected
/// before utilizing the sink. This is the initial state of the sink.
///
/// This state contains the request ID of the subscription.
Pending(Option<Id<'static>>),
/// The subscription was accepted via the [`SubscriptionSink::accept`] call.
/// This state contains the future that returns when the unsubscribe method has been called.
Accepted(watch::Receiver<()>),
/// The subscription was rejected via the [`SubscriptionSink::reject`] call.
/// In this state, the subscription cannot be utilized.
Rejected,
}
impl SubscriptionSinkState {
/// Initialize the sink's state from the subscription ID.
fn new(id: Id<'static>) -> Self {
SubscriptionSinkState::Pending(Some(id))
}
/// Takes the ID out of the `Pending` state.
fn id(&mut self) -> Result<Id<'static>, SubscriptionEmptyError> {
match self {
SubscriptionSinkState::Pending(id) => id.take().ok_or(SubscriptionEmptyError),
_ => Err(SubscriptionEmptyError),
}
}
/// Advance the state of the sink to the accepted state.
fn accept(&self, rx: watch::Receiver<()>) -> Result<SubscriptionSinkState, SubscriptionEmptyError> {
match self {
// Cannot transition to accepted if the ID was not previously consumed.
SubscriptionSinkState::Pending(None) => Ok(SubscriptionSinkState::Accepted(rx)),
_ => Err(SubscriptionEmptyError),
}
}
/// Reject the subscription.
fn reject(&self) -> Result<SubscriptionSinkState, SubscriptionEmptyError> {
match self {
SubscriptionSinkState::Pending(None) => Ok(SubscriptionSinkState::Rejected),
_ => Err(SubscriptionEmptyError),
}
}
}
/// Represents a single subscription.
#[derive(Debug)]
pub struct SubscriptionSink {
......@@ -836,8 +789,14 @@ pub struct SubscriptionSink {
subscribers: Subscribers,
/// Unique subscription.
uniq_sub: SubscriptionKey,
/// The state of the subscription sink.
state: SubscriptionSinkState,
/// Id of the subscription.
///
/// *Note*: Having some value means the subscription was not accepted or rejected yet.
id: Option<Id<'static>>,
/// Returns when the unsubscribe method has been called.
///
/// *Note*: Have some values means the subscription was accepted.
unsubscribe: Option<watch::Receiver<()>>,
/// Claimed resources.
_claimed: Option<ResourceGuard>,
}
......@@ -845,10 +804,9 @@ pub struct SubscriptionSink {
impl SubscriptionSink {
/// Reject the subscription call from [`ErrorObject`].
pub fn reject(&mut self, err: impl Into<ErrorObjectOwned>) -> Result<(), SubscriptionEmptyError> {
let id = self.state.id()?;
let id = self.id.take().ok_or(SubscriptionEmptyError)?;
if self.inner.send_error(id, err.into()) {
self.state = self.state.reject()?;
Ok(())
} else {
Err(SubscriptionEmptyError)
......@@ -859,12 +817,12 @@ impl SubscriptionSink {
///
/// Fails if the connection was closed, or if called multiple times.
pub fn accept(&mut self) -> Result<(), SubscriptionEmptyError> {
let id = self.state.id()?;
let id = self.id.take().ok_or(SubscriptionEmptyError)?;
if self.inner.send_response(id, &self.uniq_sub.sub_id) {
let (tx, rx) = watch::channel(());
self.subscribers.lock().insert(self.uniq_sub.clone(), (self.inner.clone(), tx));
self.state = self.state.accept(rx)?;
self.unsubscribe = Some(rx);
Ok(())
} else {
Err(SubscriptionEmptyError)
......@@ -875,10 +833,16 @@ impl SubscriptionSink {
///
/// Fails if the accept function fails internally, or if the subscription was rejected.
fn maybe_accept(&mut self) -> Result<(), SubscriptionEmptyError> {
match self.state {
SubscriptionSinkState::Pending(_) => self.accept(),
SubscriptionSinkState::Accepted(_) => Ok(()),
SubscriptionSinkState::Rejected => Err(SubscriptionEmptyError),
// Pending subscription.
if self.id.is_some() {
return self.accept();
}
// Subscription accepted.
if self.unsubscribe.is_some() {
Ok(())
} else {
Err(SubscriptionEmptyError)
}
}
......@@ -959,8 +923,8 @@ impl SubscriptionSink {
}
};
let mut sub_closed = match &self.state {
SubscriptionSinkState::Accepted(rx) => rx.clone(),
let mut sub_closed = match self.unsubscribe.as_ref() {
Some(rx) => rx.clone(),
_ => {
let err = ErrorObject::owned(
INTERNAL_ERROR_CODE,
......@@ -1043,9 +1007,9 @@ impl SubscriptionSink {
}
fn is_active_subscription(&self) -> bool {
match &self.state {
SubscriptionSinkState::Accepted(unsubscribe) => !unsubscribe.has_changed().is_err(),
_ => false
match self.unsubscribe.as_ref() {
Some(unsubscribe) => !unsubscribe.has_changed().is_err(),
_ => false,
}
}
......@@ -1101,7 +1065,7 @@ impl SubscriptionSink {
impl Drop for SubscriptionSink {
fn drop(&mut self) {
// Subscription was never accepted.
if let Ok(id) = self.state.id() {
if let Some(id) = self.id.take() {
self.inner.send_error(id, ErrorCode::InvalidParams.into());
} else if self.is_active_subscription() {
self.subscribers.lock().remove(&self.uniq_sub);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment