Unverified Commit bd0e6871 authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

http-server: Improve API builder for tower service


Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent 45a81059
......@@ -31,6 +31,7 @@ use std::convert::Infallible;
use std::iter::once;
use std::net::SocketAddr;
use std::time::{Duration, Instant};
use hyper::server::conn::AddrStream;
use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
use tower_http::trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer};
use tower_http::LatencyUnit;
......@@ -90,13 +91,15 @@ async fn main() -> anyhow::Result<()> {
async fn run_server() -> anyhow::Result<SocketAddr> {
let addr = SocketAddr::from(([127, 0, 0, 1], 9935));
let make_service = make_service_fn(move |_| {
let make_service = make_service_fn(move |conn: &AddrStream| {
let remote_addr = conn.remote_addr();
async move {
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo")).unwrap();
println!("[run_server]: Creating RPC service");
let rpc_svc = HttpServerBuilder::new().set_middleware(Timings).to_service(module).unwrap();
let rpc_svc = HttpServerBuilder::new().set_middleware(Timings).to_service(module, remote_addr).unwrap();
println!("[run_server]: Tower builder");
let tower_svc = tower::ServiceBuilder::new()
......
......@@ -96,7 +96,7 @@ impl Builder {
}
}
impl<M> Builder<M> {
impl<M: Clone> Builder<M> {
/// Add a middleware to the builder [`Middleware`](../jsonrpsee_core/middleware/trait.Middleware.html).
///
/// ```
......@@ -251,7 +251,7 @@ impl<M> Builder<M> {
) -> Result<Server<M>, Error> {
Ok(Server {
access_control: self.access_control,
listener: Some(listener),
listener,
local_addr: Some(local_addr),
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
......@@ -295,7 +295,7 @@ impl<M> Builder<M> {
let listener = hyper::Server::from_tcp(listener)?;
Ok(Server {
listener: Some(listener),
listener,
local_addr,
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
......@@ -331,7 +331,7 @@ impl<M> Builder<M> {
let listener = hyper::Server::from_tcp(listener)?.tcp_nodelay(true);
Ok(Server {
listener: Some(listener),
listener,
local_addr,
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
......@@ -346,17 +346,17 @@ impl<M> Builder<M> {
}
/// Returns a service that can be utilised with `tower` compatible crates.
pub fn to_service(self, methods: impl Into<Methods>) -> Result<TowerService<M>, Error> {
pub fn to_service(self, methods: impl Into<Methods>, addr: SocketAddr) -> Result<TowerService<M>, Error> {
let methods = methods.into().initialize_resources(&self.resources)?;
Ok(TowerService {
inner: TowerServiceData {
remote_addr: None,
remote_addr: addr,
methods,
acl: self.access_control,
resources: self.resources,
middleware: self.middleware,
health_api: self.health_api,
acl: self.access_control.clone(),
resources: self.resources.clone(),
middleware: self.middleware.clone(),
health_api: self.health_api.clone(),
max_request_body_size: self.max_response_body_size,
max_response_body_size: self.max_response_body_size,
max_log_length: self.max_log_length,
......@@ -407,7 +407,7 @@ impl Future for ServerHandle {
#[derive(Debug, Clone)]
struct TowerServiceData<M> {
/// Remote server address.
remote_addr: Option<SocketAddr>,
remote_addr: SocketAddr,
/// Registered server methods.
methods: Methods,
/// Access control.
......@@ -449,12 +449,6 @@ impl<M: Middleware> TowerServiceData<M> {
batch_requests_supported,
} = self;
let remote_addr = match remote_addr {
Some(addr) => addr,
// Default RPC port
None => SocketAddr::from(([127, 0, 0, 1], 9944)),
};
let request_start = middleware.on_request(remote_addr, request.headers());
let keys = request.headers().keys().map(|k| k.as_str());
......@@ -579,7 +573,7 @@ impl<M: Middleware> hyper::service::Service<hyper::Request<hyper::Body>> for Tow
#[derive(Debug)]
pub struct Server<M = ()> {
/// Hyper server.
listener: Option<HyperBuilder<AddrIncoming>>,
listener: HyperBuilder<AddrIncoming>,
/// Local address
local_addr: Option<SocketAddr>,
/// Max request body size.
......@@ -625,7 +619,7 @@ impl<M: Middleware> Server<M> {
let make_service = make_service_fn(move |conn: &AddrStream| {
let service = TowerService {
inner: TowerServiceData {
remote_addr: Some(conn.remote_addr()),
remote_addr: conn.remote_addr(),
methods: methods.clone(),
acl: acl.clone(),
resources: resources.clone(),
......@@ -649,8 +643,7 @@ impl<M: Middleware> Server<M> {
};
let handle = rt.spawn(async move {
// TODO: Hande unwrap.
let server = listener.unwrap().serve(make_service);
let server = listener.serve(make_service);
let _ = server.with_graceful_shutdown(async move { rx.next().await.map_or((), |_| ()) }).await;
});
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment