Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
parity
Mirrored projects
jsonrpsee
Commits
9b1ffcd3
Unverified
Commit
9b1ffcd3
authored
Oct 04, 2022
by
Niklas Adolfsson
Committed by
GitHub
Oct 04, 2022
Browse files
server: add `transport protocol details` to the logger trait (#886)
* server: add protocol details to logger trait * fix tests
parent
b5d54142
Pipeline
#218903
passed with stages
in 4 minutes and 34 seconds
Changes
7
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
examples/examples/logger.rs
View file @
9b1ffcd3
...
...
@@ -28,7 +28,7 @@ use std::net::SocketAddr;
use
std
::
time
::
Instant
;
use
jsonrpsee
::
core
::
client
::
ClientT
;
use
jsonrpsee
::
server
::
logger
::{
self
,
HttpRequest
,
MethodKind
,
Params
};
use
jsonrpsee
::
server
::
logger
::{
self
,
HttpRequest
,
MethodKind
,
Params
,
TransportProtocol
};
use
jsonrpsee
::
server
::
ServerBuilder
;
use
jsonrpsee
::
ws_client
::
WsClientBuilder
;
use
jsonrpsee
::{
rpc_params
,
RpcModule
};
...
...
@@ -39,28 +39,28 @@ struct Timings;
impl
logger
::
Logger
for
Timings
{
type
Instant
=
Instant
;
fn
on_connect
(
&
self
,
remote_addr
:
SocketAddr
,
request
:
&
HttpRequest
)
{
fn
on_connect
(
&
self
,
remote_addr
:
SocketAddr
,
request
:
&
HttpRequest
,
_t
:
TransportProtocol
)
{
println!
(
"[Logger::on_connect] remote_addr {:?}, headers: {:?}"
,
remote_addr
,
request
);
}
fn
on_request
(
&
self
)
->
Self
::
Instant
{
fn
on_request
(
&
self
,
_t
:
TransportProtocol
)
->
Self
::
Instant
{
println!
(
"[Logger::on_request]"
);
Instant
::
now
()
}
fn
on_call
(
&
self
,
name
:
&
str
,
params
:
Params
,
kind
:
MethodKind
)
{
fn
on_call
(
&
self
,
name
:
&
str
,
params
:
Params
,
kind
:
MethodKind
,
_t
:
TransportProtocol
)
{
println!
(
"[Logger::on_call] method: '{}', params: {:?}, kind: {}"
,
name
,
params
,
kind
);
}
fn
on_result
(
&
self
,
name
:
&
str
,
succeess
:
bool
,
started_at
:
Self
::
Instant
)
{
fn
on_result
(
&
self
,
name
:
&
str
,
succeess
:
bool
,
started_at
:
Self
::
Instant
,
_t
:
TransportProtocol
)
{
println!
(
"[Logger::on_result] '{}', worked? {}, time elapsed {:?}"
,
name
,
succeess
,
started_at
.elapsed
());
}
fn
on_response
(
&
self
,
result
:
&
str
,
started_at
:
Self
::
Instant
)
{
fn
on_response
(
&
self
,
result
:
&
str
,
started_at
:
Self
::
Instant
,
_t
:
TransportProtocol
)
{
println!
(
"[Logger::on_response] result: {}, time elapsed {:?}"
,
result
,
started_at
.elapsed
());
}
fn
on_disconnect
(
&
self
,
remote_addr
:
SocketAddr
)
{
fn
on_disconnect
(
&
self
,
remote_addr
:
SocketAddr
,
_t
:
TransportProtocol
)
{
println!
(
"[Logger::on_disconnect] remote_addr: {:?}"
,
remote_addr
);
}
}
...
...
examples/examples/multi_logger.rs
View file @
9b1ffcd3
...
...
@@ -32,7 +32,7 @@ use std::time::Instant;
use
jsonrpsee
::
core
::
client
::
ClientT
;
use
jsonrpsee
::
rpc_params
;
use
jsonrpsee
::
server
::
logger
::{
HttpRequest
,
MethodKind
};
use
jsonrpsee
::
server
::
logger
::{
HttpRequest
,
MethodKind
,
TransportProtocol
};
use
jsonrpsee
::
server
::{
logger
,
RpcModule
,
ServerBuilder
};
use
jsonrpsee
::
types
::
Params
;
use
jsonrpsee
::
ws_client
::
WsClientBuilder
;
...
...
@@ -44,27 +44,27 @@ struct Timings;
impl
logger
::
Logger
for
Timings
{
type
Instant
=
Instant
;
fn
on_connect
(
&
self
,
remote_addr
:
SocketAddr
,
req
:
&
HttpRequest
)
{
fn
on_connect
(
&
self
,
remote_addr
:
SocketAddr
,
req
:
&
HttpRequest
,
_t
:
TransportProtocol
)
{
println!
(
"[Timings::on_connect] remote_addr {:?}, req: {:?}"
,
remote_addr
,
req
);
}
fn
on_request
(
&
self
)
->
Self
::
Instant
{
fn
on_request
(
&
self
,
_t
:
TransportProtocol
)
->
Self
::
Instant
{
Instant
::
now
()
}
fn
on_call
(
&
self
,
name
:
&
str
,
params
:
Params
,
kind
:
MethodKind
)
{
fn
on_call
(
&
self
,
name
:
&
str
,
params
:
Params
,
kind
:
MethodKind
,
_t
:
TransportProtocol
)
{
println!
(
"[Timings:on_call] method: '{}', params: {:?}, kind: {}"
,
name
,
params
,
kind
);
}
fn
on_result
(
&
self
,
name
:
&
str
,
success
:
bool
,
started_at
:
Self
::
Instant
)
{
fn
on_result
(
&
self
,
name
:
&
str
,
success
:
bool
,
started_at
:
Self
::
Instant
,
_t
:
TransportProtocol
)
{
println!
(
"[Timings] call={}, worked? {}, duration {:?}"
,
name
,
success
,
started_at
.elapsed
());
}
fn
on_response
(
&
self
,
_result
:
&
str
,
started_at
:
Self
::
Instant
)
{
fn
on_response
(
&
self
,
_result
:
&
str
,
started_at
:
Self
::
Instant
,
_t
:
TransportProtocol
)
{
println!
(
"[Timings] Response duration {:?}"
,
started_at
.elapsed
());
}
fn
on_disconnect
(
&
self
,
remote_addr
:
SocketAddr
)
{
fn
on_disconnect
(
&
self
,
remote_addr
:
SocketAddr
,
_t
:
TransportProtocol
)
{
println!
(
"[Timings::on_disconnect] remote_addr: {:?}"
,
remote_addr
);
}
}
...
...
@@ -91,32 +91,32 @@ impl ThreadWatcher {
impl
logger
::
Logger
for
ThreadWatcher
{
type
Instant
=
isize
;
fn
on_connect
(
&
self
,
remote_addr
:
SocketAddr
,
headers
:
&
HttpRequest
)
{
fn
on_connect
(
&
self
,
remote_addr
:
SocketAddr
,
headers
:
&
HttpRequest
,
_t
:
TransportProtocol
)
{
println!
(
"[ThreadWatcher::on_connect] remote_addr {:?}, headers: {:?}"
,
remote_addr
,
headers
);
}
fn
on_call
(
&
self
,
_method
:
&
str
,
_params
:
Params
,
_kind
:
MethodKind
)
{
fn
on_call
(
&
self
,
_method
:
&
str
,
_params
:
Params
,
_kind
:
MethodKind
,
_t
:
TransportProtocol
)
{
let
threads
=
Self
::
count_threads
();
println!
(
"[ThreadWatcher::on_call] Threads running on the machine at the start of a call: {}"
,
threads
);
}
fn
on_request
(
&
self
)
->
Self
::
Instant
{
fn
on_request
(
&
self
,
_t
:
TransportProtocol
)
->
Self
::
Instant
{
let
threads
=
Self
::
count_threads
();
println!
(
"[ThreadWatcher::on_request] Threads running on the machine at the start of a call: {}"
,
threads
);
threads
as
isize
}
fn
on_result
(
&
self
,
_name
:
&
str
,
_succees
:
bool
,
started_at
:
Self
::
Instant
)
{
fn
on_result
(
&
self
,
_name
:
&
str
,
_succees
:
bool
,
started_at
:
Self
::
Instant
,
_t
:
TransportProtocol
)
{
let
current_nr_threads
=
Self
::
count_threads
()
as
isize
;
println!
(
"[ThreadWatcher::on_result] {} threads"
,
current_nr_threads
-
started_at
);
}
fn
on_response
(
&
self
,
_result
:
&
str
,
started_at
:
Self
::
Instant
)
{
fn
on_response
(
&
self
,
_result
:
&
str
,
started_at
:
Self
::
Instant
,
_t
:
TransportProtocol
)
{
let
current_nr_threads
=
Self
::
count_threads
()
as
isize
;
println!
(
"[ThreadWatcher::on_response] {} threads"
,
current_nr_threads
-
started_at
);
}
fn
on_disconnect
(
&
self
,
remote_addr
:
SocketAddr
)
{
fn
on_disconnect
(
&
self
,
remote_addr
:
SocketAddr
,
_t
:
TransportProtocol
)
{
println!
(
"[ThreadWatcher::on_disconnect] remote_addr: {:?}"
,
remote_addr
);
}
}
...
...
server/src/logger.rs
View file @
9b1ffcd3
...
...
@@ -60,6 +60,26 @@ impl std::fmt::Display for MethodKind {
}
}
/// The transport protocol used to send or receive a call or request.
#[derive(Debug,
Copy,
Clone)]
pub
enum
TransportProtocol
{
/// HTTP transport.
Http
,
/// WebSocket transport.
WebSocket
,
}
impl
std
::
fmt
::
Display
for
TransportProtocol
{
fn
fmt
(
&
self
,
f
:
&
mut
std
::
fmt
::
Formatter
<
'_
>
)
->
std
::
fmt
::
Result
{
let
s
=
match
self
{
Self
::
Http
=>
"http"
,
Self
::
WebSocket
=>
"websocket"
,
};
write!
(
f
,
"{}"
,
s
)
}
}
/// Defines a logger specifically for WebSocket connections with callbacks during the RPC request life-cycle.
/// The primary use case for this is to collect timings for a larger metrics collection solution.
///
...
...
@@ -71,38 +91,38 @@ pub trait Logger: Send + Sync + Clone + 'static {
type
Instant
:
std
::
fmt
::
Debug
+
Send
+
Sync
+
Copy
;
/// Called when a new client connects
fn
on_connect
(
&
self
,
_remote_addr
:
SocketAddr
,
_request
:
&
HttpRequest
);
fn
on_connect
(
&
self
,
_remote_addr
:
SocketAddr
,
_request
:
&
HttpRequest
,
_t
:
TransportProtocol
);
/// Called when a new JSON-RPC request comes to the server.
fn
on_request
(
&
self
)
->
Self
::
Instant
;
fn
on_request
(
&
self
,
transport
:
TransportProtocol
)
->
Self
::
Instant
;
/// Called on each JSON-RPC method call, batch requests will trigger `on_call` multiple times.
fn
on_call
(
&
self
,
method_name
:
&
str
,
params
:
Params
,
kind
:
MethodKind
);
fn
on_call
(
&
self
,
method_name
:
&
str
,
params
:
Params
,
kind
:
MethodKind
,
transport
:
TransportProtocol
);
/// Called on each JSON-RPC method completion, batch requests will trigger `on_result` multiple times.
fn
on_result
(
&
self
,
method_name
:
&
str
,
success
:
bool
,
started_at
:
Self
::
Instant
);
fn
on_result
(
&
self
,
method_name
:
&
str
,
success
:
bool
,
started_at
:
Self
::
Instant
,
transport
:
TransportProtocol
);
/// Called once the JSON-RPC request is finished and response is sent to the output buffer.
fn
on_response
(
&
self
,
result
:
&
str
,
started_at
:
Self
::
Instant
);
fn
on_response
(
&
self
,
result
:
&
str
,
started_at
:
Self
::
Instant
,
transport
:
TransportProtocol
);
/// Called when a client disconnects
fn
on_disconnect
(
&
self
,
_remote_addr
:
SocketAddr
);
fn
on_disconnect
(
&
self
,
_remote_addr
:
SocketAddr
,
transport
:
TransportProtocol
);
}
impl
Logger
for
()
{
type
Instant
=
();
fn
on_connect
(
&
self
,
_
:
SocketAddr
,
_
:
&
HttpRequest
)
->
Self
::
Instant
{}
fn
on_connect
(
&
self
,
_
:
SocketAddr
,
_
:
&
HttpRequest
,
_p
:
TransportProtocol
)
->
Self
::
Instant
{}
fn
on_request
(
&
self
)
->
Self
::
Instant
{}
fn
on_request
(
&
self
,
_p
:
TransportProtocol
)
->
Self
::
Instant
{}
fn
on_call
(
&
self
,
_
:
&
str
,
_
:
Params
,
_
:
MethodKind
)
{}
fn
on_call
(
&
self
,
_
:
&
str
,
_
:
Params
,
_
:
MethodKind
,
_p
:
TransportProtocol
)
{}
fn
on_result
(
&
self
,
_
:
&
str
,
_
:
bool
,
_
:
Self
::
Instant
)
{}
fn
on_result
(
&
self
,
_
:
&
str
,
_
:
bool
,
_
:
Self
::
Instant
,
_p
:
TransportProtocol
)
{}
fn
on_response
(
&
self
,
_
:
&
str
,
_
:
Self
::
Instant
)
{}
fn
on_response
(
&
self
,
_
:
&
str
,
_
:
Self
::
Instant
,
_p
:
TransportProtocol
)
{}
fn
on_disconnect
(
&
self
,
_
:
SocketAddr
)
{}
fn
on_disconnect
(
&
self
,
_
:
SocketAddr
,
_p
:
TransportProtocol
)
{}
}
impl
<
A
,
B
>
Logger
for
(
A
,
B
)
...
...
@@ -112,32 +132,32 @@ where
{
type
Instant
=
(
A
::
Instant
,
B
::
Instant
);
fn
on_connect
(
&
self
,
remote_addr
:
std
::
net
::
SocketAddr
,
request
:
&
HttpRequest
)
{
self
.0
.on_connect
(
remote_addr
,
request
);
self
.1
.on_connect
(
remote_addr
,
request
);
fn
on_connect
(
&
self
,
remote_addr
:
std
::
net
::
SocketAddr
,
request
:
&
HttpRequest
,
transport
:
TransportProtocol
)
{
self
.0
.on_connect
(
remote_addr
,
request
,
transport
);
self
.1
.on_connect
(
remote_addr
,
request
,
transport
);
}
fn
on_request
(
&
self
)
->
Self
::
Instant
{
(
self
.0
.on_request
(),
self
.1
.on_request
())
fn
on_request
(
&
self
,
transport
:
TransportProtocol
)
->
Self
::
Instant
{
(
self
.0
.on_request
(
transport
),
self
.1
.on_request
(
transport
))
}
fn
on_call
(
&
self
,
method_name
:
&
str
,
params
:
Params
,
kind
:
MethodKind
)
{
self
.0
.on_call
(
method_name
,
params
.clone
(),
kind
);
self
.1
.on_call
(
method_name
,
params
,
kind
);
fn
on_call
(
&
self
,
method_name
:
&
str
,
params
:
Params
,
kind
:
MethodKind
,
transport
:
TransportProtocol
)
{
self
.0
.on_call
(
method_name
,
params
.clone
(),
kind
,
transport
);
self
.1
.on_call
(
method_name
,
params
,
kind
,
transport
);
}
fn
on_result
(
&
self
,
method_name
:
&
str
,
success
:
bool
,
started_at
:
Self
::
Instant
)
{
self
.0
.on_result
(
method_name
,
success
,
started_at
.0
);
self
.1
.on_result
(
method_name
,
success
,
started_at
.1
);
fn
on_result
(
&
self
,
method_name
:
&
str
,
success
:
bool
,
started_at
:
Self
::
Instant
,
transport
:
TransportProtocol
)
{
self
.0
.on_result
(
method_name
,
success
,
started_at
.0
,
transport
);
self
.1
.on_result
(
method_name
,
success
,
started_at
.1
,
transport
);
}
fn
on_response
(
&
self
,
result
:
&
str
,
started_at
:
Self
::
Instant
)
{
self
.0
.on_response
(
result
,
started_at
.0
);
self
.1
.on_response
(
result
,
started_at
.1
);
fn
on_response
(
&
self
,
result
:
&
str
,
started_at
:
Self
::
Instant
,
transport
:
TransportProtocol
)
{
self
.0
.on_response
(
result
,
started_at
.0
,
transport
);
self
.1
.on_response
(
result
,
started_at
.1
,
transport
);
}
fn
on_disconnect
(
&
self
,
remote_addr
:
SocketAddr
)
{
self
.0
.on_disconnect
(
remote_addr
);
self
.1
.on_disconnect
(
remote_addr
);
fn
on_disconnect
(
&
self
,
remote_addr
:
SocketAddr
,
transport
:
TransportProtocol
)
{
self
.0
.on_disconnect
(
remote_addr
,
transport
);
self
.1
.on_disconnect
(
remote_addr
,
transport
);
}
}
server/src/server.rs
View file @
9b1ffcd3
...
...
@@ -33,7 +33,7 @@ use std::task::{Context, Poll};
use
std
::
time
::
Duration
;
use
crate
::
future
::{
ConnectionGuard
,
FutureDriver
,
ServerHandle
,
StopHandle
};
use
crate
::
logger
::
Logger
;
use
crate
::
logger
::
{
Logger
,
TransportProtocol
}
;
use
crate
::
transport
::{
http
,
ws
};
use
futures_util
::
future
::
FutureExt
;
...
...
@@ -315,7 +315,7 @@ impl<B, L> Builder<B, L> {
/// ```
/// use std::{time::Instant, net::SocketAddr};
///
/// use jsonrpsee_server::logger::{Logger, HttpRequest, MethodKind, Params};
/// use jsonrpsee_server::logger::{Logger, HttpRequest, MethodKind, Params
, TransportProtocol
};
/// use jsonrpsee_server::ServerBuilder;
///
/// #[derive(Clone)]
...
...
@@ -324,28 +324,28 @@ impl<B, L> Builder<B, L> {
/// impl Logger for MyLogger {
/// type Instant = Instant;
///
/// fn on_connect(&self, remote_addr: SocketAddr, request: &HttpRequest) {
/// println!("[MyLogger::on_call] remote_addr: {:?}, headers: {:?}", remote_addr, request);
/// fn on_connect(&self, remote_addr: SocketAddr, request: &HttpRequest
, transport: TransportProtocol
) {
/// println!("[MyLogger::on_call] remote_addr: {:?}, headers: {:?}
, transport: {}
", remote_addr, request
, transport
);
/// }
///
/// fn on_request(&self) -> Self::Instant {
/// fn on_request(&self
, transport: TransportProtocol
) -> Self::Instant {
/// Instant::now()
/// }
///
/// fn on_call(&self, method_name: &str, params: Params, kind: MethodKind) {
/// println!("[MyLogger::on_call] method: '{}' params: {:?}, kind: {:?}", method_name, params, kind);
/// fn on_call(&self, method_name: &str, params: Params, kind: MethodKind
, transport: TransportProtocol
) {
/// println!("[MyLogger::on_call] method: '{}' params: {:?}, kind: {:?}
, transport: {}
", method_name, params, kind
, transport
);
/// }
///
/// fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant) {
/// println!("[MyLogger::on_result] '{}', worked? {}, time elapsed {:?}", method_name, success, started_at.elapsed());
/// fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant
, transport: TransportProtocol
) {
/// println!("[MyLogger::on_result] '{}', worked? {}, time elapsed {:?}
, transport: {}
", method_name, success, started_at.elapsed()
, transport
);
/// }
///
/// fn on_response(&self, result: &str, started_at: Self::Instant) {
/// println!("[MyLogger::on_response] result: {}, time elapsed {:?}", result, started_at.elapsed());
/// fn on_response(&self, result: &str, started_at: Self::Instant
, transport: TransportProtocol
) {
/// println!("[MyLogger::on_response] result: {}, time elapsed {:?}
, transport: {}
", result, started_at.elapsed()
, transport
);
/// }
///
/// fn on_disconnect(&self, remote_addr: SocketAddr) {
/// println!("[MyLogger::on_disconnect] remote_addr: {:?}", remote_addr);
/// fn on_disconnect(&self, remote_addr: SocketAddr
, transport: TransportProtocol
) {
/// println!("[MyLogger::on_disconnect] remote_addr: {:?}
, transport: {}
", remote_addr
, transport
);
/// }
/// }
///
...
...
@@ -615,7 +615,7 @@ impl<L: Logger> hyper::service::Service<hyper::Request<hyper::Body>> for TowerSe
let
response
=
match
server
.receive_request
(
&
request
)
{
Ok
(
response
)
=>
{
self
.inner.logger
.on_connect
(
self
.inner.remote_addr
,
&
request
);
self
.inner.logger
.on_connect
(
self
.inner.remote_addr
,
&
request
,
TransportProtocol
::
WebSocket
);
let
data
=
self
.inner
.clone
();
tokio
::
spawn
(
async
move
{
...
...
@@ -655,8 +655,11 @@ impl<L: Logger> hyper::service::Service<hyper::Request<hyper::Body>> for TowerSe
batch_requests_supported
:
self
.inner.batch_requests_supported
,
logger
:
self
.inner.logger
.clone
(),
conn
:
self
.inner.conn
.clone
(),
remote_addr
:
self
.inner.remote_addr
,
};
self
.inner.logger
.on_connect
(
self
.inner.remote_addr
,
&
request
,
TransportProtocol
::
Http
);
Box
::
pin
(
http
::
handle_request
(
request
,
data
)
.map
(
Ok
))
}
}
...
...
server/src/transport/http.rs
View file @
9b1ffcd3
use
std
::
convert
::
Infallible
;
use
std
::
net
::
SocketAddr
;
use
std
::
sync
::
Arc
;
use
crate
::
logger
::{
self
,
Logger
};
use
crate
::
logger
::{
self
,
Logger
,
TransportProtocol
};
use
futures_util
::
TryStreamExt
;
use
http
::
Method
;
...
...
@@ -45,9 +46,9 @@ pub(crate) async fn reject_connection(socket: tokio::net::TcpStream) {
}
#[derive(Debug)]
pub
(
crate
)
struct
ProcessValidatedRequest
<
L
:
Logger
>
{
pub
(
crate
)
struct
ProcessValidatedRequest
<
'a
,
L
:
Logger
>
{
pub
(
crate
)
request
:
hyper
::
Request
<
hyper
::
Body
>
,
pub
(
crate
)
logger
:
L
,
pub
(
crate
)
logger
:
&
'a
L
,
pub
(
crate
)
methods
:
Methods
,
pub
(
crate
)
resources
:
Resources
,
pub
(
crate
)
max_request_body_size
:
u32
,
...
...
@@ -59,7 +60,7 @@ pub(crate) struct ProcessValidatedRequest<L: Logger> {
/// Process a verified request, it implies a POST request with content type JSON.
pub
(
crate
)
async
fn
process_validated_request
<
L
:
Logger
>
(
input
:
ProcessValidatedRequest
<
L
>
,
input
:
ProcessValidatedRequest
<
'_
,
L
>
,
)
->
hyper
::
Response
<
hyper
::
Body
>
{
let
ProcessValidatedRequest
{
request
,
...
...
@@ -89,7 +90,7 @@ pub(crate) async fn process_validated_request<L: Logger>(
if
is_single
{
let
call
=
CallData
{
conn_id
:
0
,
logger
:
&
logger
,
logger
,
methods
:
&
methods
,
max_response_body_size
,
max_log_length
,
...
...
@@ -97,7 +98,7 @@ pub(crate) async fn process_validated_request<L: Logger>(
request_start
,
};
let
response
=
process_single_request
(
body
,
call
)
.await
;
logger
.on_response
(
&
response
.result
,
request_start
);
logger
.on_response
(
&
response
.result
,
request_start
,
TransportProtocol
::
Http
);
response
::
ok_response
(
response
.result
)
}
// Batch of requests or notifications
...
...
@@ -106,7 +107,7 @@ pub(crate) async fn process_validated_request<L: Logger>(
Id
::
Null
,
ErrorObject
::
borrowed
(
BATCHES_NOT_SUPPORTED_CODE
,
&
BATCHES_NOT_SUPPORTED_MSG
,
None
),
);
logger
.on_response
(
&
err
.result
,
request_start
);
logger
.on_response
(
&
err
.result
,
request_start
,
TransportProtocol
::
Http
);
response
::
ok_response
(
err
.result
)
}
// Batch of requests or notifications
...
...
@@ -115,7 +116,7 @@ pub(crate) async fn process_validated_request<L: Logger>(
data
:
body
,
call
:
CallData
{
conn_id
:
0
,
logger
:
&
logger
,
logger
,
methods
:
&
methods
,
max_response_body_size
,
max_log_length
,
...
...
@@ -124,7 +125,7 @@ pub(crate) async fn process_validated_request<L: Logger>(
},
})
.await
;
logger
.on_response
(
&
response
.result
,
request_start
);
logger
.on_response
(
&
response
.result
,
request_start
,
TransportProtocol
::
Http
);
response
::
ok_response
(
response
.result
)
}
}
...
...
@@ -223,12 +224,12 @@ pub(crate) async fn execute_call<L: Logger>(req: Request<'_>, call: CallData<'_,
let
response
=
match
methods
.method_with_name
(
name
)
{
None
=>
{
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
Unknown
);
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
Unknown
,
TransportProtocol
::
Http
);
MethodResponse
::
error
(
id
,
ErrorObject
::
from
(
ErrorCode
::
MethodNotFound
))
}
Some
((
name
,
method
))
=>
match
&
method
.inner
()
{
MethodKind
::
Sync
(
callback
)
=>
{
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
MethodCall
);
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
MethodCall
,
TransportProtocol
::
Http
);
match
method
.claim
(
name
,
resources
)
{
Ok
(
guard
)
=>
{
...
...
@@ -243,7 +244,7 @@ pub(crate) async fn execute_call<L: Logger>(req: Request<'_>, call: CallData<'_,
}
}
MethodKind
::
Async
(
callback
)
=>
{
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
MethodCall
);
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
MethodCall
,
TransportProtocol
::
Http
);
match
method
.claim
(
name
,
resources
)
{
Ok
(
guard
)
=>
{
let
id
=
id
.into_owned
();
...
...
@@ -258,7 +259,7 @@ pub(crate) async fn execute_call<L: Logger>(req: Request<'_>, call: CallData<'_,
}
}
MethodKind
::
Subscription
(
_
)
|
MethodKind
::
Unsubscription
(
_
)
=>
{
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
Unknown
);
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
Unknown
,
TransportProtocol
::
Http
);
tracing
::
error!
(
"Subscriptions not supported on HTTP"
);
MethodResponse
::
error
(
id
,
ErrorObject
::
from
(
ErrorCode
::
InternalError
))
}
...
...
@@ -266,7 +267,7 @@ pub(crate) async fn execute_call<L: Logger>(req: Request<'_>, call: CallData<'_,
};
tx_log_from_str
(
&
response
.result
,
max_log_length
);
logger
.on_result
(
name
,
response
.success
,
request_start
);
logger
.on_result
(
name
,
response
.success
,
request_start
,
TransportProtocol
::
Http
);
response
}
...
...
@@ -287,6 +288,7 @@ pub(crate) struct HandleRequest<L: Logger> {
pub
(
crate
)
batch_requests_supported
:
bool
,
pub
(
crate
)
logger
:
L
,
pub
(
crate
)
conn
:
Arc
<
OwnedSemaphorePermit
>
,
pub
(
crate
)
remote_addr
:
SocketAddr
,
}
pub
(
crate
)
async
fn
handle_request
<
L
:
Logger
>
(
...
...
@@ -302,9 +304,10 @@ pub(crate) async fn handle_request<L: Logger>(
batch_requests_supported
,
logger
,
conn
,
remote_addr
,
}
=
input
;
let
request_start
=
logger
.on_request
();
let
request_start
=
logger
.on_request
(
TransportProtocol
::
Http
);
// Only the `POST` method is allowed.
let
res
=
match
*
request
.method
()
{
...
...
@@ -317,7 +320,7 @@ pub(crate) async fn handle_request<L: Logger>(
max_response_body_size
,
max_log_length
,
batch_requests_supported
,
logger
,
logger
:
&
logger
,
request_start
,
})
.await
...
...
@@ -328,6 +331,7 @@ pub(crate) async fn handle_request<L: Logger>(
};
drop
(
conn
);
logger
.on_disconnect
(
remote_addr
,
TransportProtocol
::
Http
);
res
}
...
...
server/src/transport/ws.rs
View file @
9b1ffcd3
...
...
@@ -3,7 +3,7 @@ use std::task::{Context, Poll};
use
std
::
time
::
Duration
;
use
crate
::
future
::{
FutureDriver
,
StopHandle
};
use
crate
::
logger
::{
self
,
Logger
};
use
crate
::
logger
::{
self
,
Logger
,
TransportProtocol
};
use
crate
::
server
::{
MethodResult
,
ServiceData
};
use
futures_channel
::
mpsc
;
...
...
@@ -180,13 +180,13 @@ pub(crate) async fn execute_call<'a, L: Logger>(req: Request<'a>, call: CallData
let
response
=
match
methods
.method_with_name
(
name
)
{
None
=>
{
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
Unknown
);
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
Unknown
,
TransportProtocol
::
WebSocket
);
let
response
=
MethodResponse
::
error
(
id
,
ErrorObject
::
from
(
ErrorCode
::
MethodNotFound
));
MethodResult
::
SendAndLogger
(
response
)
}
Some
((
name
,
method
))
=>
match
&
method
.inner
()
{
MethodKind
::
Sync
(
callback
)
=>
{
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
MethodCall
);
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
MethodCall
,
TransportProtocol
::
WebSocket
);
match
method
.claim
(
name
,
resources
)
{
Ok
(
guard
)
=>
{
let
r
=
(
callback
)(
id
,
params
,
max_response_body_size
as
usize
);
...
...
@@ -201,7 +201,7 @@ pub(crate) async fn execute_call<'a, L: Logger>(req: Request<'a>, call: CallData
}
}
MethodKind
::
Async
(
callback
)
=>
{
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
MethodCall
);
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
MethodCall
,
TransportProtocol
::
WebSocket
);
match
method
.claim
(
name
,
resources
)
{
Ok
(
guard
)
=>
{
let
id
=
id
.into_owned
();
...
...
@@ -219,7 +219,7 @@ pub(crate) async fn execute_call<'a, L: Logger>(req: Request<'a>, call: CallData
}
}
MethodKind
::
Subscription
(
callback
)
=>
{
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
Subscription
);
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
Subscription
,
TransportProtocol
::
WebSocket
);
match
method
.claim
(
name
,
resources
)
{
Ok
(
guard
)
=>
{
if
let
Some
(
cn
)
=
bounded_subscriptions
.acquire
()
{
...
...
@@ -240,7 +240,7 @@ pub(crate) async fn execute_call<'a, L: Logger>(req: Request<'a>, call: CallData
}
}
MethodKind
::
Unsubscription
(
callback
)
=>
{
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
Unsubscription
);
logger
.on_call
(
name
,
params
.clone
(),
logger
::
MethodKind
::
Unsubscription
,
TransportProtocol
::
WebSocket
);
// Don't adhere to any resource or subscription limits; always let unsubscribing happen!
let
result
=
callback
(
id
,
params
,
conn_id
,
max_response_body_size
as
usize
);
...
...
@@ -252,7 +252,7 @@ pub(crate) async fn execute_call<'a, L: Logger>(req: Request<'a>, call: CallData
let
r
=
response
.as_inner
();
tx_log_from_str
(
&
r
.result
,
max_log_length
);
logger
.on_result
(
name
,
r
.success
,
request_start
);
logger
.on_result
(
name
,
r
.success
,
request_start
,
TransportProtocol
::
WebSocket
);
response
}
...
...
@@ -341,7 +341,7 @@ pub(crate) async fn background_task<L: Logger>(
};
};
let
request_start
=
logger
.on_request
();
let
request_start
=
logger
.on_request
(
TransportProtocol
::
WebSocket
);
let
first_non_whitespace
=
data
.iter
()
.find
(|
byte
|
!
byte
.is_ascii_whitespace
());
match
first_non_whitespace
{
...
...
@@ -369,10 +369,10 @@ pub(crate) async fn background_task<L: Logger>(
match
process_single_request
(
data
,
call
)
.await
{
MethodResult
::
JustLogger
(
r
)
=>
{
logger
.on_response
(
&
r
.result
,
request_start
);
logger
.on_response
(
&
r
.result
,
request_start
,
TransportProtocol
::
WebSocket
);
}
MethodResult
::
SendAndLogger
(
r
)
=>
{
logger
.on_response
(
&
r
.result
,
request_start
);
logger
.on_response
(
&
r
.result
,
request_start
,
TransportProtocol
::
WebSocket
);
let
_
=
sink
.send_raw
(
r
.result
);
}
};
...
...
@@ -386,7 +386,7 @@ pub(crate) async fn background_task<L: Logger>(
Id
::
Null
,
ErrorObject
::
borrowed
(
BATCHES_NOT_SUPPORTED_CODE
,
&
BATCHES_NOT_SUPPORTED_MSG
,
None
),
);
logger
.on_response
(
&
response
.result
,
request_start
);
logger
.on_response
(
&
response
.result
,
request_start
,
TransportProtocol
::
WebSocket
);
let
_
=
sink
.send_raw
(
response
.result
);
}
Some
(
b'['
)
=>
{
...
...
@@ -417,7 +417,7 @@ pub(crate) async fn background_task<L: Logger>(
.await
;
tx_log_from_str
(
&
response
.result
,
max_log_length
);
logger
.on_response
(
&
response
.result
,
request_start
);
logger
.on_response
(
&
response
.result
,
request_start
,
TransportProtocol
::
WebSocket
);
let
_
=
sink
.send_raw
(
response
.result
);
};
...
...
@@ -429,7 +429,7 @@ pub(crate) async fn background_task<L: Logger>(
}
};
logger
.on_disconnect
(
remote_addr
);
logger
.on_disconnect
(
remote_addr
,
TransportProtocol
::
WebSocket
);
// Drive all running methods to completion.
// **NOTE** Do not return early in this function. This `await` needs to run to guarantee
...
...
tests/tests/metrics.rs
View file @
9b1ffcd3
...
...
@@ -36,7 +36,7 @@ use jsonrpsee::core::{client::ClientT, Error};
use
jsonrpsee
::
http_client
::
HttpClientBuilder
;
use
jsonrpsee
::
proc_macros
::
rpc
;
use
jsonrpsee
::
rpc_params
;
use
jsonrpsee
::
server
::
logger
::{
HttpRequest
,
Logger
,
MethodKind
};
use
jsonrpsee
::
server
::
logger
::{
HttpRequest
,
Logger
,
MethodKind
,
TransportProtocol
};