rpc_module.rs 40.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

Maciej Hirsz's avatar
Maciej Hirsz committed
27
28
29
30
31
32
use std::collections::hash_map::Entry;
use std::fmt::{self, Debug};
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

33
use crate::error::{Error, SubscriptionClosed};
34
use crate::id_providers::RandomIntegerIdProvider;
35
use crate::server::helpers::{BoundedSubscriptions, MethodSink, SubscriptionPermit};
Maciej Hirsz's avatar
Maciej Hirsz committed
36
use crate::server::resource_limiting::{ResourceGuard, ResourceTable, ResourceVec, Resources};
37
use crate::traits::{IdProvider, ToRpcParams};
38
use futures_channel::{mpsc, oneshot};
39
use futures_util::future::Either;
David's avatar
David committed
40
use futures_util::pin_mut;
41
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStream, TryStreamExt};
42
use jsonrpsee_types::error::{
43
44
	CallError, ErrorCode, ErrorObject, ErrorObjectOwned, SubscriptionAcceptRejectError, INTERNAL_ERROR_CODE,
	SUBSCRIPTION_CLOSED_WITH_ERROR,
45
};
46
use jsonrpsee_types::response::{SubscriptionError, SubscriptionPayloadError};
David's avatar
David committed
47
use jsonrpsee_types::{
48
49
	ErrorResponse, Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload,
	SubscriptionResponse, SubscriptionResult,
50
};
51
52
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
Maciej Hirsz's avatar
Maciej Hirsz committed
53
use serde::{de::DeserializeOwned, Serialize};
54
use tokio::sync::watch;
55

56
57
use super::helpers::MethodResponse;

Maciej Hirsz's avatar
Maciej Hirsz committed
58
/// A `MethodCallback` is an RPC endpoint, callable with a standard JSON-RPC request,
59
60
61
/// implemented as a function pointer to a `Fn` function taking four arguments:
/// the `id`, `params`, a channel the function uses to communicate the result (or error)
/// back to `jsonrpsee`, and the connection ID (useful for the websocket transport).
62
pub type SyncMethod = Arc<dyn Send + Sync + Fn(Id, Params, MaxResponseSize) -> MethodResponse>;
Maciej Hirsz's avatar
Maciej Hirsz committed
63
/// Similar to [`SyncMethod`], but represents an asynchronous handler and takes an additional argument containing a [`ResourceGuard`] if configured.
64
pub type AsyncMethod<'a> = Arc<
65
66
67
	dyn Send
		+ Sync
		+ Fn(Id<'a>, Params<'a>, ConnectionId, MaxResponseSize, Option<ResourceGuard>) -> BoxFuture<'a, MethodResponse>,
68
>;
69
/// Method callback for subscriptions.
70
71
72
pub type SubscriptionMethod<'a> = Arc<
	dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState, Option<ResourceGuard>) -> BoxFuture<'a, MethodResponse>,
>;
73
// Method callback to unsubscribe.
74
type UnsubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, ConnectionId, MaxResponseSize) -> MethodResponse>;
75

76
77
78
/// Connection ID, used for stateful protocol such as WebSockets.
/// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value.
pub type ConnectionId = usize;
79

80
81
82
/// Max response size.
pub type MaxResponseSize = usize;

David's avatar
David committed
83
84
85
86
/// Raw response from an RPC
/// A 3-tuple containing:
///   - Call result as a `String`,
///   - a [`mpsc::UnboundedReceiver<String>`] to receive future subscription results
87
///   - a [`crate::server::helpers::SubscriptionPermit`] to allow subscribers to notify their [`SubscriptionSink`] when they disconnect.
88
pub type RawRpcResponse = (MethodResponse, mpsc::UnboundedReceiver<String>, SubscriptionPermit);
David's avatar
David committed
89
90

/// Helper struct to manage subscriptions.
91
92
93
pub struct ConnState<'a> {
	/// Connection ID
	pub conn_id: ConnectionId,
David's avatar
David committed
94
	/// Get notified when the connection to subscribers is closed.
95
	pub close_notify: SubscriptionPermit,
96
97
98
99
	/// ID provider.
	pub id_provider: &'a dyn IdProvider,
}

100
101
/// Outcome of a successful terminated subscription.
#[derive(Debug)]
102
pub enum InnerSubscriptionResult {
103
104
105
106
107
108
	/// The subscription stream was executed successfully.
	Success,
	/// The subscription was aborted by the remote peer.
	Aborted,
}

109
110
impl<'a> std::fmt::Debug for ConnState<'a> {
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
David's avatar
David committed
111
		f.debug_struct("ConnState").field("conn_id", &self.conn_id).field("close", &self.close_notify).finish()
112
113
	}
}
114

115
type Subscribers = Arc<Mutex<FxHashMap<SubscriptionKey, (MethodSink, watch::Sender<()>)>>>;
116

117
118
/// Represent a unique subscription entry based on [`RpcSubscriptionId`] and [`ConnectionId`].
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
119
120
struct SubscriptionKey {
	conn_id: ConnectionId,
121
	sub_id: RpcSubscriptionId<'static>,
122
}
123

124
/// Callback wrapper that can be either sync or async.
125
#[derive(Clone)]
126
pub enum MethodKind {
127
	/// Synchronous method handler.
128
	Sync(SyncMethod),
129
	/// Asynchronous method handler.
130
	Async(AsyncMethod<'static>),
131
	/// Subscription method handler.
132
	Subscription(SubscriptionMethod<'static>),
133
134
	/// Unsubscription method handler.
	Unsubscription(UnsubscriptionMethod),
135
136
}

Maciej Hirsz's avatar
Maciej Hirsz committed
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/// Information about resources the method uses during its execution. Initialized when the the server starts.
#[derive(Clone, Debug)]
enum MethodResources {
	/// Uninitialized resource table, mapping string label to units.
	Uninitialized(Box<[(&'static str, u16)]>),
	/// Initialized resource table containing units for each `ResourceId`.
	Initialized(ResourceTable),
}

/// Method callback wrapper that contains a sync or async closure,
/// plus a table with resources it needs to claim to run
#[derive(Clone, Debug)]
pub struct MethodCallback {
	callback: MethodKind,
	resources: MethodResources,
}

Maciej Hirsz's avatar
Maciej Hirsz committed
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
/// Result of a method, either direct value or a future of one.
pub enum MethodResult<T> {
	/// Result by value
	Sync(T),
	/// Future of a value
	Async(BoxFuture<'static, T>),
}

impl<T: Debug> Debug for MethodResult<T> {
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
		match self {
			MethodResult::Sync(result) => result.fmt(f),
			MethodResult::Async(_) => f.write_str("<future>"),
		}
	}
}

Maciej Hirsz's avatar
Maciej Hirsz committed
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
/// Builder for configuring resources used by a method.
#[derive(Debug)]
pub struct MethodResourcesBuilder<'a> {
	build: ResourceVec<(&'static str, u16)>,
	callback: &'a mut MethodCallback,
}

impl<'a> MethodResourcesBuilder<'a> {
	/// Define how many units of a given named resource the method uses during its execution.
	pub fn resource(mut self, label: &'static str, units: u16) -> Result<Self, Error> {
		self.build.try_push((label, units)).map_err(|_| Error::MaxResourcesReached)?;
		Ok(self)
	}
}

impl<'a> Drop for MethodResourcesBuilder<'a> {
	fn drop(&mut self) {
		self.callback.resources = MethodResources::Uninitialized(self.build[..].into());
	}
}

192
impl MethodCallback {
Maciej Hirsz's avatar
Maciej Hirsz committed
193
194
195
196
197
198
199
200
	fn new_sync(callback: SyncMethod) -> Self {
		MethodCallback { callback: MethodKind::Sync(callback), resources: MethodResources::Uninitialized([].into()) }
	}

	fn new_async(callback: AsyncMethod<'static>) -> Self {
		MethodCallback { callback: MethodKind::Async(callback), resources: MethodResources::Uninitialized([].into()) }
	}

201
	fn new_subscription(callback: SubscriptionMethod<'static>) -> Self {
202
203
204
205
206
207
		MethodCallback {
			callback: MethodKind::Subscription(callback),
			resources: MethodResources::Uninitialized([].into()),
		}
	}

208
209
210
211
212
213
214
	fn new_unsubscription(callback: UnsubscriptionMethod) -> Self {
		MethodCallback {
			callback: MethodKind::Unsubscription(callback),
			resources: MethodResources::Uninitialized([].into()),
		}
	}

Maciej Hirsz's avatar
Maciej Hirsz committed
215
216
217
218
219
220
221
222
223
	/// Attempt to claim resources prior to executing a method. On success returns a guard that releases
	/// claimed resources when dropped.
	pub fn claim(&self, name: &str, resources: &Resources) -> Result<ResourceGuard, Error> {
		match self.resources {
			MethodResources::Uninitialized(_) => Err(Error::UninitializedMethod(name.into())),
			MethodResources::Initialized(units) => resources.claim(units),
		}
	}

224
225
226
	/// Get handle to the callback.
	pub fn inner(&self) -> &MethodKind {
		&self.callback
227
	}
228
229
}

Maciej Hirsz's avatar
Maciej Hirsz committed
230
impl Debug for MethodKind {
231
232
233
234
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		match self {
			Self::Async(_) => write!(f, "Async"),
			Self::Sync(_) => write!(f, "Sync"),
235
			Self::Subscription(_) => write!(f, "Subscription"),
236
			Self::Unsubscription(_) => write!(f, "Unsubscription"),
237
238
239
240
		}
	}
}

241
242
/// Reference-counted, clone-on-write collection of synchronous and asynchronous methods.
#[derive(Default, Debug, Clone)]
David's avatar
David committed
243
pub struct Methods {
244
	callbacks: Arc<FxHashMap<&'static str, MethodCallback>>,
245
246
}

David's avatar
David committed
247
248
impl Methods {
	/// Creates a new empty [`Methods`].
249
250
	pub fn new() -> Self {
		Self::default()
251
252
	}

Maciej Hirsz's avatar
Maciej Hirsz committed
253
	fn verify_method_name(&mut self, name: &'static str) -> Result<(), Error> {
254
		if self.callbacks.contains_key(name) {
255
256
257
258
259
260
			return Err(Error::MethodAlreadyRegistered(name.into()));
		}

		Ok(())
	}

Maciej Hirsz's avatar
Maciej Hirsz committed
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
	/// Inserts the method callback for a given name, or returns an error if the name was already taken.
	/// On success it returns a mut reference to the [`MethodCallback`] just inserted.
	fn verify_and_insert(
		&mut self,
		name: &'static str,
		callback: MethodCallback,
	) -> Result<&mut MethodCallback, Error> {
		match self.mut_callbacks().entry(name) {
			Entry::Occupied(_) => Err(Error::MethodAlreadyRegistered(name.into())),
			Entry::Vacant(vacant) => Ok(vacant.insert(callback)),
		}
	}

	/// Initialize resources for all methods in this collection. This method has no effect if called more than once.
	pub fn initialize_resources(mut self, resources: &Resources) -> Result<Self, Error> {
		let callbacks = self.mut_callbacks();

		for (&method_name, callback) in callbacks.iter_mut() {
			if let MethodResources::Uninitialized(uninit) = &callback.resources {
				let mut map = resources.defaults;

				for &(label, units) in uninit.iter() {
					let idx = match resources.labels.iter().position(|&l| l == label) {
						Some(idx) => idx,
						None => return Err(Error::ResourceNameNotFoundForMethod(label, method_name)),
					};

288
289
290
291
292
293
294
					// If resource capacity set to `0`, we ignore the unit value of the method
					// and set it to `0` as well, effectively making the resource unlimited.
					if resources.capacities[idx] == 0 {
						map[idx] = 0;
					} else {
						map[idx] = units;
					}
Maciej Hirsz's avatar
Maciej Hirsz committed
295
296
297
298
299
300
301
302
303
				}

				callback.resources = MethodResources::Initialized(map);
			}
		}

		Ok(self)
	}

304
305
306
307
308
	/// Helper for obtaining a mut ref to the callbacks HashMap.
	fn mut_callbacks(&mut self) -> &mut FxHashMap<&'static str, MethodCallback> {
		Arc::make_mut(&mut self.callbacks)
	}

309
	/// Merge two [`Methods`]'s by adding all [`MethodCallback`]s from `other` into `self`.
310
	/// Fails if any of the methods in `other` is present already.
Maciej Hirsz's avatar
Maciej Hirsz committed
311
312
313
	pub fn merge(&mut self, other: impl Into<Methods>) -> Result<(), Error> {
		let mut other = other.into();

314
		for name in other.callbacks.keys() {
315
316
317
			self.verify_method_name(name)?;
		}

318
319
320
321
		let callbacks = self.mut_callbacks();

		for (name, callback) in other.mut_callbacks().drain() {
			callbacks.insert(name, callback);
322
323
324
325
326
		}

		Ok(())
	}

327
328
329
	/// Returns the method callback.
	pub fn method(&self, method_name: &str) -> Option<&MethodCallback> {
		self.callbacks.get(method_name)
330
331
	}

Maciej Hirsz's avatar
Maciej Hirsz committed
332
333
334
335
336
337
	/// Returns the method callback along with its name. The returned name is same as the
	/// `method_name`, but its lifetime bound is `'static`.
	pub fn method_with_name(&self, method_name: &str) -> Option<(&'static str, &MethodCallback)> {
		self.callbacks.get_key_value(method_name).map(|(k, v)| (*k, v))
	}

338
	/// Helper to call a method on the `RPC module` without having to spin up a server.
339
340
	///
	/// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation.
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
	///
	/// Returns the decoded value of the `result field` in JSON-RPC response if succesful.
	///
	/// # Examples
	///
	/// ```
	/// #[tokio::main]
	/// async fn main() {
	///     use jsonrpsee::RpcModule;
	///
	///     let mut module = RpcModule::new(());
	///     module.register_method("echo_call", |params, _| {
	///         params.one::<u64>().map_err(Into::into)
	///     }).unwrap();
	///
	///     let echo: u64 = module.call("echo_call", [1_u64]).await.unwrap();
	///     assert_eq!(echo, 1);
	/// }
	/// ```
	pub async fn call<Params: ToRpcParams, T: DeserializeOwned>(
		&self,
		method: &str,
		params: Params,
	) -> Result<T, Error> {
		let params = params.to_rpc_params()?;
		let req = Request::new(method.into(), Some(&params), Id::Number(0));
		tracing::trace!("[Methods::call] Calling method: {:?}, params: {:?}", method, params);
		let (resp, _, _) = self.inner_call(req).await;
369

370
371
372
373
374
375
		if resp.success {
			serde_json::from_str::<Response<T>>(&resp.result).map(|r| r.result).map_err(Into::into)
		} else {
			match serde_json::from_str::<ErrorResponse>(&resp.result) {
				Ok(err) => Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned()))),
				Err(e) => Err(e.into()),
376
			}
377
		}
378
379
	}

380
	/// Make a request (JSON-RPC method call or subscription) by using raw JSON.
381
	///
382
	/// Returns the raw JSON response to the call and a stream to receive notifications if the call was a subscription.
383
	///
384
	/// # Examples
385
386
387
388
	///
	/// ```
	/// #[tokio::main]
	/// async fn main() {
Maciej Hirsz's avatar
Maciej Hirsz committed
389
390
	///     use jsonrpsee::RpcModule;
	///     use jsonrpsee::types::Response;
391
392
393
	///     use futures_util::StreamExt;
	///
	///     let mut module = RpcModule::new(());
394
395
396
	///     module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| {
	///         sink.send(&"one answer").unwrap();
	///         Ok(())
397
	///     }).unwrap();
398
	///     let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#).await.unwrap();
399
	///     let resp = serde_json::from_str::<Response<u64>>(&resp.result).unwrap();
400
401
402
403
404
	///     let sub_resp = stream.next().await.unwrap();
	///     assert_eq!(
	///         format!(r#"{{"jsonrpc":"2.0","method":"hi","params":{{"subscription":{},"result":"one answer"}}}}"#, resp.result),
	///         sub_resp
	///     );
405
406
	/// }
	/// ```
407
408
409
410
	pub async fn raw_json_request(
		&self,
		call: &str,
	) -> Result<(MethodResponse, mpsc::UnboundedReceiver<String>), Error> {
411
412
413
414
		tracing::trace!("[Methods::raw_json_request] {:?}", call);
		let req: Request = serde_json::from_str(call)?;
		let (resp, rx, _) = self.inner_call(req).await;
		Ok((resp, rx))
415
416
	}

417
	/// Execute a callback.
418
	async fn inner_call(&self, req: Request<'_>) -> RawRpcResponse {
David's avatar
David committed
419
420
		let (tx_sink, mut rx_sink) = mpsc::unbounded();
		let sink = MethodSink::new(tx_sink);
421
422
		let id = req.id.clone();
		let params = Params::new(req.params.map(|params| params.get()));
423
424
425
		let bounded_subs = BoundedSubscriptions::new(u32::MAX);
		let close_notify = bounded_subs.acquire().expect("u32::MAX permits is sufficient; qed");
		let notify = bounded_subs.acquire().expect("u32::MAX permits is sufficient; qed");
426

427
		let result = match self.method(&req.method).map(|c| &c.callback) {
428
429
430
			None => MethodResponse::error(req.id, ErrorObject::from(ErrorCode::MethodNotFound)),
			Some(MethodKind::Sync(cb)) => (cb)(id, params, usize::MAX),
			Some(MethodKind::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), 0, usize::MAX, None).await,
431
			Some(MethodKind::Subscription(cb)) => {
David's avatar
David committed
432
				let conn_state = ConnState { conn_id: 0, close_notify, id_provider: &RandomIntegerIdProvider };
433
434
435
436
437
438
439
440
441
				let res = (cb)(id, params, sink.clone(), conn_state, None).await;

				// This message is not used because it's used for middleware so we discard in other to
				// not read once this is used for subscriptions.
				//
				// The same information is part of `res` above.
				let _ = rx_sink.next().await.expect("Every call must at least produce one reponse; qed");

				res
442
			}
443
			Some(MethodKind::Unsubscription(cb)) => (cb)(id, params, 0, usize::MAX),
444
		};
445

446
		tracing::trace!("[Methods::inner_call]: method: `{}` result: {:?}", req.method, result);
447

448
		(result, rx_sink, notify)
449
450
451
452
453
454
	}

	/// Helper to create a subscription on the `RPC module` without having to spin up a server.
	///
	/// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation.
	///
455
	/// Returns [`Subscription`] on success which can used to get results from the subscriptions.
456
457
458
459
460
461
462
463
464
	///
	/// # Examples
	///
	/// ```
	/// #[tokio::main]
	/// async fn main() {
	///     use jsonrpsee::{RpcModule, types::EmptyParams};
	///
	///     let mut module = RpcModule::new(());
465
466
467
	///     module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| {
	///         sink.send(&"one answer").unwrap();
	///         Ok(())
468
469
470
471
472
473
474
475
476
477
478
	///     }).unwrap();
	///
	///     let mut sub = module.subscribe("hi", EmptyParams::new()).await.unwrap();
	///     // In this case we ignore the subscription ID,
	///     let (sub_resp, _sub_id) = sub.next::<String>().await.unwrap().unwrap();
	///     assert_eq!(&sub_resp, "one answer");
	/// }
	/// ```
	pub async fn subscribe(&self, sub_method: &str, params: impl ToRpcParams) -> Result<Subscription, Error> {
		let params = params.to_rpc_params()?;
		let req = Request::new(sub_method.into(), Some(&params), Id::Number(0));
479

480
		tracing::trace!("[Methods::subscribe] Calling subscription method: {:?}, params: {:?}", sub_method, params);
481

David's avatar
David committed
482
		let (response, rx, close_notify) = self.inner_call(req).await;
483

484
		tracing::trace!("[Methods::subscribe] response {:?}", response);
485
486

		let subscription_response = match serde_json::from_str::<Response<RpcSubscriptionId>>(&response.result) {
487
			Ok(r) => r,
488
			Err(_) => match serde_json::from_str::<ErrorResponse>(&response.result) {
489
490
491
492
				Ok(err) => return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned()))),
				Err(err) => return Err(err.into()),
			},
		};
493

494
		let sub_id = subscription_response.result.into_owned();
David's avatar
David committed
495
		let close_notify = Some(close_notify);
496

David's avatar
David committed
497
		Ok(Subscription { sub_id, rx, close_notify })
498
499
	}

Maciej Hirsz's avatar
Maciej Hirsz committed
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
	/// Returns an `Iterator` with all the method names registered on this server.
	pub fn method_names(&self) -> impl Iterator<Item = &'static str> + '_ {
		self.callbacks.keys().copied()
	}
}

impl<Context> Deref for RpcModule<Context> {
	type Target = Methods;

	fn deref(&self) -> &Methods {
		&self.methods
	}
}

impl<Context> DerefMut for RpcModule<Context> {
	fn deref_mut(&mut self) -> &mut Methods {
		&mut self.methods
517
518
519
520
521
522
	}
}

/// Sets of JSON-RPC methods can be organized into a "module"s that are in turn registered on the server or,
/// alternatively, merged with other modules to construct a cohesive API. [`RpcModule`] wraps an additional context
/// argument that can be used to access data during call execution.
523
#[derive(Debug, Clone)]
524
525
pub struct RpcModule<Context> {
	ctx: Arc<Context>,
David's avatar
David committed
526
	methods: Methods,
527
528
}

529
impl<Context> RpcModule<Context> {
530
531
	/// Create a new module with a given shared `Context`.
	pub fn new(ctx: Context) -> Self {
532
		Self { ctx: Arc::new(ctx), methods: Default::default() }
533
	}
534
535
536
537
538
539
540

	/// Transform a module into an `RpcModule<()>` (unit context).
	pub fn remove_context(self) -> RpcModule<()> {
		let mut module = RpcModule::new(());
		module.methods = self.methods;
		module
	}
Maciej Hirsz's avatar
Maciej Hirsz committed
541
}
542

Maciej Hirsz's avatar
Maciej Hirsz committed
543
544
545
impl<Context> From<RpcModule<Context>> for Methods {
	fn from(module: RpcModule<Context>) -> Methods {
		module.methods
546
547
548
549
	}
}

impl<Context: Send + Sync + 'static> RpcModule<Context> {
David's avatar
David committed
550
	/// Register a new synchronous RPC method, which computes the response with the given callback.
Maciej Hirsz's avatar
Maciej Hirsz committed
551
552
553
554
555
	pub fn register_method<R, F>(
		&mut self,
		method_name: &'static str,
		callback: F,
	) -> Result<MethodResourcesBuilder, Error>
556
	where
David's avatar
David committed
557
		Context: Send + Sync + 'static,
558
		R: Serialize,
David's avatar
David committed
559
		F: Fn(Params, &Context) -> Result<R, Error> + Send + Sync + 'static,
560
	{
David's avatar
David committed
561
		let ctx = self.ctx.clone();
Maciej Hirsz's avatar
Maciej Hirsz committed
562
		let callback = self.methods.verify_and_insert(
563
			method_name,
564
565
566
			MethodCallback::new_sync(Arc::new(move |id, params, max_response_size| match callback(params, &*ctx) {
				Ok(res) => MethodResponse::response(id, res, max_response_size),
				Err(err) => MethodResponse::error(id, err),
567
			})),
Maciej Hirsz's avatar
Maciej Hirsz committed
568
		)?;
569

Maciej Hirsz's avatar
Maciej Hirsz committed
570
		Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
571
572
	}

David's avatar
David committed
573
	/// Register a new asynchronous RPC method, which computes the response with the given callback.
Maciej Hirsz's avatar
Maciej Hirsz committed
574
575
576
577
578
	pub fn register_async_method<R, Fun, Fut>(
		&mut self,
		method_name: &'static str,
		callback: Fun,
	) -> Result<MethodResourcesBuilder, Error>
579
580
	where
		R: Serialize + Send + Sync + 'static,
Maciej Hirsz's avatar
Maciej Hirsz committed
581
582
		Fut: Future<Output = Result<R, Error>> + Send,
		Fun: (Fn(Params<'static>, Arc<Context>) -> Fut) + Copy + Send + Sync + 'static,
583
584
	{
		let ctx = self.ctx.clone();
Maciej Hirsz's avatar
Maciej Hirsz committed
585
		let callback = self.methods.verify_and_insert(
586
			method_name,
587
			MethodCallback::new_async(Arc::new(move |id, params, _, max_response_size, claimed| {
588
589
				let ctx = ctx.clone();
				let future = async move {
Maciej Hirsz's avatar
Maciej Hirsz committed
590
					let result = match callback(params, ctx).await {
591
592
						Ok(res) => MethodResponse::response(id, res, max_response_size),
						Err(err) => MethodResponse::error(id, err),
593
					};
Maciej Hirsz's avatar
Maciej Hirsz committed
594
595
596

					// Release claimed resources
					drop(claimed);
Maciej Hirsz's avatar
Maciej Hirsz committed
597
598

					result
599
600
				};
				future.boxed()
601
			})),
Maciej Hirsz's avatar
Maciej Hirsz committed
602
		)?;
603

Maciej Hirsz's avatar
Maciej Hirsz committed
604
		Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
605
606
	}

607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
	/// Register a new **blocking** synchronous RPC method, which computes the response with the given callback.
	/// Unlike the regular [`register_method`](RpcModule::register_method), this method can block its thread and perform expensive computations.
	pub fn register_blocking_method<R, F>(
		&mut self,
		method_name: &'static str,
		callback: F,
	) -> Result<MethodResourcesBuilder, Error>
	where
		Context: Send + Sync + 'static,
		R: Serialize,
		F: Fn(Params, Arc<Context>) -> Result<R, Error> + Copy + Send + Sync + 'static,
	{
		let ctx = self.ctx.clone();
		let callback = self.methods.verify_and_insert(
			method_name,
622
			MethodCallback::new_async(Arc::new(move |id, params, _, max_response_size, claimed| {
623
624
625
				let ctx = ctx.clone();

				tokio::task::spawn_blocking(move || {
Maciej Hirsz's avatar
Maciej Hirsz committed
626
					let result = match callback(params, ctx) {
627
628
						Ok(result) => MethodResponse::response(id, result, max_response_size),
						Err(err) => MethodResponse::error(id, err),
629
630
631
632
					};

					// Release claimed resources
					drop(claimed);
Maciej Hirsz's avatar
Maciej Hirsz committed
633
634

					result
635
				})
Maciej Hirsz's avatar
Maciej Hirsz committed
636
637
638
639
				.map(|result| match result {
					Ok(r) => r,
					Err(err) => {
						tracing::error!("Join error for blocking RPC method: {:?}", err);
640
						MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError))
Maciej Hirsz's avatar
Maciej Hirsz committed
641
					}
642
643
644
645
646
647
648
649
				})
				.boxed()
			})),
		)?;

		Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
	}

650
651
652
653
654
655
656
657
	/// Register a new publish/subscribe interface using JSON-RPC notifications.
	///
	/// It implements the [ethereum pubsub specification](https://geth.ethereum.org/docs/rpc/pubsub)
	/// with an option to choose custom subscription ID generation.
	///
	/// Furthermore, it generates the `unsubscribe implementation` where a `bool` is used as
	/// the result to indicate whether the subscription was successfully unsubscribed to or not.
	/// For instance an `unsubscribe call` may fail if a non-existent subscriptionID is used in the call.
658
659
660
661
662
663
664
665
666
667
668
	///
	/// This method ensures that the `subscription_method_name` and `unsubscription_method_name` are unique.
	/// The `notif_method_name` argument sets the content of the `method` field in the JSON document that
	/// the server sends back to the client. The uniqueness of this value is not machine checked and it's up to
	/// the user to ensure it is not used in any other [`RpcModule`] used in the server.
	///
	/// # Arguments
	///
	/// * `subscription_method_name` - name of the method to call to initiate a subscription
	/// * `notif_method_name` - name of method to be used in the subscription payload (technically a JSON-RPC notification)
	/// * `unsubscription_method` - name of the method to call to terminate a subscription
669
	/// * `callback` - A callback to invoke on each subscription; it takes three parameters:
670
	///     - [`Params`]: JSON-RPC parameters in the subscription call.
David's avatar
David committed
671
672
	///     - [`SubscriptionSink`]: A sink to send messages to the subscriber.
	///     - Context: Any type that can be embedded into the [`RpcModule`].
673
674
675
676
677
	///
	/// # Examples
	///
	/// ```no_run
	///
678
679
	/// use jsonrpsee_core::server::rpc_module::{RpcModule, SubscriptionSink};
	/// use jsonrpsee_core::Error;
680
	///
David's avatar
David committed
681
	/// let mut ctx = RpcModule::new(99_usize);
682
	/// ctx.register_subscription("sub", "notif_name", "unsub", |params, mut sink, ctx| {
683
684
685
686
	///     let x = match params.one::<usize>() {
	///         Ok(x) => x,
	///         Err(e) => {
	///             let err: Error = e.into();
687
688
	///             sink.reject(err);
	///             return Ok(());
689
690
	///         }
	///     };
691
	///     // Sink is accepted on the first `send` call.
692
	///     std::thread::spawn(move || {
David's avatar
David committed
693
	///         let sum = x + (*ctx);
694
	///         let _ = sink.send(&sum);
695
	///     });
696
697
	///
	///     Ok(())
698
699
700
	/// });
	/// ```
	pub fn register_subscription<F>(
701
702
		&mut self,
		subscribe_method_name: &'static str,
703
		notif_method_name: &'static str,
704
		unsubscribe_method_name: &'static str,
705
		callback: F,
706
	) -> Result<MethodResourcesBuilder, Error>
707
	where
David's avatar
David committed
708
		Context: Send + Sync + 'static,
709
		F: Fn(Params, SubscriptionSink, Arc<Context>) -> SubscriptionResult + Send + Sync + 'static,
