Unverified Commit d64d1af4 authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

examples: Adjust tower_http example



Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent 602d4818
Pipeline #206958 passed with stages
in 4 minutes and 59 seconds
...@@ -25,10 +25,7 @@ ...@@ -25,10 +25,7 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use hyper::body::Bytes; use hyper::body::Bytes;
use hyper::server::conn::AddrStream; use hyper::Body;
use hyper::service::make_service_fn;
use hyper::{Body, Server};
use std::convert::Infallible;
use std::iter::once; use std::iter::once;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
...@@ -39,7 +36,7 @@ use tower_http::LatencyUnit; ...@@ -39,7 +36,7 @@ use tower_http::LatencyUnit;
use jsonrpsee::core::client::ClientT; use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::logger::{self, Params, Request}; use jsonrpsee::core::logger::{self, Params, Request};
use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::http_server::{HttpServerBuilder, RpcModule}; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle, RpcModule};
/// Define a custom logging mechanism to detect the time passed /// Define a custom logging mechanism to detect the time passed
/// between receiving the request and proving the response. /// between receiving the request and proving the response.
...@@ -76,7 +73,7 @@ async fn main() -> anyhow::Result<()> { ...@@ -76,7 +73,7 @@ async fn main() -> anyhow::Result<()> {
.try_init() .try_init()
.expect("setting default subscriber failed"); .expect("setting default subscriber failed");
let addr = run_server().await?; let (addr, _handler) = run_server().await?;
let url = format!("http://{}", addr); let url = format!("http://{}", addr);
let client = HttpClientBuilder::default().build(&url)?; let client = HttpClientBuilder::default().build(&url)?;
...@@ -88,43 +85,29 @@ async fn main() -> anyhow::Result<()> { ...@@ -88,43 +85,29 @@ async fn main() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
async fn run_server() -> anyhow::Result<SocketAddr> { async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> {
// Construct a custom service for handling the RPC requests. // Custom tower service to handle the RPC requests
let make_service = make_service_fn(move |conn: &AddrStream| { let builder = tower::ServiceBuilder::new()
let remote_addr = conn.remote_addr(); // Add high level tracing/logging to all requests
async move { .layer(
let mut module = RpcModule::new(()); TraceLayer::new_for_http()
module.register_method("say_hello", |_, _| Ok("lo")).unwrap(); .on_body_chunk(|chunk: &Bytes, latency: Duration, _: &tracing::Span| {
tracing::trace!(size_bytes = chunk.len(), latency = ?latency, "sending body chunk")
// Obtain the tower service relying on the RPC implementation. })
// NOTE: RPC's logger can be chained with tower. .make_span_with(DefaultMakeSpan::new().include_headers(true))
let rpc_svc = HttpServerBuilder::new().set_logger(Timings).to_service(module, remote_addr).unwrap(); .on_response(DefaultOnResponse::new().include_headers(true).latency_unit(LatencyUnit::Micros)),
)
// Chain multiple tower compatible layers on top of the RPC's service. // Mark the `Authorization` request header as sensitive so it doesn't show in logs
let tower_svc = tower::ServiceBuilder::new() .layer(SetSensitiveRequestHeadersLayer::new(once(hyper::header::AUTHORIZATION)))
// Add high level tracing/logging to all requests .timeout(Duration::from_secs(2));
.layer(
TraceLayer::new_for_http() let server = HttpServerBuilder::new().set_logger(Timings).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;
.on_body_chunk(|chunk: &Bytes, latency: Duration, _: &tracing::Span| { let addr = server.local_addr()?;
tracing::trace!(size_bytes = chunk.len(), latency = ?latency, "sending body chunk")
}) let mut module = RpcModule::new(());
.make_span_with(DefaultMakeSpan::new().include_headers(true)) module.register_method("say_hello", |_, _| Ok("lo")).unwrap();
.on_response(DefaultOnResponse::new().include_headers(true).latency_unit(LatencyUnit::Micros)),
) let handler = server.start_with_builder(module, builder)?;
// Mark the `Authorization` request header as sensitive so it doesn't show in logs
.layer(SetSensitiveRequestHeadersLayer::new(once(hyper::header::AUTHORIZATION))) Ok((addr, handler))
.timeout(Duration::from_secs(2))
.service(rpc_svc);
Ok::<_, Infallible>(tower_svc)
}
});
let addr = SocketAddr::from(([127, 0, 0, 1], 9935));
tokio::spawn(async move { Server::bind(&addr).serve(make_service).await });
// Race with server start / client connect present in all examples
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(addr)
} }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment