Unverified Commit 348f3668 authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

Add `accept-reject` API on `SubscriptionSink`


Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent 8cf401c3
......@@ -827,7 +827,7 @@ impl PendingSubscription {
if sink.send_response(id, &uniq_sub.sub_id) {
let (tx, rx) = watch::channel(());
subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), tx));
Ok(SubscriptionSink { inner: sink, close_notify, method, uniq_sub, subscribers, unsubscribe: rx, _claimed: claimed })
Ok(SubscriptionSink { inner: sink, close_notify, method, uniq_sub, subscribers, unsubscribe: rx, _claimed: claimed, id: None })
} else {
Err(SubscriptionEmptyError)
}
......@@ -983,17 +983,46 @@ pub struct SubscriptionSink {
close_notify: Option<SubscriptionPermit>,
/// MethodCallback.
method: &'static str,
/// Unique subscription.
uniq_sub: SubscriptionKey,
/// Shared Mutex of subscriptions for this method.
subscribers: Subscribers,
/// Future that returns when the unsubscribe method has been called.
unsubscribe: watch::Receiver<()>,
/// Unique subscription.
uniq_sub: SubscriptionKey,
/// Request ID of the subscription.
///
/// **Note**: This field becomes `None` when the subscription is accepted, or rejected.
id: Option<Id<'static>>,
/// Claimed resources.
_claimed: Option<ResourceGuard>,
}
impl SubscriptionSink {
/// Reject the subscription call from [`ErrorObject`].
pub fn reject(&mut self, err: impl Into<ErrorObjectOwned>) -> bool {
if let Some(id) = self.id.take() {
self.inner.send_error(id, err.into())
} else {
false
}
}
/// Attempt to accept the subscription and respond the subscription method call.
///
/// Fails if the connection was closed
pub fn accept(&mut self) -> Result<(), SubscriptionEmptyError> {
let id = self.id.take().ok_or(SubscriptionEmptyError)?;
if self.inner.send_response(id, &self.uniq_sub.sub_id) {
let (tx, rx) = watch::channel(());
self.subscribers.lock().insert(self.uniq_sub.clone(), (self.inner.clone(), tx));
self.unsubscribe = rx;
Ok(())
} else {
Err(SubscriptionEmptyError)
}
}
/// Send a message back to subscribers.
///
/// Returns `Ok(true)` if the message could be send
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment