Newer
Older
use crate::error::Error;
use crate::jsonrpc::{self, DeserializeOwned, JsonValue, Params, SubscriptionId};
use core::marker::PhantomData;
use futures::channel::{mpsc, oneshot};
use futures::prelude::*;
/// Active subscription on a Client.
pub struct Subscription<Notif> {
/// Channel to send requests to the background task.
pub to_back: mpsc::Sender<FrontToBack>,
/// Channel from which we receive notifications from the server, as encoded `JsonValue`s.
pub notifs_rx: mpsc::Receiver<JsonValue>,
/// Subscription ID,
pub id: SubscriptionId,
/// Marker in order to pin the `Notif` parameter.
pub marker: PhantomData<Notif>,
}
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/// Notification message.
#[derive(Debug)]
pub struct NotificationMessage {
/// Method for the notification.
pub method: String,
/// Parameters to send to the server.
pub params: Params,
}
/// Request message.
#[derive(Debug)]
pub struct RequestMessage {
/// Method for the request.
pub method: String,
/// Parameters of the request.
pub params: Params,
/// One-shot channel over which we send back the result of this request.
pub send_back: Option<oneshot::Sender<Result<JsonValue, Error>>>,
}
/// Subscription message.
#[derive(Debug)]
pub struct SubscriptionMessage {
/// Method for the subscription request.
pub subscribe_method: String,
/// Parameters to send for the subscription.
pub params: Params,
/// Method to use to unsubscribe later. Used if the channel unexpectedly closes.
pub unsubscribe_method: String,
/// If the subscription succeeds, we return a [`mpsc::Receiver`] that will receive notifications.
/// When we get a response from the server about that subscription, we send the result over
/// this channel.
pub send_back: oneshot::Sender<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>,
}
/// Message that the Client can send to the background task.
pub enum FrontToBack {
/// Send a notification to the server.
Notification(NotificationMessage),
/// Send a request to the server.
StartRequest(RequestMessage),
/// Send a subscription request to the server.
Subscribe(SubscriptionMessage),
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/// When a subscription channel is closed, we send this message to the background
/// task to mark it ready for garbage collection.
// NOTE: It is not possible to cancel pending subscriptions or pending requests.
// Such operations will be blocked until a response is received or the background
// thread has been terminated.
SubscriptionClosed(SubscriptionId),
}
impl<Notif> Subscription<Notif>
where
Notif: DeserializeOwned,
{
/// Returns the next notification from the stream
/// This may return `None` if the subscription has been terminated,
/// may happen if the channel becomes full or is dropped.
///
/// Ignores any malformed packet.
pub async fn next(&mut self) -> Option<Notif> {
loop {
match self.notifs_rx.next().await {
Some(n) => match jsonrpc::from_value(n) {
Ok(parsed) => return Some(parsed),
Err(e) => log::debug!("Subscription response error: {:?}", e),
},
None => return None,
}
}
}
}
impl<Notif> Drop for Subscription<Notif> {
fn drop(&mut self) {
// We can't actually guarantee that this goes through. If the background task is busy, then
// the channel's buffer will be full, and our unsubscription request will never make it.
// However, when a notification arrives, the background task will realize that the channel
// to the `Subscription` has been closed, and will perform the unsubscribe.
let id = std::mem::replace(&mut self.id, SubscriptionId::Num(0));
let _ = self.to_back.send(FrontToBack::SubscriptionClosed(id)).now_or_never();
}
}