Newer
Older
}
#[tokio::test]
async fn subscribing_without_server() {
let mut module = RpcModule::new(());
module
.register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| {
let mut stream_data = vec!['0', '1', '2'];
std::thread::spawn(move || loop {
tracing::debug!("This is your friendly subscription sending data.");
if let Some(letter) = stream_data.pop() {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&letter) {
return;
}
} else {
return;
}
std::thread::sleep(std::time::Duration::from_millis(500));
});
Ok(())
})
.unwrap();
let mut my_sub: TestSubscription = module.test_subscription("my_sub", Vec::<()>::new()).await;
Niklas Adolfsson
committed
let (val, id) = my_sub.next::<char>().await.unwrap();
assert_eq!(val, std::char::from_digit(i, 10).unwrap());
assert_eq!(id, v2::params::SubscriptionId::Num(my_sub.subscription_id()));
Niklas Adolfsson
committed
Niklas Adolfsson
committed
// The subscription is now closed by the server.
let (sub_closed_err, _) = my_sub.next::<SubscriptionClosedError>().await.unwrap();
assert_eq!(sub_closed_err.subscription_id(), my_sub.subscription_id());
assert_eq!(sub_closed_err.close_reason(), "Closed by the server");
Niklas Adolfsson
committed
}
Niklas Adolfsson
committed
#[tokio::test]
async fn close_test_subscribing_without_server() {
let mut module = RpcModule::new(());
module
.register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| {
Niklas Adolfsson
committed
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&"lo") {
return;
}
std::thread::sleep(std::time::Duration::from_millis(500));
});
Ok(())
})
.unwrap();
let mut my_sub: TestSubscription = module.test_subscription("my_sub", Vec::<()>::new()).await;
let (val, id) = my_sub.next::<String>().await.unwrap();
assert_eq!(&val, "lo");
assert_eq!(id, v2::params::SubscriptionId::Num(my_sub.subscription_id()));
// close the subscription to ensure it doesn't return any items.
my_sub.close();
assert_eq!(None, my_sub.next::<String>().await);
}