rpc_module.rs 34.8 KiB
Newer Older
	}

	#[tokio::test]
	async fn subscribing_without_server() {
		let mut module = RpcModule::new(());
		module
			.register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| {
				let mut stream_data = vec!['0', '1', '2'];
				std::thread::spawn(move || loop {
					tracing::debug!("This is your friendly subscription sending data.");
					if let Some(letter) = stream_data.pop() {
						if let Err(Error::SubscriptionClosed(_)) = sink.send(&letter) {
							return;
						}
					} else {
						return;
					}
					std::thread::sleep(std::time::Duration::from_millis(500));
				});
				Ok(())
			})
			.unwrap();

		let mut my_sub: TestSubscription = module.test_subscription("my_sub", Vec::<()>::new()).await;
		for i in (0..=2).rev() {
			let (val, id) = my_sub.next::<char>().await.unwrap();
			assert_eq!(val, std::char::from_digit(i, 10).unwrap());
			assert_eq!(id, v2::params::SubscriptionId::Num(my_sub.subscription_id()));
		// The subscription is now closed by the server.
		let (sub_closed_err, _) = my_sub.next::<SubscriptionClosedError>().await.unwrap();
		assert_eq!(sub_closed_err.subscription_id(), my_sub.subscription_id());
		assert_eq!(sub_closed_err.close_reason(), "Closed by the server");

	#[tokio::test]
	async fn close_test_subscribing_without_server() {
		let mut module = RpcModule::new(());
		module
			.register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| {
				std::thread::spawn(move || loop {
					if let Err(Error::SubscriptionClosed(_)) = sink.send(&"lo") {
						return;
					}
					std::thread::sleep(std::time::Duration::from_millis(500));
				});
				Ok(())
			})
			.unwrap();

		let mut my_sub: TestSubscription = module.test_subscription("my_sub", Vec::<()>::new()).await;
		let (val, id) = my_sub.next::<String>().await.unwrap();
		assert_eq!(&val, "lo");
		assert_eq!(id, v2::params::SubscriptionId::Num(my_sub.subscription_id()));

		// close the subscription to ensure it doesn't return any items.
		my_sub.close();
		assert_eq!(None, my_sub.next::<String>().await);
	}