Newer
Older
use crate::{common, server::batch, server::params::ServerRequestParams};
use fnv::FnvHashMap;
use futures::prelude::*;
use smallvec::SmallVec;
use std::{collections::hash_map::Entry, fmt, iter};
/// Collection of multiple batches.
///
/// This struct manages the state of the requests that have been received by the server and that
/// are waiting for a response. Due to the batching mechanism in the JSON-RPC protocol, one single
/// message can contain multiple requests and notifications that must all be answered at once.
///
/// # Usage
///
/// - Create a new empty [`BatchesState`] with [`new`](BatchesState::new).
/// - Whenever the server receives a JSON message, call [`inject`](BatchesState::inject).
/// - Call [`next_event`](BatchesState::next_event) in a loop and process the events buffered
/// within the object.
///
/// The [`BatchesState`] also acts as a collection of pending requests, which you can query using
/// [`request_by_id`](BatchesState::request_by_id).
///
pub struct BatchesState<T> {
/// Identifier of the next batch to add to `batches`.
next_batch_id: u64,
/// For each batch, the individual batch's state and the user parameter.
///
/// The identifier is lineraly increasing and is never leaked on the wire or outside of this
/// module. Therefore there is no risk of hash collision.
batches: FnvHashMap<u64, (batch::BatchState, T)>,
}
/// Event generated by [`next_event`](BatchesState::next_event).
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
pub enum BatchesEvent<'a, T> {
/// A notification has been extracted from a batch.
Notification {
/// Notification in question.
notification: common::Notification,
/// User parameter passed when calling [`inject`](BatchesState::inject).
user_param: &'a mut T,
},
/// A request has been extracted from a batch.
Request(BatchesElem<'a, T>),
/// A batch has gotten all its requests answered and a response is ready to be sent out.
ReadyToSend {
/// Response to send out to the JSON-RPC client.
response: common::Response,
/// User parameter passed when calling [`inject`](BatchesState::inject).
user_param: T,
},
}
/// Request within the batches.
pub struct BatchesElem<'a, T> {
/// Id of the batch that contains this element.
batch_id: u64,
/// Inner reference to a request within a batch.
inner: batch::BatchElem<'a>,
/// User parameter passed when calling `inject`.
user_param: &'a mut T,
}
/// Identifier of a request within a [`BatchesState`].
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct BatchesElemId {
/// Id of the batch within `BatchesState::batches`.
outer: u64,
/// Id of the request within the batch.
inner: usize,
}
/// Minimal capacity for the `batches` container.
const BATCHES_MIN_CAPACITY: usize = 256;
impl<T> BatchesState<T> {
/// Creates a new empty `BatchesState`.
pub fn new() -> BatchesState<T> {
BatchesState {
next_batch_id: 0,
batches: FnvHashMap::with_capacity_and_hasher(BATCHES_MIN_CAPACITY, Default::default()),
}
}
/// Processes one step from a batch and returns an event. Returns `None` if there is nothing
/// to do. After you call `inject`, then this will return `Some` at least once.
pub fn next_event(&mut self) -> Option<BatchesEvent<T>> {
// Note that this function has a complexity of `O(n)`, as we iterate over every single
// batch every single time. This is however the most straight-forward way to implement it,
// and while better strategies might yield better complexities, it might not actually yield
// better performances in real-world situations. More brainstorming and benchmarking could
// get helpful here.
// Because of long-standing Rust lifetime issues
// (https://github.com/rust-lang/rust/issues/51526), we can't do this in an elegant way.
// If you're reading this code, know that it took several iterations and that I hated my
// life while trying to figure out how to make the compiler happy.
for batch_id in self.batches.keys().cloned().collect::<Vec<_>>() {
enum WhatCanWeDo {
Nothing,
ReadyToRespond,
Notification(common::Notification),
Request(usize),
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
let what_can_we_do = {
let (batch, user_param) = self.batches.get_mut(&batch_id)
.expect("all keys are valid; qed");
let is_ready_to_respond = batch.is_ready_to_respond();
match batch.next() {
None if is_ready_to_respond => WhatCanWeDo::ReadyToRespond,
None => WhatCanWeDo::Nothing,
Some(batch::BatchInc::Notification(n)) => WhatCanWeDo::Notification(n),
Some(batch::BatchInc::Request(inner)) => WhatCanWeDo::Request(inner.id()),
}
};
match what_can_we_do {
WhatCanWeDo::Nothing => {},
WhatCanWeDo::ReadyToRespond => {
let (batch, user_param) = self.batches.remove(&batch_id)
.expect("key was grabbed from self.batches; qed");
let response = batch.into_response().unwrap_or_else(|_| {
panic!("is_ready_to_respond returned true; qed")
});
if let Some(response) = response {
return Some(BatchesEvent::ReadyToSend {
response,
user_param,
});
}
},
WhatCanWeDo::Notification(notification) => {
return Some(BatchesEvent::Notification {
notification,
user_param: &mut self.batches.get_mut(&batch_id).unwrap().1,
});
},
WhatCanWeDo::Request(id) => {
let (batch, user_param) = self.batches.get_mut(&batch_id).unwrap();
}
}
None
}
/// Injects a newly-received batch into the list. You must then call
/// [`next_event`](BatchesState::next_event) in order to process it.
pub fn inject(&mut self, request: common::Request, user_param: T) {
let batch = batch::BatchState::from_request(request);
loop {
let id = self.next_batch_id;
self.next_batch_id = self.next_batch_id.wrapping_add(1);
// We shrink `self.batches` from time to time so that it doesn't grow too much.
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
if id % 256 == 0 {
self.batches.shrink_to_fit();
// TODO: self.batches.shrink_to(BATCHES_MIN_CAPACITY);
// ^ see https://github.com/rust-lang/rust/issues/56431
}
match self.batches.entry(id) {
Entry::Occupied(_) => continue,
Entry::Vacant(e) => {
e.insert((batch, user_param));
break;
}
}
}
}
/// Returns a request previously returned by [`next_event`](crate::Server::next_event) by its
/// id.
///
/// Note that previous notifications don't have an ID and can't be accessed with this method.
///
/// Returns `None` if the request ID is invalid or if the request has already been answered in
/// the past.
pub fn request_by_id<'a>(
&'a mut self,
id: BatchesElemId,
) -> Option<BatchesElem<'a, T>> {
if let Some((batch, user_param)) = self.batches.get_mut(&id.outer) {
Some(BatchesElem {
batch_id: id.outer,
inner: batch.request_by_id(id.inner)?,
user_param,
})
} else {
None
}
}
}
impl<T> Default for BatchesState<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> fmt::Debug for BatchesState<T>
where
T: fmt::Debug
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_list().entries(self.batches.values()).finish()
}
}
impl<'a, T> BatchesElem<'a, T> {
/// Returns the id of the request within the [`BatchesState`].
///
/// > **Note**: This is NOT the request id that the client passed.
pub fn id(&self) -> BatchesElemId {
BatchesElemId {
outer: self.batch_id,
inner: self.inner.id(),
}
}
/// Returns the user parameter passed when calling [`inject`](BatchesState::inject).
pub fn user_param(&mut self) -> &mut T {
&mut self.user_param
}
/// Returns the id that the client sent out.
pub fn request_id(&self) -> &common::Id {
self.inner.request_id()
}
/// Returns the method of this request.
pub fn method(&self) -> &str {
self.inner.method()
}
/// Returns the parameters of the request, as a `common::Params`.
pub fn params(&self) -> ServerRequestParams {
self.inner.params()
}
/// Responds to the request. This destroys the request object, meaning you can no longer
/// retrieve it with [`request_by_id`](BatchesState::request_by_id) later anymore.
///
/// A [`ReadyToSend`](BatchesEvent::ReadyToSend) event containing this response might be
/// generated the next time you call [`next_event`](BatchesState::next_event).
pub fn set_response(self, response: Result<common::JsonValue, common::Error>) {
self.inner.set_response(response)
}
}
impl<'a, T> fmt::Debug for BatchesElem<'a, T>
where
T: fmt::Debug
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BatchesElem")
.field("id", &self.id())
.field("user_param", &self.user_param)
.field("request_id", &self.request_id())
.field("method", &self.method())
.field("params", &self.params())
.finish()
}
}