client.rs 6.01 KB
Newer Older
1
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
David's avatar
David committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

Niklas Adolfsson's avatar
Niklas Adolfsson committed
27
use crate::transport::HttpTransportClient;
David's avatar
David committed
28
29
use crate::types::{
	traits::Client,
David's avatar
David committed
30
	v2::{Id, NotificationSer, ParamsSer, RequestSer, Response, RpcError},
David's avatar
David committed
31
	Error, TEN_MB_SIZE_BYTES,
32
};
33
use async_trait::async_trait;
34
use fnv::FnvHashMap;
35
use serde::de::DeserializeOwned;
Niklas Adolfsson's avatar
Niklas Adolfsson committed
36
use std::sync::atomic::{AtomicU64, Ordering};
David's avatar
David committed
37
use std::time::Duration;
Niklas Adolfsson's avatar
Niklas Adolfsson committed
38

Niklas Adolfsson's avatar
Niklas Adolfsson committed
39
40
41
42
/// Http Client Builder.
#[derive(Debug)]
pub struct HttpClientBuilder {
	max_request_body_size: u32,
David's avatar
David committed
43
	request_timeout: Duration,
Niklas Adolfsson's avatar
Niklas Adolfsson committed
44
45
46
47
48
49
50
51
52
}

impl HttpClientBuilder {
	/// Sets the maximum size of a request body in bytes (default is 10 MiB).
	pub fn max_request_body_size(mut self, size: u32) -> Self {
		self.max_request_body_size = size;
		self
	}

David's avatar
David committed
53
54
55
56
57
58
	/// Set request timeout (default is 60 seconds).
	pub fn request_timeout(mut self, timeout: Duration) -> Self {
		self.request_timeout = timeout;
		self
	}

Niklas Adolfsson's avatar
Niklas Adolfsson committed
59
60
	/// Build the HTTP client with target to connect to.
	pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient, Error> {
61
		let transport =
62
			HttpTransportClient::new(target, self.max_request_body_size).map_err(|e| Error::Transport(e.into()))?;
David's avatar
David committed
63
		Ok(HttpClient { transport, request_id: AtomicU64::new(0), request_timeout: self.request_timeout })
Niklas Adolfsson's avatar
Niklas Adolfsson committed
64
65
66
67
68
	}
}

impl Default for HttpClientBuilder {
	fn default() -> Self {
David's avatar
David committed
69
		Self { max_request_body_size: TEN_MB_SIZE_BYTES, request_timeout: Duration::from_secs(60) }
Niklas Adolfsson's avatar
Niklas Adolfsson committed
70
71
72
	}
}

Niklas Adolfsson's avatar
Niklas Adolfsson committed
73
/// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications.
74
#[derive(Debug)]
Niklas Adolfsson's avatar
Niklas Adolfsson committed
75
76
77
78
79
pub struct HttpClient {
	/// HTTP transport client.
	transport: HttpTransportClient,
	/// Request ID that wraps around when overflowing.
	request_id: AtomicU64,
David's avatar
David committed
80
81
	/// Request timeout. Defaults to 60sec.
	request_timeout: Duration,
Niklas Adolfsson's avatar
Niklas Adolfsson committed
82
83
}

84
85
#[async_trait]
impl Client for HttpClient {
David's avatar
David committed
86
87
	async fn notification<'a>(&self, method: &'a str, params: ParamsSer<'a>) -> Result<(), Error> {
		let notif = NotificationSer::new(method, params);
David's avatar
David committed
88
		let fut = self.transport.send(serde_json::to_string(&notif).map_err(Error::ParseError)?);
89
		match tokio::time::timeout(self.request_timeout, fut).await {
David's avatar
David committed
90
91
			Ok(Ok(ok)) => Ok(ok),
			Err(_) => Err(Error::RequestTimeout),
92
			Ok(Err(e)) => Err(Error::Transport(e.into())),
David's avatar
David committed
93
		}
Niklas Adolfsson's avatar
Niklas Adolfsson committed
94
95
96
	}

	/// Perform a request towards the server.
David's avatar
David committed
97
	async fn request<'a, R>(&self, method: &'a str, params: ParamsSer<'a>) -> Result<R, Error>
98
	where
99
		R: DeserializeOwned,
100
	{
Niklas Adolfsson's avatar
Niklas Adolfsson committed
101
		// NOTE: `fetch_add` wraps on overflow which is intended.
David's avatar
David committed
102
		let id = self.request_id.fetch_add(1, Ordering::SeqCst);
David's avatar
David committed
103
		let request = RequestSer::new(Id::Number(id), method, params);
104

David's avatar
David committed
105
		let fut = self.transport.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?);
106
		let body = match tokio::time::timeout(self.request_timeout, fut).await {
David's avatar
David committed
107
108
			Ok(Ok(body)) => body,
			Err(_e) => return Err(Error::RequestTimeout),
109
			Ok(Err(e)) => return Err(Error::Transport(e.into())),
David's avatar
David committed
110
		};
Niklas Adolfsson's avatar
Niklas Adolfsson committed
111

David's avatar
David committed
112
		let response: Response<_> = match serde_json::from_slice(&body) {
113
114
			Ok(response) => response,
			Err(_) => {
David's avatar
David committed
115
				let err: RpcError = serde_json::from_slice(&body).map_err(Error::ParseError)?;
116
				return Err(Error::Request(err.to_string()));
117
118
119
			}
		};

120
		let response_id = response.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
121
122
123
124
125
126

		if response_id == id {
			Ok(response.result)
		} else {
			Err(Error::InvalidRequestId)
		}
Niklas Adolfsson's avatar
Niklas Adolfsson committed
127
	}
128

David's avatar
David committed
129
	async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, ParamsSer<'a>)>) -> Result<Vec<R>, Error>
130
	where
131
		R: DeserializeOwned + Default + Clone,
132
	{
133
		let mut batch_request = Vec::with_capacity(batch.len());
134
135
136
137
138
139
		// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
		let mut ordered_requests = Vec::with_capacity(batch.len());
		let mut request_set = FnvHashMap::with_capacity_and_hasher(batch.len(), Default::default());

		for (pos, (method, params)) in batch.into_iter().enumerate() {
			let id = self.request_id.fetch_add(1, Ordering::SeqCst);
David's avatar
David committed
140
			batch_request.push(RequestSer::new(Id::Number(id), method, params));
141
142
143
144
			ordered_requests.push(id);
			request_set.insert(id, pos);
		}

David's avatar
David committed
145
146
		let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);

147
		let body = match tokio::time::timeout(self.request_timeout, fut).await {
David's avatar
David committed
148
149
			Ok(Ok(body)) => body,
			Err(_e) => return Err(Error::RequestTimeout),
150
			Ok(Err(e)) => return Err(Error::Transport(e.into())),
David's avatar
David committed
151
		};
152

David's avatar
David committed
153
		let rps: Vec<Response<_>> = match serde_json::from_slice(&body) {
154
155
			Ok(response) => response,
			Err(_) => {
David's avatar
David committed
156
				let err: RpcError = serde_json::from_slice(&body).map_err(Error::ParseError)?;
157
				return Err(Error::Request(err.to_string()));
158
			}
159
160
161
162
163
		};

		// NOTE: `R::default` is placeholder and will be replaced in loop below.
		let mut responses = vec![R::default(); ordered_requests.len()];
		for rp in rps {
164
			let response_id = rp.id.as_number().copied().ok_or(Error::InvalidRequestId)?;
165
166
167
168
169
170
171
172
173
			let pos = match request_set.get(&response_id) {
				Some(pos) => *pos,
				None => return Err(Error::InvalidRequestId),
			};
			responses[pos] = rp.result
		}
		Ok(responses)
	}
}