Newer
Older
/// Reject the subscription call from [`ErrorObject`].
pub fn reject(&mut self, err: impl Into<ErrorObjectOwned>) -> bool {
if let Some(id) = self.id.take() {
self.inner.send_error(id, err.into())
} else {
false
}
}
/// Attempt to accept the subscription and respond the subscription method call.
///
/// Fails if the connection was closed, or if called multiple times.
pub fn accept(&mut self) -> Result<(), SubscriptionEmptyError> {
let id = self.id.take().ok_or(SubscriptionEmptyError)?;
if self.inner.send_response(id, &self.uniq_sub.sub_id) {
let (tx, rx) = watch::channel(());
self.subscribers.lock().insert(self.uniq_sub.clone(), (self.inner.clone(), tx));
self.unsubscribe = rx;
Ok(())
} else {
Err(SubscriptionEmptyError)
}
}
/// Accepts the subscription if previously not accepted.
fn maybe_accept(&mut self) {
let _ = self.accept();
}
///
/// Returns `Ok(true)` if the message could be send
/// Returns `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated)
/// Return `Err(err)` if the message could not be serialized.
///
pub fn send<T: Serialize>(&mut self, result: &T) -> Result<bool, serde_json::Error> {
// only possible to trigger when the connection is dropped.
return Ok(false);
let msg = self.build_message(result)?;
Niklas Adolfsson
committed
Ok(self.inner.send_raw(msg).is_ok())
/// Reads data from the `stream` and sends back data on the subscription
/// when items gets produced by the stream.
Niklas Adolfsson
committed
/// The underlying stream must produce `Result values, see [`futures_util::TryStream`] for further information.
///
/// Returns `Ok(())` if the stream or connection was terminated.
Niklas Adolfsson
committed
/// Returns `Err(_)` immediately if the underlying stream returns an error or if an item from the stream could not be serialized.
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::RpcModule;
/// use jsonrpsee_core::error::{Error, SubscriptionClosed};
/// use jsonrpsee_types::ErrorObjectOwned;
Niklas Adolfsson
committed
/// use anyhow::anyhow;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
Niklas Adolfsson
committed
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
/// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
/// // because after the `Err(_)` the stream is terminated.
/// tokio::spawn(async move {
/// // jsonrpsee doesn't send an error notification unless `close` is explicitly called.
/// // If we pipe messages to the sink, we can inspect why it ended:
/// pending
/// .pipe_from_try_stream(stream)
/// .await
/// .on_success(|sink| {
/// let err_obj: ErrorObjectOwned = SubscriptionClosed::Success.into();
/// sink.close(err_obj);
/// })
/// .on_failure(|sink, err| {
/// sink.close(err);
/// })
/// });
/// });
/// ```
pub async fn pipe_from_try_stream<S, T, E>(&mut self, mut stream: S) -> SubscriptionClosed
Niklas Adolfsson
committed
S: TryStream<Ok = T, Error = E> + Unpin,
T: Serialize,
Niklas Adolfsson
committed
E: std::fmt::Display,
Niklas Adolfsson
committed
let conn_closed = match self.close_notify.as_ref().map(|cn| cn.handle()) {
Some(cn) => cn,
Niklas Adolfsson
committed
None => {
return SubscriptionClosed::RemotePeerAborted;
}
};
Niklas Adolfsson
committed
let mut sub_closed = self.unsubscribe.clone();
let sub_closed_fut = sub_closed.changed();
let conn_closed_fut = conn_closed.notified();
pin_mut!(conn_closed_fut);
pin_mut!(sub_closed_fut);
let mut stream_item = stream.try_next();
Niklas Adolfsson
committed
let mut closed_fut = futures_util::future::select(conn_closed_fut, sub_closed_fut);
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
loop {
match futures_util::future::select(stream_item, closed_fut).await {
// The app sent us a value to send back to the subscribers
Either::Left((Ok(Some(result)), next_closed_fut)) => {
match self.send(&result) {
Ok(true) => (),
Ok(false) => {
break SubscriptionClosed::RemotePeerAborted;
}
Err(err) => {
let err = ErrorObject::owned(SUBSCRIPTION_CLOSED_WITH_ERROR, err.to_string(), None::<()>);
break SubscriptionClosed::Failed(err);
}
};
stream_item = stream.try_next();
closed_fut = next_closed_fut;
}
// Stream canceled because of error.
Either::Left((Err(err), _)) => {
let err = ErrorObject::owned(SUBSCRIPTION_CLOSED_WITH_ERROR, err.to_string(), None::<()>);
break SubscriptionClosed::Failed(err);
}
Either::Left((Ok(None), _)) => break SubscriptionClosed::Success,
Niklas Adolfsson
committed
Either::Right((_, _)) => {
break SubscriptionClosed::RemotePeerAborted;
}
}
}
}
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
/// Similar to [`SubscriptionSink::pipe_from_try_stream`] but it doesn't require the stream return `Result`.
///
/// Warning: it's possible to pass in a stream that returns `Result` if `Result: Serialize` is satisfied
/// but it won't cancel the stream when an error occurs. If you want the stream to be canceled when an
/// error occurs use [`SubscriptionSink::pipe_from_try_stream`] instead.
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::RpcModule;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
/// let mut sink = pending.accept().unwrap();
/// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
/// tokio::spawn(async move { sink.pipe_from_stream(stream).await; });
/// Ok(())
/// });
/// ```
async fn pipe_from_stream<S, T>(&mut self, stream: S) -> SubscriptionClosed
where
S: Stream<Item = T> + Unpin,
T: Serialize,
{
self.pipe_from_try_stream::<_, _, Error>(stream.map(|item| Ok(item))).await
}
Niklas Adolfsson
committed
/// Returns whether the subscription is closed.
pub fn is_closed(&self) -> bool {
Niklas Adolfsson
committed
self.inner.is_closed() || self.close_notify.is_none() || !self.is_active_subscription()
fn is_active_subscription(&self) -> bool {
Niklas Adolfsson
committed
!self.unsubscribe.has_changed().is_err()
}
fn build_message<T: Serialize>(&self, result: &T) -> Result<String, serde_json::Error> {
serde_json::to_string(&SubscriptionResponse::new(
self.method.into(),
SubscriptionPayload { subscription: self.uniq_sub.sub_id.clone(), result },
.map_err(Into::into)
fn build_error_message<T: Serialize>(&self, error: &T) -> Result<String, serde_json::Error> {
serde_json::to_string(&SubscriptionError::new(
self.method.into(),
SubscriptionPayloadError { subscription: self.uniq_sub.sub_id.clone(), error },
))
.map_err(Into::into)
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
/// Close the subscription, sending a notification with a special `error` field containing the provided error.
///
/// This can be used to signal an actual error, or just to signal that the subscription has been closed,
/// depending on your preference.
///
/// If you'd like to to close the subscription without sending an error, just drop it and don't call this method.
///
///
/// ```json
/// {
/// "jsonrpc": "2.0",
/// "method": "<method>",
/// "params": {
/// "subscription": "<subscriptionID>",
/// "error": { "code": <code from error>, "message": <message from error>, "data": <data from error> }
/// }
/// }
/// }
/// ```
///
pub fn close(self, err: impl Into<ErrorObjectOwned>) -> bool {
if self.is_active_subscription() {
if let Some((sink, _)) = self.subscribers.lock().remove(&self.uniq_sub) {
tracing::debug!("Closing subscription: {:?}", self.uniq_sub.sub_id);
let msg = self.build_error_message(&err.into()).expect("valid json infallible; qed");
return sink.send_raw(msg).is_ok();
false
impl Drop for SubscriptionSink {
fn drop(&mut self) {
if self.is_active_subscription() {
self.subscribers.lock().remove(&self.uniq_sub);
}
/// Wrapper struct that maintains a subscription "mainly" for testing.
pub struct Subscription {
Niklas Adolfsson
committed
close_notify: Option<SubscriptionPermit>,
rx: mpsc::UnboundedReceiver<String>,
sub_id: RpcSubscriptionId<'static>,
impl Subscription {
/// Close the subscription channel.
pub fn close(&mut self) {
tracing::trace!("[Subscription::close] Notifying");
if let Some(n) = self.close_notify.take() {
Niklas Adolfsson
committed
n.handle().notify_one()
/// Get the subscription ID
pub fn subscription_id(&self) -> &RpcSubscriptionId {
&self.sub_id
/// Check whether the subscription is closed.
pub fn is_closed(&self) -> bool {
self.close_notify.is_none()
}
Niklas Adolfsson
committed
/// Returns `Some((val, sub_id))` for the next element of type T from the underlying stream,
/// otherwise `None` if the subscription was closed.
Niklas Adolfsson
committed
/// # Panics
///
/// If the decoding the value as `T` fails.
pub async fn next<T: DeserializeOwned>(&mut self) -> Option<Result<(T, RpcSubscriptionId<'static>), Error>> {
if self.close_notify.is_none() {
tracing::debug!("[Subscription::next] Closed.");
return None;
Niklas Adolfsson
committed
let raw = self.rx.next().await?;
tracing::debug!("rx: {}", raw);
let res = match serde_json::from_str::<SubscriptionResponse<T>>(&raw) {
Ok(r) => Some(Ok((r.params.result, r.params.subscription.into_owned()))),
Err(e) => match serde_json::from_str::<SubscriptionError<serde_json::Value>>(&raw) {
Ok(_) => None,
Err(_) => Some(Err(e.into())),
impl Drop for Subscription {
fn drop(&mut self) {
self.close();
}
}