Newer
Older
}
}
let module = CoolServerImpl.into_rpc();
// Call sync method with no params
let res: bool = module.call("rebel_without_cause", EmptyParams::new()).await.unwrap();
assert!(!res);
let res: String = module.call("rebel", (Gun { shoots: true }, HashMap::<u8, u8>::default())).await.unwrap();
assert_eq!(&res, "0 Gun { shoots: true }");
// Call sync method with bad params
let err = module.call::<_, ()>("rebel", (Gun { shoots: true }, false)).await.unwrap_err();
assert!(matches!(
err,
Error::Request(err) if err == r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"invalid type: boolean `false`, expected a map at line 1 column 5"},"id":0}"#
));
// Call async method with params and context
let result: String = module.call("revolution", (Beverage { ice: true }, vec![1, 2, 3])).await.unwrap();
assert_eq!(&result, "drink: Beverage { ice: true }, phases: [1, 2, 3]");
}
#[tokio::test]
async fn subscribing_without_server() {
let mut module = RpcModule::new(());
module
.register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| {
let mut stream_data = vec!['0', '1', '2'];
std::thread::spawn(move || loop {
tracing::debug!("This is your friendly subscription sending data.");
if let Some(letter) = stream_data.pop() {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&letter) {
return;
}
} else {
return;
}
std::thread::sleep(std::time::Duration::from_millis(500));
});
Ok(())
})
.unwrap();
let mut my_sub = module.subscribe("my_sub", EmptyParams::new()).await.unwrap();
let (val, id) = my_sub.next::<char>().await.unwrap().unwrap();
assert_eq!(val, std::char::from_digit(i, 10).unwrap());
assert_eq!(id, v2::params::SubscriptionId::Num(my_sub.subscription_id()));
Niklas Adolfsson
committed
Niklas Adolfsson
committed
// The subscription is now closed by the server.
let (sub_closed_err, _) = my_sub.next::<SubscriptionClosedError>().await.unwrap().unwrap();
assert_eq!(sub_closed_err.subscription_id(), my_sub.subscription_id());
assert_eq!(sub_closed_err.close_reason(), "Closed by the server");
Niklas Adolfsson
committed
}
Niklas Adolfsson
committed
#[tokio::test]
async fn close_test_subscribing_without_server() {
let mut module = RpcModule::new(());
module
.register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| {
Niklas Adolfsson
committed
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&"lo") {
return;
}
std::thread::sleep(std::time::Duration::from_millis(500));
});
Ok(())
})
.unwrap();
let mut my_sub = module.subscribe("my_sub", EmptyParams::new()).await.unwrap();
let (val, id) = my_sub.next::<String>().await.unwrap().unwrap();
Niklas Adolfsson
committed
assert_eq!(&val, "lo");
assert_eq!(id, v2::params::SubscriptionId::Num(my_sub.subscription_id()));
// close the subscription to ensure it doesn't return any items.
my_sub.close();
assert!(matches!(my_sub.next::<String>().await, None));
Niklas Adolfsson
committed
}