mod.rs 22.4 KB
Newer Older
1
2
//! Abstract async client.

3
4
5
6
mod helpers;
mod manager;

use crate::client::{
7
8
9
	async_client::helpers::process_subscription_close_response, BatchMessage, ClientT, ReceivedMessage,
	RegisterNotificationMessage, RequestMessage, Subscription, SubscriptionClientT, SubscriptionKind,
	SubscriptionMessage, TransportReceiverT, TransportSenderT,
10
};
11
12
use crate::tracing::{rx_log_from_json, tx_log_from_str, RpcTracing};

13
use core::time::Duration;
14
use helpers::{
Maciej Hirsz's avatar
Maciej Hirsz committed
15
16
17
	build_unsubscribe_message, call_with_timeout, process_batch_response, process_error_response, process_notification,
	process_single_response, process_subscription_response, stop_subscription,
};
18
19
20
use manager::RequestManager;

use crate::error::Error;
21
use async_lock::Mutex;
22
use async_trait::async_trait;
23
use futures_channel::{mpsc, oneshot};
24
use futures_timer::Delay;
25
use futures_util::future::{self, Either, Fuse};
26
27
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
28
use futures_util::FutureExt;
29
use jsonrpsee_types::{
30
31
	response::SubscriptionError, ErrorResponse, Id, Notification, NotificationSer, ParamsSer, RequestSer, Response,
	SubscriptionResponse,
Maciej Hirsz's avatar
Maciej Hirsz committed
32
};
33
use serde::de::DeserializeOwned;
34
use tracing_futures::Instrument;
35

36
use super::{FrontToBack, IdKind, RequestIdManager};
Niklas Adolfsson's avatar
Niklas Adolfsson committed
37

38
/// Wrapper over a [`oneshot::Receiver`](futures_channel::oneshot::Receiver) that reads
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/// the underlying channel once and then stores the result in String.
/// It is possible that the error is read more than once if several calls are made
/// when the background thread has been terminated.
#[derive(Debug)]
enum ErrorFromBack {
	/// Error message is already read.
	Read(String),
	/// Error message is unread.
	Unread(oneshot::Receiver<Error>),
}

impl ErrorFromBack {
	async fn read_error(self) -> (Self, Error) {
		match self {
			Self::Unread(rx) => {
				let msg = match rx.await {
					Ok(msg) => msg.to_string(),
					// This should never happen because the receiving end is still alive.
					// Would be a bug in the logic of the background task.
					Err(_) => "Error reason could not be found. This is a bug. Please open an issue.".to_string(),
				};
				let err = Error::RestartNeeded(msg.clone());
				(Self::Read(msg), err)
			}
			Self::Read(msg) => (Self::Read(msg.clone()), Error::RestartNeeded(msg)),
		}
	}
}

68
/// Builder for [`Client`].
69
#[derive(Clone, Debug)]
70
pub struct ClientBuilder {
David's avatar
David committed
71
	request_timeout: Duration,
72
73
	max_concurrent_requests: usize,
	max_notifs_per_subscription: usize,
74
	id_kind: IdKind,
75
	max_log_length: u32,
76
	ping_interval: Option<Duration>,
Niklas Adolfsson's avatar
Niklas Adolfsson committed
77
78
}

79
impl Default for ClientBuilder {
80
	fn default() -> Self {
Niklas Adolfsson's avatar
Niklas Adolfsson committed
81
		Self {
David's avatar
David committed
82
			request_timeout: Duration::from_secs(60),
83
			max_concurrent_requests: 256,
84
			max_notifs_per_subscription: 1024,
85
			id_kind: IdKind::Number,
86
			max_log_length: 4096,
87
			ping_interval: None,
Niklas Adolfsson's avatar
Niklas Adolfsson committed
88
89
90
91
		}
	}
}

92
impl ClientBuilder {
David's avatar
David committed
93
	/// Set request timeout (default is 60 seconds).
94
	pub fn request_timeout(mut self, timeout: Duration) -> Self {
David's avatar
David committed
95
		self.request_timeout = timeout;
96
97
98
		self
	}

99
	/// Set max concurrent requests (default is 256).
100
101
102
103
104
	pub fn max_concurrent_requests(mut self, max: usize) -> Self {
		self.max_concurrent_requests = max;
		self
	}

105
	/// Set max concurrent notification capacity for each subscription; when the capacity is exceeded the subscription
106
	/// will be dropped (default is 1024).
107
	///
108
109
110
	/// You may prevent the subscription from being dropped by polling often enough
	/// [`Subscription::next()`](../../jsonrpsee_core/client/struct.Subscription.html#method.next) such that
	/// it can keep with the rate as server produces new items on the subscription.
111
112
	///
	/// **Note**: The actual capacity is `num_senders + max_subscription_capacity`
113
	/// because it is passed to [`futures_channel::mpsc::channel`].
114
115
116
117
118
	pub fn max_notifs_per_subscription(mut self, max: usize) -> Self {
		self.max_notifs_per_subscription = max;
		self
	}

119
120
121
122
123
124
	/// Configure the data type of the request object ID (default is number).
	pub fn id_format(mut self, id_kind: IdKind) -> Self {
		self.id_kind = id_kind;
		self
	}

125
126
127
128
129
130
131
	/// Set maximum length for logging calls and responses.
	///
	/// Logs bigger than this limit will be truncated.
	pub fn set_max_logging_length(mut self, max: u32) -> Self {
		self.max_log_length = max;
		self
	}
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
	/// Set the interval at which pings frames are submitted (disabled by default).
	///
	/// Periodically submitting pings at a defined interval has mainly two benefits:
	///  - Directly, it acts as a "keep-alive" alternative in the WebSocket world.
	///  - Indirectly by inspecting debug logs, it ensures that the endpoint is still responding to messages.
	///
	/// The underlying implementation does not make any assumptions about at which intervals pongs are received.
	///
	/// Note: The interval duration is restarted when
	///  - a frontend command is submitted
	///  - a reply is received from the backend
	///  - the interval duration expires
	pub fn ping_interval(mut self, interval: Duration) -> Self {
		self.ping_interval = Some(interval);
		self
	}

149
	/// Build the client with given transport.
150
151
152
	///
	/// ## Panics
	///
153
154
	/// Panics if called outside of `tokio` runtime context.
	#[cfg(feature = "async-client")]
155
	#[cfg_attr(docsrs, doc(cfg(feature = "async-client")))]
156
157
158
159
160
	pub fn build_with_tokio<S, R>(self, sender: S, receiver: R) -> Client
	where
		S: TransportSenderT + Send,
		R: TransportReceiverT + Send,
	{
161
		let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
162
		let (err_tx, err_rx) = oneshot::channel();
163
		let max_notifs_per_subscription = self.max_notifs_per_subscription;
164
		let ping_interval = self.ping_interval;
Niklas Adolfsson's avatar
Niklas Adolfsson committed
165

166
		tokio::spawn(async move {
167
			background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription, ping_interval).await;
Niklas Adolfsson's avatar
Niklas Adolfsson committed
168
		});
169
		Client {
170
			to_back,
171
			request_timeout: self.request_timeout,
172
			error: Mutex::new(ErrorFromBack::Unread(err_rx)),
173
			id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind),
174
			max_log_length: self.max_log_length,
175
		}
176
	}
177
178
179

	/// Build the client with given transport.
	#[cfg(all(feature = "async-wasm-client", target_arch = "wasm32"))]
180
	#[cfg_attr(docsrs, doc(cfg(feature = "async-wasm-client")))]
181
182
183
184
185
186
187
188
189
190
	pub fn build_with_wasm<S, R>(self, sender: S, receiver: R) -> Client
	where
		S: TransportSenderT,
		R: TransportReceiverT,
	{
		let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
		let (err_tx, err_rx) = oneshot::channel();
		let max_notifs_per_subscription = self.max_notifs_per_subscription;

		wasm_bindgen_futures::spawn_local(async move {
191
			background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription, None).await;
192
193
194
195
196
197
		});
		Client {
			to_back,
			request_timeout: self.request_timeout,
			error: Mutex::new(ErrorFromBack::Unread(err_rx)),
			id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind),
198
			max_log_length: self.max_log_length,
199
200
		}
	}
201
}
202

203
/// Generic asynchronous client.
204
205
206
207
208
209
210
211
212
213
214
#[derive(Debug)]
pub struct Client {
	/// Channel to send requests to the background task.
	to_back: mpsc::Sender<FrontToBack>,
	/// If the background thread terminates the error is sent to this channel.
	// NOTE(niklasad1): This is a Mutex to circumvent that the async fns takes immutable references.
	error: Mutex<ErrorFromBack>,
	/// Request timeout. Defaults to 60sec.
	request_timeout: Duration,
	/// Request ID manager.
	id_manager: RequestIdManager,
215
216
217
218
	/// Max length for logging for requests and responses.
	///
	/// Entries bigger than this limit will be truncated.
	max_log_length: u32,
219
220
221
}

impl Client {
222
223
224
225
226
	/// Checks if the client is connected to the target.
	pub fn is_connected(&self) -> bool {
		!self.to_back.is_closed()
	}

227
228
229
230
231
232
233
	// Reads the error message from the backend thread.
	async fn read_error_from_backend(&self) -> Error {
		let mut err_lock = self.error.lock().await;
		let from_back = std::mem::replace(&mut *err_lock, ErrorFromBack::Read(String::new()));
		let (next_state, err) = from_back.read_error().await;
		*err_lock = next_state;
		err
Niklas Adolfsson's avatar
Niklas Adolfsson committed
234
	}
235
}
Niklas Adolfsson's avatar
Niklas Adolfsson committed
236

237
impl Drop for Client {
238
239
240
241
242
	fn drop(&mut self) {
		self.to_back.close_channel();
	}
}

243
#[async_trait]
244
impl ClientT for Client {
245
	async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error> {
246
		// NOTE: we use this to guard against max number of concurrent requests.
247
		let _req_id = self.id_manager.next_request_id()?;
David's avatar
David committed
248
		let notif = NotificationSer::new(method, params);
249
250
251
		let trace = RpcTracing::batch();
		let _enter = trace.span().enter();

252
		let raw = serde_json::to_string(&notif).map_err(Error::ParseError)?;
253
		tx_log_from_str(&raw, self.max_log_length);
David's avatar
David committed
254
255

		let mut sender = self.to_back.clone();
256
		let fut = sender.send(FrontToBack::Notification(raw)).in_current_span();
David's avatar
David committed
257

258
259
260
261
		match future::select(fut, Delay::new(self.request_timeout)).await {
			Either::Left((Ok(()), _)) => Ok(()),
			Either::Left((Err(_), _)) => Err(self.read_error_from_backend().await),
			Either::Right((_, _)) => Err(Error::RequestTimeout),
262
		}
Niklas Adolfsson's avatar
Niklas Adolfsson committed
263
264
	}

265
	async fn request<'a, R>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<R, Error>
Niklas Adolfsson's avatar
Niklas Adolfsson committed
266
	where
267
		R: DeserializeOwned,
Niklas Adolfsson's avatar
Niklas Adolfsson committed
268
269
	{
		let (send_back_tx, send_back_rx) = oneshot::channel();
270
271
		let guard = self.id_manager.next_request_id()?;
		let id = guard.inner();
patrick's avatar
patrick committed
272
		let trace = RpcTracing::method_call(method, &id);
273
		let _enter = trace.span().enter();
274
275

		let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?;
276
		tx_log_from_str(&raw, self.max_log_length);
277

278
279
		if self
			.to_back
Niklas Adolfsson's avatar
Niklas Adolfsson committed
280
			.clone()
281
			.send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) }))
Niklas Adolfsson's avatar
Niklas Adolfsson committed
282
			.await
283
284
285
286
			.is_err()
		{
			return Err(self.read_error_from_backend().await);
		}
Niklas Adolfsson's avatar
Niklas Adolfsson committed
287

288
		let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await;
David's avatar
David committed
289
		let json_value = match res {
Niklas Adolfsson's avatar
Niklas Adolfsson committed
290
291
			Ok(Ok(v)) => v,
			Ok(Err(err)) => return Err(err),
292
			Err(_) => return Err(self.read_error_from_backend().await),
Niklas Adolfsson's avatar
Niklas Adolfsson committed
293
		};
294
295
296

		rx_log_from_json(&Response::new(&json_value, id), self.max_log_length);

297
		serde_json::from_value(json_value).map_err(Error::ParseError)
Niklas Adolfsson's avatar
Niklas Adolfsson committed
298
	}
299

300
	async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option<ParamsSer<'a>>)>) -> Result<Vec<R>, Error>
301
	where
302
		R: DeserializeOwned + Default + Clone,
303
	{
304
305
		let guard = self.id_manager.next_request_ids(batch.len())?;
		let batch_ids: Vec<Id> = guard.inner();
306
		let mut batches = Vec::with_capacity(batch.len());
307
308
		let log = RpcTracing::batch();
		let _enter = log.span().enter();
309
310

		for (idx, (method, params)) in batch.into_iter().enumerate() {
311
			batches.push(RequestSer::new(&batch_ids[idx], method, params));
312
313
		}

314
		let (send_back_tx, send_back_rx) = oneshot::channel();
315

316
		let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?;
317
318
319

		tx_log_from_str(&raw, self.max_log_length);

320
321
322
		if self
			.to_back
			.clone()
323
			.send(FrontToBack::Batch(BatchMessage { raw, ids: batch_ids, send_back: send_back_tx }))
324
325
326
327
328
329
			.await
			.is_err()
		{
			return Err(self.read_error_from_backend().await);
		}

330
		let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await;
331
		let json_values = match res {
332
333
334
335
336
			Ok(Ok(v)) => v,
			Ok(Err(err)) => return Err(err),
			Err(_) => return Err(self.read_error_from_backend().await),
		};

337
338
		rx_log_from_json(&json_values, self.max_log_length);

339
		let values: Result<_, _> =
340
			json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect();
341
342
		Ok(values?)
	}
343
}
Niklas Adolfsson's avatar
Niklas Adolfsson committed
344

345
#[async_trait]
346
impl SubscriptionClientT for Client {
Niklas Adolfsson's avatar
Niklas Adolfsson committed
347
348
349
350
	/// Send a subscription request to the server.
	///
	/// The `subscribe_method` and `params` are used to ask for the subscription towards the
	/// server. The `unsubscribe_method` is used to close the subscription.
351
	async fn subscribe<'a, N>(
Niklas Adolfsson's avatar
Niklas Adolfsson committed
352
		&self,
353
		subscribe_method: &'a str,
354
		params: Option<ParamsSer<'a>>,
355
		unsubscribe_method: &'a str,
356
357
358
359
	) -> Result<Subscription<N>, Error>
	where
		N: DeserializeOwned,
	{
Niklas Adolfsson's avatar
Niklas Adolfsson committed
360
		if subscribe_method == unsubscribe_method {
361
			return Err(Error::SubscriptionNameConflict(unsubscribe_method.to_owned()));
Niklas Adolfsson's avatar
Niklas Adolfsson committed
362
363
		}

364
365
		let guard = self.id_manager.next_request_ids(2)?;
		let mut ids: Vec<Id> = guard.inner();
patrick's avatar
patrick committed
366
367
368
		let id = ids[0].clone();

		let trace = RpcTracing::method_call(subscribe_method, &id);
369
		let _enter = trace.span().enter();
370

371
372
373
374

		let raw = serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?;

		tx_log_from_str(&raw, self.max_log_length);
375

Niklas Adolfsson's avatar
Niklas Adolfsson committed
376
		let (send_back_tx, send_back_rx) = oneshot::channel();
377
378
		if self
			.to_back
Niklas Adolfsson's avatar
Niklas Adolfsson committed
379
			.clone()
380
			.send(FrontToBack::Subscribe(SubscriptionMessage {
381
				raw,
382
383
				subscribe_id: ids.swap_remove(0),
				unsubscribe_id: ids.swap_remove(0),
384
				unsubscribe_method: unsubscribe_method.to_owned(),
385
386
				send_back: send_back_tx,
			}))
Niklas Adolfsson's avatar
Niklas Adolfsson committed
387
			.await
388
389
390
391
			.is_err()
		{
			return Err(self.read_error_from_backend().await);
		}
Niklas Adolfsson's avatar
Niklas Adolfsson committed
392

393
		let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await;
David's avatar
David committed
394

395
		let (notifs_rx, sub_id) = match res {
Niklas Adolfsson's avatar
Niklas Adolfsson committed
396
397
			Ok(Ok(val)) => val,
			Ok(Err(err)) => return Err(err),
398
			Err(_) => return Err(self.read_error_from_backend().await),
Niklas Adolfsson's avatar
Niklas Adolfsson committed
399
		};
400
401
402
403

		rx_log_from_json(&Response::new(&sub_id, id), self.max_log_length);

		Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id)))
