Unverified Commit 6fdb67f9 authored by Alexandru Vasile's avatar Alexandru Vasile Committed by GitHub
Browse files

client: Implement `notify_on_disconnect` (#837)



* client: Implement `notify_on_disconnect`

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Revert "client: Implement `notify_on_disconnect`"

This reverts commit c8ffaa4c

.

* client: Rely on `tokio::sync::mpsc::Sender` to notify on disconnect

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Implement `InternalError` for tokio and futures_channel's SendError

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* client: Remove tokio-stream dependency

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* client: Rename `notify_on_disconnect` to `on_disconnect`

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Add `tokio/sync` as dependency of client feature

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* wasm-client: Use `tokio::sync::mpsc::channel` for notify on disconnect

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Add tokio/sync dependency to satisfy `--no-default-features` check

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Best effort to close channel on client drops

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* revert to: client: Implement `notify_on_disconnect`

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Replace tokio/sync with oneshot channel

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* tests: Check `client.on_disconnect()` returns when server shuts down

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* tests: Fix comment typo

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* tests: Call `on_disconnect()` multiple times

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* tests: Call `on_disconnect` with closed server

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent 1ebaf626
Pipeline #208511 passed with stages
in 5 minutes and 26 seconds
...@@ -138,7 +138,7 @@ pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notifica ...@@ -138,7 +138,7 @@ pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notifica
Err(err) => { Err(err) => {
tracing::error!("Error sending notification, dropping handler for {:?} error: {:?}", notif.method, err); tracing::error!("Error sending notification, dropping handler for {:?} error: {:?}", notif.method, err);
let _ = manager.remove_notification_handler(notif.method.into_owned()); let _ = manager.remove_notification_handler(notif.method.into_owned());
Err(Error::Internal(err.into_send_error())) Err(err.into_send_error().into())
} }
}, },
None => { None => {
......
...@@ -162,9 +162,19 @@ impl ClientBuilder { ...@@ -162,9 +162,19 @@ impl ClientBuilder {
let (err_tx, err_rx) = oneshot::channel(); let (err_tx, err_rx) = oneshot::channel();
let max_notifs_per_subscription = self.max_notifs_per_subscription; let max_notifs_per_subscription = self.max_notifs_per_subscription;
let ping_interval = self.ping_interval; let ping_interval = self.ping_interval;
let (on_close_tx, on_close_rx) = oneshot::channel();
tokio::spawn(async move { tokio::spawn(async move {
background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription, ping_interval).await; background_task(
sender,
receiver,
from_front,
err_tx,
max_notifs_per_subscription,
ping_interval,
on_close_tx,
)
.await;
}); });
Client { Client {
to_back, to_back,
...@@ -172,6 +182,7 @@ impl ClientBuilder { ...@@ -172,6 +182,7 @@ impl ClientBuilder {
error: Mutex::new(ErrorFromBack::Unread(err_rx)), error: Mutex::new(ErrorFromBack::Unread(err_rx)),
id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind), id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind),
max_log_length: self.max_log_length, max_log_length: self.max_log_length,
notify: Mutex::new(Some(on_close_rx)),
} }
} }
...@@ -186,9 +197,10 @@ impl ClientBuilder { ...@@ -186,9 +197,10 @@ impl ClientBuilder {
let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests); let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
let (err_tx, err_rx) = oneshot::channel(); let (err_tx, err_rx) = oneshot::channel();
let max_notifs_per_subscription = self.max_notifs_per_subscription; let max_notifs_per_subscription = self.max_notifs_per_subscription;
let (on_close_tx, on_close_rx) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move { wasm_bindgen_futures::spawn_local(async move {
background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription, None).await; background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription, None, on_close_tx).await;
}); });
Client { Client {
to_back, to_back,
...@@ -196,6 +208,7 @@ impl ClientBuilder { ...@@ -196,6 +208,7 @@ impl ClientBuilder {
error: Mutex::new(ErrorFromBack::Unread(err_rx)), error: Mutex::new(ErrorFromBack::Unread(err_rx)),
id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind), id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind),
max_log_length: self.max_log_length, max_log_length: self.max_log_length,
notify: Mutex::new(Some(on_close_rx)),
} }
} }
} }
...@@ -216,6 +229,10 @@ pub struct Client { ...@@ -216,6 +229,10 @@ pub struct Client {
/// ///
/// Entries bigger than this limit will be truncated. /// Entries bigger than this limit will be truncated.
max_log_length: u32, max_log_length: u32,
/// Notify when the client is disconnected or encountered an error.
// NOTE: Similar to error, the async fns use immutable references. The `Receiver` is wrapped
// into `Option` to ensure the `on_disconnect` awaits only once.
notify: Mutex<Option<oneshot::Receiver<()>>>,
} }
impl Client { impl Client {
...@@ -232,6 +249,20 @@ impl Client { ...@@ -232,6 +249,20 @@ impl Client {
*err_lock = next_state; *err_lock = next_state;
err err
} }
/// Completes when the client is disconnected or the client's background task encountered an error.
/// If the client is already disconnected, the future produced by this method will complete immediately.
///
/// # Cancel safety
///
/// This method is cancel safe.
pub async fn on_disconnect(&self) {
// Wait until the `background_task` exits.
let mut notify_lock = self.notify.lock().await;
if let Some(notify) = notify_lock.take() {
let _ = notify.await;
}
}
} }
impl Drop for Client { impl Drop for Client {
...@@ -628,6 +659,7 @@ async fn background_task<S, R>( ...@@ -628,6 +659,7 @@ async fn background_task<S, R>(
front_error: oneshot::Sender<Error>, front_error: oneshot::Sender<Error>,
max_notifs_per_subscription: usize, max_notifs_per_subscription: usize,
ping_interval: Option<Duration>, ping_interval: Option<Duration>,
on_close: oneshot::Sender<()>,
) where ) where
S: TransportSenderT, S: TransportSenderT,
R: TransportReceiverT, R: TransportReceiverT,
...@@ -699,6 +731,8 @@ async fn background_task<S, R>( ...@@ -699,6 +731,8 @@ async fn background_task<S, R>(
} }
}; };
} }
// Wake the `on_disconnect` method.
let _ = on_close.send(());
// Send close message to the server. // Send close message to the server.
let _ = sender.close().await; let _ = sender.close().await;
} }
...@@ -458,6 +458,78 @@ async fn ws_server_should_stop_subscription_after_client_drop() { ...@@ -458,6 +458,78 @@ async fn ws_server_should_stop_subscription_after_client_drop() {
assert_eq!(close_err, ErrorObject::borrowed(0, &"Subscription terminated successfully", None)); assert_eq!(close_err, ErrorObject::borrowed(0, &"Subscription terminated successfully", None));
} }
#[tokio::test]
async fn ws_server_notify_client_on_disconnect() {
use futures::channel::oneshot;
init_logger();
let (server_addr, server_handle) = websocket_server_with_subscription().await;
let server_url = format!("ws://{}", server_addr);
let (up_tx, up_rx) = oneshot::channel();
let (dis_tx, mut dis_rx) = oneshot::channel();
let (multiple_tx, multiple_rx) = oneshot::channel();
tokio::spawn(async move {
let client = WsClientBuilder::default().build(&server_url).await.unwrap();
// Validate server is up.
client.request::<String>("say_hello", None).await.unwrap();
// Signal client is waiting for the server to disconnect.
up_tx.send(()).unwrap();
client.on_disconnect().await;
// Signal disconnect finished.
dis_tx.send(()).unwrap();
// Call `on_disconnect` a few more times to ensure it does not block.
client.on_disconnect().await;
client.on_disconnect().await;
multiple_tx.send(()).unwrap();
});
// Ensure the client validated the server and is waiting for the disconnect.
up_rx.await.unwrap();
// Let A = dis_rx try_recv and server stop
// B = client on_disconnect
//
// Precautionary wait to ensure that a buggy `on_disconnect` (B) cannot be called
// after the server shutdowns (A).
tokio::time::sleep(Duration::from_secs(5)).await;
// Make sure the `on_disconnect` method did not return before stopping the server.
assert_eq!(dis_rx.try_recv().unwrap(), None);
server_handle.stop().unwrap().await;
// The `on_disconnect()` method returned.
let _ = dis_rx.await.unwrap();
// Multiple `on_disconnect()` calls did not block.
let _ = multiple_rx.await.unwrap();
}
#[tokio::test]
async fn ws_server_notify_client_on_disconnect_with_closed_server() {
init_logger();
let (server_addr, server_handle) = websocket_server_with_subscription().await;
let server_url = format!("ws://{}", server_addr);
let client = WsClientBuilder::default().build(&server_url).await.unwrap();
// Validate server is up.
client.request::<String>("say_hello", None).await.unwrap();
// Stop the server.
server_handle.stop().unwrap().await;
// Ensure `on_disconnect` returns when the call is made after the server is closed.
client.on_disconnect().await;
}
#[tokio::test] #[tokio::test]
async fn ws_server_cancels_subscriptions_on_reset_conn() { async fn ws_server_cancels_subscriptions_on_reset_conn() {
init_logger(); init_logger();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment