Newer
Older
use crate::{common, server::batch, server::params::ServerRequestParams};
use fnv::FnvHashMap;
use futures::prelude::*;
use smallvec::SmallVec;
use std::{collections::hash_map::Entry, fmt, iter};
/// Collection of multiple batches.
///
/// This struct manages the state of the requests that have been received by the server and that
/// are waiting for a response. Due to the batching mechanism in the JSON-RPC protocol, one single
/// message can contain multiple requests and notifications that must all be answered at once.
///
/// # Usage
///
/// - Create a new empty [`BatchesState`] with [`new`](BatchesState::new).
/// - Whenever the server receives a JSON message, call [`inject`](BatchesState::inject).
/// - Call [`next_event`](BatchesState::next_event) in a loop and process the events buffered
/// within the object.
///
/// The [`BatchesState`] also acts as a collection of pending requests, which you can query using
/// [`request_by_id`](BatchesState::request_by_id).
pub struct BatchesState<T> {
/// Identifier of the next batch to add to `batches`.
next_batch_id: u64,
/// For each batch, the individual batch's state and the user parameter.
///
/// The identifier is lineraly increasing and is never leaked on the wire or outside of this
/// module. Therefore there is no risk of hash collision.
batches: FnvHashMap<u64, (batch::BatchState, T)>,
}
/// Event generated by [`next_event`](BatchesState::next_event).
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
pub enum BatchesEvent<'a, T> {
/// A notification has been extracted from a batch.
Notification {
/// Notification in question.
notification: common::Notification,
/// User parameter passed when calling [`inject`](BatchesState::inject).
user_param: &'a mut T,
},
/// A request has been extracted from a batch.
Request(BatchesElem<'a, T>),
/// A batch has gotten all its requests answered and a response is ready to be sent out.
ReadyToSend {
/// Response to send out to the JSON-RPC client.
response: common::Response,
/// User parameter passed when calling [`inject`](BatchesState::inject).
user_param: T,
},
}
/// Request within the batches.
pub struct BatchesElem<'a, T> {
/// Id of the batch that contains this element.
batch_id: u64,
/// Inner reference to a request within a batch.
inner: batch::BatchElem<'a>,
/// User parameter passed when calling `inject`.
user_param: &'a mut T,
}
/// Identifier of a request within a [`BatchesState`].
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct BatchesElemId {
/// Id of the batch within `BatchesState::batches`.
outer: u64,
/// Id of the request within the batch.
inner: usize,
}
/// Minimal capacity for the `batches` container.
const BATCHES_MIN_CAPACITY: usize = 256;
impl<T> BatchesState<T> {
/// Creates a new empty `BatchesState`.
pub fn new() -> BatchesState<T> {
BatchesState {
next_batch_id: 0,
batches: FnvHashMap::with_capacity_and_hasher(BATCHES_MIN_CAPACITY, Default::default()),
}
}
/// Processes one step from a batch and returns an event. Returns `None` if there is nothing
/// to do. After you call `inject`, then this will return `Some` at least once.
pub fn next_event(&mut self) -> Option<BatchesEvent<T>> {
// Note that this function has a complexity of `O(n)`, as we iterate over every single
// batch every single time. This is however the most straight-forward way to implement it,
// and while better strategies might yield better complexities, it might not actually yield
// better performances in real-world situations. More brainstorming and benchmarking could
// get helpful here.
// Because of long-standing Rust lifetime issues
// (https://github.com/rust-lang/rust/issues/51526), we can't do this in an elegant way.
// If you're reading this code, know that it took several iterations and that I hated my
// life while trying to figure out how to make the compiler happy.
for batch_id in self.batches.keys().cloned().collect::<Vec<_>>() {
enum WhatCanWeDo {
Nothing,
ReadyToRespond,
Notification(common::Notification),
Request(usize),
let (batch, user_param) = self
.batches
.get_mut(&batch_id)
.expect("all keys are valid; qed");
let is_ready_to_respond = batch.is_ready_to_respond();
match batch.next() {
None if is_ready_to_respond => WhatCanWeDo::ReadyToRespond,
None => WhatCanWeDo::Nothing,
Some(batch::BatchInc::Notification(n)) => WhatCanWeDo::Notification(n),
Some(batch::BatchInc::Request(inner)) => WhatCanWeDo::Request(inner.id()),
}
};
match what_can_we_do {
let (batch, user_param) = self
.batches
.remove(&batch_id)
let response = batch
.into_response()
.unwrap_or_else(|_| panic!("is_ready_to_respond returned true; qed"));
if let Some(response) = response {
return Some(BatchesEvent::ReadyToSend {
response,
user_param,
});
}
WhatCanWeDo::Notification(notification) => {
return Some(BatchesEvent::Notification {
notification,
user_param: &mut self.batches.get_mut(&batch_id).unwrap().1,
});
WhatCanWeDo::Request(id) => {
let (batch, user_param) = self.batches.get_mut(&batch_id).unwrap();
}
}
None
}
/// Injects a newly-received batch into the list. You must then call
/// [`next_event`](BatchesState::next_event) in order to process it.
pub fn inject(&mut self, request: common::Request, user_param: T) {
let batch = batch::BatchState::from_request(request);
loop {
let id = self.next_batch_id;
self.next_batch_id = self.next_batch_id.wrapping_add(1);
// We shrink `self.batches` from time to time so that it doesn't grow too much.
if id % 256 == 0 {
self.batches.shrink_to_fit();
// TODO: self.batches.shrink_to(BATCHES_MIN_CAPACITY);
// ^ see https://github.com/rust-lang/rust/issues/56431
}
match self.batches.entry(id) {
Entry::Occupied(_) => continue,
Entry::Vacant(e) => {
e.insert((batch, user_param));
break;
}
}
}
}
/// Returns a request previously returned by [`next_event`](crate::Server::next_event) by its
/// id.
///
/// Note that previous notifications don't have an ID and can't be accessed with this method.
///
/// Returns `None` if the request ID is invalid or if the request has already been answered in
/// the past.
pub fn request_by_id<'a>(&'a mut self, id: BatchesElemId) -> Option<BatchesElem<'a, T>> {
if let Some((batch, user_param)) = self.batches.get_mut(&id.outer) {
Some(BatchesElem {
batch_id: id.outer,
inner: batch.request_by_id(id.inner)?,
user_param,
})
} else {
None
}
}
}
impl<T> Default for BatchesState<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> fmt::Debug for BatchesState<T>
where
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_list().entries(self.batches.values()).finish()
}
}
impl<'a, T> BatchesElem<'a, T> {
/// Returns the id of the request within the [`BatchesState`].
///
/// > **Note**: This is NOT the request id that the client passed.
pub fn id(&self) -> BatchesElemId {
BatchesElemId {
outer: self.batch_id,
inner: self.inner.id(),
}
}
/// Returns the user parameter passed when calling [`inject`](BatchesState::inject).
pub fn user_param(&mut self) -> &mut T {
&mut self.user_param
}
/// Returns the id that the client sent out.
pub fn request_id(&self) -> &common::Id {
self.inner.request_id()
}
/// Returns the method of this request.
pub fn method(&self) -> &str {
self.inner.method()
}
/// Returns the parameters of the request, as a `common::Params`.
pub fn params(&self) -> ServerRequestParams {
self.inner.params()
}
/// Responds to the request. This destroys the request object, meaning you can no longer
/// retrieve it with [`request_by_id`](BatchesState::request_by_id) later anymore.
///
/// A [`ReadyToSend`](BatchesEvent::ReadyToSend) event containing this response might be
/// generated the next time you call [`next_event`](BatchesState::next_event).
pub fn set_response(self, response: Result<common::JsonValue, common::Error>) {
self.inner.set_response(response)
}
}
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BatchesElem")
.field("id", &self.id())
.field("user_param", &self.user_param)
.field("request_id", &self.request_id())
.field("method", &self.method())
.field("params", &self.params())
.finish()
}
}
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
#[cfg(test)]
mod tests {
use super::{BatchesEvent, BatchesState};
use crate::common;
#[test]
fn basic_notification() {
let notif = common::Notification {
jsonrpc: common::Version::V2,
method: "foo".to_string(),
params: common::Params::None,
};
let mut state = BatchesState::new();
assert!(state.next_event().is_none());
state.inject(
common::Request::Single(common::Call::Notification(notif.clone())),
(),
);
match state.next_event() {
Some(BatchesEvent::Notification { notification, .. }) if notification == notif => {}
_ => panic!(),
}
assert!(state.next_event().is_none());
}
#[test]
fn basic_request() {
let call = common::MethodCall {
jsonrpc: common::Version::V2,
method: "foo".to_string(),
params: common::Params::Map(serde_json::from_str("{\"test\":\"foo\"}").unwrap()),
id: common::Id::Num(123),
};
let mut state = BatchesState::new();
assert!(state.next_event().is_none());
state.inject(
common::Request::Single(common::Call::MethodCall(call)),
8889,
);
let rq_id = match state.next_event() {
Some(BatchesEvent::Request(rq)) => {
assert_eq!(rq.method(), "foo");
assert_eq!(
{
let v: String = rq.params().get("test").unwrap();
v
},
"foo"
);
assert_eq!(rq.request_id(), &common::Id::Num(123));
rq.id()
}
_ => panic!(),
};
assert!(state.next_event().is_none());
assert_eq!(state.request_by_id(rq_id).unwrap().method(), "foo");
state
.request_by_id(rq_id)
.unwrap()
.set_response(Err(common::Error::method_not_found()));
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
match state.next_event() {
Some(BatchesEvent::ReadyToSend {
response,
user_param,
}) => {
assert_eq!(user_param, 8889);
match response {
common::Response::Single(common::Output::Failure(f)) => {
assert_eq!(f.id, common::Id::Num(123));
}
_ => panic!(),
}
}
_ => panic!(),
};
}
#[test]
fn empty_batch() {
let mut state = BatchesState::new();
assert!(state.next_event().is_none());
state.inject(common::Request::Batch(Vec::new()), ());
assert!(state.next_event().is_none());
}
#[test]
fn batch_of_notifs() {
let notif1 = common::Notification {
jsonrpc: common::Version::V2,
method: "foo".to_string(),
params: common::Params::None,
};
let notif2 = common::Notification {
jsonrpc: common::Version::V2,
method: "bar".to_string(),
params: common::Params::None,
};
let mut state = BatchesState::new();
assert!(state.next_event().is_none());
state.inject(
common::Request::Batch(vec![
common::Call::Notification(notif1.clone()),
common::Call::Notification(notif2.clone()),
]),
2,
);
match state.next_event() {
Some(BatchesEvent::Notification {
notification,
user_param,
}) if notification == notif1 && *user_param == 2 => {}
_ => panic!(),
}
match state.next_event() {
Some(BatchesEvent::Notification {
notification,
user_param,
}) if notification == notif2 && *user_param == 2 => {}
_ => panic!(),
}
assert!(state.next_event().is_none());
}
}