710
	{
711
712
713
714
		if subscribe_method_name == unsubscribe_method_name {
			return Err(Error::SubscriptionNameConflict(subscribe_method_name.into()));
		}

715
716
		self.methods.verify_method_name(subscribe_method_name)?;
		self.methods.verify_method_name(unsubscribe_method_name)?;
717

David's avatar
David committed
718
		let ctx = self.ctx.clone();
719
720
		let subscribers = Subscribers::default();

David's avatar
David committed
721
		// Unsubscribe
722
		{
723
			let subscribers = subscribers.clone();
724
			self.methods.mut_callbacks().insert(
725
				unsubscribe_method_name,
726
				MethodCallback::new_unsubscription(Arc::new(move |id, params, conn_id, max_response_size| {
727
					let sub_id = match params.one::<RpcSubscriptionId>() {
728
729
						Ok(sub_id) => sub_id,
						Err(_) => {
730
							tracing::warn!(
731
								"unsubscribe call '{}' failed: couldn't parse subscription id={:?} request id={:?}",
732
								unsubscribe_method_name,
733
								params,
734
735
								id
							);
736
737

							return MethodResponse::response(id, false, max_response_size);
738
739
						}
					};
740

741
					let key = SubscriptionKey { conn_id, sub_id: sub_id.into_owned() };
742
					let result = subscribers.lock().remove(&key).is_some();
743

744
745
746
747
748
749
750
					if !result {
						tracing::warn!(
							"unsubscribe call `{}` subscription key={:?} not an active subscription",
							unsubscribe_method_name,
							key,
						);
					}
751

752
753
					// TODO: register as failed in !result.
					MethodResponse::response(id, result, max_response_size)
754
				})),
755
756
757
			);
		}

758
759
760
761
762
		// Subscribe
		let callback = {
			self.methods.verify_and_insert(
				subscribe_method_name,
				MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn, claimed| {
763
764
765
766
					let uniq_sub = SubscriptionKey { conn_id: conn.conn_id, sub_id: conn.id_provider.next_id() };

					// response to the subscription call.
					let (tx, rx) = oneshot::channel();
767

768
					let sink = SubscriptionSink {
Niklas Adolfsson's avatar
Niklas Adolfsson committed
769
						inner: method_sink,
770
771
772
						close_notify: Some(conn.close_notify),
						method: notif_method_name,
						subscribers: subscribers.clone(),
773
774
						uniq_sub,
						id: Some((id.clone().into_owned(), tx)),
775
776
777
						unsubscribe: None,
						_claimed: claimed,
					};
778

779
					// The callback returns a `SubscriptionResult` for better ergonomics and is not propagated further.
Niklas Adolfsson's avatar
Niklas Adolfsson committed
780
					if callback(params, sink, ctx.clone()).is_err() {
781
782
						tracing::warn!("subscribe call `{}` failed", subscribe_method_name);
					}
783

784
785
786
787
788
789
790
791
792
793
					let id = id.clone().into_owned();

					let result = async move {
						match rx.await {
							Ok(result) => result,
							Err(_) => MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError)),
						}
					};

					Box::pin(result)
794
795
796
797
798
				})),
			)?
		};

		Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
799
800
	}

David's avatar
David committed
801
	/// Register an alias for an existing_method. Alias uniqueness is enforced.
802
803
	pub fn register_alias(&mut self, alias: &'static str, existing_method: &'static str) -> Result<(), Error> {
		self.methods.verify_method_name(alias)?;
804

805
806
807
808
809
810
		let callback = match self.methods.callbacks.get(existing_method) {
			Some(callback) => callback.clone(),
			None => return Err(Error::MethodNotFound(existing_method.into())),
		};

		self.methods.mut_callbacks().insert(alias, callback);
811
812
813
814
815

		Ok(())
	}
}

816
817
818
819
/// Returns once the unsubscribe method has been called.
type UnsubscribeCall = Option<watch::Receiver<()>>;

/// Represents a single subscription.
820
#[derive(Debug)]
821
pub struct SubscriptionSink {
822
	/// Sink.
823
	inner: MethodSink,
824
	/// Get notified when subscribers leave so we can exit
825
	close_notify: Option<SubscriptionPermit>,
826
827
	/// MethodCallback.
	method: &'static str,
828
829
	/// Shared Mutex of subscriptions for this method.
	subscribers: Subscribers,
830
831
	/// Unique subscription.
	uniq_sub: SubscriptionKey,
832
833
834
835
	/// Id of the `subscription call` (i.e. not the same as subscription id) which is used
	/// to reply to subscription method call and must only be used once.
	///
	/// *Note*: Having some value means the subscription was not accepted or rejected yet.
836
	id: Option<(Id<'static>, oneshot::Sender<MethodResponse>)>,
837
838
	/// Having some value means the subscription was accepted.
	unsubscribe: UnsubscribeCall,
839
	/// Claimed resources.
840
	_claimed: Option<ResourceGuard>,
841
842
}

843
impl SubscriptionSink {
844
	/// Reject the subscription call from [`ErrorObject`].
845
	pub fn reject(&mut self, err: impl Into<ErrorObjectOwned>) -> Result<(), SubscriptionAcceptRejectError> {
846
847
848
		let (id, subscribe_call) = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?;

		let err = MethodResponse::error(id, err.into());
849

850
		if self.answer_subscription(err, subscribe_call) {
851
			Ok(())
852
		} else {
853
			Err(SubscriptionAcceptRejectError::RemotePeerAborted)
854
855
856
857
858
		}
	}

	/// Attempt to accept the subscription and respond the subscription method call.
	///
859
860
	/// Fails if the connection was closed, or if called multiple times.
	pub fn accept(&mut self) -> Result<(), SubscriptionAcceptRejectError> {
861
862
863
864
865
866
		let (id, subscribe_call) = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?;

		let response = MethodResponse::response(id, &self.uniq_sub.sub_id, self.inner.max_response_size() as usize);
		let success = response.success;

		let sent = self.answer_subscription(response, subscribe_call);
867

868
		if sent && success {
869
			let (tx, rx) = watch::channel(());
870
871
872
			self.subscribers.lock().insert(self.uniq_sub.clone(), (self.inner.clone(), tx));
			self.unsubscribe = Some(rx);
			Ok(())
873
		} else {
874
			Err(SubscriptionAcceptRejectError::RemotePeerAborted)
875
876
		}
	}
877

David's avatar
David committed
878
	/// Send a message back to subscribers.
879
	///
880
881
882
883
884
	/// Returns
	/// - `Ok(true)` if the message could be send.
	/// - `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated),
	/// or the subscription could not be accepted.
	/// - `Err(err)` if the message could not be serialized.
885
	pub fn send<T: Serialize>(&mut self, result: &T) -> Result<bool, serde_json::Error> {
886
887
888
889
890
891
		// Cannot accept the subscription.
		if let Err(SubscriptionAcceptRejectError::RemotePeerAborted) = self.accept() {
			return Ok(false);
		}

		// Only possible to trigger when the connection is dropped.
892
		if self.is_closed() {
893
			return Ok(false);
894
		}
895

896
		let msg = self.build_message(result)?;
897
		Ok(self.inner.send_raw(msg).is_ok())
898
	}
899

900
	/// Reads data from the `stream` and sends back data on the subscription
901
	/// when items gets produced by the stream.
902
	/// The underlying stream must produce `Result values, see [`futures_util::TryStream`] for further information.
903
904
	///
	/// Returns `Ok(())` if the stream or connection was terminated.
905
	/// Returns `Err(_)` immediately if the underlying stream returns an error or if an item from the stream could not be serialized.
906
907
908
909
910
	///
	/// # Examples
	///
	/// ```no_run
	///
911
	/// use jsonrpsee_core::server::rpc_module::RpcModule;
912
913
	/// use jsonrpsee_core::error::{Error, SubscriptionClosed};
	/// use jsonrpsee_types::ErrorObjectOwned;
914
	/// use anyhow::anyhow;
915
916
	///
	/// let mut m = RpcModule::new(());
917
	/// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
918
919
920
	///     let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
	///     // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
	///     // because after the `Err(_)` the stream is terminated.
921
922
	///     let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
	///
923
	///     tokio::spawn(async move {
924
	///
925
926
927
928
929
930
931
932
933
934
935
936
	///         // jsonrpsee doesn't send an error notification unless `close` is explicitly called.
	///         // If we pipe messages to the sink, we can inspect why it ended:
	///         match sink.pipe_from_try_stream(stream).await {
	///            SubscriptionClosed::Success => {
	///                let err_obj: ErrorObjectOwned = SubscriptionClosed::Success.into();
	///                sink.close(err_obj);
	///            }
	///            // we don't want to send close reason when the client is unsubscribed or disconnected.
	///            SubscriptionClosed::RemotePeerAborted => (),
	///            SubscriptionClosed::Failed(e) => {
	///                sink.close(e);
	///            }
937
	///         }
938
	///     });
939
	///     Ok(())
940
941
	/// });
	/// ```
942
	pub async fn pipe_from_try_stream<S, T, E>(&mut self, mut stream: S) -> SubscriptionClosed
943
	where
944
		S: TryStream<Ok = T, Error = E> + Unpin,
945
		T: Serialize,
946
		E: std::fmt::Display,
947
	{
948
949
950
951
		if let Err(SubscriptionAcceptRejectError::RemotePeerAborted) = self.accept() {
			return SubscriptionClosed::RemotePeerAborted;
		}

952
953
		let conn_closed = match self.close_notify.as_ref().map(|cn| cn.handle()) {
			Some(cn) => cn,
954
955
956
957
958
			None => return SubscriptionClosed::RemotePeerAborted,
		};

		let mut sub_closed = match self.unsubscribe.as_ref() {
			Some(rx) => rx.clone(),
959
960
			_ => {
				return SubscriptionClosed::Failed(ErrorObject::owned(
961
962
					INTERNAL_ERROR_CODE,
					"Unsubscribe watcher not set after accepting the subscription".to_string(),
963
964
965
					None::<()>,
				))
			}
966
967
		};

968
969
970
971
972
973
		let sub_closed_fut = sub_closed.changed();

		let conn_closed_fut = conn_closed.notified();
		pin_mut!(conn_closed_fut);
		pin_mut!(sub_closed_fut);

974
		let mut stream_item = stream.try_next();
975
976
		let mut closed_fut = futures_util::future::select(conn_closed_fut, sub_closed_fut);

977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
		loop {
			match futures_util::future::select(stream_item, closed_fut).await {
				// The app sent us a value to send back to the subscribers
				Either::Left((Ok(Some(result)), next_closed_fut)) => {
					match self.send(&result) {
						Ok(true) => (),
						Ok(false) => {
							break SubscriptionClosed::RemotePeerAborted;
						}
						Err(err) => {
							let err = ErrorObject::owned(SUBSCRIPTION_CLOSED_WITH_ERROR, err.to_string(), None::<()>);
							break SubscriptionClosed::Failed(err);
						}
					};
					stream_item = stream.try_next();
					closed_fut = next_closed_fut;
				}
				// Stream canceled because of error.
				Either::Left((Err(err), _)) => {
					let err = ErrorObject::owned(SUBSCRIPTION_CLOSED_WITH_ERROR, err.to_string(), None::<()>);
					break SubscriptionClosed::Failed(err);
				}
				Either::Left((Ok(None), _)) => break SubscriptionClosed::Success,
1000
				Either::Right((_, _)) => {