client.rs 7.33 KB
Newer Older
1
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
David's avatar
David committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

Maciej Hirsz's avatar
Maciej Hirsz committed
27
28
29
use std::sync::Arc;
use std::time::Duration;

Niklas Adolfsson's avatar
Niklas Adolfsson committed
30
use crate::transport::HttpTransportClient;
31
use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response};
32
use async_trait::async_trait;
33
34
use jsonrpsee_core::client::{CertificateStore, ClientT, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
35
use rustc_hash::FxHashMap;
36
use serde::de::DeserializeOwned;
Niklas Adolfsson's avatar
Niklas Adolfsson committed
37

Niklas Adolfsson's avatar
Niklas Adolfsson committed
38
39
40
41
/// Http Client Builder.
#[derive(Debug)]
pub struct HttpClientBuilder {
	max_request_body_size: u32,
David's avatar
David committed
42
	request_timeout: Duration,
43
	max_concurrent_requests: usize,
44
	certificate_store: CertificateStore,
Niklas Adolfsson's avatar
Niklas Adolfsson committed
45
46
47
48
49
50
51
52
53
}

impl HttpClientBuilder {
	/// Sets the maximum size of a request body in bytes (default is 10 MiB).
	pub fn max_request_body_size(mut self, size: u32) -> Self {
		self.max_request_body_size = size;
		self
	}

David's avatar
David committed
54
55
56
57
58
59
	/// Set request timeout (default is 60 seconds).
	pub fn request_timeout(mut self, timeout: Duration) -> Self {
		self.request_timeout = timeout;
		self
	}

60
61
62
63
64
65
	/// Set max concurrent requests.
	pub fn max_concurrent_requests(mut self, max: usize) -> Self {
		self.max_concurrent_requests = max;
		self
	}

66
67
68
69
70
71
	/// Set which certificate store to use.
	pub fn certificate_store(mut self, certificate_store: CertificateStore) -> Self {
		self.certificate_store = certificate_store;
		self
	}

Niklas Adolfsson's avatar
Niklas Adolfsson committed
72
73
	/// Build the HTTP client with target to connect to.
	pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient, Error> {
74
75
		let transport = HttpTransportClient::new(target, self.max_request_body_size, self.certificate_store)
			.map_err(|e| Error::Transport(e.into()))?;
76
77
		Ok(HttpClient {
			transport,
78
			id_manager: Arc::new(RequestIdManager::new(self.max_concurrent_requests)),
79
80
			request_timeout: self.request_timeout,
		})
Niklas Adolfsson's avatar
Niklas Adolfsson committed
81
82
83
84
85
	}
}

impl Default for HttpClientBuilder {
	fn default() -> Self {
86
87
88
89
		Self {
			max_request_body_size: TEN_MB_SIZE_BYTES,
			request_timeout: Duration::from_secs(60),
			max_concurrent_requests: 256,
90
			certificate_store: CertificateStore::Native,
91
		}
Niklas Adolfsson's avatar
Niklas Adolfsson committed
92
93
94
	}
}

Niklas Adolfsson's avatar
Niklas Adolfsson committed
95
/// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications.
96
#[derive(Debug, Clone)]
Niklas Adolfsson's avatar
Niklas Adolfsson committed
97
98
99
pub struct HttpClient {
	/// HTTP transport client.
	transport: HttpTransportClient,
David's avatar
David committed
100
101
	/// Request timeout. Defaults to 60sec.
	request_timeout: Duration,
102
	/// Request ID manager.
103
	id_manager: Arc<RequestIdManager>,
Niklas Adolfsson's avatar
Niklas Adolfsson committed
104
105
}

106
#[async_trait]
107
impl ClientT for HttpClient {
108
	async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error> {
David's avatar
David committed
109
		let notif = NotificationSer::new(method, params);
David's avatar
David committed
110
		let fut = self.transport.send(serde_json::to_string(&notif).map_err(Error::ParseError)?);
111
		match tokio::time::timeout(self.request_timeout, fut).await {
David's avatar
David committed
112
113
			Ok(Ok(ok)) => Ok(ok),
			Err(_) => Err(Error::RequestTimeout),
114
			Ok(Err(e)) => Err(Error::Transport(e.into())),
David's avatar
David committed
115
		}
Niklas Adolfsson's avatar
Niklas Adolfsson committed
116
117
118
	}

	/// Perform a request towards the server.
119
	async fn request<'a, R>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<R, Error>
120
	where
121
		R: DeserializeOwned,
122
	{
123
124
125
126
		let id = self.id_manager.next_request_id()?;
		let request = RequestSer::new(Id::Number(*id.inner()), method, params);

		let fut = self.transport.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?);
127
		let body = match tokio::time::timeout(self.request_timeout, fut).await {
David's avatar
David committed
128
			Ok(Ok(body)) => body,
129
130
131
132
133
134
			Err(_e) => {
				return Err(Error::RequestTimeout);
			}
			Ok(Err(e)) => {
				return Err(Error::Transport(e.into()));
			}
David's avatar
David committed
135
		};
Niklas Adolfsson's avatar
Niklas Adolfsson committed
136

David's avatar
David committed
137
		let response: Response<_> = match serde_json::from_slice(&body) {
138
139
			Ok(response) => response,
			Err(_) => {
Maciej Hirsz's avatar
Maciej Hirsz committed
140
				let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?;
141
				return Err(Error::Request(err.to_string()));
142
143
144
			}
		};

145
		let response_id = response.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
146

147
		if response_id == *id.inner() {
148
149
150
151
			Ok(response.result)
		} else {
			Err(Error::InvalidRequestId)
		}
Niklas Adolfsson's avatar
Niklas Adolfsson committed
152
	}
153

154
	async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option<ParamsSer<'a>>)>) -> Result<Vec<R>, Error>
155
	where
156
		R: DeserializeOwned + Default + Clone,
157
	{
158
		let mut batch_request = Vec::with_capacity(batch.len());
159
160
		// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
		let mut ordered_requests = Vec::with_capacity(batch.len());
161
		let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default());
162

163
		let ids = self.id_manager.next_request_ids(batch.len())?;
164
		for (pos, (method, params)) in batch.into_iter().enumerate() {
165
166
167
			batch_request.push(RequestSer::new(Id::Number(ids.inner()[pos]), method, params));
			ordered_requests.push(ids.inner()[pos]);
			request_set.insert(ids.inner()[pos], pos);
168
169
		}

170
		let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
David's avatar
David committed
171

172
		let body = match tokio::time::timeout(self.request_timeout, fut).await {
David's avatar
David committed
173
174
			Ok(Ok(body)) => body,
			Err(_e) => return Err(Error::RequestTimeout),
175
			Ok(Err(e)) => return Err(Error::Transport(e.into())),
David's avatar
David committed
176
		};
177

178
		let rps: Vec<Response<_>> =
Maciej Hirsz's avatar
Maciej Hirsz committed
179
			serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::<ErrorResponse>(&body) {
180
181
182
				Ok(e) => Error::Request(e.to_string()),
				Err(e) => Error::ParseError(e),
			})?;
183
184
185
186

		// NOTE: `R::default` is placeholder and will be replaced in loop below.
		let mut responses = vec![R::default(); ordered_requests.len()];
		for rp in rps {
187
			let response_id = rp.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
188
189
190
191
192
193
194
195
196
			let pos = match request_set.get(&response_id) {
				Some(pos) => *pos,
				None => return Err(Error::InvalidRequestId),
			};
			responses[pos] = rp.result
		}
		Ok(responses)
	}
}
197
198

#[async_trait]
199
impl SubscriptionClientT for HttpClient {
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
	/// Send a subscription request to the server. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`].
	async fn subscribe<'a, N>(
		&self,
		_subscribe_method: &'a str,
		_params: Option<ParamsSer<'a>>,
		_unsubscribe_method: &'a str,
	) -> Result<Subscription<N>, Error>
	where
		N: DeserializeOwned,
	{
		Err(Error::HttpNotImplemented)
	}

	/// Subscribe to a specific method. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`].
	async fn subscribe_to_method<'a, N>(&self, _method: &'a str) -> Result<Subscription<N>, Error>
	where
		N: DeserializeOwned,
	{
		Err(Error::HttpNotImplemented)
	}
}