Newer
Older
}
fn build_message<T: Serialize>(&self, result: &T) -> Result<String, serde_json::Error> {
serde_json::to_string(&SubscriptionResponse::new(
self.method.into(),
SubscriptionPayload { subscription: self.uniq_sub.sub_id.clone(), result },
.map_err(Into::into)
fn build_error_message<T: Serialize>(&self, error: &T) -> Result<String, serde_json::Error> {
serde_json::to_string(&SubscriptionError::new(
self.method.into(),
SubscriptionPayloadError { subscription: self.uniq_sub.sub_id.clone(), error },
))
.map_err(Into::into)
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
/// Close the subscription, sending a notification with a special `error` field containing the provided error.
///
/// This can be used to signal an actual error, or just to signal that the subscription has been closed,
/// depending on your preference.
///
/// If you'd like to to close the subscription without sending an error, just drop it and don't call this method.
///
///
/// ```json
/// {
/// "jsonrpc": "2.0",
/// "method": "<method>",
/// "params": {
/// "subscription": "<subscriptionID>",
/// "error": { "code": <code from error>, "message": <message from error>, "data": <data from error> }
/// }
/// }
/// }
/// ```
///
pub fn close(self, err: impl Into<ErrorObjectOwned>) -> bool {
if self.is_active_subscription() {
if let Some((sink, _)) = self.subscribers.lock().remove(&self.uniq_sub) {
tracing::debug!("Closing subscription: {:?}", self.uniq_sub.sub_id);
let msg = self.build_error_message(&err.into()).expect("valid json infallible; qed");
return sink.send_raw(msg).is_ok();
false
impl Drop for SubscriptionSink {
fn drop(&mut self) {
// Subscription was never accepted.
if let Some(id) = self.id.take() {
self.inner.send_error(id, ErrorCode::InvalidParams.into());
} else if self.is_active_subscription() {
self.subscribers.lock().remove(&self.uniq_sub);
}
/// Wrapper struct that maintains a subscription "mainly" for testing.
pub struct Subscription {
Niklas Adolfsson
committed
close_notify: Option<SubscriptionPermit>,
rx: mpsc::UnboundedReceiver<String>,
sub_id: RpcSubscriptionId<'static>,
impl Subscription {
/// Close the subscription channel.
pub fn close(&mut self) {
tracing::trace!("[Subscription::close] Notifying");
if let Some(n) = self.close_notify.take() {
Niklas Adolfsson
committed
n.handle().notify_one()
/// Get the subscription ID
pub fn subscription_id(&self) -> &RpcSubscriptionId {
&self.sub_id
/// Check whether the subscription is closed.
pub fn is_closed(&self) -> bool {
self.close_notify.is_none()
}
Niklas Adolfsson
committed
/// Returns `Some((val, sub_id))` for the next element of type T from the underlying stream,
/// otherwise `None` if the subscription was closed.
Niklas Adolfsson
committed
/// # Panics
///
/// If the decoding the value as `T` fails.
pub async fn next<T: DeserializeOwned>(&mut self) -> Option<Result<(T, RpcSubscriptionId<'static>), Error>> {
if self.close_notify.is_none() {
tracing::debug!("[Subscription::next] Closed.");
return None;
Niklas Adolfsson
committed
let raw = self.rx.next().await?;
tracing::debug!("rx: {}", raw);
let res = match serde_json::from_str::<SubscriptionResponse<T>>(&raw) {
Ok(r) => Some(Ok((r.params.result, r.params.subscription.into_owned()))),
Err(e) => match serde_json::from_str::<SubscriptionError<serde_json::Value>>(&raw) {
Ok(_) => None,
Err(_) => Some(Err(e.into())),
impl Drop for Subscription {
fn drop(&mut self) {
self.close();
}
}