client.rs 9.2 KB
Newer Older
1
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
David's avatar
David committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

Maciej Hirsz's avatar
Maciej Hirsz committed
27
28
29
use std::sync::Arc;
use std::time::Duration;

Niklas Adolfsson's avatar
Niklas Adolfsson committed
30
use crate::transport::HttpTransportClient;
31
use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response};
32
use async_trait::async_trait;
33
use hyper::http::HeaderMap;
34
use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT};
35
use jsonrpsee_core::tracing::RpcTracing;
36
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
37
use jsonrpsee_types::error::CallError;
38
use rustc_hash::FxHashMap;
39
use serde::de::DeserializeOwned;
40
use tracing_futures::Instrument;
Niklas Adolfsson's avatar
Niklas Adolfsson committed
41

Niklas Adolfsson's avatar
Niklas Adolfsson committed
42
/// Http Client Builder.
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
///
/// # Examples
///
/// ```no_run
///
/// use jsonrpsee_http_client::{HttpClientBuilder, HeaderMap, HeaderValue};
///
/// #[tokio::main]
/// async fn main() {
///     // Build custom headers used for every submitted request.
///     let mut headers = HeaderMap::new();
///     headers.insert("Any-Header-You-Like", HeaderValue::from_static("42"));
///
///     // Build client
///     let client = HttpClientBuilder::default()
///          .set_headers(headers)
///          .build("wss://localhost:443")
///          .unwrap();
///
///     // use client....
/// }
///
/// ```
Niklas Adolfsson's avatar
Niklas Adolfsson committed
66
67
68
#[derive(Debug)]
pub struct HttpClientBuilder {
	max_request_body_size: u32,
David's avatar
David committed
69
	request_timeout: Duration,
70
	max_concurrent_requests: usize,
71
	certificate_store: CertificateStore,
72
	id_kind: IdKind,
73
	max_log_length: u32,
74
	headers: HeaderMap,
Niklas Adolfsson's avatar
Niklas Adolfsson committed
75
76
77
78
79
80
81
82
83
}

impl HttpClientBuilder {
	/// Sets the maximum size of a request body in bytes (default is 10 MiB).
	pub fn max_request_body_size(mut self, size: u32) -> Self {
		self.max_request_body_size = size;
		self
	}

David's avatar
David committed
84
85
86
87
88
89
	/// Set request timeout (default is 60 seconds).
	pub fn request_timeout(mut self, timeout: Duration) -> Self {
		self.request_timeout = timeout;
		self
	}

90
91
92
93
94
95
	/// Set max concurrent requests.
	pub fn max_concurrent_requests(mut self, max: usize) -> Self {
		self.max_concurrent_requests = max;
		self
	}

96
97
98
99
100
101
	/// Set which certificate store to use.
	pub fn certificate_store(mut self, certificate_store: CertificateStore) -> Self {
		self.certificate_store = certificate_store;
		self
	}

102
103
104
105
106
107
	/// Configure the data type of the request object ID (default is number).
	pub fn id_format(mut self, id_kind: IdKind) -> Self {
		self.id_kind = id_kind;
		self
	}

108
109
110
111
112
113
114
115
	/// Max length for logging for requests and responses in number characters.
	///
	/// Logs bigger than this limit will be truncated.
	pub fn set_max_logging_length(mut self, max: u32) -> Self {
		self.max_log_length = max;
		self
	}

116
117
118
119
120
121
122
123
	/// Set a custom header passed to the server with every request (default is none).
	///
	/// The caller is responsible for checking that the headers do not conflict or are duplicated.
	pub fn set_headers(mut self, headers: HeaderMap) -> Self {
		self.headers = headers;
		self
	}

Niklas Adolfsson's avatar
Niklas Adolfsson committed
124
125
	/// Build the HTTP client with target to connect to.
	pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient, Error> {
126
127
128
129
130
131
132
133
		let transport = HttpTransportClient::new(
			target,
			self.max_request_body_size,
			self.certificate_store,
			self.max_log_length,
			self.headers,
		)
		.map_err(|e| Error::Transport(e.into()))?;
134
135
		Ok(HttpClient {
			transport,
136
			id_manager: Arc::new(RequestIdManager::new(self.max_concurrent_requests, self.id_kind)),
137
138
			request_timeout: self.request_timeout,
		})
Niklas Adolfsson's avatar
Niklas Adolfsson committed
139
140
141
142
143
	}
}

impl Default for HttpClientBuilder {
	fn default() -> Self {
144
145
146
147
		Self {
			max_request_body_size: TEN_MB_SIZE_BYTES,
			request_timeout: Duration::from_secs(60),
			max_concurrent_requests: 256,
148
			certificate_store: CertificateStore::Native,
149
			id_kind: IdKind::Number,
150
			max_log_length: 4096,
151
			headers: HeaderMap::new(),
152
		}
Niklas Adolfsson's avatar
Niklas Adolfsson committed
153
154
155
	}
}