Niklas Adolfsson's avatar
Niklas Adolfsson committed
404
	}
405

406
407
	/// Subscribe to a specific method.
	async fn subscribe_to_method<'a, N>(&self, method: &'a str) -> Result<Subscription<N>, Error>
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
	where
		N: DeserializeOwned,
	{
		let (send_back_tx, send_back_rx) = oneshot::channel();
		if self
			.to_back
			.clone()
			.send(FrontToBack::RegisterNotification(RegisterNotificationMessage {
				send_back: send_back_tx,
				method: method.to_owned(),
			}))
			.await
			.is_err()
		{
			return Err(self.read_error_from_backend().await);
		}

David's avatar
David committed
425
426
		let res = call_with_timeout(self.request_timeout, send_back_rx).await;

427
428
429
430
431
432
		let (notifs_rx, method) = match res {
			Ok(Ok(val)) => val,
			Ok(Err(err)) => return Err(err),
			Err(_) => return Err(self.read_error_from_backend().await),
		};

433
		Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Method(method)))
434
	}
Niklas Adolfsson's avatar
Niklas Adolfsson committed
435
436
}

437
438
439
440
441
442
443
444
445
446
447
448
449
450
/// Handle backend messages.
///
/// Returns an error if the main background loop should be terminated.
async fn handle_backend_messages<S: TransportSenderT, R: TransportReceiverT>(
	message: Option<Result<ReceivedMessage, R::Error>>,
	manager: &mut RequestManager,
	sender: &mut S,
	max_notifs_per_subscription: usize,
) -> Result<(), Error> {
	// Handle raw messages of form `ReceivedMessage::Bytes` (Vec<u8>) or ReceivedMessage::Data` (String).
	async fn handle_recv_message<S: TransportSenderT>(
		raw: &[u8],
		manager: &mut RequestManager,
		sender: &mut S,
451
		max_notifs_per_subscription: usize,
452
453
	) -> Result<(), Error> {
		// Single response to a request.
Niklas Adolfsson's avatar
Niklas Adolfsson committed
454
		if let Ok(single) = serde_json::from_slice::<Response<_>>(raw) {
455
456
457
458
459
460
461
462
463
			match process_single_response(manager, single, max_notifs_per_subscription) {
				Ok(Some(unsub)) => {
					stop_subscription(sender, manager, unsub).await;
				}
				Ok(None) => (),
				Err(err) => return Err(err),
			}
		}
		// Subscription response.
Niklas Adolfsson's avatar
Niklas Adolfsson committed
464
		else if let Ok(response) = serde_json::from_slice::<SubscriptionResponse<_>>(raw) {
465
			if let Err(Some(unsub)) = process_subscription_response(manager, response) {
Niklas Adolfsson's avatar
Niklas Adolfsson committed
466
				stop_subscription(sender, manager, unsub).await;
467
468
469
			}
		}
		// Subscription error response.
Niklas Adolfsson's avatar
Niklas Adolfsson committed
470
		else if let Ok(response) = serde_json::from_slice::<SubscriptionError<_>>(raw) {
471
472
473
			let _ = process_subscription_close_response(manager, response);
		}
		// Incoming Notification
Niklas Adolfsson's avatar
Niklas Adolfsson committed
474
		else if let Ok(notif) = serde_json::from_slice::<Notification<_>>(raw) {
475
476
477
			let _ = process_notification(manager, notif);
		}
		// Batch response.
Niklas Adolfsson's avatar
Niklas Adolfsson committed
478
		else if let Ok(batch) = serde_json::from_slice::<Vec<Response<_>>>(raw) {
479
480
481
482
483
			if let Err(e) = process_batch_response(manager, batch) {
				return Err(e);
			}
		}
		// Error response
Niklas Adolfsson's avatar
Niklas Adolfsson committed
484
		else if let Ok(err) = serde_json::from_slice::<ErrorResponse>(raw) {
485
486
487
488
489
490
			if let Err(e) = process_error_response(manager, err) {
				return Err(e);
			}
		}
		// Unparsable response
		else {
Niklas Adolfsson's avatar
Niklas Adolfsson committed
491
			let json = serde_json::from_slice::<serde_json::Value>(raw);
492
493
494
495
496
497
498

			let json_str = match json {
				Ok(json) => serde_json::to_string(&json).expect("valid JSON; qed"),
				Err(e) => e.to_string(),
			};

			return Err(Error::Custom(format!("Unparseable message: {}", json_str)));
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
		}
		Ok(())
	}

	match message {
		Some(Ok(ReceivedMessage::Pong)) => {
			tracing::debug!("recv pong");
		}
		Some(Ok(ReceivedMessage::Bytes(raw))) => {
			handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?;
		}
		Some(Ok(ReceivedMessage::Text(raw))) => {
			handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?;
		}
		Some(Err(e)) => {
			tracing::error!("Error: {:?} terminating client", e);
			return Err(Error::Transport(e.into()));
		}
		None => {
			tracing::error!("[backend]: WebSocket receiver dropped; terminate client");
			return Err(Error::Custom("WebSocket receiver dropped".into()));
		}
	}

Niklas Adolfsson's avatar
Niklas Adolfsson committed
523
	Ok(())
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
}

/// Handle frontend messages.
///
/// Returns an error if the main background loop should be terminated.
async fn handle_frontend_messages<S: TransportSenderT>(
	message: Option<FrontToBack>,
	manager: &mut RequestManager,
	sender: &mut S,
	max_notifs_per_subscription: usize,
) -> Result<(), Error> {
	match message {
		// User dropped the sender side of the channel.
		// There is nothing to do just terminate.
		None => {
			return Err(Error::Custom("[backend]: frontend dropped; terminate client".into()));
		}

		Some(FrontToBack::Batch(batch)) => {
			if let Err(send_back) = manager.insert_pending_batch(batch.ids.clone(), batch.send_back) {
				tracing::warn!("[backend]: batch request: {:?} already pending", batch.ids);
				let _ = send_back.send(Err(Error::InvalidRequestId));
				return Ok(());
			}

			if let Err(e) = sender.send(batch.raw).await {
				tracing::warn!("[backend]: client batch request failed: {:?}", e);
				manager.complete_pending_batch(batch.ids);
			}
		}
		// User called `notification` on the front-end
		Some(FrontToBack::Notification(notif)) => {
			if let Err(e) = sender.send(notif).await {
				tracing::warn!("[backend]: client notif failed: {:?}", e);
			}
		}
		// User called `request` on the front-end
561
562
563
564
565
		Some(FrontToBack::Request(request)) => match sender.send(request.raw).await {
			Ok(_) => manager.insert_pending_call(request.id, request.send_back).expect("ID unused checked above; qed"),
			Err(e) => {
				tracing::warn!("[backend]: client request failed: {:?}", e);
				let _ = request.send_back.map(|s| s.send(Err(Error::Transport(e.into()))));
566
			}
567
		},
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
		// User called `subscribe` on the front-end.
		Some(FrontToBack::Subscribe(sub)) => match sender.send(sub.raw).await {
			Ok(_) => manager
				.insert_pending_subscription(
					sub.subscribe_id,
					sub.unsubscribe_id,
					sub.send_back,
					sub.unsubscribe_method,
				)
				.expect("Request ID unused checked above; qed"),
			Err(e) => {
				tracing::warn!("[backend]: client subscription failed: {:?}", e);
				let _ = sub.send_back.send(Err(Error::Transport(e.into())));
			}
		},
		// User dropped a subscription.
		Some(FrontToBack::SubscriptionClosed(sub_id)) => {
			tracing::trace!("Closing subscription: {:?}", sub_id);
			// NOTE: The subscription may have been closed earlier if
			// the channel was full or disconnected.
			if let Some(unsub) = manager
				.get_request_id_by_subscription_id(&sub_id)
				.and_then(|req_id| build_unsubscribe_message(manager, req_id, sub_id))
			{
				stop_subscription(sender, manager, unsub).await;
			}
		}
		// User called `register_notification` on the front-end.
		Some(FrontToBack::RegisterNotification(reg)) => {
			let (subscribe_tx, subscribe_rx) = mpsc::channel(max_notifs_per_subscription);

			if manager.insert_notification_handler(&reg.method, subscribe_tx).is_ok() {
				let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
			} else {
				let _ = reg.send_back.send(Err(Error::MethodAlreadyRegistered(reg.method)));
			}
		}
		// User dropped the notificationHandler for this method
		Some(FrontToBack::UnregisterNotification(method)) => {
			let _ = manager.remove_notification_handler(method);
		}
	}

	Ok(())
}

Niklas Adolfsson's avatar
Niklas Adolfsson committed
614
/// Function being run in the background that processes messages from the frontend.
615
async fn background_task<S, R>(
616
617
	mut sender: S,
	receiver: R,
Niklas Adolfsson's avatar
Niklas Adolfsson committed
618
	mut frontend: mpsc::Receiver<FrontToBack>,
619
	front_error: oneshot::Sender<Error>,
620
	max_notifs_per_subscription: usize,
621
	ping_interval: Option<Duration>,
622
623
624
625
) where
	S: TransportSenderT,
	R: TransportReceiverT,
{
626
	let mut manager = RequestManager::new();
Niklas Adolfsson's avatar
Niklas Adolfsson committed
627

628
629
	let backend_event = futures_util::stream::unfold(receiver, |mut receiver| async {
		let res = receiver.receive().await;
Niklas Adolfsson's avatar
Niklas Adolfsson committed
630
631
		Some((res, receiver))
	});
632
	futures_util::pin_mut!(backend_event);
Niklas Adolfsson's avatar
Niklas Adolfsson committed
633

634
635
636
637
638
639
	// Place frontend and backend messages into their own select.
	// This implies that either messages are received (both front or backend),
	// or the submitted ping timer expires (if provided).
	let next_frontend = frontend.next();
	let next_backend = backend_event.next();
	let mut message_fut = future::select(next_frontend, next_backend);
640

641
642
643
644
645
646
647
648
649
	loop {
		// Create either a valid delay fuse triggered every provided `duration`,
		// or create a terminated fuse that's never selected if the provided `duration` is None.
		let submit_ping = if let Some(duration) = ping_interval {
			Delay::new(duration).fuse()
		} else {
			// The select macro bypasses terminated futures, and the `submit_ping` branch is never selected.
			Fuse::<Delay>::terminated()
		};
650

651
652
653
		match future::select(message_fut, submit_ping).await {
			// Message received from the frontend.
			Either::Left((Either::Left((frontend_value, backend)), _)) => {
654
655
656
657
				if let Err(err) =
					handle_frontend_messages(frontend_value, &mut manager, &mut sender, max_notifs_per_subscription)
						.await
				{
658
659
660
					tracing::warn!("{:?}", err);
					let _ = front_error.send(err);
					break;
Niklas Adolfsson's avatar
Niklas Adolfsson committed
661
				}
662
663
				// Advance frontend, save backend.
				message_fut = future::select(frontend.next(), backend);
664
			}
665
			// Message received from the backend.
666
			Either::Left((Either::Right((backend_value, frontend)), _)) => {
667
				if let Err(err) = handle_backend_messages::<S, R>(
668
669
670
671
672
673
674
					backend_value,
					&mut manager,
					&mut sender,
					max_notifs_per_subscription,
				)
				.await
				{
675
676
677
					tracing::warn!("{:?}", err);
					let _ = front_error.send(err);
					break;
678
				}
679
680
				// Advance backend, save frontend.
				message_fut = future::select(frontend, backend_event.next());
681
			}
682
683
684
685
686
			// Submit ping interval was triggered if enabled.
			Either::Right((_, next_message_fut)) => {
				if let Err(e) = sender.send_ping().await {
					tracing::warn!("[backend]: client send ping failed: {:?}", e);
					let _ = front_error.send(Error::Custom("Could not send ping frame".into()));
687
					break;
688
				}
689
				message_fut = next_message_fut;
690
			}
691
		};
Niklas Adolfsson's avatar
Niklas Adolfsson committed
692
	}
693
694
	// Send close message to the server.
	let _ = sender.close().await;
Niklas Adolfsson's avatar
Niklas Adolfsson committed
695
}