Newer
Older
Either::Left((Err(err), _)) => {
let err = ErrorObject::owned(SUBSCRIPTION_CLOSED_WITH_ERROR, err.to_string(), None::<()>);
break SubscriptionClosed::Failed(err);
}
Either::Left((Ok(None), _)) => break SubscriptionClosed::Success,
Niklas Adolfsson
committed
Either::Right((_, _)) => {
break SubscriptionClosed::RemotePeerAborted;
}
}
}
}
/// Similar to [`SubscriptionSink::pipe_from_try_stream`] but it doesn't require the stream return `Result`.
///
/// Warning: it's possible to pass in a stream that returns `Result` if `Result: Serialize` is satisfied
/// but it won't cancel the stream when an error occurs. If you want the stream to be canceled when an
/// error occurs use [`SubscriptionSink::pipe_from_try_stream`] instead.
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::RpcModule;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
/// tokio::spawn(async move { sink.pipe_from_stream(stream).await; });
/// Ok(())
/// });
/// ```
pub async fn pipe_from_stream<S, T>(&mut self, stream: S) -> SubscriptionClosed
where
S: Stream<Item = T> + Unpin,
T: Serialize,
{
self.pipe_from_try_stream::<_, _, Error>(stream.map(|item| Ok(item))).await
}
Niklas Adolfsson
committed
/// Returns whether the subscription is closed.
pub fn is_closed(&self) -> bool {
Niklas Adolfsson
committed
self.inner.is_closed() || self.close_notify.is_none() || !self.is_active_subscription()
fn is_active_subscription(&self) -> bool {
match &self.state {
SubscriptionSinkState::Accepted(unsubscribe) => !unsubscribe.has_changed().is_err(),
_ => false
}
}
fn build_message<T: Serialize>(&self, result: &T) -> Result<String, serde_json::Error> {
serde_json::to_string(&SubscriptionResponse::new(
self.method.into(),
SubscriptionPayload { subscription: self.uniq_sub.sub_id.clone(), result },
.map_err(Into::into)
fn build_error_message<T: Serialize>(&self, error: &T) -> Result<String, serde_json::Error> {
serde_json::to_string(&SubscriptionError::new(
self.method.into(),
SubscriptionPayloadError { subscription: self.uniq_sub.sub_id.clone(), error },
))
.map_err(Into::into)
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
/// Close the subscription, sending a notification with a special `error` field containing the provided error.
///
/// This can be used to signal an actual error, or just to signal that the subscription has been closed,
/// depending on your preference.
///
/// If you'd like to to close the subscription without sending an error, just drop it and don't call this method.
///
///
/// ```json
/// {
/// "jsonrpc": "2.0",
/// "method": "<method>",
/// "params": {
/// "subscription": "<subscriptionID>",
/// "error": { "code": <code from error>, "message": <message from error>, "data": <data from error> }
/// }
/// }
/// }
/// ```
///
pub fn close(self, err: impl Into<ErrorObjectOwned>) -> bool {
if self.is_active_subscription() {
if let Some((sink, _)) = self.subscribers.lock().remove(&self.uniq_sub) {
tracing::debug!("Closing subscription: {:?}", self.uniq_sub.sub_id);
let msg = self.build_error_message(&err.into()).expect("valid json infallible; qed");
return sink.send_raw(msg).is_ok();
false
impl Drop for SubscriptionSink {
fn drop(&mut self) {
// Subscription was never accepted.
if let Ok(id) = self.state.id() {
self.inner.send_error(id, ErrorCode::InvalidParams.into());
} else if self.is_active_subscription() {
self.subscribers.lock().remove(&self.uniq_sub);
}
/// Wrapper struct that maintains a subscription "mainly" for testing.
pub struct Subscription {
Niklas Adolfsson
committed
close_notify: Option<SubscriptionPermit>,
rx: mpsc::UnboundedReceiver<String>,
sub_id: RpcSubscriptionId<'static>,
impl Subscription {
/// Close the subscription channel.
pub fn close(&mut self) {
tracing::trace!("[Subscription::close] Notifying");
if let Some(n) = self.close_notify.take() {
Niklas Adolfsson
committed
n.handle().notify_one()
/// Get the subscription ID
pub fn subscription_id(&self) -> &RpcSubscriptionId {
&self.sub_id
/// Check whether the subscription is closed.
pub fn is_closed(&self) -> bool {
self.close_notify.is_none()
}
Niklas Adolfsson
committed
/// Returns `Some((val, sub_id))` for the next element of type T from the underlying stream,
/// otherwise `None` if the subscription was closed.
Niklas Adolfsson
committed
/// # Panics
///
/// If the decoding the value as `T` fails.
pub async fn next<T: DeserializeOwned>(&mut self) -> Option<Result<(T, RpcSubscriptionId<'static>), Error>> {
if self.close_notify.is_none() {
tracing::debug!("[Subscription::next] Closed.");
return None;
Niklas Adolfsson
committed
let raw = self.rx.next().await?;
tracing::debug!("rx: {}", raw);
let res = match serde_json::from_str::<SubscriptionResponse<T>>(&raw) {
Ok(r) => Some(Ok((r.params.result, r.params.subscription.into_owned()))),
Err(e) => match serde_json::from_str::<SubscriptionError<serde_json::Value>>(&raw) {
Ok(_) => None,
Err(_) => Some(Err(e.into())),
impl Drop for Subscription {
fn drop(&mut self) {
self.close();
}
}
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sink_state_pending() {
let mut state = SubscriptionSinkState::new(Id::Number(0));
assert!(matches!(state, SubscriptionSinkState::Pending(Some(Id::Number(0)))));
assert!(matches!(state.id(), Ok(Id::Number(0))));
assert!(matches!(state, SubscriptionSinkState::Pending(None)));
assert!(matches!(state.id(), Err(SubscriptionEmptyError)));
}
#[test]
fn sink_state_accepted() {
let mut state = SubscriptionSinkState::new(Id::Number(0));
let (_, rx) = watch::channel(());
// Invalid transition without consuming the ID.
assert!(matches!(state.accept(rx.clone()), Err(SubscriptionEmptyError)));
assert!(matches!(state, SubscriptionSinkState::Pending(Some(Id::Number(0)))));
// Transition to accepted.
assert!(matches!(state.id(), Ok(Id::Number(0))));
let state = state.accept(rx.clone());
assert!(matches!(state, Ok(SubscriptionSinkState::Accepted(_))));
let mut state = state.unwrap();
// Double transition.
assert!(matches!(state.accept(rx.clone()), Err(SubscriptionEmptyError)));
// Invalid method call in this state.
assert!(matches!(state.id(), Err(SubscriptionEmptyError)));
// Invalid transition.
assert!(matches!(state.reject(), Err(SubscriptionEmptyError)));
}
#[test]
fn sink_state_rejected() {
let mut state = SubscriptionSinkState::new(Id::Number(0));
// Invalid transition without consuming the ID.
assert!(matches!(state.reject(), Err(SubscriptionEmptyError)));
assert!(matches!(state, SubscriptionSinkState::Pending(Some(Id::Number(0)))));
// Transition to rejected.
assert!(matches!(state.id(), Ok(Id::Number(0))));
let state = state.reject();
assert!(matches!(state, Ok(SubscriptionSinkState::Rejected)));
let mut state = state.unwrap();
// Double transition.
assert!(matches!(state.reject(), Err(SubscriptionEmptyError)));
// Invalid method call in this state.
assert!(matches!(state.id(), Err(SubscriptionEmptyError)));
// Invalid transition.
let (_, rx) = watch::channel(());
assert!(matches!(state.accept(rx), Err(SubscriptionEmptyError)));
}
}