Unverified Commit c8ffaa4c authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

client: Implement `notify_on_disconnect`



Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent 4a7d7252
Pipeline #205775 failed with stages
in 4 minutes and 29 seconds
...@@ -11,6 +11,7 @@ use crate::client::{ ...@@ -11,6 +11,7 @@ use crate::client::{
use crate::tracing::{rx_log_from_json, tx_log_from_str, RpcTracing}; use crate::tracing::{rx_log_from_json, tx_log_from_str, RpcTracing};
use core::time::Duration; use core::time::Duration;
use std::sync::Arc;
use helpers::{ use helpers::{
build_unsubscribe_message, call_with_timeout, process_batch_response, process_error_response, process_notification, build_unsubscribe_message, call_with_timeout, process_batch_response, process_error_response, process_notification,
process_single_response, process_subscription_response, stop_subscription, process_single_response, process_subscription_response, stop_subscription,
...@@ -31,6 +32,7 @@ use jsonrpsee_types::{ ...@@ -31,6 +32,7 @@ use jsonrpsee_types::{
SubscriptionResponse, SubscriptionResponse,
}; };
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use tokio::sync::Notify;
use tracing_futures::Instrument; use tracing_futures::Instrument;
use super::{FrontToBack, IdKind, RequestIdManager}; use super::{FrontToBack, IdKind, RequestIdManager};
...@@ -162,9 +164,20 @@ impl ClientBuilder { ...@@ -162,9 +164,20 @@ impl ClientBuilder {
let (err_tx, err_rx) = oneshot::channel(); let (err_tx, err_rx) = oneshot::channel();
let max_notifs_per_subscription = self.max_notifs_per_subscription; let max_notifs_per_subscription = self.max_notifs_per_subscription;
let ping_interval = self.ping_interval; let ping_interval = self.ping_interval;
let notify_on_close = Arc::new(Notify::new());
let notify_on_close_back = notify_on_close.clone();
tokio::spawn(async move { tokio::spawn(async move {
background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription, ping_interval).await; background_task(
sender,
receiver,
from_front,
err_tx,
max_notifs_per_subscription,
ping_interval,
notify_on_close_back,
)
.await;
}); });
Client { Client {
to_back, to_back,
...@@ -172,6 +185,7 @@ impl ClientBuilder { ...@@ -172,6 +185,7 @@ impl ClientBuilder {
error: Mutex::new(ErrorFromBack::Unread(err_rx)), error: Mutex::new(ErrorFromBack::Unread(err_rx)),
id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind), id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind),
max_log_length: self.max_log_length, max_log_length: self.max_log_length,
notify: notify_on_close,
} }
} }
...@@ -216,6 +230,8 @@ pub struct Client { ...@@ -216,6 +230,8 @@ pub struct Client {
/// ///
/// Entries bigger than this limit will be truncated. /// Entries bigger than this limit will be truncated.
max_log_length: u32, max_log_length: u32,
/// Notify when the client is disconnected or encountered an error.
notify: Arc<Notify>,
} }
impl Client { impl Client {
...@@ -232,6 +248,16 @@ impl Client { ...@@ -232,6 +248,16 @@ impl Client {
*err_lock = next_state; *err_lock = next_state;
err err
} }
/// Completes when the client is disconnected or the client's background task encountered an error.
///
/// # Cancel safety
///
/// This method is cancel safe. Once the client is disconnected, it stays disconnected forever and all
/// future calls to this method will return immediately.
pub async fn notify_on_disconnect(&self) {
self.notify.notified().await;
}
} }
impl Drop for Client { impl Drop for Client {
...@@ -621,6 +647,7 @@ async fn background_task<S, R>( ...@@ -621,6 +647,7 @@ async fn background_task<S, R>(
front_error: oneshot::Sender<Error>, front_error: oneshot::Sender<Error>,
max_notifs_per_subscription: usize, max_notifs_per_subscription: usize,
ping_interval: Option<Duration>, ping_interval: Option<Duration>,
notify_on_close: Arc<Notify>,
) where ) where
S: TransportSenderT, S: TransportSenderT,
R: TransportReceiverT, R: TransportReceiverT,
...@@ -692,6 +719,7 @@ async fn background_task<S, R>( ...@@ -692,6 +719,7 @@ async fn background_task<S, R>(
} }
}; };
} }
notify_on_close.notify_waiters();
// Send close message to the server. // Send close message to the server.
let _ = sender.close().await; let _ = sender.close().await;
} }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment