rpc_module.rs 35.7 KiB
Newer Older
		struct CoolServerImpl;
		#[async_trait]
		impl CoolServer for CoolServerImpl {
			fn rebel_without_cause(&self) -> Result<bool, Error> {
				Ok(false)
			}

			fn rebel(&self, gun: Gun, map: HashMap<u8, u8>) -> Result<String, Error> {
				Ok(format!("{} {:?}", map.values().len(), gun))
			}
			async fn can_have_any_name(&self, beverage: Beverage, some_bytes: Vec<u8>) -> Result<String, Error> {
				Ok(format!("drink: {:?}, phases: {:?}", beverage, some_bytes))
			}
		}
		let module = CoolServerImpl.into_rpc();

		// Call sync method with no params
		let res: bool = module.call("rebel_without_cause", EmptyParams::new()).await.unwrap();
		assert!(!res);

		// Call sync method with params
		let res: String = module.call("rebel", (Gun { shoots: true }, HashMap::<u8, u8>::default())).await.unwrap();
		assert_eq!(&res, "0 Gun { shoots: true }");

		// Call sync method with bad params
		let err = module.call::<_, ()>("rebel", (Gun { shoots: true }, false)).await.unwrap_err();
		assert!(matches!(
			err,
			Error::Request(err) if err == r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"invalid type: boolean `false`, expected a map at line 1 column 5"},"id":0}"#
		));

		// Call async method with params and context
		let result: String = module.call("revolution", (Beverage { ice: true }, vec![1, 2, 3])).await.unwrap();
		assert_eq!(&result, "drink: Beverage { ice: true }, phases: [1, 2, 3]");
	}

	#[tokio::test]
	async fn subscribing_without_server() {
		let mut module = RpcModule::new(());
		module
			.register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| {
				let mut stream_data = vec!['0', '1', '2'];
				std::thread::spawn(move || {
					while let Some(letter) = stream_data.pop() {
						tracing::debug!("This is your friendly subscription sending data.");
						if let Err(Error::SubscriptionClosed(_)) = sink.send(&letter) {
							return;
						}
						std::thread::sleep(std::time::Duration::from_millis(500));
		let mut my_sub = module.subscribe("my_sub", EmptyParams::new()).await.unwrap();
		for i in (0..=2).rev() {
			let (val, id) = my_sub.next::<char>().await.unwrap().unwrap();
			assert_eq!(val, std::char::from_digit(i, 10).unwrap());
			assert_eq!(id, v2::params::SubscriptionId::Num(my_sub.subscription_id()));
		let sub_err = my_sub.next::<char>().await.unwrap().unwrap_err();

		// The subscription is now closed by the server.
		assert!(matches!(sub_err, Error::SubscriptionClosed(_)));

	#[tokio::test]
	async fn close_test_subscribing_without_server() {
		let mut module = RpcModule::new(());
		module
			.register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| {
				std::thread::spawn(move || loop {
					if let Err(Error::SubscriptionClosed(_)) = sink.send(&"lo") {
						return;
					}
					std::thread::sleep(std::time::Duration::from_millis(500));
				});
				Ok(())
			})
			.unwrap();

		let mut my_sub = module.subscribe("my_sub", EmptyParams::new()).await.unwrap();
		let (val, id) = my_sub.next::<String>().await.unwrap().unwrap();
		assert_eq!(&val, "lo");
		assert_eq!(id, v2::params::SubscriptionId::Num(my_sub.subscription_id()));

		// close the subscription to ensure it doesn't return any items.
		my_sub.close();
		assert!(matches!(my_sub.next::<String>().await, None));