rpc_module.rs 36.6 KiB
Newer Older
	///
	/// ```json
	/// {
	///  "jsonrpc": "2.0",
	///  "method": "<method>",
	///  "params": {
	///    "subscription": "<subscriptionID>",
	///    "error": { "code": <code from error>, "message": <message from error>, "data": <data from error> }
	///    }
	///  }
	/// }
	/// ```
	///
	pub fn close(self, err: impl Into<ErrorObjectOwned>) -> bool {
		if self.is_active_subscription() {
			if let Some((sink, _)) = self.subscribers.lock().remove(&self.uniq_sub) {
				tracing::debug!("Closing subscription: {:?}", self.uniq_sub.sub_id);
				let msg = self.build_error_message(&err.into()).expect("valid json infallible; qed");
				return sink.send_raw(msg).is_ok();
impl Drop for SubscriptionSink {
	fn drop(&mut self) {
		if self.is_active_subscription() {
			self.subscribers.lock().remove(&self.uniq_sub);
		}
/// Wrapper struct that maintains a subscription "mainly" for testing.
David's avatar
David committed
	close_notify: Option<Arc<Notify>>,
	rx: mpsc::UnboundedReceiver<String>,
	sub_id: RpcSubscriptionId<'static>,
	/// Close the subscription channel.
	pub fn close(&mut self) {
David's avatar
David committed
		tracing::trace!("[Subscription::close] Notifying");
		if let Some(n) = self.close_notify.take() {
			n.notify_one()
		}
	}
	/// Get the subscription ID
	pub fn subscription_id(&self) -> &RpcSubscriptionId {
		&self.sub_id
	/// Returns `Some((val, sub_id))` for the next element of type T from the underlying stream,
	/// otherwise `None` if the subscription was closed.
	/// # Panics
	///
	/// If the decoding the value as `T` fails.
Maciej Hirsz's avatar
Maciej Hirsz committed
	pub async fn next<T: DeserializeOwned>(&mut self) -> Option<Result<(T, RpcSubscriptionId<'static>), Error>> {
David's avatar
David committed
		if self.close_notify.is_none() {
			tracing::debug!("[Subscription::next] Closed.");
David's avatar
David committed
		}
		let res = match serde_json::from_str::<SubscriptionResponse<T>>(&raw) {
			Ok(r) => Some(Ok((r.params.result, r.params.subscription.into_owned()))),
			Err(e) => match serde_json::from_str::<SubscriptionError<serde_json::Value>>(&raw) {
				Ok(_) => None,
				Err(_) => Some(Err(e.into())),
impl Drop for Subscription {
	fn drop(&mut self) {
		self.close();
	}
}