rpc_module.rs 46 KiB
Newer Older
		match self {
			SubscriptionSinkState::Pending(id) => id.take().ok_or(SubscriptionEmptyError),
			_ => Err(SubscriptionEmptyError),
		}
	}

	/// Advance the state of the sink to the accepted state.
	fn accept(&self, rx: watch::Receiver<()>) -> Result<SubscriptionSinkState, SubscriptionEmptyError> {
		match self {
			// Cannot transition to accepted if the ID was not previously consumed.
			SubscriptionSinkState::Pending(None) => Ok(SubscriptionSinkState::Accepted(rx)),
			_ => Err(SubscriptionEmptyError),
		}
	}

	/// Reject the subscription.
	fn reject(&self) -> Result<SubscriptionSinkState, SubscriptionEmptyError> {
		match self {
			SubscriptionSinkState::Pending(None) => Ok(SubscriptionSinkState::Rejected),
			_ => Err(SubscriptionEmptyError),
		}
	}
}

/// Represents a single subscription.
pub struct SubscriptionSink {
	inner: MethodSink,
David's avatar
David committed
	/// Get notified when subscribers leave so we can exit
	close_notify: Option<SubscriptionPermit>,
Maciej Hirsz's avatar
Maciej Hirsz committed
	/// MethodCallback.
	method: &'static str,
	/// Shared Mutex of subscriptions for this method.
	subscribers: Subscribers,
	/// Unique subscription.
	uniq_sub: SubscriptionKey,
	/// The state of the subscription sink.
	state: SubscriptionSinkState,
	/// Claimed resources.
	_claimed: Option<ResourceGuard>,
	/// Reject the subscription call from [`ErrorObject`].
	pub fn reject(&mut self, err: impl Into<ErrorObjectOwned>) -> Result<(), SubscriptionEmptyError> {
		let id = self.state.id()?;

		if self.inner.send_error(id, err.into()) {
			self.state = self.state.reject()?;
			Ok(())
			Err(SubscriptionEmptyError)
		}
	}

	/// Attempt to accept the subscription and respond the subscription method call.
	///
	/// Fails if the connection was closed, or if called multiple times.
	pub fn accept(&mut self) -> Result<(), SubscriptionEmptyError> {
		let id = self.state.id()?;

		if self.inner.send_response(id, &self.uniq_sub.sub_id) {
			let (tx, rx) = watch::channel(());
			self.subscribers.lock().insert(self.uniq_sub.clone(), (self.inner.clone(), tx));
			self.state = self.state.accept(rx)?;
			Ok(())
		} else {
			Err(SubscriptionEmptyError)
		}
	}

	/// Accepts the subscription if previously not accepted.
	fn maybe_accept(&mut self) {
		let _ = self.accept();
	}

David's avatar
David committed
	/// Send a message back to subscribers.
	///
	/// Returns `Ok(true)` if the message could be send
	/// Returns `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated)
	/// Return `Err(err)` if the message could not be serialized.
	///
	pub fn send<T: Serialize>(&mut self, result: &T) -> Result<bool, serde_json::Error> {
		self.maybe_accept();

		// only possible to trigger when the connection is dropped.
		if self.is_closed() {
		let msg = self.build_message(result)?;
	/// Reads data from the `stream` and sends back data on the subscription
	/// when items gets produced by the stream.
	/// The underlying stream must produce `Result values, see [`futures_util::TryStream`] for further information.
	///
	/// Returns `Ok(())` if the stream or connection was terminated.
	/// Returns `Err(_)` immediately if the underlying stream returns an error or if an item from the stream could not be serialized.
	/// use jsonrpsee_core::server::rpc_module::RpcModule;
	/// use jsonrpsee_core::error::{Error, SubscriptionClosed};
	/// use jsonrpsee_types::ErrorObjectOwned;
	///
	/// let mut m = RpcModule::new(());
	/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
	///     let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
	///     // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
	///     // because after the `Err(_)` the stream is terminated.
	///     tokio::spawn(async move {
	///         // jsonrpsee doesn't send an error notification unless `close` is explicitly called.
	///         // If we pipe messages to the sink, we can inspect why it ended:
	///         pending
	///             .pipe_from_try_stream(stream)
	///             .await
	///             .on_success(|sink| {
	///                 let err_obj: ErrorObjectOwned = SubscriptionClosed::Success.into();
	///                 sink.close(err_obj);
	///             })
	///             .on_failure(|sink, err| {
	///                 sink.close(err);
	///             })
	pub async fn pipe_from_try_stream<S, T, E>(&mut self, mut stream: S) -> SubscriptionClosed
		self.maybe_accept();

		let conn_closed = match self.close_notify.as_ref().map(|cn| cn.handle()) {
			Some(cn) => cn,
			None => {
				return SubscriptionClosed::RemotePeerAborted;
			}
		let mut sub_closed = match &self.state {
			SubscriptionSinkState::Accepted(rx) => rx.clone(),
			_ => {
				let err = ErrorObject::owned(
					INTERNAL_ERROR_CODE,
					"Unsubscribe watcher not set after accepting the subscription".to_string(),
					None::<()>
				);
				return SubscriptionClosed::Failed(err);
			}
		};

		let sub_closed_fut = sub_closed.changed();

		let conn_closed_fut = conn_closed.notified();
		pin_mut!(conn_closed_fut);
		pin_mut!(sub_closed_fut);

		let mut stream_item = stream.try_next();
		let mut closed_fut = futures_util::future::select(conn_closed_fut, sub_closed_fut);

		loop {
			match futures_util::future::select(stream_item, closed_fut).await {
				// The app sent us a value to send back to the subscribers
				Either::Left((Ok(Some(result)), next_closed_fut)) => {
					match self.send(&result) {
						Ok(true) => (),
						Ok(false) => {
							break SubscriptionClosed::RemotePeerAborted;
						}
						Err(err) => {
							let err = ErrorObject::owned(SUBSCRIPTION_CLOSED_WITH_ERROR, err.to_string(), None::<()>);
							break SubscriptionClosed::Failed(err);
						}
					};
					stream_item = stream.try_next();
					closed_fut = next_closed_fut;
				}
				// Stream canceled because of error.
				Either::Left((Err(err), _)) => {
					let err = ErrorObject::owned(SUBSCRIPTION_CLOSED_WITH_ERROR, err.to_string(), None::<()>);
					break SubscriptionClosed::Failed(err);
				}
				Either::Left((Ok(None), _)) => break SubscriptionClosed::Success,
					break SubscriptionClosed::RemotePeerAborted;
	/// Similar to [`SubscriptionSink::pipe_from_try_stream`] but it doesn't require the stream return `Result`.
	///
	/// Warning: it's possible to pass in a stream that returns `Result` if `Result: Serialize` is satisfied
	/// but it won't cancel the stream when an error occurs. If you want the stream to be canceled when an
	/// error occurs use [`SubscriptionSink::pipe_from_try_stream`] instead.
	///
	/// # Examples
	///
	/// ```no_run
	///
	/// use jsonrpsee_core::server::rpc_module::RpcModule;
	///
	/// let mut m = RpcModule::new(());
	/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
	///     let mut sink = pending.accept().unwrap();
	///     let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
	///     tokio::spawn(async move { sink.pipe_from_stream(stream).await; });
	///     Ok(())
	/// });
	/// ```
	pub async fn pipe_from_stream<S, T>(&mut self, stream: S) -> SubscriptionClosed
	where
		S: Stream<Item = T> + Unpin,
		T: Serialize,
	{
		self.pipe_from_try_stream::<_, _, Error>(stream.map(|item| Ok(item))).await
	}

	/// Returns whether the subscription is closed.
	pub fn is_closed(&self) -> bool {
		self.inner.is_closed() || self.close_notify.is_none() || !self.is_active_subscription()
	fn is_active_subscription(&self) -> bool {
		match &self.state {
			SubscriptionSinkState::Accepted(unsubscribe) => !unsubscribe.has_changed().is_err(),
			_ => false
		}
	}

	fn build_message<T: Serialize>(&self, result: &T) -> Result<String, serde_json::Error> {
		serde_json::to_string(&SubscriptionResponse::new(
			self.method.into(),
			SubscriptionPayload { subscription: self.uniq_sub.sub_id.clone(), result },
	fn build_error_message<T: Serialize>(&self, error: &T) -> Result<String, serde_json::Error> {
		serde_json::to_string(&SubscriptionError::new(
			self.method.into(),
			SubscriptionPayloadError { subscription: self.uniq_sub.sub_id.clone(), error },
		))
		.map_err(Into::into)
	/// Close the subscription, sending a notification with a special `error` field containing the provided error.
	///
	/// This can be used to signal an actual error, or just to signal that the subscription has been closed,
	/// depending on your preference.
	///
	/// If you'd like to to close the subscription without sending an error, just drop it and don't call this method.
	///
	///
	/// ```json
	/// {
	///  "jsonrpc": "2.0",
	///  "method": "<method>",
	///  "params": {
	///    "subscription": "<subscriptionID>",
	///    "error": { "code": <code from error>, "message": <message from error>, "data": <data from error> }
	///    }
	///  }
	/// }
	/// ```
	///
	pub fn close(self, err: impl Into<ErrorObjectOwned>) -> bool {
		if self.is_active_subscription() {
			if let Some((sink, _)) = self.subscribers.lock().remove(&self.uniq_sub) {
				tracing::debug!("Closing subscription: {:?}", self.uniq_sub.sub_id);
				let msg = self.build_error_message(&err.into()).expect("valid json infallible; qed");
				return sink.send_raw(msg).is_ok();
impl Drop for SubscriptionSink {
	fn drop(&mut self) {
		// Subscription was never accepted.
		if let Ok(id) = self.state.id() {
			self.inner.send_error(id, ErrorCode::InvalidParams.into());
		} else if self.is_active_subscription() {
			self.subscribers.lock().remove(&self.uniq_sub);
		}
/// Wrapper struct that maintains a subscription "mainly" for testing.
	close_notify: Option<SubscriptionPermit>,
	rx: mpsc::UnboundedReceiver<String>,
	sub_id: RpcSubscriptionId<'static>,
	/// Close the subscription channel.
	pub fn close(&mut self) {
David's avatar
David committed
		tracing::trace!("[Subscription::close] Notifying");
		if let Some(n) = self.close_notify.take() {
David's avatar
David committed
		}
	/// Get the subscription ID
	pub fn subscription_id(&self) -> &RpcSubscriptionId {
		&self.sub_id
	/// Check whether the subscription is closed.
	pub fn is_closed(&self) -> bool {
		self.close_notify.is_none()
	}

	/// Returns `Some((val, sub_id))` for the next element of type T from the underlying stream,
	/// otherwise `None` if the subscription was closed.
	/// # Panics
	///
	/// If the decoding the value as `T` fails.
Maciej Hirsz's avatar
Maciej Hirsz committed
	pub async fn next<T: DeserializeOwned>(&mut self) -> Option<Result<(T, RpcSubscriptionId<'static>), Error>> {
David's avatar
David committed
		if self.close_notify.is_none() {
			tracing::debug!("[Subscription::next] Closed.");
David's avatar
David committed
		}
		let res = match serde_json::from_str::<SubscriptionResponse<T>>(&raw) {
			Ok(r) => Some(Ok((r.params.result, r.params.subscription.into_owned()))),
			Err(e) => match serde_json::from_str::<SubscriptionError<serde_json::Value>>(&raw) {
				Ok(_) => None,
				Err(_) => Some(Err(e.into())),
impl Drop for Subscription {
	fn drop(&mut self) {
		self.close();
	}
}