Unverified Commit 708d4213 authored by Niklas Adolfsson's avatar Niklas Adolfsson Committed by GitHub
Browse files

feat(client): support request id as Strings. (#659)

* feat(client): support request id as Strings.

* add tests for Id::String

* address grumbles: move id_kind to RequestManager

* Update client/http-client/src/client.rs

* types: take ref to `ID` get rid of some `Clone`

* remove more clone

* grumbles: rename tests
parent c0f343d4
......@@ -61,7 +61,7 @@ pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
b.iter(|| {
let params = &[1_u64.into(), 2_u32.into()];
let params = ParamsSer::ArrayRef(params);
let request = RequestSer::new(Id::Number(0), "say_hello", Some(params));
let request = RequestSer::new(&Id::Number(0), "say_hello", Some(params));
v2_serialize(request);
})
});
......@@ -69,7 +69,7 @@ pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
crit.bench_function("jsonrpsee_types_v2_vec", |b| {
b.iter(|| {
let params = ParamsSer::Array(vec![1_u64.into(), 2_u32.into()]);
let request = RequestSer::new(Id::Number(0), "say_hello", Some(params));
let request = RequestSer::new(&Id::Number(0), "say_hello", Some(params));
v2_serialize(request);
})
});
......
......@@ -30,7 +30,7 @@ use std::time::Duration;
use crate::transport::HttpTransportClient;
use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response};
use async_trait::async_trait;
use jsonrpsee_core::client::{CertificateStore, ClientT, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use rustc_hash::FxHashMap;
use serde::de::DeserializeOwned;
......@@ -42,6 +42,7 @@ pub struct HttpClientBuilder {
request_timeout: Duration,
max_concurrent_requests: usize,
certificate_store: CertificateStore,
id_kind: IdKind,
}
impl HttpClientBuilder {
......@@ -69,13 +70,19 @@ impl HttpClientBuilder {
self
}
/// Configure the data type of the request object ID (default is number).
pub fn id_format(mut self, id_kind: IdKind) -> Self {
self.id_kind = id_kind;
self
}
/// Build the HTTP client with target to connect to.
pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient, Error> {
let transport = HttpTransportClient::new(target, self.max_request_body_size, self.certificate_store)
.map_err(|e| Error::Transport(e.into()))?;
Ok(HttpClient {
transport,
id_manager: Arc::new(RequestIdManager::new(self.max_concurrent_requests)),
id_manager: Arc::new(RequestIdManager::new(self.max_concurrent_requests, self.id_kind)),
request_timeout: self.request_timeout,
})
}
......@@ -88,6 +95,7 @@ impl Default for HttpClientBuilder {
request_timeout: Duration::from_secs(60),
max_concurrent_requests: 256,
certificate_store: CertificateStore::Native,
id_kind: IdKind::Number,
}
}
}
......@@ -120,8 +128,9 @@ impl ClientT for HttpClient {
where
R: DeserializeOwned,
{
let id = self.id_manager.next_request_id()?;
let request = RequestSer::new(Id::Number(*id.inner()), method, params);
let guard = self.id_manager.next_request_id()?;
let id = guard.inner();
let request = RequestSer::new(&id, method, params);
let fut = self.transport.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?);
let body = match tokio::time::timeout(self.request_timeout, fut).await {
......@@ -142,9 +151,7 @@ impl ClientT for HttpClient {
}
};
let response_id = response.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
if response_id == *id.inner() {
if response.id == id {
Ok(response.result)
} else {
Err(Error::InvalidRequestId)
......@@ -155,16 +162,18 @@ impl ClientT for HttpClient {
where
R: DeserializeOwned + Default + Clone,
{
let guard = self.id_manager.next_request_ids(batch.len())?;
let ids: Vec<Id> = guard.inner();
let mut batch_request = Vec::with_capacity(batch.len());
// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
let mut ordered_requests = Vec::with_capacity(batch.len());
let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default());
let ids = self.id_manager.next_request_ids(batch.len())?;
for (pos, (method, params)) in batch.into_iter().enumerate() {
batch_request.push(RequestSer::new(Id::Number(ids.inner()[pos]), method, params));
ordered_requests.push(ids.inner()[pos]);
request_set.insert(ids.inner()[pos], pos);
batch_request.push(RequestSer::new(&ids[pos], method, params));
ordered_requests.push(&ids[pos]);
request_set.insert(&ids[pos], pos);
}
let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
......@@ -184,8 +193,7 @@ impl ClientT for HttpClient {
// NOTE: `R::default` is placeholder and will be replaced in loop below.
let mut responses = vec![R::default(); ordered_requests.len()];
for rp in rps {
let response_id = rp.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
let pos = match request_set.get(&response_id) {
let pos = match request_set.get(&rp.id) {
Some(pos) => *pos,
None => return Err(Error::InvalidRequestId),
};
......
......@@ -27,13 +27,12 @@
use crate::types::error::{ErrorCode, ErrorObject, ErrorResponse};
use crate::types::ParamsSer;
use crate::HttpClientBuilder;
use jsonrpsee_core::client::ClientT;
use jsonrpsee_core::client::{ClientT, IdKind};
use jsonrpsee_core::rpc_params;
use jsonrpsee_core::Error;
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::mocks::Id;
use jsonrpsee_test_utils::TimeoutFutureExt;
use serde_json::value::Value as JsonValue;
#[tokio::test]
async fn method_call_works() {
......@@ -42,7 +41,33 @@ async fn method_call_works() {
.await
.unwrap()
.unwrap();
assert_eq!(JsonValue::String("hello".into()), result);
assert_eq!("hello", &result);
}
#[tokio::test]
async fn method_call_with_wrong_id_kind() {
let exp = "id as string";
let server_addr =
http_server_with_hardcoded_response(ok_response(exp.into(), Id::Num(0))).with_default_timeout().await.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
assert!(matches!(
client.request::<String>("o", None).with_default_timeout().await.unwrap(),
Err(Error::InvalidRequestId)
));
}
#[tokio::test]
async fn method_call_with_id_str() {
let exp = "id as string";
let server_addr = http_server_with_hardcoded_response(ok_response(exp.into(), Id::Str("0".into())))
.with_default_timeout()
.await
.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
let response: String = client.request::<String>("o", None).with_default_timeout().await.unwrap().unwrap();
assert_eq!(&response, exp);
}
#[tokio::test]
......@@ -139,7 +164,7 @@ async fn run_batch_request_with_response<'a>(
client.batch_request(batch).with_default_timeout().await.unwrap()
}
async fn run_request_with_response(response: String) -> Result<JsonValue, Error> {
async fn run_request_with_response(response: String) -> Result<String, Error> {
let server_addr = http_server_with_hardcoded_response(response).with_default_timeout().await.unwrap();
let uri = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().build(&uri).unwrap();
......
......@@ -43,7 +43,7 @@ pub use jsonrpsee_types as types;
use std::time::Duration;
use jsonrpsee_client_transport::ws::{Header, InvalidUri, Uri, WsTransportClientBuilder};
use jsonrpsee_core::client::{CertificateStore, ClientBuilder};
use jsonrpsee_core::client::{CertificateStore, ClientBuilder, IdKind};
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
/// Builder for [`WsClient`].
......@@ -77,6 +77,7 @@ pub struct WsClientBuilder<'a> {
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
max_redirections: usize,
id_kind: IdKind,
}
impl<'a> Default for WsClientBuilder<'a> {
......@@ -90,6 +91,7 @@ impl<'a> Default for WsClientBuilder<'a> {
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,
max_redirections: 5,
id_kind: IdKind::Number,
}
}
}
......@@ -143,6 +145,12 @@ impl<'a> WsClientBuilder<'a> {
self
}
/// See documentation for [`ClientBuilder::id_format`] (default is Number).
pub fn id_format(mut self, kind: IdKind) -> Self {
self.id_kind = kind;
self
}
/// Build the client with specified URL to connect to.
/// You must provide the port number in the URL.
///
......@@ -165,6 +173,7 @@ impl<'a> WsClientBuilder<'a> {
.max_notifs_per_subscription(self.max_notifs_per_subscription)
.request_timeout(self.request_timeout)
.max_concurrent_requests(self.max_concurrent_requests)
.id_format(self.id_kind)
.build(sender, receiver))
}
}
......@@ -28,8 +28,8 @@
use crate::types::error::{ErrorCode, ErrorObject, ErrorResponse};
use crate::types::ParamsSer;
use crate::WsClientBuilder;
use jsonrpsee_core::client::Subscription;
use jsonrpsee_core::client::{ClientT, SubscriptionClientT};
use jsonrpsee_core::client::{IdKind, Subscription};
use jsonrpsee_core::rpc_params;
use jsonrpsee_core::Error;
use jsonrpsee_test_utils::helpers::*;
......@@ -44,7 +44,42 @@ async fn method_call_works() {
.await
.unwrap()
.unwrap();
assert_eq!(JsonValue::String("hello".into()), result);
assert_eq!("hello", &result);
}
#[tokio::test]
async fn method_call_with_wrong_id_kind() {
let exp = "id as string";
let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
ok_response(exp.into(), Id::Num(0)),
)
.with_default_timeout()
.await
.unwrap();
let uri = format!("ws://{}", server.local_addr());
let client =
WsClientBuilder::default().id_format(IdKind::String).build(&uri).with_default_timeout().await.unwrap().unwrap();
let err = client.request::<String>("o", None).with_default_timeout().await.unwrap();
assert!(matches!(err, Err(Error::RestartNeeded(e)) if e == "Invalid request ID"));
}
#[tokio::test]
async fn method_call_with_id_str() {
let exp = "id as string";
let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
ok_response(exp.into(), Id::Str("0".into())),
)
.with_default_timeout()
.await
.unwrap();
let uri = format!("ws://{}", server.local_addr());
let client =
WsClientBuilder::default().id_format(IdKind::String).build(&uri).with_default_timeout().await.unwrap().unwrap();
let response: String = client.request::<String>("o", None).with_default_timeout().await.unwrap().unwrap();
assert_eq!(&response, exp);
}
#[tokio::test]
......@@ -237,7 +272,7 @@ async fn run_batch_request_with_response<'a>(
client.batch_request(batch).with_default_timeout().await.unwrap()
}
async fn run_request_with_response(response: String) -> Result<JsonValue, Error> {
async fn run_request_with_response(response: String) -> Result<String, Error> {
let server = WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), response)
.with_default_timeout()
.await
......
......@@ -45,8 +45,8 @@ pub(crate) fn process_batch_response(manager: &mut RequestManager, rps: Vec<Resp
let mut rps_unordered: Vec<_> = Vec::with_capacity(rps.len());
for rp in rps {
let id = rp.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
digest.push(id);
let id = rp.id.into_owned();
digest.push(id.clone());
rps_unordered.push((id, rp.result));
}
......@@ -131,7 +131,7 @@ pub(crate) fn process_single_response(
response: Response<JsonValue>,
max_capacity_per_subscription: usize,
) -> Result<Option<RequestMessage>, Error> {
let response_id = response.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
let response_id = response.id.into_owned();
match manager.request_status(&response_id) {
RequestStatus::PendingMethodCall => {
let send_back_oneshot = match manager.complete_pending_call(response_id) {
......@@ -144,7 +144,7 @@ pub(crate) fn process_single_response(
}
RequestStatus::PendingSubscription => {
let (unsub_id, send_back_oneshot, unsubscribe_method) =
manager.complete_pending_subscription(response_id).ok_or(Error::InvalidRequestId)?;
manager.complete_pending_subscription(response_id.clone()).ok_or(Error::InvalidRequestId)?;
let sub_id: Result<SubscriptionId, _> = response.result.try_into();
let sub_id = match sub_id {
......@@ -157,7 +157,7 @@ pub(crate) fn process_single_response(
let (subscribe_tx, subscribe_rx) = mpsc::channel(max_capacity_per_subscription);
if manager
.insert_subscription(response_id, unsub_id, sub_id.clone(), subscribe_tx, unsubscribe_method)
.insert_subscription(response_id.clone(), unsub_id, sub_id.clone(), subscribe_tx, unsubscribe_method)
.is_ok()
{
match send_back_oneshot.send(Ok((subscribe_rx, sub_id.clone()))) {
......@@ -191,14 +191,14 @@ pub(crate) async fn stop_subscription(
/// Builds an unsubscription message.
pub(crate) fn build_unsubscribe_message(
manager: &mut RequestManager,
sub_req_id: u64,
sub_req_id: Id<'static>,
sub_id: SubscriptionId<'static>,
) -> Option<RequestMessage> {
let (unsub_req_id, _, unsub, sub_id) = manager.remove_subscription(sub_req_id, sub_id)?;
let sub_id_slice: &[JsonValue] = &[sub_id.into()];
// TODO: https://github.com/paritytech/jsonrpsee/issues/275
let params = ParamsSer::ArrayRef(sub_id_slice);
let raw = serde_json::to_string(&RequestSer::new(Id::Number(unsub_req_id), &unsub, Some(params))).ok()?;
let raw = serde_json::to_string(&RequestSer::new(&unsub_req_id, &unsub, Some(params))).ok()?;
Some(RequestMessage { raw, id: unsub_req_id, send_back: None })
}
......@@ -207,7 +207,7 @@ pub(crate) fn build_unsubscribe_message(
/// Returns `Ok` if the response was successfully sent.
/// Returns `Err(_)` if the response ID was not found.
pub(crate) fn process_error_response(manager: &mut RequestManager, err: ErrorResponse) -> Result<(), Error> {
let id = err.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
let id = err.id.clone().into_owned();
match manager.request_status(&id) {
RequestStatus::PendingMethodCall => {
let send_back = manager.complete_pending_call(id).expect("State checked above; qed");
......
......@@ -36,7 +36,7 @@ use std::collections::{hash_map::Entry, HashMap};
use crate::Error;
use futures_channel::{mpsc, oneshot};
use jsonrpsee_types::SubscriptionId;
use jsonrpsee_types::{Id, SubscriptionId};
use rustc_hash::FxHashMap;
use serde_json::value::Value as JsonValue;
......@@ -65,7 +65,7 @@ type PendingBatchOneshot = oneshot::Sender<Result<Vec<JsonValue>, Error>>;
type PendingSubscriptionOneshot = oneshot::Sender<Result<(mpsc::Receiver<JsonValue>, SubscriptionId<'static>), Error>>;
type SubscriptionSink = mpsc::Sender<JsonValue>;
type UnsubscribeMethod = String;
type RequestId = u64;
type RequestId = Id<'static>;
#[derive(Debug)]
/// Batch state.
......@@ -124,7 +124,7 @@ impl RequestManager {
) -> Result<(), PendingBatchOneshot> {
let mut order = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default());
for (idx, batch_id) in batch.iter().enumerate() {
order.insert(*batch_id, idx);
order.insert(batch_id.clone(), idx);
}
batch.sort_unstable();
if let Entry::Vacant(v) = self.batches.entry(batch) {
......@@ -149,7 +149,8 @@ impl RequestManager {
&& !self.requests.contains_key(&unsub_req_id)
&& sub_req_id != unsub_req_id
{
self.requests.insert(sub_req_id, Kind::PendingSubscription((unsub_req_id, send_back, unsubscribe_method)));
self.requests
.insert(sub_req_id, Kind::PendingSubscription((unsub_req_id.clone(), send_back, unsubscribe_method)));
self.requests.insert(unsub_req_id, Kind::PendingMethodCall(None));
Ok(())
} else {
......@@ -169,7 +170,7 @@ impl RequestManager {
unsubscribe_method: UnsubscribeMethod,
) -> Result<(), SubscriptionSink> {
if let (Entry::Vacant(request), Entry::Vacant(subscription)) =
(self.requests.entry(sub_req_id), self.subscriptions.entry(subscription_id))
(self.requests.entry(sub_req_id.clone()), self.subscriptions.entry(subscription_id))
{
request.insert(Kind::Subscription((unsub_req_id, send_back, unsubscribe_method)));
subscription.insert(sub_req_id);
......@@ -307,7 +308,7 @@ impl RequestManager {
///
/// Returns `Some` if the subscription ID was registered as a subscription otherwise `None`.
pub(crate) fn get_request_id_by_subscription_id(&self, sub_id: &SubscriptionId) -> Option<RequestId> {
self.subscriptions.get(sub_id).copied()
self.subscriptions.get(sub_id).map(|id| id.clone().into_owned())
}
}
......@@ -315,7 +316,7 @@ impl RequestManager {
mod tests {
use super::{Error, RequestManager};
use futures_channel::{mpsc, oneshot};
use jsonrpsee_types::SubscriptionId;
use jsonrpsee_types::{Id, SubscriptionId};
use serde_json::Value as JsonValue;
#[test]
......@@ -323,8 +324,8 @@ mod tests {
let (request_tx, _) = oneshot::channel::<Result<JsonValue, Error>>();
let mut manager = RequestManager::new();
assert!(manager.insert_pending_call(0, Some(request_tx)).is_ok());
assert!(manager.complete_pending_call(0).is_some());
assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx)).is_ok());
assert!(manager.complete_pending_call(Id::Number(0)).is_some());
}
#[test]
......@@ -332,15 +333,26 @@ mod tests {
let (pending_sub_tx, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (sub_tx, _) = mpsc::channel::<JsonValue>(1);
let mut manager = RequestManager::new();
assert!(manager.insert_pending_subscription(1, 2, pending_sub_tx, "unsubscribe_method".into()).is_ok());
let (unsub_req_id, _send_back_oneshot, unsubscribe_method) = manager.complete_pending_subscription(1).unwrap();
assert_eq!(unsub_req_id, 2);
assert!(manager
.insert_subscription(1, 2, SubscriptionId::Str("uniq_id_from_server".into()), sub_tx, unsubscribe_method)
.insert_pending_subscription(Id::Number(1), Id::Number(2), pending_sub_tx, "unsubscribe_method".into())
.is_ok());
let (unsub_req_id, _send_back_oneshot, unsubscribe_method) =
manager.complete_pending_subscription(Id::Number(1)).unwrap();
assert_eq!(unsub_req_id, Id::Number(2));
assert!(manager
.insert_subscription(
Id::Number(1),
Id::Number(2),
SubscriptionId::Str("uniq_id_from_server".into()),
sub_tx,
unsubscribe_method
)
.is_ok());
assert!(manager.as_subscription_mut(&1).is_some());
assert!(manager.remove_subscription(1, SubscriptionId::Str("uniq_id_from_server".into())).is_some());
assert!(manager.as_subscription_mut(&Id::Number(1)).is_some());
assert!(manager
.remove_subscription(Id::Number(1), SubscriptionId::Str("uniq_id_from_server".into()))
.is_some());
}
#[test]
......@@ -350,14 +362,32 @@ mod tests {
let (tx3, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let (tx4, _) = oneshot::channel::<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>();
let mut manager = RequestManager::new();
assert!(manager.insert_pending_subscription(1, 1, tx1, "unsubscribe_method".into()).is_err());
assert!(manager.insert_pending_subscription(0, 1, tx2, "unsubscribe_method".into()).is_ok());
assert!(manager
.insert_pending_subscription(Id::Str("1".into()), Id::Str("1".into()), tx1, "unsubscribe_method".into())
.is_err());
assert!(manager
.insert_pending_subscription(Id::Str("0".into()), Id::Str("1".into()), tx2, "unsubscribe_method".into())
.is_ok());
assert!(
manager.insert_pending_subscription(99, 0, tx3, "unsubscribe_method".into()).is_err(),
manager
.insert_pending_subscription(
Id::Str("99".into()),
Id::Str("0".into()),
tx3,
"unsubscribe_method".into()
)
.is_err(),
"unsub request ID already occupied"
);
assert!(
manager.insert_pending_subscription(99, 1, tx4, "unsubscribe_method".into()).is_err(),
manager
.insert_pending_subscription(
Id::Str("99".into()),
Id::Str("1".into()),
tx4,
"unsubscribe_method".into()
)
.is_err(),
"sub request ID already occupied"
);
}
......@@ -370,14 +400,24 @@ mod tests {
let (sub_tx, _) = mpsc::channel::<JsonValue>(1);
let mut manager = RequestManager::new();
assert!(manager.insert_pending_call(0, Some(request_tx1)).is_ok());
assert!(manager.insert_pending_call(0, Some(request_tx2)).is_err());
assert!(manager.insert_pending_subscription(0, 1, pending_sub_tx, "beef".to_string()).is_err());
assert!(manager.insert_subscription(0, 99, SubscriptionId::Num(137), sub_tx, "bibimbap".to_string()).is_err());
assert!(manager.remove_subscription(0, SubscriptionId::Num(137)).is_none());
assert!(manager.complete_pending_subscription(0).is_none());
assert!(manager.complete_pending_call(0).is_some());
assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx1)).is_ok());
assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx2)).is_err());
assert!(manager
.insert_pending_subscription(Id::Number(0), Id::Number(1), pending_sub_tx, "beef".to_string())
.is_err());
assert!(manager
.insert_subscription(
Id::Number(0),
Id::Number(99),
SubscriptionId::Num(137),
sub_tx,
"bibimbap".to_string()
)
.is_err());
assert!(manager.remove_subscription(Id::Number(0), SubscriptionId::Num(137)).is_none());
assert!(manager.complete_pending_subscription(Id::Number(0)).is_none());
assert!(manager.complete_pending_call(Id::Number(0)).is_some());
}
#[test]
......@@ -388,15 +428,27 @@ mod tests {
let (sub_tx, _) = mpsc::channel::<JsonValue>(1);
let mut manager = RequestManager::new();
assert!(manager.insert_pending_subscription(99, 100, pending_sub_tx1, "beef".to_string()).is_ok());
assert!(manager.insert_pending_call(99, Some(request_tx)).is_err());
assert!(manager.insert_pending_subscription(99, 1337, pending_sub_tx2, "vegan".to_string()).is_err());
assert!(manager.insert_subscription(99, 100, SubscriptionId::Num(0), sub_tx, "bibimbap".to_string()).is_err());
assert!(manager
.insert_pending_subscription(Id::Number(99), Id::Number(100), pending_sub_tx1, "beef".to_string())
.is_ok());
assert!(manager.insert_pending_call(Id::Number(99), Some(request_tx)).is_err());
assert!(manager
.insert_pending_subscription(Id::Number(99), Id::Number(1337), pending_sub_tx2, "vegan".to_string())
.is_err());
assert!(manager.remove_subscription(99, SubscriptionId::Num(0)).is_none());
assert!(manager.complete_pending_call(99).is_none());
assert!(manager.complete_pending_subscription(99).is_some());
assert!(manager
.insert_subscription(
Id::Number(99),
Id::Number(100),
SubscriptionId::Num(0),
sub_tx,
"bibimbap".to_string()
)
.is_err());
assert!(manager.remove_subscription(Id::Number(99), SubscriptionId::Num(0)).is_none());
assert!(manager.complete_pending_call(Id::Number(99)).is_none());
assert!(manager.complete_pending_subscription(Id::Number(99)).is_some());
}
#[test]
......@@ -408,15 +460,21 @@ mod tests {
let mut manager = RequestManager::new();
assert!(manager.insert_subscription(3, 4, SubscriptionId::Num(0), sub_tx1, "bibimbap".to_string()).is_ok());
assert!(manager.insert_subscription(3, 4, SubscriptionId::Num(1), sub_tx2, "bibimbap".to_string()).is_err());
assert!(manager.insert_pending_subscription(3, 4, pending_sub_tx, "beef".to_string()).is_err());
assert!(manager.insert_pending_call(3, Some(request_tx)).is_err());
assert!(manager.remove_subscription(3, SubscriptionId::Num(7)).is_none());
assert!(manager.complete_pending_call(3).is_none());
assert!(manager.complete_pending_subscription(3).is_none());
assert!(manager.remove_subscription(3, SubscriptionId::Num(1)).is_none());
assert!(manager.remove_subscription(3, SubscriptionId::Num(0)).is_some());
assert!(manager
.insert_subscription(Id::Number(3), Id::Number(4), SubscriptionId::Num(0), sub_tx1, "bibimbap".to_string())
.is_ok());
assert!(manager
.insert_subscription(Id::Number(3), Id::Number(4), SubscriptionId::Num(1), sub_tx2, "bibimbap".to_string())
.is_err());
assert!(manager
.insert_pending_subscription(Id::Number(3), Id::Number(4), pending_sub_tx, "beef".to_string())
.is_err());
assert!(manager.insert_pending_call(Id::Number(3), Some(request_tx)).is_err());
assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(7)).is_none());
assert!(manager.complete_pending_call(Id::Number(3)).is_none());
assert!(manager.complete_pending_subscription(Id::Number(3)).is_none());
assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(1)).is_none());
assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(0)).is_some());
}