Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::jsonrpc_transport;
use crate::manager::{RequestManager, RequestStatus};
use futures::{
channel::{mpsc, oneshot},
prelude::*,
sink::SinkExt,
};
use jsonrpsee_types::{
error::Error,
jsonrpc::{self, JsonValue, SubscriptionId},
};
use std::{borrow::Cow, convert::TryInto};
use std::{io, marker::PhantomData};
/// Client that can be cloned.
///
/// > **Note**: This struct is designed to be easy to use, but it works by maintaining a background task running in parallel.
#[derive(Clone)]
pub struct WsClient {
/// Channel to send requests to the background task.
to_back: mpsc::Sender<FrontToBack>,
/// Request timeout
request_timeout: Option<Duration>,
#[derive(Clone, Debug)]
pub struct WsConfig<'a> {
/// URL to connect to.
Niklas Adolfsson
committed
///
/// If the port number is missing from the URL, the default port number is used.
///
///
/// `ws://host` - port 80 is used
///
/// `wss://host` - port 443 is used
pub url: &'a str,
/// Max request body size
pub max_request_body_size: usize,
/// Request timeout
pub request_timeout: Option<Duration>,
/// Connection timeout
pub connection_timeout: Duration,
/// `Origin` header to pass during the HTTP handshake. If `None`, no
/// `Origin` header was passed.
pub origin: Option<Cow<'a, str>>,
/// Url to send during the HTTP handshake.
pub handshake_url: Cow<'a, str>,
/// Max concurrent request capacity.
///
/// **Note**: The actual capacity is `num_senders + max_concurrent_requests_capacity`
/// because it is passed to [`futures::channel::mpsc::channel`]
/// and the capacity may increase because the sender is cloned when new
/// requests, notifications and subscriptions are created.
pub max_concurrent_requests_capacity: usize,
/// Max concurrent capacity for each subscription; when the capacity is exceeded the subscription will be dropped.
///
/// You can also prevent the subscription being dropped by calling [`WsSubscription::next()`] frequently enough
/// such that the buffer capacity doesn't exceeds.
///
/// **Note**: The actual capacity is `num_senders + max_subscription_capacity`
/// because it is passed to [`futures::channel::mpsc::channel`].
pub max_subscription_capacity: usize,
impl<'a> WsConfig<'a> {
/// Default WebSocket configuration with a specified URL to connect to.
pub fn with_url(url: &'a str) -> Self {
connection_timeout: Duration::from_secs(10),
origin: None,
handshake_url: From::from("/"),
max_concurrent_requests_capacity: 256,
max_subscription_capacity: 4,
/// Active subscription on a [`WsClient`].
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
pub struct WsSubscription<Notif> {
/// Channel to send requests to the background task.
to_back: mpsc::Sender<FrontToBack>,
/// Channel from which we receive notifications from the server, as undecoded `JsonValue`s.
notifs_rx: mpsc::Receiver<JsonValue>,
/// Subscription ID,
id: SubscriptionId,
/// Marker in order to pin the `Notif` parameter.
marker: PhantomData<Notif>,
}
/// Message that the [`Client`] can send to the background task.
enum FrontToBack {
/// Send a one-shot notification to the server. The server doesn't give back any feedback.
Notification {
/// Method for the notification.
method: String,
/// Parameters to send to the server.
params: jsonrpc::Params,
},
/// Send a request to the server.
StartRequest {
/// Method for the request.
method: String,
/// Parameters of the request.
params: jsonrpc::Params,
/// One-shot channel where to send back the outcome of that request.
send_back: oneshot::Sender<Result<JsonValue, Error>>,
},
/// Send a subscription request to the server.
Subscribe {
/// Method for the subscription request.
subscribe_method: String,
/// Parameters to send for the subscription.
params: jsonrpc::Params,
/// Method to use to later unsubscription. Used if the channel unexpectedly closes.
unsubscribe_method: String,
/// When we get a response from the server about that subscription, we send the result on
/// this channel. If the subscription succeeds, we return a `Receiver` that will receive
/// notifications.
send_back: oneshot::Sender<Result<(mpsc::Receiver<JsonValue>, SubscriptionId), Error>>,
},
/// When a subscription channel is closed, we send this message to the background
/// task to mark it ready for garbage collection.
// NOTE: It is not possible to cancel pending subscriptions or pending requests.
// Such operations will be blocked until a response is received or the background
// thread has been terminated.
SubscriptionClosed(SubscriptionId),
}
impl WsClient {
/// Initializes a new WebSocket client
///
/// Fails when the URL is invalid.
pub async fn new(config: WsConfig<'_>) -> Result<WsClient, Error> {
let max_capacity_per_subscription = config.max_subscription_capacity;
let request_timeout = config.request_timeout;
let (to_back, from_front) = mpsc::channel(config.max_concurrent_requests_capacity);
let (sender, receiver) =
jsonrpc_transport::websocket_connection(config).await.map_err(|e| Error::TransportError(Box::new(e)))?;
background_task(sender, receiver, from_front, max_capacity_per_subscription).await;
Ok(Self { to_back, request_timeout })
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
}
/// Send a notification to the server.
pub async fn notification(
&self,
method: impl Into<String>,
params: impl Into<jsonrpc::Params>,
) -> Result<(), Error> {
let method = method.into();
let params = params.into();
log::trace!("[frontend]: send notification: method={:?}, params={:?}", method, params);
self.to_back.clone().send(FrontToBack::Notification { method, params }).await.map_err(Error::Internal)
}
/// Perform a request towards the server.
pub async fn request<Ret>(
&self,
method: impl Into<String>,
params: impl Into<jsonrpc::Params>,
) -> Result<Ret, Error>
where
Ret: jsonrpc::DeserializeOwned,
{
let method = method.into();
let params = params.into();
log::trace!("[frontend]: send request: method={:?}, params={:?}", method, params);
let (send_back_tx, send_back_rx) = oneshot::channel();
self.to_back
.clone()
.send(FrontToBack::StartRequest { method, params, send_back: send_back_tx })
.await
.map_err(Error::Internal)?;
let send_back_rx_out = if let Some(duration) = self.request_timeout {
let timeout = async_std::task::sleep(duration);
futures::pin_mut!(send_back_rx, timeout);
match future::select(send_back_rx, timeout).await {
future::Either::Left((send_back_rx_out, _)) => send_back_rx_out,
future::Either::Right((_, _)) => return Err(Error::WsRequestTimeout),
}
} else {
send_back_rx.await
};
let json_value = match send_back_rx_out {
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => {
let err = io::Error::new(io::ErrorKind::Other, "background task closed");
return Err(Error::TransportError(Box::new(err)));
}
};
jsonrpc::from_value(json_value).map_err(Error::ParseError)
}
/// Send a subscription request to the server.
///
/// The `subscribe_method` and `params` are used to ask for the subscription towards the
/// server. The `unsubscribe_method` is used to close the subscription.
pub async fn subscribe<Notif>(
&self,
subscribe_method: impl Into<String>,
params: impl Into<jsonrpc::Params>,
unsubscribe_method: impl Into<String>,
) -> Result<WsSubscription<Notif>, Error> {
let subscribe_method = subscribe_method.into();
let unsubscribe_method = unsubscribe_method.into();
if subscribe_method == unsubscribe_method {
return Err(Error::Subscription(subscribe_method, unsubscribe_method));
}
log::trace!("[frontend]: subscribe: {:?}, unsubscribe: {:?}", subscribe_method, unsubscribe_method);
let (send_back_tx, send_back_rx) = oneshot::channel();
self.to_back
.clone()
.send(FrontToBack::Subscribe {
subscribe_method,
unsubscribe_method,
params: params.into(),
send_back: send_back_tx,
})
.await
.map_err(Error::Internal)?;
let (notifs_rx, id) = match send_back_rx.await {
Ok(Ok(val)) => val,
Ok(Err(err)) => return Err(err),
Err(_) => {
let err = io::Error::new(io::ErrorKind::Other, "background task closed");
return Err(Error::TransportError(Box::new(err)));
}
};
Ok(WsSubscription { to_back: self.to_back.clone(), notifs_rx, marker: PhantomData, id })
}
}
impl<Notif> WsSubscription<Notif>
where
Notif: jsonrpc::DeserializeOwned,
{
/// Returns the next notification from the stream
/// This may return `None` if the subscription has been terminated, may happen if the channel becomes full or dropped.
///
/// Ignores any malformed packet.
pub async fn next(&mut self) -> Option<Notif> {
loop {
match self.notifs_rx.next().await {
Some(n) => match jsonrpc::from_value(n) {
Ok(parsed) => return Some(parsed),
Err(e) => log::error!("Subscription response error: {:?}", e),
},
None => return None,
}
}
}
}
impl<Notif> Drop for WsSubscription<Notif> {
fn drop(&mut self) {
// We can't actually guarantee that this goes through. If the background task is busy, then
// the channel's buffer will be full, and our unsubscription request will never make it.
// However, when a notification arrives, the background task will realize that the channel
// to the `Subscription` has been closed, and will perform the unsubscribe.
let id = std::mem::replace(&mut self.id, SubscriptionId::Num(0));
let _ = self.to_back.send(FrontToBack::SubscriptionClosed(id)).now_or_never();
}
}
/// Function being run in the background that processes messages from the frontend.
async fn background_task(
mut sender: jsonrpc_transport::Sender,
receiver: jsonrpc_transport::Receiver,
mut frontend: mpsc::Receiver<FrontToBack>,
max_capacity_per_subscription: usize,
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
) {
let mut manager = RequestManager::new();
let backend_event = futures::stream::unfold(receiver, |mut receiver| async {
let res = receiver.next_response().await;
Some((res, receiver))
});
futures::pin_mut!(backend_event);
loop {
let next_frontend = frontend.next();
let next_backend = backend_event.next();
futures::pin_mut!(next_frontend, next_backend);
futures::select! {
event = next_frontend => match event {
// User dropped the sender side of the channel.
None => {
log::trace!("[backend]: frontend channel dropped; terminate client");
break
}
// User called `notification` on the front-end
Some(FrontToBack::Notification { method, params }) => {
log::trace!("[backend]: client prepares to send notification");
let _ = sender.send_notification(method, params).await;
}
// User called `request` on the front-end
Some(FrontToBack::StartRequest { method, params, send_back }) => {
log::trace!("[backend]: client prepares to send request={:?}", method);
match sender.start_request(method, params).await {
Ok(id) => {
if let Err(send_back) = manager.insert_pending_call(id, send_back) {
let _ = send_back.send(Err(Error::DuplicateRequestId));
}
}
Err(err) => {
log::warn!("[backend]: client request failed: {:?}", err);
let _ = send_back.send(Err(Error::TransportError(Box::new(err))));
}
}
}
// User called `subscribe` on the front-end.
Some(FrontToBack::Subscribe { subscribe_method, unsubscribe_method, params, send_back }) => {
log::trace!(
"[backend]: client prepares to start subscription, subscribe_method={:?} unsubscribe_method:{:?}",
subscribe_method,
unsubscribe_method
);
match sender.start_subscription(subscribe_method, params).await {
Ok(id) => {
if let Err(send_back) = manager.insert_pending_subscription(id, send_back, unsubscribe_method) {
let _ = send_back.send(Err(Error::DuplicateRequestId));
}
}
Err(err) => {
log::warn!("[backend]: client subscription failed: {:?}", err);
let _ = send_back.send(Err(Error::TransportError(Box::new(err))));
}
}
}
// User dropped a subscription.
Some(FrontToBack::SubscriptionClosed(sub_id)) => {
log::trace!("Closing in subscription: {:?}", sub_id);
// NOTE: The subscription may have been closed earlier if
// the channel was full or disconnected.
if let Some(request_id) = manager.get_request_id_by_subscription_id(&sub_id) {
if let Some((_sink, unsubscribe_method)) = manager.remove_subscription(request_id, sub_id.clone()) {
if let Ok(json_sub_id) = jsonrpc::to_value(sub_id) {
let params = jsonrpc::Params::Array(vec![json_sub_id]);
let _ = sender.start_request(unsubscribe_method, params).await;
}
}
}
}
},
event = next_backend => match event {
None => {
log::trace!("[backend]: backend channel dropped; terminate client");
break;
}
Some(Ok(jsonrpc::Response::Single(response))) => {
match process_response(&mut manager, response, max_capacity_per_subscription) {
Ok(Some((unsubscribe, params))) => {
if let Err(e) = sender.start_request(unsubscribe, params).await {
log::error!("Failed to send unsubscription response: {:?}", e);
}
}
Ok(None) => (),
Err(e) => {
log::error!("Error: {:?} terminating client", e);
return;
}
}
}
Some(Ok(jsonrpc::Response::Batch(responses))) => {
// if any request fails, throw away entire batch.
for response in responses {
match process_response(&mut manager, response, max_capacity_per_subscription) {
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
Ok(Some((unsubscribe, params))) => {
if let Err(e) = sender.start_request(unsubscribe, params).await {
log::error!("Failed to send unsubscription response: {:?}", e);
}
}
Ok(None) => (),
Err(e) => {
log::error!("Error: {:?} terminating client", e);
return;
}
}
}
}
Some(Ok(jsonrpc::Response::Notif(notif))) => {
let sub_id = notif.params.subscription;
let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
Some(r) => r,
None => {
log::error!("Subscription ID: {:?} not found", sub_id);
continue;
}
};
match manager.as_subscription_mut(&request_id) {
Some(send_back_sink) => {
if let Err(e) = send_back_sink.try_send(notif.params.result) {
log::error!("Dropping subscription {:?} error: {:?}", sub_id, e);
manager.remove_subscription(request_id, sub_id).expect("subscription is active; checked above");
}
}
None => {
log::error!("Subscription ID: {:?} not an active subscription", sub_id);
},
}
}
Some(Err(e)) => {
log::error!("Error: {:?} terminating client", e);
return;
}
},
}
}
}
/// Process a response from the server.
///
/// Returns `Ok(_)` if the response was successful or if the error could be handled.
/// Returns `Err(_)` if the response couldn't be handled.
fn process_response(
manager: &mut RequestManager,
response: jsonrpc::Output,
max_capacity_per_subscription: usize,
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
) -> Result<Option<(String, jsonrpc::Params)>, Error> {
let response_id = *response.id().as_number().ok_or(Error::InvalidRequestId)?;
match manager.request_status(&response_id) {
RequestStatus::PendingMethodCall => {
let send_back_oneshot = manager.complete_pending_call(response_id).ok_or(Error::InvalidRequestId)?;
let response = response.try_into().map_err(Error::Request);
match send_back_oneshot.send(response) {
Err(Err(e)) => Err(e),
Err(Ok(_)) => Err(Error::Custom("Frontend channel closed".into())),
Ok(_) => Ok(None),
}
}
RequestStatus::PendingSubscription => {
let (send_back_oneshot, unsubscribe_method) =
manager.complete_pending_subscription(response_id).ok_or(Error::InvalidRequestId)?;
let json_sub_id: JsonValue = match response.try_into() {
Ok(response) => response,
Err(e) => {
return match send_back_oneshot.send(Err(Error::Request(e))) {
Err(Err(e)) => Err(e),
Err(Ok(_)) => unreachable!("Error sent above; qed"),
_ => Ok(None),
};
}
};
let sub_id: SubscriptionId = match jsonrpc::from_value(json_sub_id.clone()) {
Ok(sub_id) => sub_id,
Err(_) => {
return match send_back_oneshot.send(Err(Error::InvalidSubscriptionId)) {
Err(Err(e)) => Err(e),
Err(Ok(_)) => unreachable!("Error sent above; qed"),
_ => Ok(None),
}
}
};
let (subscribe_tx, subscribe_rx) = mpsc::channel(max_capacity_per_subscription);
if manager.insert_subscription(response_id, sub_id.clone(), subscribe_tx, unsubscribe_method).is_ok() {
match send_back_oneshot.send(Ok((subscribe_rx, sub_id.clone()))) {
Ok(_) => Ok(None),
Err(_) => {
let (_, unsubscribe_method) =
manager.remove_subscription(response_id, sub_id).expect("Subscription inserted above; qed");
let params = jsonrpc::Params::Array(vec![json_sub_id]);
Ok(Some((unsubscribe_method, params)))
}
}
} else {
match send_back_oneshot.send(Err(Error::InvalidSubscriptionId)) {
Err(Err(e)) => Err(e),
Err(Ok(_)) => unreachable!("Error sent above; qed"),
_ => Ok(None),
}
}
}
RequestStatus::Subscription | RequestStatus::Invalid => Err(Error::InvalidRequestId),
}
}