Unverified Commit 64f45e93 authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

Make `unsubscribe` channel optional on accepting the connection


Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent 277bbd71
......@@ -39,7 +39,7 @@ use futures_channel::mpsc;
use futures_util::future::Either;
use futures_util::pin_mut;
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStream, TryStreamExt};
use jsonrpsee_types::error::{CallError, ErrorCode, ErrorObject, ErrorObjectOwned, SUBSCRIPTION_CLOSED_WITH_ERROR};
use jsonrpsee_types::error::{CallError, ErrorCode, ErrorObject, ErrorObjectOwned, INTERNAL_ERROR_CODE, SUBSCRIPTION_CLOSED_WITH_ERROR};
use jsonrpsee_types::response::{SubscriptionError, SubscriptionPayloadError};
use jsonrpsee_types::{
ErrorResponse, Id, Params, Request, Response, SubscriptionResult, SubscriptionEmptyError,
......@@ -827,7 +827,7 @@ impl PendingSubscription {
if sink.send_response(id, &uniq_sub.sub_id) {
let (tx, rx) = watch::channel(());
subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), tx));
Ok(SubscriptionSink { inner: sink, close_notify, method, uniq_sub, subscribers, unsubscribe: rx, _claimed: claimed, id: None })
Ok(SubscriptionSink { inner: sink, close_notify, method, uniq_sub, subscribers, unsubscribe: Some(rx), _claimed: claimed, id: None })
} else {
Err(SubscriptionEmptyError)
}
......@@ -986,7 +986,7 @@ pub struct SubscriptionSink {
/// Shared Mutex of subscriptions for this method.
subscribers: Subscribers,
/// Future that returns when the unsubscribe method has been called.
unsubscribe: watch::Receiver<()>,
unsubscribe: Option<watch::Receiver<()>>,
/// Unique subscription.
uniq_sub: SubscriptionKey,
/// Request ID of the subscription.
......@@ -1016,7 +1016,7 @@ impl SubscriptionSink {
if self.inner.send_response(id, &self.uniq_sub.sub_id) {
let (tx, rx) = watch::channel(());
self.subscribers.lock().insert(self.uniq_sub.clone(), (self.inner.clone(), tx));
self.unsubscribe = rx;
self.unsubscribe = Some(rx);
Ok(())
} else {
Err(SubscriptionEmptyError)
......@@ -1099,7 +1099,18 @@ impl SubscriptionSink {
}
};
let mut sub_closed = self.unsubscribe.clone();
let mut sub_closed = match self.unsubscribe.clone() {
Some(sub_closed) => sub_closed,
None => {
let err = ErrorObject::owned(
INTERNAL_ERROR_CODE,
"Unsubscribe watcher not set after accepting the subscription".to_string(),
None::<()>
);
return SubscriptionClosed::Failed(err);
}
};
let sub_closed_fut = sub_closed.changed();
let conn_closed_fut = conn_closed.notified();
......@@ -1173,7 +1184,7 @@ impl SubscriptionSink {
}
fn is_active_subscription(&self) -> bool {
!self.unsubscribe.has_changed().is_err()
self.unsubscribe.as_ref().map(|u| !u.has_changed().is_err()).unwrap_or(false)
}
fn build_message<T: Serialize>(&self, result: &T) -> Result<String, serde_json::Error> {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment