//! Handles and monitors JSONRPC v2 method calls and subscriptions //! //! Definitions: //! //! - RequestId: request ID in the JSONRPC-v2 specification //! > **Note**: The spec allow number, string or null but this crate only supports numbers. //! - SubscriptionId: unique ID generated by server use fnv::FnvHashMap; use futures::channel::{mpsc, oneshot}; use jsonrpsee_types::{ error::Error, jsonrpc::{JsonValue, SubscriptionId}, }; use std::collections::hash_map::{Entry, HashMap}; #[derive(Debug)] enum Kind { PendingMethodCall(PendingCallOneshot), PendingSubscription((PendingSubscriptionOneshot, UnsubscribeMethod)), Subscription((SubscriptionSink, UnsubscribeMethod)), } #[derive(Debug)] /// Indicates the status of a given request/response. pub enum RequestStatus { /// The method call is waiting for a response, PendingMethodCall, /// The subscription is waiting for a response to become an active subscription. PendingSubscription, /// An active subscription. Subscription, /// Invalid request ID. Invalid, } type PendingCallOneshot = oneshot::Sender>; type PendingSubscriptionOneshot = oneshot::Sender, SubscriptionId), Error>>; type SubscriptionSink = mpsc::Sender; type UnsubscribeMethod = String; type RequestId = u64; #[derive(Debug, Default)] /// Manages and monitors JSONRPC v2 method calls and subscriptions. pub struct RequestManager { /// List of requests that are waiting for a response from the server. // NOTE: FnvHashMap is used here because RequestId is not under the caller's control and is known to be a short key (u64). requests: FnvHashMap, /// Reverse lookup, to find a request ID in constant time by `subscription ID` instead of looking through all requests. subscriptions: HashMap, } impl RequestManager { /// Create an empty `RequestManager`. pub fn new() -> Self { Self::default() } /// Tries to insert a new pending call. /// /// Returns `Ok` if the pending request was successfully inserted otherwise `Err`. pub fn insert_pending_call(&mut self, id: u64, send_back: PendingCallOneshot) -> Result<(), PendingCallOneshot> { if let Entry::Vacant(v) = self.requests.entry(id) { v.insert(Kind::PendingMethodCall(send_back)); Ok(()) } else { Err(send_back) } } /// Tries to insert a new pending subscription. /// /// Returns `Ok` if the pending request was successfully inserted otherwise `Err`. pub fn insert_pending_subscription( &mut self, id: RequestId, send_back: PendingSubscriptionOneshot, unsubscribe_method: UnsubscribeMethod, ) -> Result<(), PendingSubscriptionOneshot> { if let Entry::Vacant(v) = self.requests.entry(id) { v.insert(Kind::PendingSubscription((send_back, unsubscribe_method))); Ok(()) } else { Err(send_back) } } /// Tries to insert a new subscription. /// /// Returns `Ok` if the pending request was successfully inserted otherwise `Err`. pub fn insert_subscription( &mut self, request_id: RequestId, subscription_id: SubscriptionId, send_back: SubscriptionSink, unsubscribe_method: String, ) -> Result<(), SubscriptionSink> { if let (Entry::Vacant(request), Entry::Vacant(subscription)) = (self.requests.entry(request_id), self.subscriptions.entry(subscription_id)) { request.insert(Kind::Subscription((send_back, unsubscribe_method))); subscription.insert(request_id); Ok(()) } else { Err(send_back) } } /// Tries to complete a pending subscription. /// /// Returns `Some` if the subscription was completed otherwise `None`. pub fn complete_pending_subscription( &mut self, request_id: RequestId, ) -> Option<(PendingSubscriptionOneshot, UnsubscribeMethod)> { match self.requests.entry(request_id) { Entry::Occupied(request) if matches!(request.get(), Kind::PendingSubscription(_)) => { let (_req_id, kind) = request.remove_entry(); if let Kind::PendingSubscription(send_back) = kind { Some(send_back) } else { unreachable!("Pending subscription is Pending subscription checked above; qed"); } } _ => None, } } /// Tries to complete a pending call.. /// /// Returns `Some` if the call was completed otherwise `None`. pub fn complete_pending_call(&mut self, request_id: RequestId) -> Option { match self.requests.entry(request_id) { Entry::Occupied(request) if matches!(request.get(), Kind::PendingMethodCall(_)) => { let (_req_id, kind) = request.remove_entry(); if let Kind::PendingMethodCall(send_back) = kind { Some(send_back) } else { unreachable!("Pending call is Pending call checked above; qed"); } } _ => None, } } /// Tries to remove a subscription. /// /// Returns `Some` if the subscription was removed otherwise `None`. pub fn remove_subscription( &mut self, request_id: RequestId, subscription_id: SubscriptionId, ) -> Option<(SubscriptionSink, UnsubscribeMethod)> { match (self.requests.entry(request_id), self.subscriptions.entry(subscription_id)) { (Entry::Occupied(request), Entry::Occupied(subscription)) if matches!(request.get(), Kind::Subscription(_)) => { let (_req_id, kind) = request.remove_entry(); let _sub_id = subscription.remove_entry(); if let Kind::Subscription(send_back) = kind { Some(send_back) } else { unreachable!("Subscription is Subscription checked above; qed"); } } _ => None, } } /// Returns the status of a request ID pub fn request_status(&mut self, id: &RequestId) -> RequestStatus { self.requests.get(id).map_or(RequestStatus::Invalid, |kind| match kind { Kind::PendingMethodCall(_) => RequestStatus::PendingMethodCall, Kind::PendingSubscription(_) => RequestStatus::PendingSubscription, Kind::Subscription(_) => RequestStatus::Subscription, }) } /// Get a mutable reference to underlying `Sink` in order to send messages to the subscription. /// /// Returns `Some` if the `request_id` was registered as a subscription otherwise `None`. pub fn as_subscription_mut(&mut self, request_id: &RequestId) -> Option<&mut SubscriptionSink> { if let Some(Kind::Subscription((sink, _))) = self.requests.get_mut(request_id) { Some(sink) } else { None } } /// Reverse lookup to get the request ID for a subscription ID. /// /// Returns `Some` if the subscription ID was registered as a subscription otherwise `None`. pub fn get_request_id_by_subscription_id(&self, sub_id: &SubscriptionId) -> Option { self.subscriptions.get(sub_id).copied() } } #[cfg(test)] mod tests { use super::{Error, RequestManager}; use futures::channel::{mpsc, oneshot}; use jsonrpsee_types::jsonrpc::{JsonValue, SubscriptionId}; #[test] fn insert_remove_pending_request_works() { let (request_tx, _) = oneshot::channel::>(); let mut manager = RequestManager::new(); assert!(manager.insert_pending_call(0, request_tx).is_ok()); assert!(manager.complete_pending_call(0).is_some()); } #[test] fn insert_remove_subscription_works() { let (pending_sub_tx, _) = oneshot::channel::, SubscriptionId), Error>>(); let (sub_tx, _) = mpsc::channel::(1); let mut manager = RequestManager::new(); assert!(manager.insert_pending_subscription(1, pending_sub_tx, "unsubscribe_method".into()).is_ok()); let (_send_back_oneshot, unsubscribe_method) = manager.complete_pending_subscription(1).unwrap(); assert!(manager .insert_subscription(1, SubscriptionId::Str("uniq_id_from_server".to_string()), sub_tx, unsubscribe_method) .is_ok()); assert!(manager.as_subscription_mut(&1).is_some()); assert!(manager.remove_subscription(1, SubscriptionId::Str("uniq_id_from_server".to_string())).is_some()); } #[test] fn pending_method_call_faulty() { let (request_tx1, _) = oneshot::channel::>(); let (request_tx2, _) = oneshot::channel::>(); let (pending_sub_tx, _) = oneshot::channel::, SubscriptionId), Error>>(); let (sub_tx, _) = mpsc::channel::(1); let mut manager = RequestManager::new(); assert!(manager.insert_pending_call(0, request_tx1).is_ok()); assert!(manager.insert_pending_call(0, request_tx2).is_err()); assert!(manager.insert_pending_subscription(0, pending_sub_tx, "beef".to_string()).is_err()); assert!(manager.insert_subscription(0, SubscriptionId::Num(137), sub_tx, "bibimbap".to_string()).is_err()); assert!(manager.remove_subscription(0, SubscriptionId::Num(137)).is_none()); assert!(manager.complete_pending_subscription(0).is_none()); assert!(manager.complete_pending_call(0).is_some()); } #[test] fn pending_subscription_faulty() { let (request_tx, _) = oneshot::channel::>(); let (pending_sub_tx1, _) = oneshot::channel::, SubscriptionId), Error>>(); let (pending_sub_tx2, _) = oneshot::channel::, SubscriptionId), Error>>(); let (sub_tx, _) = mpsc::channel::(1); let mut manager = RequestManager::new(); assert!(manager.insert_pending_subscription(99, pending_sub_tx1, "beef".to_string()).is_ok()); assert!(manager.insert_pending_call(99, request_tx).is_err()); assert!(manager.insert_pending_subscription(99, pending_sub_tx2, "vegan".to_string()).is_err()); assert!(manager.insert_subscription(99, SubscriptionId::Num(0), sub_tx, "bibimbap".to_string()).is_err()); assert!(manager.remove_subscription(99, SubscriptionId::Num(0)).is_none()); assert!(manager.complete_pending_call(99).is_none()); assert!(manager.complete_pending_subscription(99).is_some()); } #[test] fn active_subscriptions_faulty() { let (request_tx, _) = oneshot::channel::>(); let (pending_sub_tx, _) = oneshot::channel::, SubscriptionId), Error>>(); let (sub_tx1, _) = mpsc::channel::(1); let (sub_tx2, _) = mpsc::channel::(1); let mut manager = RequestManager::new(); assert!(manager.insert_subscription(3, SubscriptionId::Num(0), sub_tx1, "bibimbap".to_string()).is_ok()); assert!(manager.insert_subscription(3, SubscriptionId::Num(1), sub_tx2, "bibimbap".to_string()).is_err()); assert!(manager.insert_pending_subscription(3, pending_sub_tx, "beef".to_string()).is_err()); assert!(manager.insert_pending_call(3, request_tx).is_err()); assert!(manager.remove_subscription(3, SubscriptionId::Num(7)).is_none()); assert!(manager.complete_pending_call(3).is_none()); assert!(manager.complete_pending_subscription(3).is_none()); assert!(manager.remove_subscription(3, SubscriptionId::Num(1)).is_none()); assert!(manager.remove_subscription(3, SubscriptionId::Num(0)).is_some()); } }