Unverified Commit 76572020 authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

Handle rejected sinks


Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent 99ffeb7b
......@@ -1071,20 +1071,29 @@ impl SubscriptionSink {
}
/// Accepts the subscription if previously not accepted.
fn maybe_accept(&mut self) {
let _ = self.accept();
///
/// Fails if the accept function fails internally, or if the subscription was rejected.
fn maybe_accept(&mut self) -> Result<(), SubscriptionEmptyError> {
match self.state {
SubscriptionSinkState::Pending(_) => self.accept(),
SubscriptionSinkState::Accepted(_) => Ok(()),
SubscriptionSinkState::Rejected => Err(SubscriptionEmptyError),
}
}
/// Send a message back to subscribers.
///
/// Returns `Ok(true)` if the message could be send
/// Returns `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated)
/// Return `Err(err)` if the message could not be serialized.
///
/// Returns
/// - `Ok(true)` if the message could be send.
/// - `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated),
/// or the subscription could not be accepted.
/// - `Err(err)` if the message could not be serialized.
pub fn send<T: Serialize>(&mut self, result: &T) -> Result<bool, serde_json::Error> {
self.maybe_accept();
// only possible to trigger when the connection is dropped.
// Cannot accept the subscription.
if self.maybe_accept().is_err() {
return Ok(false);
}
// Only possible to trigger when the connection is dropped.
if self.is_closed() {
return Ok(false);
}
......@@ -1137,7 +1146,9 @@ impl SubscriptionSink {
T: Serialize,
E: std::fmt::Display,
{
self.maybe_accept();
if self.maybe_accept().is_err() {
return SubscriptionClosed::RemotePeerAborted;
}
let conn_closed = match self.close_notify.as_ref().map(|cn| cn.handle()) {
Some(cn) => cn,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment