Unverified Commit 172b6f79 authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

http: Rely on internal TowerService to handle requests


Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent 52ef967f
......@@ -37,7 +37,7 @@ use futures_util::stream::{StreamExt, TryStreamExt};
use hyper::header::{HeaderMap, HeaderValue};
use hyper::server::conn::AddrStream;
use hyper::server::{conn::AddrIncoming, Builder as HyperBuilder};
use hyper::service::{make_service_fn, service_fn};
use hyper::service::make_service_fn;
use hyper::{Error as HyperError, Method};
use jsonrpsee_core::error::{Error, GenericTransportError};
use jsonrpsee_core::http_helpers::{self, read_body};
......@@ -623,119 +623,24 @@ impl<M: Middleware> Server<M> {
let health_api = self.health_api;
let make_service = make_service_fn(move |conn: &AddrStream| {
let remote_addr = conn.remote_addr();
let methods = methods.clone();
let acl = acl.clone();
let resources = resources.clone();
let middleware = middleware.clone();
let health_api = health_api.clone();
async move {
Ok::<_, HyperError>(service_fn(move |request| {
let request_start = middleware.on_request(remote_addr, request.headers());
let methods = methods.clone();
let acl = acl.clone();
let resources = resources.clone();
let middleware = middleware.clone();
let health_api = health_api.clone();
// Run some validation on the http request, then read the body and try to deserialize it into one of
// two cases: a single RPC request or a batch of RPC requests.
async move {
let keys = request.headers().keys().map(|k| k.as_str());
let cors_request_headers = http_helpers::get_cors_request_headers(request.headers());
let host = match http_helpers::read_header_value(request.headers(), "host") {
Some(origin) => origin,
None => return Ok(malformed()),
};
let maybe_origin = http_helpers::read_header_value(request.headers(), "origin");
if let Err(e) = acl.verify_host(host) {
tracing::warn!("Denied request: {:?}", e);
return Ok(response::host_not_allowed());
}
if let Err(e) = acl.verify_origin(maybe_origin, host) {
tracing::warn!("Denied request: {:?}", e);
return Ok(response::invalid_allow_origin());
}
if let Err(e) = acl.verify_headers(keys, cors_request_headers) {
tracing::warn!("Denied request: {:?}", e);
return Ok(response::invalid_allow_headers());
}
// Only `POST` and `OPTIONS` methods are allowed.
match *request.method() {
// An OPTIONS request is a CORS preflight request. We've done our access check
// above so we just need to tell the browser that the request is OK.
Method::OPTIONS => {
let origin = match maybe_origin {
Some(origin) => origin,
None => return Ok(malformed()),
};
let allowed_headers = acl.allowed_headers().to_cors_header_value();
let allowed_header_bytes = allowed_headers.as_bytes();
let res = hyper::Response::builder()
.header("access-control-allow-origin", origin)
.header("access-control-allow-methods", "POST")
.header("access-control-allow-headers", allowed_header_bytes)
.body(hyper::Body::empty())
.unwrap_or_else(|e| {
tracing::error!("Error forming preflight response: {}", e);
internal_error()
});
Ok(res)
}
// The actual request. If it's a CORS request we need to remember to add
// the access-control-allow-origin header (despite preflight) to allow it
// to be read in a browser.
Method::POST if content_type_is_json(&request) => {
let origin = return_origin_if_different_from_host(request.headers()).cloned();
let mut res = process_validated_request(ProcessValidatedRequest {
request,
middleware,
methods,
resources,
max_request_body_size,
max_response_body_size,
max_log_length,
batch_requests_supported,
request_start,
})
.await?;
if let Some(origin) = origin {
res.headers_mut().insert("access-control-allow-origin", origin);
}
Ok(res)
}
Method::GET => match health_api.as_ref() {
Some(health) if health.path.as_str() == request.uri().path() => {
process_health_request(
health,
middleware,
methods,
max_response_body_size,
request_start,
max_log_length,
)
.await
}
_ => Ok(response::method_not_allowed()),
},
// Error scenarios:
Method::POST => Ok(response::unsupported_content_type()),
_ => Ok(response::method_not_allowed()),
}
}
}))
}
let service = TowerService {
inner: TowerServiceData {
remote_addr: Some(conn.remote_addr()),
methods: methods.clone(),
acl: acl.clone(),
resources: resources.clone(),
middleware: middleware.clone(),
health_api: health_api.clone(),
max_request_body_size,
max_response_body_size,
max_log_length,
batch_requests_supported,
},
};
// For every request the `TowerService` is calling into `TowerServiceData::handle_request`
// where the RPSee bare implementation resides.
async move { Ok::<_, HyperError>(service) }
});
let rt = match self.tokio_runtime.take() {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment