rpc_module.rs 43 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

Maciej Hirsz's avatar
Maciej Hirsz committed
27
28
29
30
31
32
use std::collections::hash_map::Entry;
use std::fmt::{self, Debug};
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

33
use crate::error::{Error, SubscriptionClosed};
34
use crate::id_providers::RandomIntegerIdProvider;
35
use crate::server::helpers::{BoundedSubscriptions, MethodSink, SubscriptionPermit};
Maciej Hirsz's avatar
Maciej Hirsz committed
36
use crate::server::resource_limiting::{ResourceGuard, ResourceTable, ResourceVec, Resources};
37
use crate::traits::{IdProvider, ToRpcParams};
38
use futures_channel::mpsc;
39
use futures_util::future::Either;
David's avatar
David committed
40
use futures_util::pin_mut;
41
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStream, TryStreamExt};
42
43
use jsonrpsee_types::error::{CallError, ErrorCode, ErrorObject, ErrorObjectOwned, SUBSCRIPTION_CLOSED_WITH_ERROR};
use jsonrpsee_types::response::{SubscriptionError, SubscriptionPayloadError};
David's avatar
David committed
44
use jsonrpsee_types::{
45
	ErrorResponse, Id, Params, Request, Response, SubscriptionResult, SubscriptionEmptyError,
46
	SubscriptionId as RpcSubscriptionId, SubscriptionPayload, SubscriptionResponse
47
};
48
49
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
Maciej Hirsz's avatar
Maciej Hirsz committed
50
use serde::{de::DeserializeOwned, Serialize};
51
use tokio::sync::watch;
52

Maciej Hirsz's avatar
Maciej Hirsz committed
53
/// A `MethodCallback` is an RPC endpoint, callable with a standard JSON-RPC request,
54
55
56
/// implemented as a function pointer to a `Fn` function taking four arguments:
/// the `id`, `params`, a channel the function uses to communicate the result (or error)
/// back to `jsonrpsee`, and the connection ID (useful for the websocket transport).
57
pub type SyncMethod = Arc<dyn Send + Sync + Fn(Id, Params, &MethodSink) -> bool>;
Maciej Hirsz's avatar
Maciej Hirsz committed
58
/// Similar to [`SyncMethod`], but represents an asynchronous handler and takes an additional argument containing a [`ResourceGuard`] if configured.
59
pub type AsyncMethod<'a> = Arc<
60
	dyn Send + Sync + Fn(Id<'a>, Params<'a>, MethodSink, ConnectionId, Option<ResourceGuard>) -> BoxFuture<'a, bool>,
61
>;
62
/// Method callback for subscriptions.
63
pub type SubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState, Option<ResourceGuard>) -> bool>;
64
65
// Method callback to unsubscribe.
type UnsubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, &MethodSink, ConnectionId) -> bool>;
66

67
68
69
/// Connection ID, used for stateful protocol such as WebSockets.
/// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value.
pub type ConnectionId = usize;
70

David's avatar
David committed
71
72
73
74
/// Raw response from an RPC
/// A 3-tuple containing:
///   - Call result as a `String`,
///   - a [`mpsc::UnboundedReceiver<String>`] to receive future subscription results
75
76
///   - a [`crate::server::helpers::SubscriptionPermit`] to allow subscribers to notify their [`SubscriptionSink`] when they disconnect.
pub type RawRpcResponse = (String, mpsc::UnboundedReceiver<String>, SubscriptionPermit);
David's avatar
David committed
77
78

/// Helper struct to manage subscriptions.
79
80
81
pub struct ConnState<'a> {
	/// Connection ID
	pub conn_id: ConnectionId,
David's avatar
David committed
82
	/// Get notified when the connection to subscribers is closed.
83
	pub close_notify: SubscriptionPermit,
84
85
86
87
	/// ID provider.
	pub id_provider: &'a dyn IdProvider,
}

88
89
/// Outcome of a successful terminated subscription.
#[derive(Debug)]
90
pub enum InnerSubscriptionResult {
91
92
93
94
95
96
	/// The subscription stream was executed successfully.
	Success,
	/// The subscription was aborted by the remote peer.
	Aborted,
}

97
98
impl<'a> std::fmt::Debug for ConnState<'a> {
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
David's avatar
David committed
99
		f.debug_struct("ConnState").field("conn_id", &self.conn_id).field("close", &self.close_notify).finish()
100
101
	}
}
102

103
type Subscribers = Arc<Mutex<FxHashMap<SubscriptionKey, (MethodSink, watch::Sender<()>)>>>;
104

105
106
/// Represent a unique subscription entry based on [`RpcSubscriptionId`] and [`ConnectionId`].
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
107
108
struct SubscriptionKey {
	conn_id: ConnectionId,
109
	sub_id: RpcSubscriptionId<'static>,
110
}
111

112
/// Callback wrapper that can be either sync or async.
113
#[derive(Clone)]
114
pub enum MethodKind {
115
	/// Synchronous method handler.
116
	Sync(SyncMethod),
117
	/// Asynchronous method handler.
118
	Async(AsyncMethod<'static>),
119
	/// Subscription method handler.
120
	Subscription(SubscriptionMethod),
121
122
	/// Unsubscription method handler.
	Unsubscription(UnsubscriptionMethod),
123
124
}

Maciej Hirsz's avatar
Maciej Hirsz committed
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/// Information about resources the method uses during its execution. Initialized when the the server starts.
#[derive(Clone, Debug)]
enum MethodResources {
	/// Uninitialized resource table, mapping string label to units.
	Uninitialized(Box<[(&'static str, u16)]>),
	/// Initialized resource table containing units for each `ResourceId`.
	Initialized(ResourceTable),
}

/// Method callback wrapper that contains a sync or async closure,
/// plus a table with resources it needs to claim to run
#[derive(Clone, Debug)]
pub struct MethodCallback {
	callback: MethodKind,
	resources: MethodResources,
}

Maciej Hirsz's avatar
Maciej Hirsz committed
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/// Result of a method, either direct value or a future of one.
pub enum MethodResult<T> {
	/// Result by value
	Sync(T),
	/// Future of a value
	Async(BoxFuture<'static, T>),
}

impl<T: Debug> Debug for MethodResult<T> {
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
		match self {
			MethodResult::Sync(result) => result.fmt(f),
			MethodResult::Async(_) => f.write_str("<future>"),
		}
	}
}

Maciej Hirsz's avatar
Maciej Hirsz committed
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
/// Builder for configuring resources used by a method.
#[derive(Debug)]
pub struct MethodResourcesBuilder<'a> {
	build: ResourceVec<(&'static str, u16)>,
	callback: &'a mut MethodCallback,
}

impl<'a> MethodResourcesBuilder<'a> {
	/// Define how many units of a given named resource the method uses during its execution.
	pub fn resource(mut self, label: &'static str, units: u16) -> Result<Self, Error> {
		self.build.try_push((label, units)).map_err(|_| Error::MaxResourcesReached)?;
		Ok(self)
	}
}

impl<'a> Drop for MethodResourcesBuilder<'a> {
	fn drop(&mut self) {
		self.callback.resources = MethodResources::Uninitialized(self.build[..].into());
	}
}

180
impl MethodCallback {
Maciej Hirsz's avatar
Maciej Hirsz committed
181
182
183
184
185
186
187
188
	fn new_sync(callback: SyncMethod) -> Self {
		MethodCallback { callback: MethodKind::Sync(callback), resources: MethodResources::Uninitialized([].into()) }
	}

	fn new_async(callback: AsyncMethod<'static>) -> Self {
		MethodCallback { callback: MethodKind::Async(callback), resources: MethodResources::Uninitialized([].into()) }
	}

189
190
191
192
193
194
195
	fn new_subscription(callback: SubscriptionMethod) -> Self {
		MethodCallback {
			callback: MethodKind::Subscription(callback),
			resources: MethodResources::Uninitialized([].into()),
		}
	}

196
197
198
199
200
201
202
	fn new_unsubscription(callback: UnsubscriptionMethod) -> Self {
		MethodCallback {
			callback: MethodKind::Unsubscription(callback),
			resources: MethodResources::Uninitialized([].into()),
		}
	}

Maciej Hirsz's avatar
Maciej Hirsz committed
203
204
205
206
207
208
209
210
211
	/// Attempt to claim resources prior to executing a method. On success returns a guard that releases
	/// claimed resources when dropped.
	pub fn claim(&self, name: &str, resources: &Resources) -> Result<ResourceGuard, Error> {
		match self.resources {
			MethodResources::Uninitialized(_) => Err(Error::UninitializedMethod(name.into())),
			MethodResources::Initialized(units) => resources.claim(units),
		}
	}

212
213
214
	/// Get handle to the callback.
	pub fn inner(&self) -> &MethodKind {
		&self.callback
215
	}
216
217
}

Maciej Hirsz's avatar
Maciej Hirsz committed
218
impl Debug for MethodKind {
219
220
221
222
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		match self {
			Self::Async(_) => write!(f, "Async"),
			Self::Sync(_) => write!(f, "Sync"),
223
			Self::Subscription(_) => write!(f, "Subscription"),
224
			Self::Unsubscription(_) => write!(f, "Unsubscription"),
225
226
227
228
		}
	}
}

229
230
/// Reference-counted, clone-on-write collection of synchronous and asynchronous methods.
#[derive(Default, Debug, Clone)]
David's avatar
David committed
231
pub struct Methods {
232
	callbacks: Arc<FxHashMap<&'static str, MethodCallback>>,
233
234
}

David's avatar
David committed
235
236
impl Methods {
	/// Creates a new empty [`Methods`].
237
238
	pub fn new() -> Self {
		Self::default()
239
240
	}

Maciej Hirsz's avatar
Maciej Hirsz committed
241
	fn verify_method_name(&mut self, name: &'static str) -> Result<(), Error> {
242
		if self.callbacks.contains_key(name) {
243
244
245
246
247
248
			return Err(Error::MethodAlreadyRegistered(name.into()));
		}

		Ok(())
	}

Maciej Hirsz's avatar
Maciej Hirsz committed
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
	/// Inserts the method callback for a given name, or returns an error if the name was already taken.
	/// On success it returns a mut reference to the [`MethodCallback`] just inserted.
	fn verify_and_insert(
		&mut self,
		name: &'static str,
		callback: MethodCallback,
	) -> Result<&mut MethodCallback, Error> {
		match self.mut_callbacks().entry(name) {
			Entry::Occupied(_) => Err(Error::MethodAlreadyRegistered(name.into())),
			Entry::Vacant(vacant) => Ok(vacant.insert(callback)),
		}
	}

	/// Initialize resources for all methods in this collection. This method has no effect if called more than once.
	pub fn initialize_resources(mut self, resources: &Resources) -> Result<Self, Error> {
		let callbacks = self.mut_callbacks();

		for (&method_name, callback) in callbacks.iter_mut() {
			if let MethodResources::Uninitialized(uninit) = &callback.resources {
				let mut map = resources.defaults;

				for &(label, units) in uninit.iter() {
					let idx = match resources.labels.iter().position(|&l| l == label) {
						Some(idx) => idx,
						None => return Err(Error::ResourceNameNotFoundForMethod(label, method_name)),
					};

276
277
278
279
280
281
282
					// If resource capacity set to `0`, we ignore the unit value of the method
					// and set it to `0` as well, effectively making the resource unlimited.
					if resources.capacities[idx] == 0 {
						map[idx] = 0;
					} else {
						map[idx] = units;
					}
Maciej Hirsz's avatar
Maciej Hirsz committed
283
284
285
286
287
288
289
290
291
				}

				callback.resources = MethodResources::Initialized(map);
			}
		}

		Ok(self)
	}

292
293
294
295
296
	/// Helper for obtaining a mut ref to the callbacks HashMap.
	fn mut_callbacks(&mut self) -> &mut FxHashMap<&'static str, MethodCallback> {
		Arc::make_mut(&mut self.callbacks)
	}

297
	/// Merge two [`Methods`]'s by adding all [`MethodCallback`]s from `other` into `self`.
298
	/// Fails if any of the methods in `other` is present already.
Maciej Hirsz's avatar
Maciej Hirsz committed
299
300
301
	pub fn merge(&mut self, other: impl Into<Methods>) -> Result<(), Error> {
		let mut other = other.into();

302
		for name in other.callbacks.keys() {
303
304
305
			self.verify_method_name(name)?;
		}

306
307
308
309
		let callbacks = self.mut_callbacks();

		for (name, callback) in other.mut_callbacks().drain() {
			callbacks.insert(name, callback);
310
311
312
313
314
		}

		Ok(())
	}

315
316
317
	/// Returns the method callback.
	pub fn method(&self, method_name: &str) -> Option<&MethodCallback> {
		self.callbacks.get(method_name)
318
319
	}

Maciej Hirsz's avatar
Maciej Hirsz committed
320
321
322
323
324
325
	/// Returns the method callback along with its name. The returned name is same as the
	/// `method_name`, but its lifetime bound is `'static`.
	pub fn method_with_name(&self, method_name: &str) -> Option<(&'static str, &MethodCallback)> {
		self.callbacks.get_key_value(method_name).map(|(k, v)| (*k, v))
	}

326
	/// Helper to call a method on the `RPC module` without having to spin up a server.
327
328
	///
	/// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation.
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
	///
	/// Returns the decoded value of the `result field` in JSON-RPC response if succesful.
	///
	/// # Examples
	///
	/// ```
	/// #[tokio::main]
	/// async fn main() {
	///     use jsonrpsee::RpcModule;
	///
	///     let mut module = RpcModule::new(());
	///     module.register_method("echo_call", |params, _| {
	///         params.one::<u64>().map_err(Into::into)
	///     }).unwrap();
	///
	///     let echo: u64 = module.call("echo_call", [1_u64]).await.unwrap();
	///     assert_eq!(echo, 1);
	/// }
	/// ```
	pub async fn call<Params: ToRpcParams, T: DeserializeOwned>(
		&self,
		method: &str,
		params: Params,
	) -> Result<T, Error> {
		let params = params.to_rpc_params()?;
		let req = Request::new(method.into(), Some(&params), Id::Number(0));
		tracing::trace!("[Methods::call] Calling method: {:?}, params: {:?}", method, params);
		let (resp, _, _) = self.inner_call(req).await;
357

358
359
360
361
362
363
364
365
366
367
368
		let res = match serde_json::from_str::<Response<T>>(&resp) {
			Ok(res) => Ok(res.result),
			Err(e) => {
				if let Ok(err) = serde_json::from_str::<ErrorResponse>(&resp) {
					Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned())))
				} else {
					Err(e.into())
				}
			}
		};
		res
369
370
	}

371
	/// Make a request (JSON-RPC method call or subscription) by using raw JSON.
372
	///
373
	/// Returns the raw JSON response to the call and a stream to receive notifications if the call was a subscription.
374
	///
375
	/// # Examples
376
377
378
379
	///
	/// ```
	/// #[tokio::main]
	/// async fn main() {
Maciej Hirsz's avatar
Maciej Hirsz committed
380
381
	///     use jsonrpsee::RpcModule;
	///     use jsonrpsee::types::Response;
382
383
384
	///     use futures_util::StreamExt;
	///
	///     let mut module = RpcModule::new(());
385
386
	///     module.register_subscription("hi", "hi", "goodbye", |_, pending, _| {
	///         pending.accept().unwrap().send(&"one answer").unwrap();
387
	///         Ok(())
388
	///     }).unwrap();
389
390
391
392
393
394
395
	///     let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#).await.unwrap();
	///     let resp = serde_json::from_str::<Response<u64>>(&resp).unwrap();
	///     let sub_resp = stream.next().await.unwrap();
	///     assert_eq!(
	///         format!(r#"{{"jsonrpc":"2.0","method":"hi","params":{{"subscription":{},"result":"one answer"}}}}"#, resp.result),
	///         sub_resp
	///     );
396
397
	/// }
	/// ```
398
399
400
401
402
	pub async fn raw_json_request(&self, call: &str) -> Result<(String, mpsc::UnboundedReceiver<String>), Error> {
		tracing::trace!("[Methods::raw_json_request] {:?}", call);
		let req: Request = serde_json::from_str(call)?;
		let (resp, rx, _) = self.inner_call(req).await;
		Ok((resp, rx))
403
404
	}

405
	/// Execute a callback.
406
	async fn inner_call(&self, req: Request<'_>) -> RawRpcResponse {
David's avatar
David committed
407
408
		let (tx_sink, mut rx_sink) = mpsc::unbounded();
		let sink = MethodSink::new(tx_sink);
409
410
		let id = req.id.clone();
		let params = Params::new(req.params.map(|params| params.get()));
411
412
413
		let bounded_subs = BoundedSubscriptions::new(u32::MAX);
		let close_notify = bounded_subs.acquire().expect("u32::MAX permits is sufficient; qed");
		let notify = bounded_subs.acquire().expect("u32::MAX permits is sufficient; qed");
414

415
		let result = match self.method(&req.method).map(|c| &c.callback) {
416
417
418
419
			None => sink.send_error(req.id, ErrorCode::MethodNotFound.into()),
			Some(MethodKind::Sync(cb)) => (cb)(id, params, &sink),
			Some(MethodKind::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), sink, 0, None).await,
			Some(MethodKind::Subscription(cb)) => {
David's avatar
David committed
420
				let conn_state = ConnState { conn_id: 0, close_notify, id_provider: &RandomIntegerIdProvider };
421
				(cb)(id, params, sink, conn_state, None)
422
			}
423
			Some(MethodKind::Unsubscription(cb)) => (cb)(id, params, &sink, 0),
424
		};
425

426
427
		tracing::trace!("[Methods::inner_call]: method: `{}` success: {}", req.method, result);

David's avatar
David committed
428
429
430
		let resp = rx_sink.next().await.expect("tx and rx still alive; qed");

		(resp, rx_sink, notify)
431
432
433
434
435
436
	}

	/// Helper to create a subscription on the `RPC module` without having to spin up a server.
	///
	/// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation.
	///
437
	/// Returns [`Subscription`] on success which can used to get results from the subscriptions.
438
439
440
441
442
443
444
445
446
	///
	/// # Examples
	///
	/// ```
	/// #[tokio::main]
	/// async fn main() {
	///     use jsonrpsee::{RpcModule, types::EmptyParams};
	///
	///     let mut module = RpcModule::new(());
447
448
	///     module.register_subscription("hi", "hi", "goodbye", |_, pending, _| {
	///         pending.accept().unwrap().send(&"one answer").unwrap();
449
	///         Ok(())
450
451
452
453
454
455
456
457
458
459
460
461
	///     }).unwrap();
	///
	///     let mut sub = module.subscribe("hi", EmptyParams::new()).await.unwrap();
	///     // In this case we ignore the subscription ID,
	///     let (sub_resp, _sub_id) = sub.next::<String>().await.unwrap().unwrap();
	///     assert_eq!(&sub_resp, "one answer");
	/// }
	/// ```
	pub async fn subscribe(&self, sub_method: &str, params: impl ToRpcParams) -> Result<Subscription, Error> {
		let params = params.to_rpc_params()?;
		let req = Request::new(sub_method.into(), Some(&params), Id::Number(0));
		tracing::trace!("[Methods::subscribe] Calling subscription method: {:?}, params: {:?}", sub_method, params);
David's avatar
David committed
462
		let (response, rx, close_notify) = self.inner_call(req).await;
463
464
465
466
467
468
469
470
		tracing::trace!("[Methods::subscribe] response {:?}", response);
		let subscription_response = match serde_json::from_str::<Response<RpcSubscriptionId>>(&response) {
			Ok(r) => r,
			Err(_) => match serde_json::from_str::<ErrorResponse>(&response) {
				Ok(err) => return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned()))),
				Err(err) => return Err(err.into()),
			},
		};
471
		let sub_id = subscription_response.result.into_owned();
David's avatar
David committed
472
473
		let close_notify = Some(close_notify);
		Ok(Subscription { sub_id, rx, close_notify })
474
475
	}

Maciej Hirsz's avatar
Maciej Hirsz committed
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
	/// Returns an `Iterator` with all the method names registered on this server.
	pub fn method_names(&self) -> impl Iterator<Item = &'static str> + '_ {
		self.callbacks.keys().copied()
	}
}

impl<Context> Deref for RpcModule<Context> {
	type Target = Methods;

	fn deref(&self) -> &Methods {
		&self.methods
	}
}

impl<Context> DerefMut for RpcModule<Context> {
	fn deref_mut(&mut self) -> &mut Methods {
		&mut self.methods
493
494
495
496
497
498
	}
}

/// Sets of JSON-RPC methods can be organized into a "module"s that are in turn registered on the server or,
/// alternatively, merged with other modules to construct a cohesive API. [`RpcModule`] wraps an additional context
/// argument that can be used to access data during call execution.
499
#[derive(Debug, Clone)]
500
501
pub struct RpcModule<Context> {
	ctx: Arc<Context>,
David's avatar
David committed
502
	methods: Methods,
503
504
}

505
impl<Context> RpcModule<Context> {
506
507
	/// Create a new module with a given shared `Context`.
	pub fn new(ctx: Context) -> Self {
508
		Self { ctx: Arc::new(ctx), methods: Default::default() }
509
	}
510
511
512
513
514
515
516

	/// Transform a module into an `RpcModule<()>` (unit context).
	pub fn remove_context(self) -> RpcModule<()> {
		let mut module = RpcModule::new(());
		module.methods = self.methods;
		module
	}
Maciej Hirsz's avatar
Maciej Hirsz committed
517
}
518

Maciej Hirsz's avatar
Maciej Hirsz committed
519
520
521
impl<Context> From<RpcModule<Context>> for Methods {
	fn from(module: RpcModule<Context>) -> Methods {
		module.methods
522
523
524
525
	}
}

impl<Context: Send + Sync + 'static> RpcModule<Context> {
David's avatar
David committed
526
	/// Register a new synchronous RPC method, which computes the response with the given callback.
Maciej Hirsz's avatar
Maciej Hirsz committed
527
528
529
530
531
	pub fn register_method<R, F>(
		&mut self,
		method_name: &'static str,
		callback: F,
	) -> Result<MethodResourcesBuilder, Error>
532
	where
David's avatar
David committed
533
		Context: Send + Sync + 'static,
534
		R: Serialize,
David's avatar
David committed
535
		F: Fn(Params, &Context) -> Result<R, Error> + Send + Sync + 'static,
536
	{
David's avatar
David committed
537
		let ctx = self.ctx.clone();
Maciej Hirsz's avatar
Maciej Hirsz committed
538
		let callback = self.methods.verify_and_insert(
539
			method_name,
540
			MethodCallback::new_sync(Arc::new(move |id, params, sink| match callback(params, &*ctx) {
Maciej Hirsz's avatar
Maciej Hirsz committed
541
542
				Ok(res) => sink.send_response(id, res),
				Err(err) => sink.send_call_error(id, err),
543
			})),
Maciej Hirsz's avatar
Maciej Hirsz committed
544
		)?;
545

Maciej Hirsz's avatar
Maciej Hirsz committed
546
		Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
547
548
	}

David's avatar
David committed
549
	/// Register a new asynchronous RPC method, which computes the response with the given callback.
Maciej Hirsz's avatar
Maciej Hirsz committed
550
551
552
553
554
	pub fn register_async_method<R, Fun, Fut>(
		&mut self,
		method_name: &'static str,
		callback: Fun,
	) -> Result<MethodResourcesBuilder, Error>
555
556
	where
		R: Serialize + Send + Sync + 'static,
Maciej Hirsz's avatar
Maciej Hirsz committed
557
558
		Fut: Future<Output = Result<R, Error>> + Send,
		Fun: (Fn(Params<'static>, Arc<Context>) -> Fut) + Copy + Send + Sync + 'static,
559
560
	{
		let ctx = self.ctx.clone();
Maciej Hirsz's avatar
Maciej Hirsz committed
561
		let callback = self.methods.verify_and_insert(
562
			method_name,
563
			MethodCallback::new_async(Arc::new(move |id, params, sink, _, claimed| {
564
565
				let ctx = ctx.clone();
				let future = async move {
Maciej Hirsz's avatar
Maciej Hirsz committed
566
567
568
					let result = match callback(params, ctx).await {
						Ok(res) => sink.send_response(id, res),
						Err(err) => sink.send_call_error(id, err),
569
					};
Maciej Hirsz's avatar
Maciej Hirsz committed
570
571
572

					// Release claimed resources
					drop(claimed);
Maciej Hirsz's avatar
Maciej Hirsz committed
573
574

					result
575
576
				};
				future.boxed()
577
			})),
Maciej Hirsz's avatar
Maciej Hirsz committed
578
		)?;
579

Maciej Hirsz's avatar
Maciej Hirsz committed
580
		Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
581
582
	}

583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
	/// Register a new **blocking** synchronous RPC method, which computes the response with the given callback.
	/// Unlike the regular [`register_method`](RpcModule::register_method), this method can block its thread and perform expensive computations.
	pub fn register_blocking_method<R, F>(
		&mut self,
		method_name: &'static str,
		callback: F,
	) -> Result<MethodResourcesBuilder, Error>
	where
		Context: Send + Sync + 'static,
		R: Serialize,
		F: Fn(Params, Arc<Context>) -> Result<R, Error> + Copy + Send + Sync + 'static,
	{
		let ctx = self.ctx.clone();
		let callback = self.methods.verify_and_insert(
			method_name,
598
			MethodCallback::new_async(Arc::new(move |id, params, sink, _, claimed| {
599
600
601
				let ctx = ctx.clone();

				tokio::task::spawn_blocking(move || {
Maciej Hirsz's avatar
Maciej Hirsz committed
602
603
604
					let result = match callback(params, ctx) {
						Ok(res) => sink.send_response(id, res),
						Err(err) => sink.send_call_error(id, err),
605
606
607
608
					};

					// Release claimed resources
					drop(claimed);
Maciej Hirsz's avatar
Maciej Hirsz committed
609
610

					result
611
				})
Maciej Hirsz's avatar
Maciej Hirsz committed
612
613
614
615
616
617
				.map(|result| match result {
					Ok(r) => r,
					Err(err) => {
						tracing::error!("Join error for blocking RPC method: {:?}", err);
						false
					}
618
619
620
621
622
623
624
625
				})
				.boxed()
			})),
		)?;

		Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
	}

626
627
628
629
630
631
632
633
	/// Register a new publish/subscribe interface using JSON-RPC notifications.
	///
	/// It implements the [ethereum pubsub specification](https://geth.ethereum.org/docs/rpc/pubsub)
	/// with an option to choose custom subscription ID generation.
	///
	/// Furthermore, it generates the `unsubscribe implementation` where a `bool` is used as
	/// the result to indicate whether the subscription was successfully unsubscribed to or not.
	/// For instance an `unsubscribe call` may fail if a non-existent subscriptionID is used in the call.
634
635
636
637
638
639
640
641
642
643
644
	///
	/// This method ensures that the `subscription_method_name` and `unsubscription_method_name` are unique.
	/// The `notif_method_name` argument sets the content of the `method` field in the JSON document that
	/// the server sends back to the client. The uniqueness of this value is not machine checked and it's up to
	/// the user to ensure it is not used in any other [`RpcModule`] used in the server.
	///
	/// # Arguments
	///
	/// * `subscription_method_name` - name of the method to call to initiate a subscription
	/// * `notif_method_name` - name of method to be used in the subscription payload (technically a JSON-RPC notification)
	/// * `unsubscription_method` - name of the method to call to terminate a subscription
645
	/// * `callback` - A callback to invoke on each subscription; it takes three parameters:
646
	///     - [`Params`]: JSON-RPC parameters in the subscription call.
David's avatar
David committed
647
648
	///     - [`SubscriptionSink`]: A sink to send messages to the subscriber.
	///     - Context: Any type that can be embedded into the [`RpcModule`].
649
650
651
652
653
	///
	/// # Examples
	///
	/// ```no_run
	///
654
655
	/// use jsonrpsee_core::server::rpc_module::{RpcModule, SubscriptionSink};
	/// use jsonrpsee_core::Error;
656
	///
David's avatar
David committed
657
	/// let mut ctx = RpcModule::new(99_usize);
658
659
660
661
662
663
	/// ctx.register_subscription("sub", "notif_name", "unsub", |params, pending, ctx| {
	///     let x = match params.one::<usize>() {
	///         Ok(x) => x,
	///         Err(e) => {
	///             let err: Error = e.into();
	///             pending.reject(err);
Alexandru Vasile's avatar
Alexandru Vasile committed
664
	///             return Ok(());
665
666
667
	///         }
	///     };
	///
668
	///     let mut sink = pending.accept()?;
669
	///
670
	///     std::thread::spawn(move || {
David's avatar
David committed
671
	///         let sum = x + (*ctx);
672
	///         let _ = sink.send(&sum);
673
	///     });
674
	///
675
	///     Ok(())
676
677
678
	/// });
	/// ```
	pub fn register_subscription<F>(
679
680
		&mut self,
		subscribe_method_name: &'static str,
681
		notif_method_name: &'static str,
682
		unsubscribe_method_name: &'static str,
683
		callback: F,
684
	) -> Result<MethodResourcesBuilder, Error>
685
	where
David's avatar
David committed
686
		Context: Send + Sync + 'static,
687
		F: Fn(Params, PendingSubscription, Arc<Context>) -> SubscriptionResult + Send + Sync + 'static,
688
	{
689
690
691
692
		if subscribe_method_name == unsubscribe_method_name {
			return Err(Error::SubscriptionNameConflict(subscribe_method_name.into()));
		}

693
694
		self.methods.verify_method_name(subscribe_method_name)?;
		self.methods.verify_method_name(unsubscribe_method_name)?;
695

David's avatar
David committed
696
		let ctx = self.ctx.clone();
697
698
		let subscribers = Subscribers::default();

David's avatar
David committed
699
		// Unsubscribe
700
		{
701
			let subscribers = subscribers.clone();
702
			self.methods.mut_callbacks().insert(
703
				unsubscribe_method_name,
704
				MethodCallback::new_unsubscription(Arc::new(move |id, params, sink, conn_id| {
705
					let sub_id = match params.one::<RpcSubscriptionId>() {
706
707
						Ok(sub_id) => sub_id,
						Err(_) => {
708
							tracing::warn!(
709
								"unsubscribe call '{}' failed: couldn't parse subscription id={:?} request id={:?}",
710
								unsubscribe_method_name,
711
								params,
712
713
								id
							);
714
							return sink.send_response(id, false);
715
716
						}
					};
717
718
719
					let key = SubscriptionKey { conn_id, sub_id: sub_id.into_owned() };

					let result = subscribers.lock().remove(&key).is_some();
720

721
722
723
724
725
726
727
					if !result {
						tracing::warn!(
							"unsubscribe call `{}` subscription key={:?} not an active subscription",
							unsubscribe_method_name,
							key,
						);
					}
728

729
730
					// if both the message was successful and the subscription was removed.
					sink.send_response(id, result) && result
731
				})),
732
733
734
			);
		}

735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
		// Subscribe
		let callback = {
			self.methods.verify_and_insert(
				subscribe_method_name,
				MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn, claimed| {
					let sub_id: RpcSubscriptionId = conn.id_provider.next_id();

					let sink = PendingSubscription(Some(InnerPendingSubscription {
						sink: method_sink.clone(),
						close_notify: Some(conn.close_notify),
						method: notif_method_name,
						subscribers: subscribers.clone(),
						uniq_sub: SubscriptionKey { conn_id: conn.conn_id, sub_id },
						id: id.clone().into_owned(),
						claimed,
					}));

Alexandru Vasile's avatar
Alexandru Vasile committed
752
					// The callback returns an empty `SubscriptionError` for improved API ergonomics.
753
754
755
					if let Err(err) = callback(params, sink, ctx.clone()) {
						tracing::warn!("subscribe call `{}` failed with err={:?}", subscribe_method_name, err);
					}
756
757
758
759
760
761
762

					true
				})),
			)?
		};

		Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
763
764
	}

David's avatar
David committed
765
	/// Register an alias for an existing_method. Alias uniqueness is enforced.
766
767
	pub fn register_alias(&mut self, alias: &'static str, existing_method: &'static str) -> Result<(), Error> {
		self.methods.verify_method_name(alias)?;
768

769
770
771
772
773
774
		let callback = match self.methods.callbacks.get(existing_method) {
			Some(callback) => callback.clone(),
			None => return Err(Error::MethodNotFound(existing_method.into())),
		};

		self.methods.mut_callbacks().insert(alias, callback);
775
776
777
778
779

		Ok(())
	}
}

780
781
782
783
784
785
786
787
788
/// Represent a pending subscription which waits to be accepted or rejected.
///
/// Note: you need to call either `PendingSubscription::accept` or `PendingSubscription::reject` otherwise
/// the subscription will be dropped with an `InvalidParams` error.
#[derive(Debug)]
struct InnerPendingSubscription {
	/// Sink.
	sink: MethodSink,
	/// Get notified when subscribers leave so we can exit
789
	close_notify: Option<SubscriptionPermit>,
790
791
792
793
794
795
796
797
	/// MethodCallback.
	method: &'static str,
	/// Unique subscription.
	uniq_sub: SubscriptionKey,
	/// Shared Mutex of subscriptions
	subscribers: Subscribers,
	/// Request ID.
	id: Id<'static>,
798
799
	/// Claimed resources.
	claimed: Option<ResourceGuard>,
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
}

/// Represent a pending subscription which waits until it's either accepted or rejected.
///
/// This type implements `Drop` for ease of use, e.g. when dropped in error short circuiting via `map_err()?`.
#[derive(Debug)]
pub struct PendingSubscription(Option<InnerPendingSubscription>);

impl PendingSubscription {
	/// Reject the subscription call from [`ErrorObject`].
	pub fn reject(mut self, err: impl Into<ErrorObjectOwned>) -> bool {
		if let Some(inner) = self.0.take() {
			let InnerPendingSubscription { sink, id, .. } = inner;
			sink.send_error(id, err.into())
		} else {
			false
		}
	}

	/// Attempt to accept the subscription and respond the subscription method call.
	///
	/// Fails if the connection was closed
Alexandru Vasile's avatar
Alexandru Vasile committed
822
823
	pub fn accept(mut self) -> Result<SubscriptionSink, SubscriptionEmptyError> {
		let inner = self.0.take().ok_or(SubscriptionEmptyError)?;
824

825
		let InnerPendingSubscription { sink, close_notify, method, uniq_sub, subscribers, id, claimed } = inner;
826
827

		if sink.send_response(id, &uniq_sub.sub_id) {
828
829
			let (tx, rx) = watch::channel(());
			subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), tx));
Alexandru Vasile's avatar
Alexandru Vasile committed
830
			Ok(SubscriptionSink { inner: sink, close_notify, method, uniq_sub, subscribers, unsubscribe: rx, _claimed: claimed })
831
		} else {
Alexandru Vasile's avatar
Alexandru Vasile committed
832
			Err(SubscriptionEmptyError)
833
834
		}
	}
835
836
837
838
839
840
841
842
843
844
845
846
847
848

	/// Accepts the subscription connection and wraps the [`SubscriptionSink::pipe_from_try_stream`] for
	/// better ergonomics.
	///
	/// Returns `(Ok(sink), SubscriptionClosed)` if the connection was accepted successfully. The returned
	/// sink can be used to send error notifications back.
	///
	/// Returns `(None, SubscriptionClosed::RemotePeerAborted)` if the connection was not accepted, or the
	/// client disconnected while piping the stream.
	///
	/// # Examples
	///
	/// ```no_run
	///
849
	/// use jsonrpsee_core::server::rpc_module::RpcModule;
850
851
852
853
854
855
856
857
858
859
860
861
	/// use jsonrpsee_core::error::{Error, SubscriptionClosed};
	/// use jsonrpsee_types::ErrorObjectOwned;
	/// use anyhow::anyhow;
	///
	/// let mut m = RpcModule::new(());
	/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
	///     let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
	///     // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
	///     // because after the `Err(_)` the stream is terminated.
	///     tokio::spawn(async move {
	///         // jsonrpsee doesn't send an error notification unless `close` is explicitly called.
	///         // If we pipe messages to the sink, we can inspect why it ended:
862
	///         pending
863
	///             .pipe_from_try_stream(stream)
864
865
866
867
868
869
870
871
	///             .await
	///             .on_success(|sink| {
	///                 let err_obj: ErrorObjectOwned = SubscriptionClosed::Success.into();
	///                 sink.close(err_obj);
	///             })
	///             .on_failure(|sink, err| {
	///                 sink.close(err);
	///             })
872
873
874
875
	///     });
	///     Ok(())
	/// });
	/// ```
876
	pub async fn pipe_from_try_stream<S, T, E>(self, stream: S) -> PipeFromStreamResult
877
878
879
880
881
882
883
		where
			S: TryStream<Ok = T, Error = E> + Unpin,
			T: Serialize,
			E: std::fmt::Display,
	{
		if let Ok(mut sink) = self.accept() {
			let result = sink.pipe_from_try_stream(stream).await;
884
			match result {
885
886
				SubscriptionClosed::Success => PipeFromStreamResult::Success(Some(sink)),
				SubscriptionClosed::Failed(error) => PipeFromStreamResult::Failure(Some((sink, error))),
887
888
				SubscriptionClosed::RemotePeerAborted => PipeFromStreamResult::RemotePeerAborted,
			}
889
		} else {
890
			PipeFromStreamResult::RemotePeerAborted
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
		}
	}

	/// Similar to [`PendingSubscription::pipe_from_try_stream`] but it doesn't require the stream return `Result`.
	///
	/// # Examples
	///
	/// ```no_run
	///
	/// use jsonrpsee_core::server::rpc_module::RpcModule;
	///
	/// let mut m = RpcModule::new(());
	/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
	///     let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
	///     tokio::spawn(async move { pending.pipe_from_stream(stream).await; });
	///     Ok(())
	/// });
	/// ```
909
	pub async fn pipe_from_stream<S, T>(self, stream: S) -> PipeFromStreamResult
910
911
912
913
914
915
		where
			S: Stream<Item = T> + Unpin,
			T: Serialize,
	{
		self.pipe_from_try_stream::<_, _, Error>(stream.map(|item| Ok(item))).await
	}
916
917
918
919
920
921
922
923
924
925
926
927
}

// When dropped it returns an [`InvalidParams`] error to the subscriber
impl Drop for PendingSubscription {
	fn drop(&mut self) {
		if let Some(inner) = self.0.take() {
			let InnerPendingSubscription { sink, id, .. } = inner;
			sink.send_error(id, ErrorCode::InvalidParams.into());
		}
	}
}

928
929
930
931
932
/// The result obtain from calling [`PendingSubscription::pipe_from_try_stream`] that
/// can be utilized to execute specific operations depending on the result.
#[derive(Debug)]
pub enum PipeFromStreamResult {
	/// The connection was accepted and the pipe returned [`SubscriptionClosed::Success`].
933
	Success(Option<SubscriptionSink>),
934
935
	/// The connection was accepted and the pipe returned [`SubscriptionClosed::Failed`]
	/// with the provided error.
936
	Failure(Option<(SubscriptionSink, ErrorObjectOwned)>),
937
938
939
940
941
942
943
944
945
946
947
948
949
950
	/// The remote peer closed the connection or called the unsubscribe method.
	RemotePeerAborted,
}

impl PipeFromStreamResult {
	/// Callback that will run the provided function if the result is [`PipeFromStreamResult::Success`].
	/// After the function runs a new [`PipeFromStreamResult::RemotePeerAborted`] is returned.
	///
	/// Otherwise, it leaves the object untouched.
	pub fn on_success<F>(self, func: F) -> PipeFromStreamResult
		where
			F: FnOnce(SubscriptionSink) -> (),
	{
		match self {
951
			PipeFromStreamResult::Success(Some(sink)) => {
952
				func(sink);
953
				PipeFromStreamResult::Success(None)
954
955
956
957
958
959
960
961
962
963
964
965
966
967
			}
			_ => self
		}
	}

	/// Callback that will run the provided function if the result is [`PipeFromStreamResult::Failure`].
	/// After the function runs a new [`PipeFromStreamResult::RemotePeerAborted`] is returned.
	///
	/// Otherwise, it leaves the object untouched.
	pub fn on_failure<F>(self, func: F) -> PipeFromStreamResult
		where
			F: FnOnce(SubscriptionSink, ErrorObjectOwned) -> (),
	{
		match self {
968
			PipeFromStreamResult::Failure(Some((sink, error))) => {
969
				func(sink, error);
970
				PipeFromStreamResult::Failure(None)
971
972
973
974
975
976
			}
			_ => self
		}
	}
}

977
/// Represents a single subscription.
978
#[derive(Debug)]
979
pub struct SubscriptionSink {
980
	/// Sink.
Maciej Hirsz's avatar
Maciej Hirsz committed
981
	inner: MethodSink,
David's avatar
David committed
982
	/// Get notified when subscribers leave so we can exit
983
	close_notify: Option<SubscriptionPermit>,
Maciej Hirsz's avatar
Maciej Hirsz committed
984
	/// MethodCallback.
985
	method: &'static str,
986
987
988
989
	/// Unique subscription.
	uniq_sub: SubscriptionKey,
	/// Shared Mutex of subscriptions for this method.
	subscribers: Subscribers,
990
991
	/// Future that returns when the unsubscribe method has been called.
	unsubscribe: watch::Receiver<()>,
992
993
	/// Claimed resources.
	_claimed: Option<ResourceGuard>,
994
995
996
}

impl SubscriptionSink {
David's avatar
David committed
997
	/// Send a message back to subscribers.
998
999
1000
	///
	/// Returns `Ok(true)` if the message could be send
	/// Returns `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated)
For faster browsing, not all history is shown. View entire blame