Unverified Commit 1657e26b authored by Maciej Hirsz's avatar Maciej Hirsz Committed by GitHub
Browse files

Middleware for metrics (#576)



* Squashed MethodSink

* Middleware WIP

* Passing all the information through

* Unnecessary `false`

* Apply suggestions from code review

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>

* Add a setter for middleware (#577)

* Fix try-build tests

* Add a middleware setter and an example

* Actually add the example

* Grumbles

* Use an atomic

* Set middleware with a constructor instead

* Resolve a todo

* Update ws-server/src/server.rs

Co-authored-by: default avatarMaciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

* Update ws-server/src/server.rs

Co-authored-by: default avatarMaciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

* Update ws-server/src/server.rs

Co-authored-by: default avatarMaciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

Co-authored-by: default avatarMaciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

* Middleware::on_response for batches

* Middleware in HTTP

* fmt

* Server builder for HTTP

* Use actual time in the example

* HTTP example

* Middleware to capture method not found calls

* An example of adding multiple middlewares. (#581)

* Add an example of adding multiple middlewares.

* Update examples/multi-middleware.rs

Co-authored-by: default avatarMaciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

* Update examples/Cargo.toml

Co-authored-by: default avatarMaciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

Co-authored-by: default avatarMaciej Hirsz <1096222+maciejhirsz@users.noreply.github.com>

* Move `Middleware` to jsonrpsee-types (#582)

* Move `Middleware` to jsonrpsee-types

* Move Middleware trait to jsonrpsee-types

* Add some docs.

* Link middleware to `with_middleware` methods in docs

* Doctests

* Doc comment fixed

* Clean up a TODO

* Switch back to `set_middleware`

* fmt

* Tests

* Add `on_connect` and `on_disconnect`

* Add note to future selves

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>
parent 3c3f3ac9
......@@ -13,11 +13,24 @@ jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
tracing = "0.1"
tracing-subscriber = "0.2"
tokio = { version = "1", features = ["full"] }
palaver = "0.2"
[[example]]
name = "http"
path = "http.rs"
[[example]]
name = "middleware_ws"
path = "middleware_ws.rs"
[[example]]
name = "middleware_http"
path = "middleware_http.rs"
[[example]]
name = "multi_middleware"
path = "multi_middleware.rs"
[[example]]
name = "ws"
path = "ws.rs"
......
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use jsonrpsee::{
http_client::HttpClientBuilder,
http_server::{HttpServerBuilder, HttpServerHandle, RpcModule},
types::{middleware, traits::Client},
};
use std::net::SocketAddr;
use std::time::Instant;
#[derive(Clone)]
struct Timings;
impl middleware::Middleware for Timings {
type Instant = Instant;
fn on_request(&self) -> Self::Instant {
Instant::now()
}
fn on_call(&self, name: &str) {
println!("[Middleware::on_call] '{}'", name);
}
fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) {
println!("[Middleware::on_result] '{}', worked? {}, time elapsed {:?}", name, succeess, started_at.elapsed());
}
fn on_response(&self, started_at: Self::Instant) {
println!("[Middleware::on_response] time elapsed {:?}", started_at.elapsed());
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");
let (addr, _handle) = run_server().await?;
let url = format!("http://{}", addr);
let client = HttpClientBuilder::default().build(&url)?;
let response: String = client.request("say_hello", None).await?;
println!("response: {:?}", response);
let _response: Result<String, _> = client.request("unknown_method", None).await;
let _ = client.request::<String>("say_hello", None).await?;
Ok(())
}
async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> {
let server = HttpServerBuilder::new().set_middleware(Timings).build("127.0.0.1:0")?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
let addr = server.local_addr()?;
let server_handle = server.start(module)?;
Ok((addr, server_handle))
}
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use jsonrpsee::{
types::{middleware, traits::Client},
ws_client::WsClientBuilder,
ws_server::{RpcModule, WsServerBuilder},
};
use std::net::SocketAddr;
use std::time::Instant;
#[derive(Clone)]
struct Timings;
impl middleware::Middleware for Timings {
type Instant = Instant;
fn on_request(&self) -> Self::Instant {
Instant::now()
}
fn on_call(&self, name: &str) {
println!("[Middleware::on_call] '{}'", name);
}
fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) {
println!("[Middleware::on_result] '{}', worked? {}, time elapsed {:?}", name, succeess, started_at.elapsed());
}
fn on_response(&self, started_at: Self::Instant) {
println!("[Middleware::on_response] time elapsed {:?}", started_at.elapsed());
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");
let addr = run_server().await?;
let url = format!("ws://{}", addr);
let client = WsClientBuilder::default().build(&url).await?;
let response: String = client.request("say_hello", None).await?;
println!("response: {:?}", response);
let _response: Result<String, _> = client.request("unknown_method", None).await;
let _ = client.request::<String>("say_hello", None).await?;
Ok(())
}
async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::new().set_middleware(Timings).build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
let addr = server.local_addr()?;
server.start(module)?;
Ok(addr)
}
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Example showing how to add multiple middlewares to the same server.
use jsonrpsee::{
rpc_params,
types::{middleware, traits::Client},
ws_client::WsClientBuilder,
ws_server::{RpcModule, WsServerBuilder},
};
use std::net::SocketAddr;
use std::time::Instant;
/// Example middleware to measure call execution time.
#[derive(Clone)]
struct Timings;
impl middleware::Middleware for Timings {
type Instant = Instant;
fn on_request(&self) -> Self::Instant {
Instant::now()
}
fn on_call(&self, name: &str) {
println!("[Timings] They called '{}'", name);
}
fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) {
println!("[Timings] call={}, worked? {}, duration {:?}", name, succeess, started_at.elapsed());
}
fn on_response(&self, started_at: Self::Instant) {
println!("[Timings] Response duration {:?}", started_at.elapsed());
}
}
/// Example middleware to keep a watch on the number of total threads started in the system.
#[derive(Clone)]
struct ThreadWatcher;
impl middleware::Middleware for ThreadWatcher {
type Instant = isize;
fn on_request(&self) -> Self::Instant {
let threads = palaver::process::count_threads();
println!("[ThreadWatcher] Threads running on the machine at the start of a call: {}", threads);
threads as isize
}
fn on_response(&self, started_at: Self::Instant) {
let current_nr_threads = palaver::process::count_threads() as isize;
println!("[ThreadWatcher] Request started {} threads", current_nr_threads - started_at);
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");
let addr = run_server().await?;
let url = format!("ws://{}", addr);
let client = WsClientBuilder::default().build(&url).await?;
let response: String = client.request("say_hello", None).await?;
println!("response: {:?}", response);
let _response: Result<String, _> = client.request("unknown_method", None).await;
let _ = client.request::<String>("say_hello", None).await?;
let _ = client.request::<()>("thready", rpc_params![4]).await?;
Ok(())
}
async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::new().set_middleware((Timings, ThreadWatcher)).build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
module.register_method("thready", |params, _| {
let thread_count: usize = params.one().unwrap();
for _ in 0..thread_count {
std::thread::spawn(|| std::thread::sleep(std::time::Duration::from_secs(1)));
}
Ok(())
})?;
let addr = server.local_addr()?;
server.start(module)?;
Ok(addr)
}
......@@ -34,14 +34,15 @@ use hyper::{
};
use jsonrpsee_types::{
error::{Error, GenericTransportError},
middleware::Middleware,
v2::{ErrorCode, Id, Notification, Request},
TEN_MB_SIZE_BYTES,
};
use jsonrpsee_utils::http_helpers::read_body;
use jsonrpsee_utils::server::{
helpers::{collect_batch_response, prepare_error, send_error},
helpers::{collect_batch_response, prepare_error, MethodSink},
resource_limiting::Resources,
rpc_module::Methods,
rpc_module::{MethodResult, Methods},
};
use serde_json::value::RawValue;
......@@ -56,16 +57,72 @@ use std::{
/// Builder to create JSON-RPC HTTP server.
#[derive(Debug)]
pub struct Builder {
pub struct Builder<M = ()> {
access_control: AccessControl,
resources: Resources,
max_request_body_size: u32,
keep_alive: bool,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
middleware: M,
}
impl Default for Builder {
fn default() -> Self {
Self {
max_request_body_size: TEN_MB_SIZE_BYTES,
resources: Resources::default(),
access_control: AccessControl::default(),
keep_alive: true,
tokio_runtime: None,
middleware: (),
}
}
}
impl Builder {
/// Create a default server builder.
pub fn new() -> Self {
Self::default()
}
}
impl<M> Builder<M> {
/// Add a middleware to the builder [`Middleware`](../jsonrpsee_types/middleware/trait.Middleware.html).
///
/// ```
/// use jsonrpsee_types::middleware::Middleware;
/// use jsonrpsee_http_server::HttpServerBuilder;
/// use std::time::Instant;
///
/// #[derive(Clone)]
/// struct MyMiddleware;
///
/// impl Middleware for MyMiddleware {
/// type Instant = Instant;
///
/// fn on_request(&self) -> Instant {
/// Instant::now()
/// }
///
/// fn on_result(&self, name: &str, success: bool, started_at: Instant) {
/// println!("Call to '{}' took {:?}", name, started_at.elapsed());
/// }
/// }
///
/// let builder = HttpServerBuilder::new().set_middleware(MyMiddleware);
/// ```
pub fn set_middleware<T: Middleware>(self, middleware: T) -> Builder<T> {
Builder {
max_request_body_size: self.max_request_body_size,
resources: self.resources,
access_control: self.access_control,
keep_alive: self.keep_alive,
tokio_runtime: self.tokio_runtime,
middleware,
}
}
/// Sets the maximum size of a request body in bytes (default is 10 MiB).
pub fn max_request_body_size(mut self, size: u32) -> Self {
self.max_request_body_size = size;
......@@ -120,7 +177,7 @@ impl Builder {
/// assert!(jsonrpsee_http_server::HttpServerBuilder::default().build(addrs).is_ok());
/// }
/// ```
pub fn build(self, addrs: impl ToSocketAddrs) -> Result<Server, Error> {
pub fn build(self, addrs: impl ToSocketAddrs) -> Result<Server<M>, Error> {
let mut err: Option<Error> = None;
for addr in addrs.to_socket_addrs()? {
......@@ -139,6 +196,7 @@ impl Builder {
max_request_body_size: self.max_request_body_size,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
middleware: self.middleware,
});
}
......@@ -167,18 +225,6 @@ impl Builder {
}
}
impl Default for Builder {
fn default() -> Self {
Self {
max_request_body_size: TEN_MB_SIZE_BYTES,
resources: Resources::default(),
access_control: AccessControl::default(),
keep_alive: true,
tokio_runtime: None,
}
}
}
/// Handle used to run or stop the server.
#[derive(Debug)]
pub struct ServerHandle {
......@@ -212,7 +258,7 @@ impl Future for ServerHandle {
/// An HTTP JSON RPC server.
#[derive(Debug)]
pub struct Server {
pub struct Server<M = ()> {
/// Hyper server.
listener: HyperBuilder<AddrIncoming>,
/// Local address
......@@ -225,9 +271,10 @@ pub struct Server {
resources: Resources,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
middleware: M,
}
impl Server {
impl<M: Middleware> Server<M> {
/// Returns socket address to which the server is bound.
pub fn local_addr(&self) -> Result<SocketAddr, Error> {
self.local_addr.ok_or_else(|| Error::Custom("Local address not found".into()))
......@@ -240,18 +287,21 @@ impl Server {
let (tx, mut rx) = mpsc::channel(1);
let listener = self.listener;
let resources = self.resources;
let middleware = self.middleware;
let methods = methods.into().initialize_resources(&resources)?;
let make_service = make_service_fn(move |_| {
let methods = methods.clone();
let access_control = access_control.clone();
let resources = resources.clone();
let middleware = middleware.clone();
async move {
Ok::<_, HyperError>(service_fn(move |request| {
let methods = methods.clone();
let access_control = access_control.clone();
let resources = resources.clone();
let middleware = middleware.clone();
// Run some validation on the http request, then read the body and try to deserialize it into one of
// two cases: a single RPC request or a batch of RPC requests.
......@@ -276,32 +326,60 @@ impl Server {
}
};
let request_start = middleware.on_request();
// NOTE(niklasad1): it's a channel because it's needed for batch requests.
let (tx, mut rx) = mpsc::unbounded::<String>();
let sink = MethodSink::new_with_limit(tx, max_request_body_size);
type Notif<'a> = Notification<'a, Option<&'a RawValue>>;
// Single request or notification
if is_single {
if let Ok(req) = serde_json::from_slice::<Request>(&body) {
middleware.on_call(req.method.as_ref());
// NOTE: we don't need to track connection id on HTTP, so using hardcoded 0 here.
if let Some(fut) =
methods.execute_with_resources(&tx, req, 0, &resources, max_request_body_size)
{
fut.await;
match methods.execute_with_resources(&sink, req, 0, &resources) {
Ok((name, MethodResult::Sync(success))) => {
middleware.on_result(name, success, request_start);
}
Ok((name, MethodResult::Async(fut))) => {
let success = fut.await;
middleware.on_result(name, success, request_start);
}
Err(name) => {
middleware.on_result(name.as_ref(), false, request_start);
}
}
} else if let Ok(_req) = serde_json::from_slice::<Notif>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
} else {
let (id, code) = prepare_error(&body);
send_error(id, &tx, code.into());
sink.send_error(id, code.into());
}
// Batch of requests or notifications
} else if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&body) {
if !batch.is_empty() {
join_all(batch.into_iter().filter_map(|req| {
methods.execute_with_resources(&tx, req, 0, &resources, max_request_body_size)
let middleware = &middleware;
join_all(batch.into_iter().filter_map(move |req| {
match methods.execute_with_resources(&sink, req, 0, &resources) {
Ok((name, MethodResult::Sync(success))) => {
middleware.on_result(name, success, request_start);
None
}
Ok((name, MethodResult::Async(fut))) => Some(async move {
let success = fut.await;
middleware.on_result(name, success, request_start);
}),
Err(name) => {
middleware.on_result(name.as_ref(), false, request_start);
None
}
}
}))
.await;
} else {
......@@ -309,7 +387,7 @@ impl Server {
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
is_single = true;
send_error(Id::Null, &tx, ErrorCode::InvalidRequest.into());
sink.send_error(Id::Null, ErrorCode::InvalidRequest.into());
}
} else if let Ok(_batch) = serde_json::from_slice::<Vec<Notif>>(&body) {
return Ok::<_, HyperError>(response::ok_response("".into()));
......@@ -319,7 +397,7 @@ impl Server {
// Response object." – The Spec.
is_single = true;
let (id, code) = prepare_error(&body);
send_error(id, &tx, code.into());
sink.send_error(id, code.into());
}
// Closes the receiving half of a channel without dropping it. This prevents any further
......@@ -331,6 +409,7 @@ impl Server {
collect_batch_response(rx).await
};
tracing::debug!("[service_fn] sending back: {:?}", &response[..cmp::min(response.len(), 1024)]);
middleware.on_response(request_start);
Ok::<_, HyperError>(response::ok_response(response))
}
}))
......
......@@ -76,6 +76,9 @@ pub use jsonrpsee_types as types;
#[cfg(any(feature = "http-server", feature = "ws-server"))]
pub use jsonrpsee_utils::server::rpc_module::{RpcModule, SubscriptionSink};
#[cfg(any(feature = "http-server", feature = "ws-server"))]
pub use jsonrpsee_utils as utils;
#[cfg(feature = "http-server")]
pub use http_server::tracing;
......
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION