client.rs 4.26 KB
Newer Older
1
use crate::traits::Client;
Niklas Adolfsson's avatar
Niklas Adolfsson committed
2
use crate::transport::HttpTransportClient;
3
4
use crate::v2::request::{JsonRpcCallSer, JsonRpcNotificationSer};
use crate::v2::{
5
	error::JsonRpcError,
6
7
8
	params::{Id, JsonRpcParams},
	response::JsonRpcResponse,
};
9
use crate::{Error, TEN_MB_SIZE_BYTES};
10
use async_trait::async_trait;
11
use fnv::FnvHashMap;
12
use serde::de::DeserializeOwned;
Niklas Adolfsson's avatar
Niklas Adolfsson committed
13
14
use std::sync::atomic::{AtomicU64, Ordering};

Niklas Adolfsson's avatar
Niklas Adolfsson committed
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/// Http Client Builder.
#[derive(Debug)]
pub struct HttpClientBuilder {
	max_request_body_size: u32,
}

impl HttpClientBuilder {
	/// Sets the maximum size of a request body in bytes (default is 10 MiB).
	pub fn max_request_body_size(mut self, size: u32) -> Self {
		self.max_request_body_size = size;
		self
	}

	/// Build the HTTP client with target to connect to.
	pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient, Error> {
30
31
		let transport =
			HttpTransportClient::new(target, self.max_request_body_size).map_err(|e| Error::Transport(Box::new(e)))?;
Niklas Adolfsson's avatar
Niklas Adolfsson committed
32
33
34
35
36
37
		Ok(HttpClient { transport, request_id: AtomicU64::new(0) })
	}
}

impl Default for HttpClientBuilder {
	fn default() -> Self {
38
		Self { max_request_body_size: TEN_MB_SIZE_BYTES }
Niklas Adolfsson's avatar
Niklas Adolfsson committed
39
40
41
	}
}

Niklas Adolfsson's avatar
Niklas Adolfsson committed
42
/// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications.
43
#[derive(Debug)]
Niklas Adolfsson's avatar
Niklas Adolfsson committed
44
45
46
47
48
49
50
pub struct HttpClient {
	/// HTTP transport client.
	transport: HttpTransportClient,
	/// Request ID that wraps around when overflowing.
	request_id: AtomicU64,
}

51
52
#[async_trait]
impl Client for HttpClient {
53
54
55
56
57
	async fn notification<'a>(&self, method: &'a str, params: JsonRpcParams<'a>) -> Result<(), Error> {
		let notif = JsonRpcNotificationSer::new(method, params);
		self.transport
			.send(serde_json::to_string(&notif).map_err(Error::ParseError)?)
			.await
58
			.map_err(|e| Error::Transport(Box::new(e)))
Niklas Adolfsson's avatar
Niklas Adolfsson committed
59
60
61
	}

	/// Perform a request towards the server.
62
	async fn request<'a, R>(&self, method: &'a str, params: JsonRpcParams<'a>) -> Result<R, Error>
63
	where
64
		R: DeserializeOwned,
65
	{
Niklas Adolfsson's avatar
Niklas Adolfsson committed
66
		// NOTE: `fetch_add` wraps on overflow which is intended.
David's avatar
David committed
67
		let id = self.request_id.fetch_add(1, Ordering::SeqCst);
68
69
70
		let request = JsonRpcCallSer::new(Id::Number(id), method, params);

		let body = self
Niklas Adolfsson's avatar
Niklas Adolfsson committed
71
			.transport
72
			.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?)
Niklas Adolfsson's avatar
Niklas Adolfsson committed
73
			.await
74
			.map_err(|e| Error::Transport(Box::new(e)))?;
Niklas Adolfsson's avatar
Niklas Adolfsson committed
75

76
77
78
		let response: JsonRpcResponse<_> = match serde_json::from_slice(&body) {
			Ok(response) => response,
			Err(_) => {
79
80
				let err: JsonRpcError = serde_json::from_slice(&body).map_err(Error::ParseError)?;
				return Err(Error::Request(err.to_string()));
81
82
83
			}
		};

84
		let response_id = response.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
85
86
87
88
89
90

		if response_id == id {
			Ok(response.result)
		} else {
			Err(Error::InvalidRequestId)
		}
Niklas Adolfsson's avatar
Niklas Adolfsson committed
91
	}
92

93
	async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, JsonRpcParams<'a>)>) -> Result<Vec<R>, Error>
94
	where
95
		R: DeserializeOwned + Default + Clone,
96
	{
97
		let mut batch_request = Vec::with_capacity(batch.len());
98
99
100
101
102
103
		// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
		let mut ordered_requests = Vec::with_capacity(batch.len());
		let mut request_set = FnvHashMap::with_capacity_and_hasher(batch.len(), Default::default());

		for (pos, (method, params)) in batch.into_iter().enumerate() {
			let id = self.request_id.fetch_add(1, Ordering::SeqCst);
104
			batch_request.push(JsonRpcCallSer::new(Id::Number(id), method, params));
105
106
107
108
			ordered_requests.push(id);
			request_set.insert(id, pos);
		}

109
		let body = self
110
			.transport
111
			.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?)
112
			.await
113
			.map_err(|e| Error::Transport(Box::new(e)))?;
114

115
116
117
		let rps: Vec<JsonRpcResponse<_>> = match serde_json::from_slice(&body) {
			Ok(response) => response,
			Err(_) => {
118
119
				let err: JsonRpcError = serde_json::from_slice(&body).map_err(Error::ParseError)?;
				return Err(Error::Request(err.to_string()));
120
			}
121
122
123
124
125
		};

		// NOTE: `R::default` is placeholder and will be replaced in loop below.
		let mut responses = vec![R::default(); ordered_requests.len()];
		for rp in rps {
126
			let response_id = rp.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
127
128
129
130
131
132
133
134
135
			let pos = match request_set.get(&response_id) {
				Some(pos) => *pos,
				None => return Err(Error::InvalidRequestId),
			};
			responses[pos] = rp.result
		}
		Ok(responses)
	}
}