Niklas Adolfsson's avatar
Niklas Adolfsson committed
156
/// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications.
157
#[derive(Debug, Clone)]
Niklas Adolfsson's avatar
Niklas Adolfsson committed
158
159
160
pub struct HttpClient {
	/// HTTP transport client.
	transport: HttpTransportClient,
David's avatar
David committed
161
162
	/// Request timeout. Defaults to 60sec.
	request_timeout: Duration,
163
	/// Request ID manager.
164
	id_manager: Arc<RequestIdManager>,
Niklas Adolfsson's avatar
Niklas Adolfsson committed
165
166
}

167
#[async_trait]
168
impl ClientT for HttpClient {
169
	async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error> {
170
		let trace = RpcTracing::notification(method);
171
172
		async {
			let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?;
173

174
			let fut = self.transport.send(notif);
175

176
177
178
179
180
			match tokio::time::timeout(self.request_timeout, fut).await {
				Ok(Ok(ok)) => Ok(ok),
				Err(_) => Err(Error::RequestTimeout),
				Ok(Err(e)) => Err(Error::Transport(e.into())),
			}
David's avatar
David committed
181
		}
182
183
		.instrument(trace.into_span())
		.await
Niklas Adolfsson's avatar
Niklas Adolfsson committed
184
185
186
	}

	/// Perform a request towards the server.
187
	async fn request<'a, R>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<R, Error>
188
	where
189
		R: DeserializeOwned,
190
	{
191
192
193
		let guard = self.id_manager.next_request_id()?;
		let id = guard.inner();
		let request = RequestSer::new(&id, method, params);
194
195
		let trace = RpcTracing::method_call(method);

196
197
		async {
			let raw = serde_json::to_string(&request).map_err(Error::ParseError)?;
198

199
200
201
202
203
204
205
206
207
208
			let fut = self.transport.send_and_read_body(raw);
			let body = match tokio::time::timeout(self.request_timeout, fut).await {
				Ok(Ok(body)) => body,
				Err(_e) => {
					return Err(Error::RequestTimeout);
				}
				Ok(Err(e)) => {
					return Err(Error::Transport(e.into()));
				}
			};
Niklas Adolfsson's avatar
Niklas Adolfsson committed
209

210
211
212
213
214
215
216
			let response: Response<_> = match serde_json::from_slice(&body) {
				Ok(response) => response,
				Err(_) => {
					let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?;
					return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned())));
				}
			};
217

218
219
220
221
222
			if response.id == id {
				Ok(response.result)
			} else {
				Err(Error::InvalidRequestId)
			}
223
		}
224
225
		.instrument(trace.into_span())
		.await
Niklas Adolfsson's avatar
Niklas Adolfsson committed
226
	}
227

228
	async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option<ParamsSer<'a>>)>) -> Result<Vec<R>, Error>
229
	where
230
		R: DeserializeOwned + Default + Clone,
231
	{
232
233
		let guard = self.id_manager.next_request_ids(batch.len())?;
		let ids: Vec<Id> = guard.inner();
234
		let trace = RpcTracing::batch();
235

236
237
238
239
240
		async {
			let mut batch_request = Vec::with_capacity(batch.len());
			// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
			let mut ordered_requests = Vec::with_capacity(batch.len());
			let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default());
241

242
243
244
245
246
			for (pos, (method, params)) in batch.into_iter().enumerate() {
				batch_request.push(RequestSer::new(&ids[pos], method, params));
				ordered_requests.push(&ids[pos]);
				request_set.insert(&ids[pos], pos);
			}
247

248
249
			let fut =
				self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
David's avatar
David committed
250

251
252
253
254
255
			let body = match tokio::time::timeout(self.request_timeout, fut).await {
				Ok(Ok(body)) => body,
				Err(_e) => return Err(Error::RequestTimeout),
				Ok(Err(e)) => return Err(Error::Transport(e.into())),
			};
256

257
258
259
260
261
			let rps: Vec<Response<_>> =
				serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::<ErrorResponse>(&body) {
					Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())),
					Err(e) => Error::ParseError(e),
				})?;
262

263
264
265
266
267
268
269
270
271
272
			// NOTE: `R::default` is placeholder and will be replaced in loop below.
			let mut responses = vec![R::default(); ordered_requests.len()];
			for rp in rps {
				let pos = match request_set.get(&rp.id) {
					Some(pos) => *pos,
					None => return Err(Error::InvalidRequestId),
				};
				responses[pos] = rp.result
			}
			Ok(responses)
273
		}
274
275
		.instrument(trace.into_span())
		.await
276
277
	}
}
278
279

#[async_trait]
280
impl SubscriptionClientT for HttpClient {
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
	/// Send a subscription request to the server. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`].
	async fn subscribe<'a, N>(
		&self,
		_subscribe_method: &'a str,
		_params: Option<ParamsSer<'a>>,
		_unsubscribe_method: &'a str,
	) -> Result<Subscription<N>, Error>
	where
		N: DeserializeOwned,
	{
		Err(Error::HttpNotImplemented)
	}

	/// Subscribe to a specific method. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`].
	async fn subscribe_to_method<'a, N>(&self, _method: &'a str) -> Result<Subscription<N>, Error>
	where
		N: DeserializeOwned,
	{
		Err(Error::HttpNotImplemented)
	}
}