Newer
Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Handles and monitors JSONRPC v2 method calls and subscriptions
//!
//! Definitions:
//!
//! - RequestId: request ID in the JSONRPC-v2 specification
//! > **Note**: The spec allow number, string or null but this crate only supports numbers.
//! - SubscriptionId: unique ID generated by server
use std::collections::{hash_map::Entry, HashMap};
use crate::Error;
use futures_channel::{mpsc, oneshot};
use jsonrpsee_types::{Id, SubscriptionId};
use rustc_hash::FxHashMap;
enum Kind {
PendingMethodCall(PendingCallOneshot),
PendingSubscription((RequestId, PendingSubscriptionOneshot, UnsubscribeMethod)),
Subscription((RequestId, SubscriptionSink, UnsubscribeMethod)),
/// Indicates the status of a given request/response.
pub(crate) enum RequestStatus {
/// The method call is waiting for a response,
PendingMethodCall,
/// The subscription is waiting for a response to become an active subscription.
PendingSubscription,
/// An active subscription.
Subscription,
/// Invalid request ID.
Invalid,
}
type PendingCallOneshot = Option<oneshot::Sender<Result<JsonValue, Error>>>;
type PendingBatchOneshot = oneshot::Sender<Result<Vec<JsonValue>, Error>>;
type PendingSubscriptionOneshot = oneshot::Sender<Result<(mpsc::Receiver<JsonValue>, SubscriptionId<'static>), Error>>;
type SubscriptionSink = mpsc::Sender<JsonValue>;
type UnsubscribeMethod = String;
type RequestId = Id<'static>;
#[derive(Debug)]
/// Batch state.
pub(crate) struct BatchState {
/// Order that the request was performed in.
pub(crate) order: FxHashMap<RequestId, usize>,
pub(crate) send_back: PendingBatchOneshot,
/// Manages and monitors JSONRPC v2 method calls and subscriptions.
pub(crate) struct RequestManager {
/// List of requests that are waiting for a response from the server.
// NOTE: FnvHashMap is used here because RequestId is not under the caller's control and is known to be a short
// key.
requests: FxHashMap<RequestId, Kind>,
/// Reverse lookup, to find a request ID in constant time by `subscription ID` instead of looking through all
/// requests.
subscriptions: HashMap<SubscriptionId<'static>, RequestId>,
batches: FxHashMap<Vec<RequestId>, BatchState>,
/// Registered Methods for incoming notifications.
notification_handlers: HashMap<String, SubscriptionSink>,
/// Create a new `RequestManager`.
pub(crate) fn new() -> Self {
/// Tries to insert a new pending request.
///
/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
pub(crate) fn insert_pending_call(
&mut self,
id: RequestId,
send_back: PendingCallOneshot,
) -> Result<(), PendingCallOneshot> {
if let Entry::Vacant(v) = self.requests.entry(id) {
v.insert(Kind::PendingMethodCall(send_back));
Ok(())
} else {
Err(send_back)
}
}
///
/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
pub(crate) fn insert_pending_batch(
send_back: PendingBatchOneshot,
) -> Result<(), PendingBatchOneshot> {
let mut order = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default());
for (idx, batch_id) in batch.iter().enumerate() {
order.insert(batch_id.clone(), idx);
}
batch.sort_unstable();
if let Entry::Vacant(v) = self.batches.entry(batch) {
v.insert(BatchState { order, send_back });
Ok(())
} else {
Err(send_back)
}
}
/// Tries to insert a new pending subscription and reserves a slot for a "potential" unsubscription request.
///
/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
pub(crate) fn insert_pending_subscription(
sub_req_id: RequestId,
unsub_req_id: RequestId,
send_back: PendingSubscriptionOneshot,
unsubscribe_method: UnsubscribeMethod,
) -> Result<(), PendingSubscriptionOneshot> {
// The request IDs are not in the manager and the `sub_id` and `unsub_id` are not equal.
if !self.requests.contains_key(&sub_req_id)
&& !self.requests.contains_key(&unsub_req_id)
&& sub_req_id != unsub_req_id
{
self.requests
.insert(sub_req_id, Kind::PendingSubscription((unsub_req_id.clone(), send_back, unsubscribe_method)));
self.requests.insert(unsub_req_id, Kind::PendingMethodCall(None));
Ok(())
} else {
Err(send_back)
}
}
/// Tries to insert a new subscription.
///
/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
pub(crate) fn insert_subscription(
sub_req_id: RequestId,
unsub_req_id: RequestId,
subscription_id: SubscriptionId<'static>,
unsubscribe_method: UnsubscribeMethod,
) -> Result<(), SubscriptionSink> {
if let (Entry::Vacant(request), Entry::Vacant(subscription)) =
(self.requests.entry(sub_req_id.clone()), self.subscriptions.entry(subscription_id))
request.insert(Kind::Subscription((unsub_req_id, send_back, unsubscribe_method)));
subscription.insert(sub_req_id);
Ok(())
} else {
Err(send_back)
}
}
/// Inserts a handler for incoming notifications.
pub(crate) fn insert_notification_handler(
&mut self,
method: &str,
send_back: SubscriptionSink,
) -> Result<(), Error> {
if let Entry::Vacant(handle) = self.notification_handlers.entry(method.to_owned()) {
handle.insert(send_back);
Ok(())
} else {
Err(Error::MethodAlreadyRegistered(method.to_owned()))
}
}
/// Removes a notification handler.
pub(crate) fn remove_notification_handler(&mut self, method: String) -> Result<(), Error> {
if self.notification_handlers.remove(&method).is_some() {
Err(Error::UnregisteredNotification(method))
/// Tries to complete a pending subscription.
///
/// Returns `Some` if the subscription was completed otherwise `None`.
pub(crate) fn complete_pending_subscription(
) -> Option<(RequestId, PendingSubscriptionOneshot, UnsubscribeMethod)> {
match self.requests.entry(request_id) {
Entry::Occupied(request) if matches!(request.get(), Kind::PendingSubscription(_)) => {
let (_req_id, kind) = request.remove_entry();
if let Kind::PendingSubscription(send_back) = kind {
Some(send_back)
} else {
unreachable!("Pending subscription is Pending subscription checked above; qed");
}
}
_ => None,
}
}
/// Tries to complete a pending batch request.
///
/// Returns `Some` if the subscription was completed otherwise `None`.
pub(crate) fn complete_pending_batch(&mut self, batch: Vec<RequestId>) -> Option<BatchState> {
match self.batches.entry(batch) {
Entry::Occupied(request) => {
let (_digest, state) = request.remove_entry();
Some(state)
}
_ => None,
}
}
/// Tries to complete a pending call.
///
/// Returns `Some` if the call was completed otherwise `None`.
pub(crate) fn complete_pending_call(&mut self, request_id: RequestId) -> Option<PendingCallOneshot> {
match self.requests.entry(request_id) {
Entry::Occupied(request) if matches!(request.get(), Kind::PendingMethodCall(_)) => {
let (_req_id, kind) = request.remove_entry();
if let Kind::PendingMethodCall(send_back) = kind {
Some(send_back)
} else {
unreachable!("Pending call is Pending call checked above; qed");
}
}
_ => None,
}
}
/// Tries to remove a subscription.
///
/// Returns `Some` if the subscription was removed otherwise `None`.
pub(crate) fn remove_subscription(
subscription_id: SubscriptionId<'static>,
) -> Option<(RequestId, SubscriptionSink, UnsubscribeMethod, SubscriptionId)> {
match (self.requests.entry(request_id), self.subscriptions.entry(subscription_id)) {
(Entry::Occupied(request), Entry::Occupied(subscription))
if matches!(request.get(), Kind::Subscription(_)) =>
{
let (_req_id, kind) = request.remove_entry();
let (sub_id, _req_id) = subscription.remove_entry();
if let Kind::Subscription((unsub_req_id, send_back, unsub)) = kind {
Some((unsub_req_id, send_back, unsub, sub_id))
} else {
unreachable!("Subscription is Subscription checked above; qed");
}
}
_ => None,
}
}
/// Returns the status of a request ID
pub(crate) fn request_status(&mut self, id: &RequestId) -> RequestStatus {
self.requests.get(id).map_or(RequestStatus::Invalid, |kind| match kind {
Kind::PendingMethodCall(_) => RequestStatus::PendingMethodCall,
Kind::PendingSubscription(_) => RequestStatus::PendingSubscription,
Kind::Subscription(_) => RequestStatus::Subscription,
})
}
/// Get a mutable reference to underlying `Sink` in order to send messages to the subscription.
///
/// Returns `Some` if the `request_id` was registered as a subscription otherwise `None`.
pub(crate) fn as_subscription_mut(&mut self, request_id: &RequestId) -> Option<&mut SubscriptionSink> {
if let Some(Kind::Subscription((_, sink, _))) = self.requests.get_mut(request_id) {
Some(sink)
} else {
None
}
}
Niklas Adolfsson
committed
/// Get a mutable reference to underlying `Sink` in order to send incoming notifications to the subscription.
///
/// Returns `Some` if the `method` was registered as a NotificationHandler otherwise `None`.
pub(crate) fn as_notification_handler_mut(&mut self, method: String) -> Option<&mut SubscriptionSink> {
self.notification_handlers.get_mut(&method)
}
/// Reverse lookup to get the request ID for a subscription ID.
///
/// Returns `Some` if the subscription ID was registered as a subscription otherwise `None`.
pub(crate) fn get_request_id_by_subscription_id(&self, sub_id: &SubscriptionId) -> Option<RequestId> {
self.subscriptions.get(sub_id).map(|id| id.clone().into_owned())
}
}
#[cfg(test)]
mod tests {
use super::{Error, RequestManager};
use futures_channel::{mpsc, oneshot};
use jsonrpsee_types::{Id, SubscriptionId};
use serde_json::Value as JsonValue;
#[test]
fn insert_remove_pending_request_works() {
let (request_tx, _) = oneshot::channel::<Result<JsonValue, Error>>();
let mut manager = RequestManager::new();
assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx)).is_ok());
assert!(manager.complete_pending_call(Id::Number(0)).is_some());
}
#[test]
fn insert_remove_subscription_works() {
let (pending_sub_tx, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (sub_tx, _) = mpsc::channel::<JsonValue>(1);
let mut manager = RequestManager::new();
.insert_pending_subscription(Id::Number(1), Id::Number(2), pending_sub_tx, "unsubscribe_method".into())
.is_ok());
let (unsub_req_id, _send_back_oneshot, unsubscribe_method) =
manager.complete_pending_subscription(Id::Number(1)).unwrap();
assert_eq!(unsub_req_id, Id::Number(2));
assert!(manager
.insert_subscription(
Id::Number(1),
Id::Number(2),
SubscriptionId::Str("uniq_id_from_server".into()),
sub_tx,
unsubscribe_method
)
assert!(manager.as_subscription_mut(&Id::Number(1)).is_some());
assert!(manager
.remove_subscription(Id::Number(1), SubscriptionId::Str("uniq_id_from_server".into()))
.is_some());
#[test]
fn insert_subscription_with_same_sub_and_unsub_id_should_err() {
let (tx1, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (tx2, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (tx3, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (tx4, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let mut manager = RequestManager::new();
assert!(manager
.insert_pending_subscription(Id::Str("1".into()), Id::Str("1".into()), tx1, "unsubscribe_method".into())
.is_err());
assert!(manager
.insert_pending_subscription(Id::Str("0".into()), Id::Str("1".into()), tx2, "unsubscribe_method".into())
.is_ok());
manager
.insert_pending_subscription(
Id::Str("99".into()),
Id::Str("0".into()),
tx3,
"unsubscribe_method".into()
)
.is_err(),
"unsub request ID already occupied"
);
assert!(
manager
.insert_pending_subscription(
Id::Str("99".into()),
Id::Str("1".into()),
tx4,
"unsubscribe_method".into()
)
.is_err(),
"sub request ID already occupied"
);
}
#[test]
fn pending_method_call_faulty() {
let (request_tx1, _) = oneshot::channel::<Result<JsonValue, Error>>();
let (request_tx2, _) = oneshot::channel::<Result<JsonValue, Error>>();
let (pending_sub_tx, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (sub_tx, _) = mpsc::channel::<JsonValue>(1);
let mut manager = RequestManager::new();
assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx1)).is_ok());
assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx2)).is_err());
assert!(manager
.insert_pending_subscription(Id::Number(0), Id::Number(1), pending_sub_tx, "beef".to_string())
.is_err());
assert!(manager
.insert_subscription(
Id::Number(0),
Id::Number(99),
SubscriptionId::Num(137),
sub_tx,
"bibimbap".to_string()
)
.is_err());
assert!(manager.remove_subscription(Id::Number(0), SubscriptionId::Num(137)).is_none());
assert!(manager.complete_pending_subscription(Id::Number(0)).is_none());
assert!(manager.complete_pending_call(Id::Number(0)).is_some());
}
#[test]
fn pending_subscription_faulty() {
let (request_tx, _) = oneshot::channel::<Result<JsonValue, Error>>();
let (pending_sub_tx1, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (pending_sub_tx2, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (sub_tx, _) = mpsc::channel::<JsonValue>(1);
let mut manager = RequestManager::new();
assert!(manager
.insert_pending_subscription(Id::Number(99), Id::Number(100), pending_sub_tx1, "beef".to_string())
.is_ok());
assert!(manager.insert_pending_call(Id::Number(99), Some(request_tx)).is_err());
assert!(manager
.insert_pending_subscription(Id::Number(99), Id::Number(1337), pending_sub_tx2, "vegan".to_string())
.is_err());
assert!(manager
.insert_subscription(
Id::Number(99),
Id::Number(100),
SubscriptionId::Num(0),
sub_tx,
"bibimbap".to_string()
)
.is_err());
assert!(manager.remove_subscription(Id::Number(99), SubscriptionId::Num(0)).is_none());
assert!(manager.complete_pending_call(Id::Number(99)).is_none());
assert!(manager.complete_pending_subscription(Id::Number(99)).is_some());
}
#[test]
fn active_subscriptions_faulty() {
let (request_tx, _) = oneshot::channel::<Result<JsonValue, Error>>();
let (pending_sub_tx, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (sub_tx1, _) = mpsc::channel::<JsonValue>(1);
let (sub_tx2, _) = mpsc::channel::<JsonValue>(1);
let mut manager = RequestManager::new();
assert!(manager
.insert_subscription(Id::Number(3), Id::Number(4), SubscriptionId::Num(0), sub_tx1, "bibimbap".to_string())
.is_ok());
assert!(manager
.insert_subscription(Id::Number(3), Id::Number(4), SubscriptionId::Num(1), sub_tx2, "bibimbap".to_string())
.is_err());
assert!(manager
.insert_pending_subscription(Id::Number(3), Id::Number(4), pending_sub_tx, "beef".to_string())
.is_err());
assert!(manager.insert_pending_call(Id::Number(3), Some(request_tx)).is_err());
assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(7)).is_none());
assert!(manager.complete_pending_call(Id::Number(3)).is_none());
assert!(manager.complete_pending_subscription(Id::Number(3)).is_none());
assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(1)).is_none());
assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(0)).is_some());