Unverified Commit 80bcef02 authored by Alexandru Vasile's avatar Alexandru Vasile Committed by GitHub
Browse files

HTTP server with `tower` interaction (#831)



* http: Add inner server data structure

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http: Handle RPC messages

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http: Implement equivalent of `service_fn`

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http: Implement equivalent of `make_service_fn`

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http: Expose `tower` compatible service

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http: Prebuild http server with optional listener

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* examples: WIP tower service

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http: Fix warnings

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* tower_http: Fix warnings

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http: Ensure service works with tower

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http: Remove `RPSeeServerMakeSvc` to allow further flexibility

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* tower_http: Fix warnings

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* tower_http: Resubmit the same request for testing

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http: Transform builder into service directly

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http: Rename `RPSeeServerSvc` into user friendly `TowerService`

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http: Rely on internal TowerService to handle requests

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Fix middleware typo

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http-server: Improve API builder for tower service

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Rename the inner service data and check comments

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* examples: Add comments

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http-server: Receive tower service builder as param

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* examples: Adjust tower_http example

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http-server: Add tower middleware on the HttpBuilder

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http-server: Do not expose the internal `TowerService` for now

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Update http-server/src/server.rs

Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>

* http-server: Use `std::error::Error`

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Fix fmt

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* address some grumbles

* fix more grumbles: no more Infallible

* make clippy happy

* Rename tower http example

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Update http-server/src/server.rs

* Update http-server/src/server.rs

* Update http-server/src/server.rs

* Update http-server/src/server.rs

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>
parent 4c6207a1
Pipeline #207819 passed with stages
in 5 minutes and 29 seconds
......@@ -15,3 +15,6 @@ tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
tokio = { version = "1.16", features = ["full"] }
tokio-stream = { version = "0.1", features = ["sync"] }
serde_json = { version = "1" }
tower-http = { version = "0.3.4", features = ["full"] }
tower = { version = "0.4.13", features = ["full"] }
hyper = "0.14.20"
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! This example sets a custom tower service middleware to the RPC implementation.
use hyper::body::Bytes;
use std::iter::once;
use std::net::SocketAddr;
use std::time::Duration;
use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
use tower_http::trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer};
use tower_http::LatencyUnit;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle, RpcModule};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");
let (addr, _handler) = run_server().await?;
let url = format!("http://{}", addr);
let client = HttpClientBuilder::default().build(&url)?;
let response: String = client.request("say_hello", None).await?;
println!("[main]: response: {:?}", response);
let _response: Result<String, _> = client.request("unknown_method", None).await;
let _ = client.request::<String>("say_hello", None).await?;
Ok(())
}
async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> {
// Custom tower service to handle the RPC requests
let service_builder = tower::ServiceBuilder::new()
// Add high level tracing/logging to all requests
.layer(
TraceLayer::new_for_http()
.on_body_chunk(|chunk: &Bytes, latency: Duration, _: &tracing::Span| {
tracing::trace!(size_bytes = chunk.len(), latency = ?latency, "sending body chunk")
})
.make_span_with(DefaultMakeSpan::new().include_headers(true))
.on_response(DefaultOnResponse::new().include_headers(true).latency_unit(LatencyUnit::Micros)),
)
// Mark the `Authorization` request header as sensitive so it doesn't show in logs
.layer(SetSensitiveRequestHeadersLayer::new(once(hyper::header::AUTHORIZATION)))
.timeout(Duration::from_secs(2));
let server =
HttpServerBuilder::new().set_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;
let addr = server.local_addr()?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo")).unwrap();
let handler = server.start(module)?;
Ok((addr, handler))
}
......@@ -56,8 +56,8 @@ impl logger::WsLogger for Timings {
println!("[Timings:on_call] method: '{}', params: {:?}, kind: {}", name, params, kind);
}
fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) {
println!("[Timings] call={}, worked? {}, duration {:?}", name, succeess, started_at.elapsed());
fn on_result(&self, name: &str, success: bool, started_at: Self::Instant) {
println!("[Timings] call={}, worked? {}, duration {:?}", name, success, started_at.elapsed());
}
fn on_response(&self, _result: &str, started_at: Self::Instant) {
......
......@@ -20,6 +20,7 @@ tracing-futures = "0.2.5"
serde_json = { version = "1.0", features = ["raw_value"] }
serde = "1"
tokio = { version = "1.16", features = ["rt-multi-thread", "macros"] }
tower = "0.4.13"
[dev-dependencies]
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
......
......@@ -34,11 +34,12 @@ use crate::response::{internal_error, malformed};
use futures_channel::mpsc;
use futures_util::future::FutureExt;
use futures_util::stream::{StreamExt, TryStreamExt};
use hyper::body::HttpBody;
use hyper::header::{HeaderMap, HeaderValue};
use hyper::server::conn::AddrStream;
use hyper::server::{conn::AddrIncoming, Builder as HyperBuilder};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Error as HyperError, Method};
use hyper::service::{make_service_fn, Service};
use hyper::{Body, Error as HyperError, Method};
use jsonrpsee_core::error::{Error, GenericTransportError};
use jsonrpsee_core::http_helpers::{self, read_body};
use jsonrpsee_core::logger::{self, HttpLogger as Logger};
......@@ -52,14 +53,17 @@ use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use jsonrpsee_types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use jsonrpsee_types::{Id, Notification, Params, Request};
use serde_json::value::RawValue;
use std::error::Error as StdError;
use tokio::net::{TcpListener, ToSocketAddrs};
use tower::layer::util::Identity;
use tower::Layer;
use tracing_futures::Instrument;
type Notif<'a> = Notification<'a, Option<&'a RawValue>>;
/// Builder to create JSON-RPC HTTP server.
#[derive(Debug)]
pub struct Builder<L = ()> {
pub struct Builder<B = Identity, L = ()> {
/// Access control based on HTTP headers.
access_control: AccessControl,
resources: Resources,
......@@ -71,6 +75,7 @@ pub struct Builder<L = ()> {
logger: L,
max_log_length: u32,
health_api: Option<HealthApi>,
service_builder: tower::ServiceBuilder<B>,
}
impl Default for Builder {
......@@ -85,6 +90,7 @@ impl Default for Builder {
logger: (),
max_log_length: 4096,
health_api: None,
service_builder: tower::ServiceBuilder::new(),
}
}
}
......@@ -96,9 +102,11 @@ impl Builder {
}
}
impl<L> Builder<L> {
impl<B, L> Builder<B, L> {
/// Add a logger to the builder [`Logger`](../jsonrpsee_core/logger/trait.Logger.html).
///
/// # Examples
///
/// ```
/// use std::{time::Instant, net::SocketAddr};
/// use hyper::Request;
......@@ -138,7 +146,7 @@ impl<L> Builder<L> {
///
/// let builder = HttpServerBuilder::new().set_logger(MyLogger);
/// ```
pub fn set_logger<T: Logger>(self, logger: T) -> Builder<T> {
pub fn set_logger<T: Logger>(self, logger: T) -> Builder<B, T> {
Builder {
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
......@@ -149,6 +157,7 @@ impl<L> Builder<L> {
logger,
max_log_length: self.max_log_length,
health_api: self.health_api,
service_builder: self.service_builder,
}
}
......@@ -213,8 +222,49 @@ impl<L> Builder<L> {
Ok(self)
}
/// Configure a custom [`tower::ServiceBuilder`] middleware for composing layers to be applied to the RPC service.
///
/// Default: No tower layers are applied to the RPC service.
///
/// # Examples
///
/// ```rust
///
/// use std::time::Duration;
/// use std::net::SocketAddr;
/// use jsonrpsee_http_server::HttpServerBuilder;
///
/// #[tokio::main]
/// async fn main() {
/// let builder = tower::ServiceBuilder::new()
/// .timeout(Duration::from_secs(2));
///
/// let server = HttpServerBuilder::new()
/// .set_middleware(builder)
/// .build("127.0.0.1:0".parse::<SocketAddr>().unwrap())
/// .await
/// .unwrap();
/// }
/// ```
pub fn set_middleware<T>(self, service_builder: tower::ServiceBuilder<T>) -> Builder<T, L> {
Builder {
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
max_response_body_size: self.max_response_body_size,
batch_requests_supported: self.batch_requests_supported,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
logger: self.logger,
max_log_length: self.max_log_length,
health_api: self.health_api,
service_builder,
}
}
/// Finalizes the configuration of the server with customized TCP settings on the socket and on hyper.
///
/// # Examples
///
/// ```rust
/// use jsonrpsee_http_server::HttpServerBuilder;
/// use socket2::{Domain, Socket, Type};
......@@ -249,7 +299,7 @@ impl<L> Builder<L> {
self,
listener: hyper::server::Builder<AddrIncoming>,
local_addr: SocketAddr,
) -> Result<Server<L>, Error> {
) -> Result<Server<B, L>, Error> {
Ok(Server {
access_control: self.access_control,
listener,
......@@ -262,6 +312,7 @@ impl<L> Builder<L> {
logger: self.logger,
max_log_length: self.max_log_length,
health_api: self.health_api,
service_builder: self.service_builder,
})
}
......@@ -289,7 +340,7 @@ impl<L> Builder<L> {
/// let server = HttpServerBuilder::new().build_from_tcp(socket).unwrap();
/// }
/// ```
pub fn build_from_tcp(self, listener: impl Into<StdTcpListener>) -> Result<Server<L>, Error> {
pub fn build_from_tcp(self, listener: impl Into<StdTcpListener>) -> Result<Server<B, L>, Error> {
let listener = listener.into();
let local_addr = listener.local_addr().ok();
......@@ -307,6 +358,7 @@ impl<L> Builder<L> {
logger: self.logger,
max_log_length: self.max_log_length,
health_api: self.health_api,
service_builder: self.service_builder,
})
}
......@@ -325,7 +377,7 @@ impl<L> Builder<L> {
/// assert!(jsonrpsee_http_server::HttpServerBuilder::default().build(addrs).await.is_ok());
/// }
/// ```
pub async fn build(self, addrs: impl ToSocketAddrs) -> Result<Server<L>, Error> {
pub async fn build(self, addrs: impl ToSocketAddrs) -> Result<Server<B, L>, Error> {
let listener = TcpListener::bind(addrs).await?.into_std()?;
let local_addr = listener.local_addr().ok();
......@@ -343,6 +395,7 @@ impl<L> Builder<L> {
logger: self.logger,
max_log_length: self.max_log_length,
health_api: self.health_api,
service_builder: self.service_builder,
})
}
}
......@@ -384,13 +437,21 @@ impl Future for ServerHandle {
}
}
/// An HTTP JSON RPC server.
#[derive(Debug)]
pub struct Server<L = ()> {
/// Hyper server.
listener: HyperBuilder<AddrIncoming>,
/// Local address
local_addr: Option<SocketAddr>,
/// Data required by the server to handle requests.
#[derive(Debug, Clone)]
struct ServiceData<L> {
/// Remote server address.
remote_addr: SocketAddr,
/// Registered server methods.
methods: Methods,
/// Access control.
acl: AccessControl,
/// Tracker for currently used resources on the server.
resources: Resources,
/// User provided logger.
logger: L,
/// Health API.
health_api: Option<HealthApi>,
/// Max request body size.
max_request_body_size: u32,
/// Max response body size.
......@@ -401,79 +462,48 @@ pub struct Server<L = ()> {
max_log_length: u32,
/// Whether batch requests are supported by this server or not.
batch_requests_supported: bool,
/// Access control.
access_control: AccessControl,
/// Tracker for currently used resources on the server.
resources: Resources,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
logger: L,
health_api: Option<HealthApi>,
}
impl<L: Logger> Server<L> {
/// Returns socket address to which the server is bound.
pub fn local_addr(&self) -> Result<SocketAddr, Error> {
self.local_addr.ok_or_else(|| Error::Custom("Local address not found".into()))
}
/// Start the server.
pub fn start(mut self, methods: impl Into<Methods>) -> Result<ServerHandle, Error> {
let max_request_body_size = self.max_request_body_size;
let max_response_body_size = self.max_response_body_size;
let max_log_length = self.max_log_length;
let acl = self.access_control;
let (tx, mut rx) = mpsc::channel(1);
let listener = self.listener;
let resources = self.resources;
let logger = self.logger;
let batch_requests_supported = self.batch_requests_supported;
let methods = methods.into().initialize_resources(&resources)?;
let health_api = self.health_api;
impl<L: Logger> ServiceData<L> {
/// Default behavior for handling the RPC requests.
async fn handle_request(self, request: hyper::Request<hyper::Body>) -> hyper::Response<hyper::Body> {
let ServiceData {
remote_addr,
methods,
acl,
resources,
logger,
health_api,
max_request_body_size,
max_response_body_size,
max_log_length,
batch_requests_supported,
} = self;
let make_service = make_service_fn(move |conn: &AddrStream| {
let remote_addr = conn.remote_addr();
let methods = methods.clone();
let acl = acl.clone();
let resources = resources.clone();
let logger = logger.clone();
let health_api = health_api.clone();
async move {
Ok::<_, HyperError>(service_fn(move |request| {
let request_start = logger.on_request(remote_addr, &request);
let methods = methods.clone();
let acl = acl.clone();
let resources = resources.clone();
let logger = logger.clone();
let health_api = health_api.clone();
// Run some validation on the http request, then read the body and try to deserialize it into one of
// two cases: a single RPC request or a batch of RPC requests.
async move {
let keys = request.headers().keys().map(|k| k.as_str());
let cors_request_headers = http_helpers::get_cors_request_headers(request.headers());
let host = match http_helpers::read_header_value(request.headers(), "host") {
Some(origin) => origin,
None => return Ok(malformed()),
None => return malformed(),
};
let maybe_origin = http_helpers::read_header_value(request.headers(), "origin");
if let Err(e) = acl.verify_host(host) {
tracing::warn!("Denied request: {:?}", e);
return Ok(response::host_not_allowed());
return response::host_not_allowed();
}
if let Err(e) = acl.verify_origin(maybe_origin, host) {
tracing::warn!("Denied request: {:?}", e);
return Ok(response::invalid_allow_origin());
return response::invalid_allow_origin();
}
if let Err(e) = acl.verify_headers(keys, cors_request_headers) {
tracing::warn!("Denied request: {:?}", e);
return Ok(response::invalid_allow_headers());
return response::invalid_allow_headers();
}
// Only `POST` and `OPTIONS` methods are allowed.
......@@ -483,13 +513,13 @@ impl<L: Logger> Server<L> {
Method::OPTIONS => {
let origin = match maybe_origin {
Some(origin) => origin,
None => return Ok(malformed()),
None => return malformed(),
};
let allowed_headers = acl.allowed_headers().to_cors_header_value();
let allowed_header_bytes = allowed_headers.as_bytes();
let res = hyper::Response::builder()
hyper::Response::builder()
.header("access-control-allow-origin", origin)
.header("access-control-allow-methods", "POST")
.header("access-control-allow-headers", allowed_header_bytes)
......@@ -497,9 +527,7 @@ impl<L: Logger> Server<L> {
.unwrap_or_else(|e| {
tracing::error!("Error forming preflight response: {}", e);
internal_error()
});
Ok(res)
})
}
// The actual request. If it's a CORS request we need to remember to add
// the access-control-allow-origin header (despite preflight) to allow it
......@@ -517,12 +545,12 @@ impl<L: Logger> Server<L> {
batch_requests_supported,
request_start,
})
.await?;
.await;
if let Some(origin) = origin {
res.headers_mut().insert("access-control-allow-origin", origin);
}
Ok(res)
res
}
Method::GET => match health_api.as_ref() {
Some(health) if health.path.as_str() == request.uri().path() => {
......@@ -536,15 +564,130 @@ impl<L: Logger> Server<L> {
)
.await
}
_ => Ok(response::method_not_allowed()),
_ => response::method_not_allowed(),
},
// Error scenarios:
Method::POST => Ok(response::unsupported_content_type()),
_ => Ok(response::method_not_allowed()),
Method::POST => response::unsupported_content_type(),
_ => response::method_not_allowed(),
}
}
}))
}
/// JsonRPSee service compatible with `tower`.
///
/// # Note
/// This is similar to [`hyper::service::service_fn`].
#[derive(Debug)]
pub struct TowerService<L> {
inner: ServiceData<L>,
}
impl<L: Logger> hyper::service::Service<hyper::Request<hyper::Body>> for TowerService<L> {
type Response = hyper::Response<hyper::Body>;
// The following associated type is required by the `impl<B, U, L: Logger> Server<B, L>` bounds.
// It satisfies the server's bounds when the `tower::ServiceBuilder<B>` is not set (ie `B: Identity`).
type Error = Box<dyn StdError + Send + Sync + 'static>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
/// Opens door for back pressure implementation.
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, request: hyper::Request<hyper::Body>) -> Self::Future {
let data = self.inner.clone();
Box::pin(data.handle_request(request).map(Ok))
}
}
/// An HTTP JSON RPC server.
#[derive(Debug)]
pub struct Server<B = Identity, L = ()> {
/// Hyper server.
listener: HyperBuilder<AddrIncoming>,
/// Local address
local_addr: Option<SocketAddr>,
/// Max request body size.
max_request_body_size: u32,
/// Max response body size.
max_response_body_size: u32,
/// Max length for logging for request and response
///
/// Logs bigger than this limit will be truncated.
max_log_length: u32,
/// Whether batch requests are supported by this server or not.
batch_requests_supported: bool,
/// Access control.
access_control: AccessControl,
/// Tracker for currently used resources on the server.
resources: Resources,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
logger: L,
health_api: Option<HealthApi>,
service_builder: tower::ServiceBuilder<B>,
}
impl<B, L> Server<B, L> {
/// Returns socket address to which the server is bound.
pub fn local_addr(&self) -> Result<SocketAddr, Error> {
self.local_addr.ok_or_else(|| Error::Custom("Local address not found".into()))
}
}
// Required trait bounds for the middleware service.
impl<B, U, L> Server<B, L>
where
L: Logger,
B: Layer<TowerService<L>> + Send + 'static,
<B as Layer<TowerService<L>>>::Service: Send
+ Service<
hyper::Request<Body>,
Response = hyper::Response<U>,
Error = Box<(dyn StdError + Send + Sync + 'static)>,
>,
<<B as Layer<TowerService<L>>>::Service as Service<hyper::Request<Body>>>::Future: Send,
U: HttpBody + Send + 'static,
<U as HttpBody>::Error: Send + Sync + StdError,
<U as HttpBody>::Data: Send,
{
/// Start the server.
pub fn start(mut self, methods: impl Into<Methods>) -> Result<ServerHandle, Error> {
let max_request_body_size = self.max_request_body_size;
let max_response_body_size = self.max_response_body_size;
let max_log_length = self.max_log_length;
let acl = self.access_control;
let (tx, mut rx) = mpsc::channel(1);
let listener = self.listener;
let resources = self.resources;
let logger = self.logger;
let batch_requests_supported = self.batch_requests_supported;
let methods = methods.into().initialize_resources(&resources)?;
let health_api = self.health_api;
let make_service = make_service_fn(move |conn: &AddrStream| {
let service = TowerService {
inner: ServiceData {
remote_addr: conn.remote_addr(),
methods: methods.clone(),
acl: acl.clone(),
resources: resources.clone(),
logger: logger.clone(),
health_api: health_api.clone(),
max_request_body_size,
max_response_body_size,
max_log_length,
batch_requests_supported,
},
};
let server = self.service_builder.service(service);
// For every request the `TowerService` is calling into `ServiceData::handle_request`
// where the RPSee bare implementation resides.
async move { Ok::<_, HyperError>(server) }
});
let rt = match self.tokio_runtime.take() {
......@@ -607,9 +750,7 @@ struct ProcessValidatedRequest<L: Logger> {
}