Unverified Commit a0813cbd authored by Alexandru Vasile's avatar Alexandru Vasile Committed by GitHub
Browse files

Implement `ping-pong` for WebSocket clients (#772)



* ws: Implement ping for `TransportSenderT` trait
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* ws/client: Receive pong frames
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* core/client: Use `select!` macro for the background task
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* client: Propagate ping interval to background task
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* async_client: Submit ping requests
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* async_client: Handle pong replies
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* client: Handle frontend messages to dedicated fn
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* client: Handle backend messages in dedicated fn
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* client: Add terminated fuse for opt-out pings
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Set opt-out behavior for client pings
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* client: Move imports
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* client: Handle handle_frontend_messages errors
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* client: Add custom error related to byteslice conversions
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* client: Modify `send_ping` to send empty slices
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Fix `cargo hack check` and use `select_biased`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Handle sending pings with lowest priority
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* core: Add proper number of params to `background_task`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Fix wasm client
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Handle raw bytes and string received messages
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Fix Cargo.toml feature
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Panic when empty slice does not fit into `ByteSlice125`
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* wasm: Add operation not supported for pings
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Rename `ReceivedMessage` from Data to Text
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Rename test variable
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Add documentation
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* client: Use `future::select` for  cancel safety
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* client: Remove `pong` handling logic
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* client: Update ping documentation
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Update core/src/client/async_client/mod.rs
Co-authored-by: default avatarTarik Gul <47201679+TarikGul@users.noreply.github.com>

* Update core/src/client/async_client/mod.rs
Co-authored-by: default avatarTarik Gul <47201679+TarikGul@users.noreply.github.com>

* Update core/src/client/async_client/mod.rs
Co-authored-by: default avatarTarik Gul <47201679+TarikGul@users.noreply.github.com>

* Update core/src/client/async_client/mod.rs
Co-authored-by: default avatarTarik Gul <47201679+TarikGul@users.noreply.github.com>

* Update core/src/client/async_client/mod.rs
Co-authored-by: default avatarTarik Gul <47201679+TarikGul@users.noreply.github.com>

* Update core/Cargo.toml
Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>

* Update core/Cargo.toml
Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>

* logs: Keep debug log for submitting `Ping` frames
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Print debug logs when receiving `Pong` frames
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Update core/src/client/async_client/mod.rs
Co-authored-by: default avatarTarik Gul <47201679+TarikGul@users.noreply.github.com>
Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>
parent 2ac4cb26
Pipeline #196851 passed with stages
in 5 minutes and 27 seconds
......@@ -5,7 +5,7 @@ use futures_util::sink::SinkExt;
use futures_util::stream::{SplitSink, SplitStream, StreamExt};
use gloo_net::websocket::{futures::WebSocket, Message, WebSocketError};
use jsonrpsee_core::async_trait;
use jsonrpsee_core::client::{TransportReceiverT, TransportSenderT};
use jsonrpsee_core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT};
/// Web-sys transport error that can occur.
#[derive(Debug, thiserror::Error)]
......@@ -22,6 +22,9 @@ pub enum Error {
/// WebSocket error
#[error("WebSocket Error: {0:?}")]
WebSocket(WebSocketError),
/// Operation not supported
#[error("Operation not supported")]
NotSupported,
}
/// Sender.
......@@ -52,6 +55,11 @@ impl TransportSenderT for Sender {
Ok(())
}
async fn send_ping(&mut self) -> Result<(), Self::Error> {
tracing::trace!("send ping - not implemented for wasm");
Err(Error::NotSupported)
}
async fn close(&mut self) -> Result<(), Error> {
Ok(())
}
......@@ -61,17 +69,15 @@ impl TransportSenderT for Sender {
impl TransportReceiverT for Receiver {
type Error = Error;
async fn receive(&mut self) -> Result<String, Self::Error> {
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
match self.0.next().await {
Some(Ok(msg)) => {
tracing::trace!("rx: {:?}", msg);
let txt = match msg {
Message::Bytes(bytes) => String::from_utf8(bytes).expect("WebSocket message is valid utf8; qed"),
Message::Text(txt) => txt,
};
Ok(txt)
match msg {
Message::Bytes(bytes) => Ok(ReceivedMessage::Bytes(bytes)),
Message::Text(txt) => Ok(ReceivedMessage::Text(txt)),
}
}
Some(Err(err)) => Err(Error::WebSocket(err)),
None => Err(Error::SenderDisconnected),
......
......@@ -31,11 +31,13 @@ use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;
use futures_util::io::{BufReader, BufWriter};
use jsonrpsee_core::client::{CertificateStore, TransportReceiverT, TransportSenderT};
use jsonrpsee_core::client::{CertificateStore, ReceivedMessage, TransportReceiverT, TransportSenderT};
use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use jsonrpsee_core::{async_trait, Cow};
use soketto::connection;
use soketto::connection::Error::Utf8;
use soketto::data::ByteSlice125;
use soketto::handshake::client::{Client as WsHandshakeClient, ServerResponse};
use soketto::{connection, Data, Incoming};
use stream::EitherStream;
use thiserror::Error;
use tokio::net::TcpStream;
......@@ -195,6 +197,20 @@ impl TransportSenderT for Sender {
Ok(())
}
/// Sends out a ping request. Returns a `Future` that finishes when the request has been
/// successfully sent.
async fn send_ping(&mut self) -> Result<(), Self::Error> {
tracing::debug!("send ping");
// Submit empty slice as "optional" parameter.
let slice: &[u8] = &[];
// Byte slice fails if the provided slice is larger than 125 bytes.
let byte_slice = ByteSlice125::try_from(slice).expect("Empty slice should fit into ByteSlice125");
self.inner.send_ping(byte_slice).await?;
self.inner.flush().await?;
Ok(())
}
/// Send a close message and close the connection.
async fn close(&mut self) -> Result<(), WsError> {
self.inner.close().await.map_err(Into::into)
......@@ -206,11 +222,21 @@ impl TransportReceiverT for Receiver {
type Error = WsError;
/// Returns a `Future` resolving when the server sent us something back.
async fn receive(&mut self) -> Result<String, Self::Error> {
let mut message = Vec::new();
self.inner.receive_data(&mut message).await?;
let s = String::from_utf8(message).expect("Found invalid UTF-8");
Ok(s)
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
loop {
let mut message = Vec::new();
let recv = self.inner.receive(&mut message).await?;
match recv {
Incoming::Data(Data::Text(_)) => {
let s = String::from_utf8(message).map_err(|err| WsError::Connection(Utf8(err.utf8_error())))?;
break Ok(ReceivedMessage::Text(s));
}
Incoming::Data(Data::Binary(_)) => break Ok(ReceivedMessage::Bytes(message)),
Incoming::Pong(_) => break Ok(ReceivedMessage::Pong),
_ => continue,
}
}
}
}
......
......@@ -73,6 +73,7 @@ pub struct WsClientBuilder<'a> {
max_request_body_size: u32,
request_timeout: Duration,
connection_timeout: Duration,
ping_interval: Option<Duration>,
headers: Vec<Header<'a>>,
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
......@@ -87,6 +88,7 @@ impl<'a> Default for WsClientBuilder<'a> {
max_request_body_size: TEN_MB_SIZE_BYTES,
request_timeout: Duration::from_secs(60),
connection_timeout: Duration::from_secs(10),
ping_interval: None,
headers: Vec::new(),
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,
......@@ -121,6 +123,12 @@ impl<'a> WsClientBuilder<'a> {
self
}
/// See documentation [`ClientBuilder::ping_interval`] (disabled by default).
pub fn ping_interval(mut self, interval: Duration) -> Self {
self.ping_interval = Some(interval);
self
}
/// See documentation [`WsTransportClientBuilder::add_header`] (default is none).
pub fn add_header(mut self, name: &'a str, value: &'a str) -> Self {
self.headers.push(Header { name, value: value.as_bytes() });
......@@ -169,11 +177,16 @@ impl<'a> WsClientBuilder<'a> {
let uri: Uri = url.as_ref().parse().map_err(|e: InvalidUri| Error::Transport(e.into()))?;
let (sender, receiver) = transport_builder.build(uri).await.map_err(|e| Error::Transport(e.into()))?;
Ok(ClientBuilder::default()
let mut client = ClientBuilder::default()
.max_notifs_per_subscription(self.max_notifs_per_subscription)
.request_timeout(self.request_timeout)
.max_concurrent_requests(self.max_concurrent_requests)
.id_format(self.id_kind)
.build_with_tokio(sender, receiver))
.id_format(self.id_kind);
if let Some(interval) = self.ping_interval {
client = client.ping_interval(interval);
}
Ok(client.build_with_tokio(sender, receiver))
}
}
......@@ -3,13 +3,12 @@
mod helpers;
mod manager;
use core::time::Duration;
use crate::client::{
async_client::helpers::process_subscription_close_response, BatchMessage, ClientT, RegisterNotificationMessage,
RequestMessage, Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT,
TransportSenderT,
async_client::helpers::process_subscription_close_response, BatchMessage, ClientT, ReceivedMessage,
RegisterNotificationMessage, RequestMessage, Subscription, SubscriptionClientT, SubscriptionKind,
SubscriptionMessage, TransportReceiverT, TransportSenderT,
};
use core::time::Duration;
use helpers::{
build_unsubscribe_message, call_with_timeout, process_batch_response, process_error_response, process_notification,
process_single_response, process_subscription_response, stop_subscription,
......@@ -21,9 +20,10 @@ use async_lock::Mutex;
use async_trait::async_trait;
use futures_channel::{mpsc, oneshot};
use futures_timer::Delay;
use futures_util::future::{self, Either};
use futures_util::future::{self, Either, Fuse};
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use futures_util::FutureExt;
use jsonrpsee_types::{
response::SubscriptionError, ErrorResponse, Id, Notification, NotificationSer, ParamsSer, RequestSer, Response,
SubscriptionResponse,
......@@ -69,6 +69,7 @@ pub struct ClientBuilder {
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
id_kind: IdKind,
ping_interval: Option<Duration>,
}
impl Default for ClientBuilder {
......@@ -78,6 +79,7 @@ impl Default for ClientBuilder {
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,
id_kind: IdKind::Number,
ping_interval: None,
}
}
}
......@@ -115,6 +117,23 @@ impl ClientBuilder {
self
}
/// Set the interval at which pings frames are submitted (disabled by default).
///
/// Periodically submitting pings at a defined interval has mainly two benefits:
/// - Directly, it acts as a "keep-alive" alternative in the WebSocket world.
/// - Indirectly by inspecting debug logs, it ensures that the endpoint is still responding to messages.
///
/// The underlying implementation does not make any assumptions about at which intervals pongs are received.
///
/// Note: The interval duration is restarted when
/// - a frontend command is submitted
/// - a reply is received from the backend
/// - the interval duration expires
pub fn ping_interval(mut self, interval: Duration) -> Self {
self.ping_interval = Some(interval);
self
}
/// Build the client with given transport.
///
/// ## Panics
......@@ -130,9 +149,10 @@ impl ClientBuilder {
let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
let (err_tx, err_rx) = oneshot::channel();
let max_notifs_per_subscription = self.max_notifs_per_subscription;
let ping_interval = self.ping_interval;
tokio::spawn(async move {
background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription).await;
background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription, ping_interval).await;
});
Client {
to_back,
......@@ -155,7 +175,7 @@ impl ClientBuilder {
let max_notifs_per_subscription = self.max_notifs_per_subscription;
wasm_bindgen_futures::spawn_local(async move {
background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription).await;
background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription, None).await;
});
Client {
to_back,
......@@ -378,6 +398,196 @@ impl SubscriptionClientT for Client {
}
}
/// Handle backend messages.
///
/// Returns an error if the main background loop should be terminated.
async fn handle_backend_messages<S: TransportSenderT, R: TransportReceiverT>(
message: Option<Result<ReceivedMessage, R::Error>>,
manager: &mut RequestManager,
sender: &mut S,
max_notifs_per_subscription: usize,
) -> Result<(), Error> {
// Handle raw messages of form `ReceivedMessage::Bytes` (Vec<u8>) or ReceivedMessage::Data` (String).
async fn handle_recv_message<S: TransportSenderT>(
raw: &[u8],
manager: &mut RequestManager,
sender: &mut S,
max_notifs_per_subscription: usize
) -> Result<(), Error> {
// Single response to a request.
if let Ok(single) = serde_json::from_slice::<Response<_>>(&raw) {
tracing::debug!("[backend]: recv method_call {:?}", single);
match process_single_response(manager, single, max_notifs_per_subscription) {
Ok(Some(unsub)) => {
stop_subscription(sender, manager, unsub).await;
}
Ok(None) => (),
Err(err) => return Err(err),
}
}
// Subscription response.
else if let Ok(response) = serde_json::from_slice::<SubscriptionResponse<_>>(&raw) {
tracing::debug!("[backend]: recv subscription {:?}", response);
if let Err(Some(unsub)) = process_subscription_response(manager, response) {
let _ = stop_subscription(sender, manager, unsub).await;
}
}
// Subscription error response.
else if let Ok(response) = serde_json::from_slice::<SubscriptionError<_>>(&raw) {
tracing::debug!("[backend]: recv subscription closed {:?}", response);
let _ = process_subscription_close_response(manager, response);
}
// Incoming Notification
else if let Ok(notif) = serde_json::from_slice::<Notification<_>>(&raw) {
tracing::debug!("[backend]: recv notification {:?}", notif);
let _ = process_notification(manager, notif);
}
// Batch response.
else if let Ok(batch) = serde_json::from_slice::<Vec<Response<_>>>(&raw) {
tracing::debug!("[backend]: recv batch {:?}", batch);
if let Err(e) = process_batch_response(manager, batch) {
return Err(e);
}
}
// Error response
else if let Ok(err) = serde_json::from_slice::<ErrorResponse>(&raw) {
tracing::debug!("[backend]: recv error response {:?}", err);
if let Err(e) = process_error_response(manager, err) {
return Err(e);
}
}
// Unparsable response
else {
tracing::debug!(
"[backend]: recv unparseable message: {:?}",
serde_json::from_slice::<serde_json::Value>(&raw)
);
return Err(Error::Custom("Unparsable response".into()));
}
Ok(())
}
match message {
Some(Ok(ReceivedMessage::Pong)) => {
tracing::debug!("recv pong");
}
Some(Ok(ReceivedMessage::Bytes(raw))) => {
handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?;
}
Some(Ok(ReceivedMessage::Text(raw))) => {
handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?;
}
Some(Err(e)) => {
tracing::error!("Error: {:?} terminating client", e);
return Err(Error::Transport(e.into()));
}
None => {
tracing::error!("[backend]: WebSocket receiver dropped; terminate client");
return Err(Error::Custom("WebSocket receiver dropped".into()));
}
}
return Ok(());
}
/// Handle frontend messages.
///
/// Returns an error if the main background loop should be terminated.
async fn handle_frontend_messages<S: TransportSenderT>(
message: Option<FrontToBack>,
manager: &mut RequestManager,
sender: &mut S,
max_notifs_per_subscription: usize,
) -> Result<(), Error> {
match message {
// User dropped the sender side of the channel.
// There is nothing to do just terminate.
None => {
return Err(Error::Custom("[backend]: frontend dropped; terminate client".into()));
}
Some(FrontToBack::Batch(batch)) => {
tracing::trace!("[backend]: client prepares to send batch request: {:?}", batch.raw);
if let Err(send_back) = manager.insert_pending_batch(batch.ids.clone(), batch.send_back) {
tracing::warn!("[backend]: batch request: {:?} already pending", batch.ids);
let _ = send_back.send(Err(Error::InvalidRequestId));
return Ok(());
}
if let Err(e) = sender.send(batch.raw).await {
tracing::warn!("[backend]: client batch request failed: {:?}", e);
manager.complete_pending_batch(batch.ids);
}
}
// User called `notification` on the front-end
Some(FrontToBack::Notification(notif)) => {
tracing::trace!("[backend]: client prepares to send notification: {:?}", notif);
if let Err(e) = sender.send(notif).await {
tracing::warn!("[backend]: client notif failed: {:?}", e);
}
}
// User called `request` on the front-end
Some(FrontToBack::Request(request)) => {
tracing::trace!("[backend]: client prepares to send request={:?}", request);
match sender.send(request.raw).await {
Ok(_) => {
manager.insert_pending_call(request.id, request.send_back).expect("ID unused checked above; qed")
}
Err(e) => {
tracing::warn!("[backend]: client request failed: {:?}", e);
let _ = request.send_back.map(|s| s.send(Err(Error::Transport(e.into()))));
}
}
}
// User called `subscribe` on the front-end.
Some(FrontToBack::Subscribe(sub)) => match sender.send(sub.raw).await {
Ok(_) => manager
.insert_pending_subscription(
sub.subscribe_id,
sub.unsubscribe_id,
sub.send_back,
sub.unsubscribe_method,
)
.expect("Request ID unused checked above; qed"),
Err(e) => {
tracing::warn!("[backend]: client subscription failed: {:?}", e);
let _ = sub.send_back.send(Err(Error::Transport(e.into())));
}
},
// User dropped a subscription.
Some(FrontToBack::SubscriptionClosed(sub_id)) => {
tracing::trace!("Closing subscription: {:?}", sub_id);
// NOTE: The subscription may have been closed earlier if
// the channel was full or disconnected.
if let Some(unsub) = manager
.get_request_id_by_subscription_id(&sub_id)
.and_then(|req_id| build_unsubscribe_message(manager, req_id, sub_id))
{
stop_subscription(sender, manager, unsub).await;
}
}
// User called `register_notification` on the front-end.
Some(FrontToBack::RegisterNotification(reg)) => {
tracing::trace!("[backend] registering notification handler: {:?}", reg.method);
let (subscribe_tx, subscribe_rx) = mpsc::channel(max_notifs_per_subscription);
if manager.insert_notification_handler(&reg.method, subscribe_tx).is_ok() {
let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
} else {
let _ = reg.send_back.send(Err(Error::MethodAlreadyRegistered(reg.method)));
}
}
// User dropped the notificationHandler for this method
Some(FrontToBack::UnregisterNotification(method)) => {
tracing::trace!("[backend] unregistering notification handler: {:?}", method);
let _ = manager.remove_notification_handler(method);
}
}
Ok(())
}
/// Function being run in the background that processes messages from the frontend.
async fn background_task<S, R>(
mut sender: S,
......@@ -385,6 +595,7 @@ async fn background_task<S, R>(
mut frontend: mpsc::Receiver<FrontToBack>,
front_error: oneshot::Sender<Error>,
max_notifs_per_subscription: usize,
ping_interval: Option<Duration>,
) where
S: TransportSenderT,
R: TransportReceiverT,
......@@ -395,172 +606,58 @@ async fn background_task<S, R>(
let res = receiver.receive().await;
Some((res, receiver))
});
futures_util::pin_mut!(backend_event);
loop {
let next_frontend = frontend.next();
let next_backend = backend_event.next();
futures_util::pin_mut!(next_frontend, next_backend);
match futures_util::future::select(next_frontend, next_backend).await {
// User dropped the sender side of the channel.
// There is nothing to do just terminate.
Either::Left((None, _)) => {
tracing::trace!("[backend]: frontend dropped; terminate client");
break;
}
Either::Left((Some(FrontToBack::Batch(batch)), _)) => {
tracing::trace!("[backend]: client prepares to send batch request: {:?}", batch.raw);
// NOTE(niklasad1): annoying allocation.
if let Err(send_back) = manager.insert_pending_batch(batch.ids.clone(), batch.send_back) {
tracing::warn!("[backend]: batch request: {:?} already pending", batch.ids);
let _ = send_back.send(Err(Error::InvalidRequestId));
continue;
}
if let Err(e) = sender.send(batch.raw).await {
tracing::warn!("[backend]: client batch request failed: {:?}", e);
manager.complete_pending_batch(batch.ids);
}
}
// User called `notification` on the front-end
Either::Left((Some(FrontToBack::Notification(notif)), _)) => {
tracing::trace!("[backend]: client prepares to send notification: {:?}", notif);
if let Err(e) = sender.send(notif).await {
tracing::warn!("[backend]: client notif failed: {:?}", e);
}
}
// Place frontend and backend messages into their own select.
// This implies that either messages are received (both front or backend),
// or the submitted ping timer expires (if provided).
let next_frontend = frontend.next();
let next_backend = backend_event.next();
let mut message_fut = future::select(next_frontend, next_backend);
// User called `request` on the front-end
Either::Left((Some(FrontToBack::Request(request)), _)) => {
tracing::trace!("[backend]: client prepares to send request={:?}", request);
match sender.send(request.raw).await {
Ok(_) => manager
.insert_pending_call(request.id, request.send_back)
.expect("ID unused checked above; qed"),
Err(e) => {
tracing::warn!("[backend]: client request failed: {:?}", e);
let _ = request.send_back.map(|s| s.send(Err(Error::Transport(e.into()))));
}
}
}
loop {
// Create either a valid delay fuse triggered every provided `duration`,
// or create a terminated fuse that's never selected if the provided `duration` is None.
let submit_ping = if let Some(duration) = ping_interval {
Delay::new(duration).fuse()
} else {
// The select macro bypasses terminated futures, and the `submit_ping` branch is never selected.
Fuse::<Delay>::terminated()
};
// User called `subscribe` on the front-end.
Either::Left((Some(FrontToBack::Subscribe(sub)), _)) => match sender.send(sub.raw).await {
Ok(_) => manager
.insert_pending_subscription(
sub.subscribe_id,
sub.unsubscribe_id,
sub.send_back,
sub.unsubscribe_method,
)
.expect("Request ID unused checked above; qed"),
Err(e) => {
tracing::warn!("[backend]: client subscription failed: {:?}", e);
let _ = sub.send_back.send(Err(Error::Transport(e.into())));
}
},
// User dropped a subscription.
Either::Left((Some(FrontToBack::SubscriptionClosed(sub_id)), _)) => {
tracing::trace!("Closing subscription: {:?}", sub_id);
// NOTE: The subscription may have been closed earlier if
// the channel was full or disconnected.
if let Some(unsub) = manager
.get_request_id_by_subscription_id(&sub_id)
.and_then(|req_id| build_unsubscribe_message(&mut manager, req_id, sub_id))
{
stop_subscription(&mut sender, &mut manager, unsub).await;
match future::select(message_fut, submit_ping).await {
// Message received from the frontend.
Either::Left((Either::Left((frontend_value, backend)), _)) => {
if let Err(err) = handle_frontend_messages(frontend_value, &mut manager, &mut sender, max_notifs_per_subscription).await {
tracing::warn!("{:?}", err);
let _ = front_error.send(err);
break;
}
// Advance frontend, save backend.
message_fut = future::select(frontend.next(), backend);
}
// User called `register_notification` on the front-end.
Either::Left((Some(FrontToBack::RegisterNotification(reg)), _)) => {
tracing::trace!("[backend] registering notification handler: {:?}", reg.method);
let (subscribe_tx, subscribe_rx) = mpsc::channel(max_notifs_per_subscription);
if manager.insert_notification_handler(&reg.method, subscribe_tx).is_ok() {
let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
} else {
let _ = reg.send_back.send(Err(Error::MethodAlreadyRegistered(reg.method)));
// Message received from the backend.
Either::Left((Either::Right((backend_value, frontend)), _))=> {
if let Err(err) = handle_backend_messages::<S, R>(
backend_value, &mut manager, &mut sender, max_notifs_per_subscription
).await {
tracing::warn!("{:?}", err);