Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
parity
Mirrored projects
jsonrpsee
Commits
1dafb1e6
Unverified
Commit
1dafb1e6
authored
Jun 07, 2022
by
Niklas Adolfsson
Browse files
WIP: refactoring
parent
64215300
Pipeline
#198275
failed with stages
in 61 minutes and 53 seconds
Changes
15
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
TODO.txt
0 → 100644
View file @
1dafb1e6
test proc_macros_generic_ws_client_api has been running for over 60 seconds
core/Cargo.toml
View file @
1dafb1e6
...
...
@@ -15,6 +15,7 @@ jsonrpsee-types = { path = "../types", version = "0.13.1" }
thiserror
=
"1"
serde
=
{
version
=
"1.0"
,
default-features
=
false
,
features
=
["derive"]
}
serde_json
=
{
version
=
"1"
,
features
=
["raw_value"]
}
http
=
"0.2.7"
# optional deps
arrayvec
=
{
version
=
"0.7.1"
,
optional
=
true
}
...
...
core/src/client/async_client/mod.rs
View file @
1dafb1e6
...
...
@@ -407,13 +407,12 @@ async fn handle_backend_messages<S: TransportSenderT, R: TransportReceiverT>(
sender
:
&
mut
S
,
max_notifs_per_subscription
:
usize
,
)
->
Result
<
(),
Error
>
{
// Handle raw messages of form `ReceivedMessage::Bytes` (Vec<u8>) or ReceivedMessage::Data` (String).
async
fn
handle_recv_message
<
S
:
TransportSenderT
>
(
raw
:
&
[
u8
],
manager
:
&
mut
RequestManager
,
sender
:
&
mut
S
,
max_notifs_per_subscription
:
usize
max_notifs_per_subscription
:
usize
,
)
->
Result
<
(),
Error
>
{
// Single response to a request.
if
let
Ok
(
single
)
=
serde_json
::
from_slice
::
<
Response
<
_
>>
(
&
raw
)
{
...
...
@@ -460,9 +459,9 @@ async fn handle_backend_messages<S: TransportSenderT, R: TransportReceiverT>(
// Unparsable response
else
{
tracing
::
debug!
(
"[backend]: recv unparseable message: {:?}"
,
serde_json
::
from_slice
::
<
serde_json
::
Value
>
(
&
raw
)
);
"[backend]: recv unparseable message: {:?}"
,
serde_json
::
from_slice
::
<
serde_json
::
Value
>
(
&
raw
)
);
return
Err
(
Error
::
Custom
(
"Unparsable response"
.into
()));
}
Ok
(())
...
...
@@ -628,7 +627,10 @@ async fn background_task<S, R>(
match
future
::
select
(
message_fut
,
submit_ping
)
.await
{
// Message received from the frontend.
Either
::
Left
((
Either
::
Left
((
frontend_value
,
backend
)),
_
))
=>
{
if
let
Err
(
err
)
=
handle_frontend_messages
(
frontend_value
,
&
mut
manager
,
&
mut
sender
,
max_notifs_per_subscription
)
.await
{
if
let
Err
(
err
)
=
handle_frontend_messages
(
frontend_value
,
&
mut
manager
,
&
mut
sender
,
max_notifs_per_subscription
)
.await
{
tracing
::
warn!
(
"{:?}"
,
err
);
let
_
=
front_error
.send
(
err
);
break
;
...
...
@@ -637,10 +639,15 @@ async fn background_task<S, R>(
message_fut
=
future
::
select
(
frontend
.next
(),
backend
);
}
// Message received from the backend.
Either
::
Left
((
Either
::
Right
((
backend_value
,
frontend
)),
_
))
=>
{
Either
::
Left
((
Either
::
Right
((
backend_value
,
frontend
)),
_
))
=>
{
if
let
Err
(
err
)
=
handle_backend_messages
::
<
S
,
R
>
(
backend_value
,
&
mut
manager
,
&
mut
sender
,
max_notifs_per_subscription
)
.await
{
backend_value
,
&
mut
manager
,
&
mut
sender
,
max_notifs_per_subscription
,
)
.await
{
tracing
::
warn!
(
"{:?}"
,
err
);
let
_
=
front_error
.send
(
err
);
break
;
...
...
core/src/lib.rs
View file @
1dafb1e6
...
...
@@ -57,6 +57,7 @@ cfg_client! {
pub
use
async_trait
::
async_trait
;
pub
use
error
::
Error
;
pub
use
http
::
HeaderMap
;
/// JSON-RPC result.
pub
type
RpcResult
<
T
>
=
std
::
result
::
Result
<
T
,
Error
>
;
...
...
core/src/middleware.rs
View file @
1dafb1e6
...
...
@@ -26,6 +26,8 @@
//! Middleware for `jsonrpsee` servers.
use
jsonrpsee_types
::
Params
;
/// Defines a middleware with callbacks during the RPC request life-cycle. The primary use case for
/// this is to collect timings for a larger metrics collection solution but the only constraints on
/// the associated type is that it be [`Send`] and [`Copy`], giving users some freedom to do what
...
...
@@ -42,17 +44,17 @@ pub trait Middleware: Send + Sync + Clone + 'static {
/// Called when a new client connects (WebSocket only)
fn
on_connect
(
&
self
)
{}
/// Called when a new JSON-RPC comes to the server.
fn
on_request
(
&
self
)
->
Self
::
Instant
;
/// Called when a new JSON-RPC
request
comes to the server.
fn
on_request
(
&
self
,
remote_addr
:
std
::
net
::
SocketAddr
,
headers
:
&
http
::
HeaderMap
)
->
Self
::
Instant
;
/// Called on each JSON-RPC method call, batch requests will trigger `on_call` multiple times.
fn
on_call
(
&
self
,
_name
:
&
str
)
{}
fn
on_call
(
&
self
,
_name
:
&
str
,
_params
:
Params
)
{}
/// Called on each JSON-RPC method completion, batch requests will trigger `on_result` multiple times.
fn
on_result
(
&
self
,
_name
:
&
str
,
_success
:
bool
,
_started_at
:
Self
::
Instant
)
{}
/// Called once the JSON-RPC request is finished and response is sent to the output buffer.
fn
on_response
(
&
self
,
_started_at
:
Self
::
Instant
)
{}
fn
on_response
(
&
self
,
_result
:
&
str
,
_started_at
:
Self
::
Instant
)
{}
/// Called when a client disconnects (WebSocket only)
fn
on_disconnect
(
&
self
)
{}
...
...
@@ -61,7 +63,7 @@ pub trait Middleware: Send + Sync + Clone + 'static {
impl
Middleware
for
()
{
type
Instant
=
();
fn
on_request
(
&
self
)
->
Self
::
Instant
{}
fn
on_request
(
&
self
,
_ip_addr
:
std
::
net
::
SocketAddr
,
_headers
:
&
http
::
HeaderMap
)
->
Self
::
Instant
{}
}
impl
<
A
,
B
>
Middleware
for
(
A
,
B
)
...
...
@@ -71,13 +73,13 @@ where
{
type
Instant
=
(
A
::
Instant
,
B
::
Instant
);
fn
on_request
(
&
self
)
->
Self
::
Instant
{
(
self
.0
.on_request
(),
self
.1
.on_request
())
fn
on_request
(
&
self
,
ip_addr
:
std
::
net
::
SocketAddr
,
headers
:
&
http
::
HeaderMap
)
->
Self
::
Instant
{
(
self
.0
.on_request
(
ip_addr
,
headers
),
self
.1
.on_request
(
ip_addr
,
headers
))
}
fn
on_call
(
&
self
,
name
:
&
str
)
{
self
.0
.on_call
(
name
);
self
.1
.on_call
(
name
);
fn
on_call
(
&
self
,
name
:
&
str
,
params
:
Params
)
{
self
.0
.on_call
(
name
,
params
.clone
()
);
self
.1
.on_call
(
name
,
params
);
}
fn
on_result
(
&
self
,
name
:
&
str
,
success
:
bool
,
started_at
:
Self
::
Instant
)
{
...
...
@@ -85,8 +87,8 @@ where
self
.1
.on_result
(
name
,
success
,
started_at
.1
);
}
fn
on_response
(
&
self
,
started_at
:
Self
::
Instant
)
{
self
.0
.on_response
(
started_at
.0
);
self
.1
.on_response
(
started_at
.1
);
fn
on_response
(
&
self
,
result
:
&
str
,
started_at
:
Self
::
Instant
)
{
self
.0
.on_response
(
result
,
started_at
.0
);
self
.1
.on_response
(
result
,
started_at
.1
);
}
}
core/src/server/helpers.rs
View file @
1dafb1e6
...
...
@@ -241,6 +241,51 @@ impl BoundedSubscriptions {
}
}
/// Represent the response to method call.
#[derive(Debug)]
pub
struct
MethodResponse
{
/// Serialized response,
pub
result
:
String
,
/// Status indicates whether the call was successful or or.
pub
success
:
bool
,
}
impl
MethodResponse
{
/// Send a JSON-RPC response to the client. If the serialization of `result` exceeds `max_response_size`,
/// an error will be sent instead.
pub
fn
response
(
id
:
Id
,
result
:
impl
Serialize
,
max_response_size
:
usize
)
->
Self
{
let
mut
writer
=
BoundedWriter
::
new
(
max_response_size
);
match
serde_json
::
to_writer
(
&
mut
writer
,
&
Response
::
new
(
result
,
id
.clone
()))
{
Ok
(
_
)
=>
{
// Safety - serde_json does not emit invalid UTF-8.
let
result
=
unsafe
{
String
::
from_utf8_unchecked
(
writer
.into_bytes
())
};
Self
{
result
,
success
:
true
}
}
Err
(
err
)
=>
{
tracing
::
error!
(
"Error serializing response: {:?}"
,
err
);
if
err
.is_io
()
{
let
data
=
to_json_raw_value
(
&
format!
(
"Exceeded max limit {}"
,
max_response_size
))
.ok
();
let
err
=
ErrorObject
::
borrowed
(
OVERSIZED_RESPONSE_CODE
,
&
OVERSIZED_RESPONSE_MSG
,
data
.as_deref
());
let
result
=
serde_json
::
to_string
(
&
ErrorResponse
::
borrowed
(
err
,
id
))
.unwrap
();
Self
{
result
,
success
:
false
}
}
else
{
let
result
=
serde_json
::
to_string
(
&
ErrorResponse
::
borrowed
(
ErrorCode
::
InternalError
.into
(),
id
))
.unwrap
();
Self
{
result
,
success
:
false
}
}
}
}
}
pub
fn
error
(
id
:
Id
,
err
:
impl
Into
<
ErrorObject
<
'static
>>
)
->
Self
{
let
result
=
serde_json
::
to_string
(
&
ErrorResponse
::
borrowed
(
err
.into
(),
id
))
.unwrap
();
Self
{
result
,
success
:
false
}
}
}
#[cfg(test)]
mod
tests
{
use
crate
::
server
::
helpers
::
BoundedSubscriptions
;
...
...
core/src/server/rpc_module.rs
View file @
1dafb1e6
...
...
@@ -35,7 +35,7 @@ use crate::id_providers::RandomIntegerIdProvider;
use
crate
::
server
::
helpers
::{
BoundedSubscriptions
,
MethodSink
,
SubscriptionPermit
};
use
crate
::
server
::
resource_limiting
::{
ResourceGuard
,
ResourceTable
,
ResourceVec
,
Resources
};
use
crate
::
traits
::{
IdProvider
,
ToRpcParams
};
use
futures_channel
::
mpsc
;
use
futures_channel
::
{
mpsc
,
oneshot
}
;
use
futures_util
::
future
::
Either
;
use
futures_util
::
pin_mut
;
use
futures_util
::{
future
::
BoxFuture
,
FutureExt
,
Stream
,
StreamExt
,
TryStream
,
TryStreamExt
};
...
...
@@ -50,30 +50,38 @@ use rustc_hash::FxHashMap;
use
serde
::{
de
::
DeserializeOwned
,
Serialize
};
use
tokio
::
sync
::
watch
;
use
super
::
helpers
::
MethodResponse
;
/// A `MethodCallback` is an RPC endpoint, callable with a standard JSON-RPC request,
/// implemented as a function pointer to a `Fn` function taking four arguments:
/// the `id`, `params`, a channel the function uses to communicate the result (or error)
/// back to `jsonrpsee`, and the connection ID (useful for the websocket transport).
pub
type
SyncMethod
=
Arc
<
dyn
Send
+
Sync
+
Fn
(
Id
,
Params
,
&
MethodSink
)
->
bool
>
;
pub
type
SyncMethod
=
Arc
<
dyn
Send
+
Sync
+
Fn
(
Id
,
Params
,
MaxResponseSize
)
->
MethodResponse
>
;
/// Similar to [`SyncMethod`], but represents an asynchronous handler and takes an additional argument containing a [`ResourceGuard`] if configured.
pub
type
AsyncMethod
<
'a
>
=
Arc
<
dyn
Send
+
Sync
+
Fn
(
Id
<
'a
>
,
Params
<
'a
>
,
MethodSink
,
ConnectionId
,
Option
<
ResourceGuard
>
)
->
BoxFuture
<
'a
,
bool
>
,
dyn
Send
+
Sync
+
Fn
(
Id
<
'a
>
,
Params
<
'a
>
,
ConnectionId
,
MaxResponseSize
,
Option
<
ResourceGuard
>
)
->
BoxFuture
<
'a
,
MethodResponse
>
,
>
;
/// Method callback for subscriptions.
pub
type
SubscriptionMethod
=
Arc
<
dyn
Send
+
Sync
+
Fn
(
Id
,
Params
,
&
MethodSink
,
ConnState
)
->
bool
>
;
pub
type
SubscriptionMethod
<
'a
>
=
Arc
<
dyn
Send
+
Sync
+
Fn
(
Id
,
Params
,
&
MethodSink
,
ConnState
)
->
BoxFuture
<
'a
,
MethodResponse
>>
;
// Method callback to unsubscribe.
type
UnsubscriptionMethod
=
Arc
<
dyn
Send
+
Sync
+
Fn
(
Id
,
Params
,
&
MethodSink
,
ConnectionId
)
->
bool
>
;
type
UnsubscriptionMethod
=
Arc
<
dyn
Send
+
Sync
+
Fn
(
Id
,
Params
,
ConnectionId
)
->
MethodResponse
>
;
/// Connection ID, used for stateful protocol such as WebSockets.
/// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value.
pub
type
ConnectionId
=
usize
;
/// Max response size.
pub
type
MaxResponseSize
=
usize
;
/// Raw response from an RPC
/// A 3-tuple containing:
/// - Call result as a `String`,
/// - a [`mpsc::UnboundedReceiver<String>`] to receive future subscription results
/// - a [`crate::server::helpers::SubscriptionPermit`] to allow subscribers to notify their [`SubscriptionSink`] when they disconnect.
pub
type
RawRpcResponse
=
(
String
,
mpsc
::
UnboundedReceiver
<
String
>
,
SubscriptionPermit
);
pub
type
RawRpcResponse
=
(
MethodResponse
,
mpsc
::
UnboundedReceiver
<
String
>
,
SubscriptionPermit
);
/// Helper struct to manage subscriptions.
pub
struct
ConnState
<
'a
>
{
...
...
@@ -117,7 +125,7 @@ pub enum MethodKind {
/// Asynchronous method handler.
Async
(
AsyncMethod
<
'static
>
),
/// Subscription method handler.
Subscription
(
SubscriptionMethod
),
Subscription
(
SubscriptionMethod
<
'static
>
),
/// Unsubscription method handler.
Unsubscription
(
UnsubscriptionMethod
),
}
...
...
@@ -186,7 +194,7 @@ impl MethodCallback {
MethodCallback
{
callback
:
MethodKind
::
Async
(
callback
),
resources
:
MethodResources
::
Uninitialized
([]
.into
())
}
}
fn
new_subscription
(
callback
:
SubscriptionMethod
)
->
Self
{
fn
new_subscription
(
callback
:
SubscriptionMethod
<
'static
>
)
->
Self
{
MethodCallback
{
callback
:
MethodKind
::
Subscription
(
callback
),
resources
:
MethodResources
::
Uninitialized
([]
.into
()),
...
...
@@ -355,10 +363,10 @@ impl Methods {
tracing
::
trace!
(
"[Methods::call] Calling method: {:?}, params: {:?}"
,
method
,
params
);
let
(
resp
,
_
,
_
)
=
self
.inner_call
(
req
)
.await
;
let
res
=
match
serde_json
::
from_str
::
<
Response
<
T
>>
(
&
resp
)
{
let
res
=
match
serde_json
::
from_str
::
<
Response
<
T
>>
(
&
resp
.result
)
{
Ok
(
res
)
=>
Ok
(
res
.result
),
Err
(
e
)
=>
{
if
let
Ok
(
err
)
=
serde_json
::
from_str
::
<
ErrorResponse
>
(
&
resp
)
{
if
let
Ok
(
err
)
=
serde_json
::
from_str
::
<
ErrorResponse
>
(
&
resp
.result
)
{
Err
(
Error
::
Call
(
CallError
::
Custom
(
err
.error_object
()
.clone
()
.into_owned
())))
}
else
{
Err
(
e
.into
())
...
...
@@ -394,7 +402,10 @@ impl Methods {
/// );
/// }
/// ```
pub
async
fn
raw_json_request
(
&
self
,
call
:
&
str
)
->
Result
<
(
String
,
mpsc
::
UnboundedReceiver
<
String
>
),
Error
>
{
pub
async
fn
raw_json_request
(
&
self
,
call
:
&
str
,
)
->
Result
<
(
MethodResponse
,
mpsc
::
UnboundedReceiver
<
String
>
),
Error
>
{
tracing
::
trace!
(
"[Methods::raw_json_request] {:?}"
,
call
);
let
req
:
Request
=
serde_json
::
from_str
(
call
)
?
;
let
(
resp
,
rx
,
_
)
=
self
.inner_call
(
req
)
.await
;
...
...
@@ -403,7 +414,7 @@ impl Methods {
/// Execute a callback.
async
fn
inner_call
(
&
self
,
req
:
Request
<
'_
>
)
->
RawRpcResponse
{
let
(
tx_sink
,
mut
rx_sink
)
=
mpsc
::
unbounded
();
let
(
tx_sink
,
rx_sink
)
=
mpsc
::
unbounded
();
let
sink
=
MethodSink
::
new
(
tx_sink
);
let
id
=
req
.id
.clone
();
let
params
=
Params
::
new
(
req
.params
.map
(|
params
|
params
.get
()));
...
...
@@ -411,20 +422,18 @@ impl Methods {
let
close_notify
=
bounded_subs
.acquire
()
.expect
(
"u32::MAX permits is sufficient; qed"
);
let
notify
=
bounded_subs
.acquire
()
.expect
(
"u32::MAX permits is sufficient; qed"
);
let
_
result
=
match
self
.method
(
&
req
.method
)
.map
(|
c
|
&
c
.callback
)
{
None
=>
sink
.send_
error
(
req
.id
,
ErrorCode
::
MethodNotFound
.into
(
)),
Some
(
MethodKind
::
Sync
(
cb
))
=>
(
cb
)(
id
,
params
,
&
si
nk
),
Some
(
MethodKind
::
Async
(
cb
))
=>
(
cb
)(
id
.into_owned
(),
params
.into_owned
(),
sink
,
0
,
None
)
.await
,
let
result
=
match
self
.method
(
&
req
.method
)
.map
(|
c
|
&
c
.callback
)
{
None
=>
MethodResponse
::
error
(
req
.id
,
ErrorObject
::
from
(
ErrorCode
::
MethodNotFound
)),
Some
(
MethodKind
::
Sync
(
cb
))
=>
(
cb
)(
id
,
params
,
u
si
ze
::
MAX
),
Some
(
MethodKind
::
Async
(
cb
))
=>
(
cb
)(
id
.into_owned
(),
params
.into_owned
(),
0
,
usize
::
MAX
,
None
)
.await
,
Some
(
MethodKind
::
Subscription
(
cb
))
=>
{
let
conn_state
=
ConnState
{
conn_id
:
0
,
close_notify
,
id_provider
:
&
RandomIntegerIdProvider
};
(
cb
)(
id
,
params
,
&
sink
,
conn_state
)
(
cb
)(
id
,
params
,
&
sink
,
conn_state
)
.await
}
Some
(
MethodKind
::
Unsubscription
(
cb
))
=>
(
cb
)(
id
,
params
,
&
sink
,
0
),
Some
(
MethodKind
::
Unsubscription
(
cb
))
=>
(
cb
)(
id
,
params
,
0
),
};
let
resp
=
rx_sink
.next
()
.await
.expect
(
"tx and rx still alive; qed"
);
(
resp
,
rx_sink
,
notify
)
(
result
,
rx_sink
,
notify
)
}
/// Helper to create a subscription on the `RPC module` without having to spin up a server.
...
...
@@ -457,9 +466,9 @@ impl Methods {
tracing
::
trace!
(
"[Methods::subscribe] Calling subscription method: {:?}, params: {:?}"
,
sub_method
,
params
);
let
(
response
,
rx
,
close_notify
)
=
self
.inner_call
(
req
)
.await
;
tracing
::
trace!
(
"[Methods::subscribe] response {:?}"
,
response
);
let
subscription_response
=
match
serde_json
::
from_str
::
<
Response
<
RpcSubscriptionId
>>
(
&
response
)
{
let
subscription_response
=
match
serde_json
::
from_str
::
<
Response
<
RpcSubscriptionId
>>
(
&
response
.result
)
{
Ok
(
r
)
=>
r
,
Err
(
_
)
=>
match
serde_json
::
from_str
::
<
ErrorResponse
>
(
&
response
)
{
Err
(
_
)
=>
match
serde_json
::
from_str
::
<
ErrorResponse
>
(
&
response
.result
)
{
Ok
(
err
)
=>
return
Err
(
Error
::
Call
(
CallError
::
Custom
(
err
.error_object
()
.clone
()
.into_owned
()))),
Err
(
err
)
=>
return
Err
(
err
.into
()),
},
...
...
@@ -533,9 +542,9 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
let
ctx
=
self
.ctx
.clone
();
let
callback
=
self
.methods
.verify_and_insert
(
method_name
,
MethodCallback
::
new_sync
(
Arc
::
new
(
move
|
id
,
params
,
sink
|
match
callback
(
params
,
&*
ctx
)
{
Ok
(
res
)
=>
sink
.send_response
(
id
,
res
),
Err
(
err
)
=>
sink
.send_call_
error
(
id
,
err
),
MethodCallback
::
new_sync
(
Arc
::
new
(
move
|
id
,
params
,
max_response_size
|
match
callback
(
params
,
&*
ctx
)
{
Ok
(
res
)
=>
MethodResponse
::
response
(
id
,
res
,
max_response_size
),
Err
(
err
)
=>
MethodResponse
::
error
(
id
,
err
),
})),
)
?
;
...
...
@@ -556,12 +565,12 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
let
ctx
=
self
.ctx
.clone
();
let
callback
=
self
.methods
.verify_and_insert
(
method_name
,
MethodCallback
::
new_async
(
Arc
::
new
(
move
|
id
,
params
,
sink
,
_
,
claimed
|
{
MethodCallback
::
new_async
(
Arc
::
new
(
move
|
id
,
params
,
_
,
max_response_size
,
claimed
|
{
let
ctx
=
ctx
.clone
();
let
future
=
async
move
{
let
result
=
match
callback
(
params
,
ctx
)
.await
{
Ok
(
res
)
=>
sink
.send_response
(
id
,
res
),
Err
(
err
)
=>
sink
.send_call_
error
(
id
,
err
),
Ok
(
res
)
=>
MethodResponse
::
response
(
id
,
res
,
max_response_size
),
Err
(
err
)
=>
MethodResponse
::
error
(
id
,
err
),
};
// Release claimed resources
...
...
@@ -591,13 +600,13 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
let
ctx
=
self
.ctx
.clone
();
let
callback
=
self
.methods
.verify_and_insert
(
method_name
,
MethodCallback
::
new_async
(
Arc
::
new
(
move
|
id
,
params
,
sink
,
_
,
claimed
|
{
MethodCallback
::
new_async
(
Arc
::
new
(
move
|
id
,
params
,
_
,
max_response_size
,
claimed
|
{
let
ctx
=
ctx
.clone
();
tokio
::
task
::
spawn_blocking
(
move
||
{
let
result
=
match
callback
(
params
,
ctx
)
{
Ok
(
res
)
=>
sink
.send_response
(
id
,
res
),
Err
(
err
)
=>
sink
.send_call_
error
(
id
,
err
),
Ok
(
res
ult
)
=>
MethodResponse
::
response
(
id
,
result
,
max_response_size
),
Err
(
err
)
=>
MethodResponse
::
error
(
id
,
err
),
};
// Release claimed resources
...
...
@@ -609,7 +618,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
Ok
(
r
)
=>
r
,
Err
(
err
)
=>
{
tracing
::
error!
(
"Join error for blocking RPC method: {:?}"
,
err
);
false
todo!
();
}
})
.boxed
()
...
...
@@ -703,8 +712,11 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
MethodCallback
::
new_subscription
(
Arc
::
new
(
move
|
id
,
params
,
method_sink
,
conn
|
{
let
sub_id
:
RpcSubscriptionId
=
conn
.id_provider
.next_id
();
let
(
tx
,
rx
)
=
oneshot
::
channel
();
let
sink
=
PendingSubscription
(
Some
(
InnerPendingSubscription
{
sink
:
method_sink
.clone
(),
result
:
tx
,
close_notify
:
Some
(
conn
.close_notify
),
method
:
notif_method_name
,
subscribers
:
subscribers
.clone
(),
...
...
@@ -714,7 +726,15 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
callback
(
params
,
sink
,
ctx
.clone
());
true
let
id
=
id
.clone
()
.into_owned
();
let
result
=
rx
.then
(|
r
|
async
move
{
match
r
{
Ok
(
r
)
=>
r
,
Err
(
_
)
=>
MethodResponse
::
error
(
id
,
ErrorObject
::
from
(
ErrorCode
::
InternalError
)),
}
});
Box
::
pin
(
result
)
})),
);
}
...
...
@@ -723,7 +743,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
{
self
.methods
.mut_callbacks
()
.insert
(
unsubscribe_method_name
,
MethodCallback
::
new_unsubscription
(
Arc
::
new
(
move
|
id
,
params
,
sink
,
conn_id
|
{
MethodCallback
::
new_unsubscription
(
Arc
::
new
(
move
|
id
,
params
,
conn_id
|
{
let
sub_id
=
match
params
.one
::
<
RpcSubscriptionId
>
()
{
Ok
(
sub_id
)
=>
sub_id
,
Err
(
_
)
=>
{
...
...
@@ -733,14 +753,15 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
params
,
id
);
return
sink
.send_response
(
id
,
false
);
return
MethodResponse
::
response
(
id
,
false
,
999
);
}
};
let
sub_id
=
sub_id
.into_owned
();
let
result
=
subscribers
.lock
()
.remove
(
&
SubscriptionKey
{
conn_id
,
sub_id
})
.is_some
();
sink
.send_
response
(
id
,
result
)
MethodResponse
::
response
(
id
,
result
,
999
)
})),
);
}
...
...
@@ -771,6 +792,8 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
struct
InnerPendingSubscription
{
/// Sink.
sink
:
MethodSink
,
/// Oneshot which sends response to the subscription call.
result
:
oneshot
::
Sender
<
MethodResponse
>
,
/// Get notified when subscribers leave so we can exit
close_notify
:
Option
<
SubscriptionPermit
>
,
/// MethodCallback.
...
...
@@ -806,9 +829,15 @@ impl PendingSubscription {
pub
fn
accept
(
mut
self
)
->
Option
<
SubscriptionSink
>
{
let
inner
=
self
.0
.take
()
?
;
let
InnerPendingSubscription
{
sink
,
close_notify
,
method
,
uniq_sub
,
subscribers
,
id
}
=
inner
;
let
InnerPendingSubscription
{
sink
,
result
,
close_notify
,
method
,
uniq_sub
,
subscribers
,
id
}
=
inner
;
let
response
=
MethodResponse
::
response
(
id
,
&
uniq_sub
.sub_id
,
usize
::
MAX
);
let
success
=
response
.success
;
if
sink
.send_response
(
id
,
&
uniq_sub
.sub_id
)
{
if
result
.send
(
response
)
.is_ok
()
&&
success
{
// TODO: It might be possible that the actual `WebSocket message` might not have been sent yet
// So we must be super careful that sink doesn't start sending stuff before that actual
// subscription call has been answered.
let
(
tx
,
rx
)
=
watch
::
channel
(());
subscribers
.lock
()
.insert
(
uniq_sub
.clone
(),
(
sink
.clone
(),
tx
));
Some
(
SubscriptionSink
{
inner
:
sink
,
close_notify
,
method
,
uniq_sub
,
subscribers
,
unsubscribe
:
rx
})
...
...
examples/examples/middleware_http.rs
View file @
1dafb1e6
...
...
@@ -27,9 +27,11 @@
use
std
::
net
::
SocketAddr
;
use
std
::
time
::
Instant
;
use
jsonrpsee
::
core
::
HeaderMap
;
use
jsonrpsee
::
core
::{
client
::
ClientT
,
middleware
};
use
jsonrpsee
::
http_client
::
HttpClientBuilder
;
use
jsonrpsee
::
http_server
::{
HttpServerBuilder
,
HttpServerHandle
,
RpcModule
};
use
jsonrpsee
::
types
::
Params
;
#[derive(Clone)]
struct
Timings
;
...
...
@@ -37,11 +39,11 @@ struct Timings;
impl
middleware
::
Middleware
for
Timings
{
type
Instant
=
Instant
;
fn
on_request
(
&
self
)
->
Self
::
Instant
{
fn
on_request
(
&
self
,
_remote_addr
:
SocketAddr
,
_headers
:
&
HeaderMap
)
->
Self
::
Instant
{
Instant
::
now
()
}
fn
on_call
(
&
self
,
name
:
&
str
)
{
fn
on_call
(
&
self
,
name
:
&
str
,
_params
:
Params
)
{
println!
(
"[Middleware::on_call] '{}'"
,
name
);
}
...
...
@@ -49,7 +51,7 @@ impl middleware::Middleware for Timings {
println!
(
"[Middleware::on_result] '{}', worked? {}, time elapsed {:?}"
,
name
,
succeess
,
started_at
.elapsed
());
}
fn
on_response
(
&
self
,
started_at
:
Self
::
Instant
)
{
fn
on_response
(
&
self
,
_result
:
&
str
,
started_at
:
Self
::
Instant
)
{
println!
(
"[Middleware::on_response] time elapsed {:?}"
,
started_at
.elapsed
());
}
}
...
...
examples/examples/middleware_ws.rs
View file @
1dafb1e6
...
...
@@ -27,7 +27,9 @@
use
std
::
net
::
SocketAddr
;
use
std
::
time
::
Instant
;
use
jsonrpsee
::
core
::
HeaderMap
;
use
jsonrpsee
::
core
::{
client
::
ClientT
,
middleware
};
use
jsonrpsee
::
types
::
Params
;
use
jsonrpsee
::
ws_client
::
WsClientBuilder
;
use
jsonrpsee
::
ws_server
::{
RpcModule
,
WsServerBuilder
};
...
...
@@ -37,11 +39,11 @@ struct Timings;
impl
middleware
::
Middleware
for
Timings
{
type
Instant
=
Instant
;
fn
on_request
(
&
self
)
->
Self
::
Instant
{
fn
on_request
(
&
self
,
_remote_addr
:
SocketAddr
,
_headers
:
&
HeaderMap
)
->
Self
::
Instant
{
Instant
::
now
()
}
fn
on_call
(
&
self
,
name
:
&
str
)
{
fn
on_call
(
&
self
,
name
:
&
str
,
_params
:
Params
)
{
println!
(
"[Middleware::on_call] '{}'"
,
name
);
}
...
...
@@ -49,7 +51,7 @@ impl middleware::Middleware for Timings {
println!
(
"[Middleware::on_result] '{}', worked? {}, time elapsed {:?}"
,
name
,
succeess
,
started_at
.elapsed
());
}
fn
on_response
(
&
self
,
started_at
:
Self
::
Instant
)
{
fn
on_response
(
&
self
,
_result
:
&
str
,
started_at
:
Self
::
Instant
)
{
println!
(
"[Middleware::on_response] time elapsed {:?}"
,
started_at
.elapsed
());
}
}
...
...
examples/examples/multi_middleware.rs
View file @
1dafb1e6
...
...
@@ -30,8 +30,9 @@ use std::net::SocketAddr;
use
std
::
process
::
Command
;
use
std
::
time
::
Instant
;
use
jsonrpsee
::
core
::{
client
::
ClientT
,
middleware
};
use
jsonrpsee
::
core
::{
client
::
ClientT
,
middleware
,
HeaderMap
};
use
jsonrpsee
::
rpc_params
;
use
jsonrpsee
::
types
::
Params
;
use
jsonrpsee
::
ws_client
::
WsClientBuilder
;
use
jsonrpsee
::
ws_server
::{
RpcModule
,
WsServerBuilder
};
...
...
@@ -42,11 +43,11 @@ struct Timings;
impl
middleware
::
Middleware
for
Timings
{
type
Instant
=
Instant
;
fn
on_request
(
&
self
)
->
Self
::
Instant
{
fn
on_request
(
&
self
,
_remote_addr
:
SocketAddr
,
_headers
:
&
HeaderMap
)
->
Self
::
Instant
{
Instant
::
now
()
}
fn
on_call
(
&
self
,
name
:
&
str
)
{
fn
on_call
(
&
self
,
name
:
&
str
,
_params
:
Params
)
{
println!
(
"[Timings] They called '{}'"
,
name
);
}
...
...
@@ -54,7 +55,7 @@ impl middleware::Middleware for Timings {
println!
(
"[Timings] call={}, worked? {}, duration {:?}"
,
name
,
succeess
,
started_at
.elapsed
());
}
fn
on_response
(
&
self
,
started_at
:
Self
::
Instant
)
{
fn
on_response
(
&
self
,
_result
:
&
str
,
started_at
:
Self
::
Instant
)
{
println!
(
"[Timings] Response duration {:?}"
,
started_at
.elapsed
());
}
}
...
...
@@ -81,13 +82,13 @@ impl ThreadWatcher {
impl
middleware
::
Middleware
for
ThreadWatcher
{
type
Instant
=
isize
;
fn
on_request
(
&
self
)
->
Self
::
Instant
{
fn
on_request
(
&
self
,
_remote_addr
:
SocketAddr
,
_headers
:
&
HeaderMap
)
->
Self
::
Instant
{
let
threads
=
Self
::
count_threads
();
println!
(
"[ThreadWatcher] Threads running on the machine at the start of a call: {}"