Unverified Commit 602d4818 authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

http-server: Receive tower service builder as param


Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent c328f863
......@@ -20,6 +20,7 @@ tracing-futures = "0.2.5"
serde_json = { version = "1.0", features = ["raw_value"] }
serde = "1"
tokio = { version = "1.16", features = ["rt-multi-thread", "macros"] }
tower = "0.4.13"
[dev-dependencies]
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
......
......@@ -34,11 +34,12 @@ use crate::response::{internal_error, malformed};
use futures_channel::mpsc;
use futures_util::future::FutureExt;
use futures_util::stream::{StreamExt, TryStreamExt};
use hyper::body::HttpBody;
use hyper::header::{HeaderMap, HeaderValue};
use hyper::server::conn::AddrStream;
use hyper::server::{conn::AddrIncoming, Builder as HyperBuilder};
use hyper::service::make_service_fn;
use hyper::{Error as HyperError, Method};
use hyper::service::{make_service_fn, Service};
use hyper::{Body, Error as HyperError, Method};
use jsonrpsee_core::error::{Error, GenericTransportError};
use jsonrpsee_core::http_helpers::{self, read_body};
use jsonrpsee_core::logger::{self, HttpLogger as Logger};
......@@ -51,8 +52,10 @@ use jsonrpsee_core::tracing::{rx_log_from_json, rx_log_from_str, tx_log_from_str
use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use jsonrpsee_types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use jsonrpsee_types::{Id, Notification, Params, Request};
use serde::de::StdError;
use serde_json::value::RawValue;
use tokio::net::{TcpListener, ToSocketAddrs};
use tower::Layer;
use tracing_futures::Instrument;
type Notif<'a> = Notification<'a, Option<&'a RawValue>>;
......@@ -96,7 +99,7 @@ impl Builder {
}
}
impl<L: Clone> Builder<L> {
impl<L> Builder<L> {
/// Add a logger to the builder [`Logger`](../jsonrpsee_core/logger/trait.Logger.html).
///
/// ```
......@@ -345,26 +348,6 @@ impl<L: Clone> Builder<L> {
health_api: self.health_api,
})
}
/// Returns a service that can be utilised with `tower` compatible crates.
pub fn to_service(self, methods: impl Into<Methods>, addr: SocketAddr) -> Result<TowerService<L>, Error> {
let methods = methods.into().initialize_resources(&self.resources)?;
Ok(TowerService {
inner: ServiceData {
remote_addr: addr,
methods,
acl: self.access_control.clone(),
resources: self.resources.clone(),
logger: self.logger.clone(),
health_api: self.health_api.clone(),
max_request_body_size: self.max_response_body_size,
max_response_body_size: self.max_response_body_size,
max_log_length: self.max_log_length,
batch_requests_supported: self.batch_requests_supported,
},
})
}
}
#[derive(Debug, Clone)]
......@@ -603,6 +586,72 @@ impl<L: Logger> Server<L> {
self.local_addr.ok_or_else(|| Error::Custom("Local address not found".into()))
}
/// Start the server with a custom tower builder.
pub fn start_with_builder<T, U>(
mut self,
methods: impl Into<Methods>,
builder: tower::ServiceBuilder<T>,
) -> Result<ServerHandle, Error>
where
T: Layer<TowerService<L>> + Send + 'static,
<T as Layer<TowerService<L>>>::Service: Send
+ Service<
hyper::Request<Body>,
Response = hyper::Response<U>,
Error = Box<(dyn StdError + Send + Sync + 'static)>,
>,
<<T as Layer<TowerService<L>>>::Service as Service<hyper::Request<Body>>>::Future: Send,
U: HttpBody + Send + 'static,
<U as HttpBody>::Error: Send + Sync + StdError,
<U as HttpBody>::Data: Send,
{
let max_request_body_size = self.max_request_body_size;
let max_response_body_size = self.max_response_body_size;
let max_log_length = self.max_log_length;
let acl = self.access_control;
let (tx, mut rx) = mpsc::channel(1);
let listener = self.listener;
let resources = self.resources;
let logger = self.logger;
let batch_requests_supported = self.batch_requests_supported;
let methods = methods.into().initialize_resources(&resources)?;
let health_api = self.health_api;
let make_service = make_service_fn(move |conn: &AddrStream| {
let service = TowerService {
inner: ServiceData {
remote_addr: conn.remote_addr(),
methods: methods.clone(),
acl: acl.clone(),
resources: resources.clone(),
logger: logger.clone(),
health_api: health_api.clone(),
max_request_body_size,
max_response_body_size,
max_log_length,
batch_requests_supported,
},
};
let server = builder.service(service);
// For every request the `TowerService` is calling into `ServiceData::handle_request`
// where the RPSee bare implementation resides.
async move { Ok::<_, HyperError>(server) }
});
let rt = match self.tokio_runtime.take() {
Some(rt) => rt,
None => tokio::runtime::Handle::current(),
};
let handle = rt.spawn(async move {
let server = listener.serve(make_service);
let _ = server.with_graceful_shutdown(async move { rx.next().await.map_or((), |_| ()) }).await;
});
Ok(ServerHandle { handle: Some(handle), stop_sender: tx })
}
/// Start the server.
pub fn start(mut self, methods: impl Into<Methods>) -> Result<ServerHandle, Error> {
let max_request_body_size = self.max_request_body_size;
......@@ -633,7 +682,7 @@ impl<L: Logger> Server<L> {
},
};
// For every request the `TowerService` is calling into `TowerServiceData::handle_request`
// For every request the `TowerService` is calling into `ServiceData::handle_request`
// where the RPSee bare implementation resides.
async move { Ok::<_, HyperError>(service) }
});
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment