Unverified Commit 7eb5d47f authored by Alexandru Vasile's avatar Alexandru Vasile Committed by GitHub
Browse files

middleware: Implement proxy URI paths to RPC methods (#859)



* middleware: Proxy `GET /path` requests to internal methods

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* middleware: Modify the response for proxies

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* examples: Add `ProxyRequestLayer` example for URI redirection

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http: Remove internal Health API

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* middleware: Replace `ResponseFuture` with pinning

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* middleware: Use `Uri::from_static` and `RequestSer` for body message

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* middleware: Use `Arc<str>` instead of `String`

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Rename `ProxyRequest` to `ProxyGetRequest` and rename mod to `middleware`

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* middleware: Improve docs

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* middleware: Fail if path does not start with `/`

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* http-server: Remove pin project dependency

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Rename `proxy_request.rs` to `proxy_get_request.rs`

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent bd31557d
Pipeline #209958 passed with stages
in 5 minutes and 16 seconds
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! This example utilizes the `ProxyRequest` layer for redirecting
//! `GET /path` requests to internal RPC methods.
//!
//! The RPC server registers a method named `system_health` which
//! returns `serde_json::Value`. Redirect any `GET /health`
//! requests to the internal method, and return only the method's
//! response in the body (ie, without any jsonRPC 2.0 overhead).
//!
//! # Note
//!
//! This functionality is useful for services which would
//! like to query a certain `URI` path for statistics.
use hyper::{Body, Client, Request};
use std::net::SocketAddr;
use std::time::Duration;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::http_server::middleware::proxy_get_request::ProxyGetRequestLayer;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle, RpcModule};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");
let (addr, _handler) = run_server().await?;
let url = format!("http://{}", addr);
// Use RPC client to get the response of `say_hello` method.
let client = HttpClientBuilder::default().build(&url)?;
let response: String = client.request("say_hello", None).await?;
println!("[main]: response: {:?}", response);
// Use hyper client to manually submit a `GET /health` request.
let http_client = Client::new();
let uri = format!("http://{}/health", addr);
let req = Request::builder().method("GET").uri(&uri).body(Body::empty())?;
println!("[main]: Submit proxy request: {:?}", req);
let res = http_client.request(req).await?;
println!("[main]: Received proxy response: {:?}", res);
// Interpret the response as String.
let bytes = hyper::body::to_bytes(res.into_body()).await.unwrap();
let out = String::from_utf8(bytes.to_vec()).unwrap();
println!("[main]: Interpret proxy response: {:?}", out);
assert_eq!(out.as_str(), "{\"health\":true}");
Ok(())
}
async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> {
// Custom tower service to handle the RPC requests
let service_builder = tower::ServiceBuilder::new()
// Proxy `GET /health` requests to internal `system_health` method.
.layer(ProxyGetRequestLayer::new("/health", "system_health")?)
.timeout(Duration::from_secs(2));
let server =
HttpServerBuilder::new().set_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;
let addr = server.local_addr()?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo")).unwrap();
module.register_method("system_health", |_, _| Ok(serde_json::json!({ "health": true }))).unwrap();
let handler = server.start(module)?;
Ok((addr, handler))
}
......@@ -35,6 +35,9 @@ mod server;
/// Common builders for RPC responses.
pub mod response;
/// Common tower middleware exposed for RPC interaction.
pub mod middleware;
pub use jsonrpsee_core::server::access_control::{AccessControl, AccessControlBuilder};
pub use jsonrpsee_core::server::rpc_module::RpcModule;
pub use jsonrpsee_types as types;
......
//! Various middleware implementations for RPC specific purposes.
/// Proxy `GET /path` to internal RPC methods.
pub mod proxy_get_request;
//! Middleware that proxies requests at a specified URI to internal
//! RPC method calls.
use crate::response;
use hyper::header::{ACCEPT, CONTENT_TYPE};
use hyper::http::HeaderValue;
use hyper::{Body, Method, Request, Response, Uri};
use jsonrpsee_core::error::Error as RpcError;
use jsonrpsee_types::{Id, RequestSer};
use std::error::Error;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tower::{Layer, Service};
/// Layer that applies [`ProxyGetRequest`] which proxies the `GET /path` requests to
/// specific RPC method calls and that strips the response.
///
/// See [`ProxyGetRequest`] for more details.
#[derive(Debug, Clone)]
pub struct ProxyGetRequestLayer {
path: String,
method: String,
}
impl ProxyGetRequestLayer {
/// Creates a new [`ProxyGetRequestLayer`].
///
/// See [`ProxyGetRequest`] for more details.
pub fn new(path: impl Into<String>, method: impl Into<String>) -> Result<Self, RpcError> {
let path = path.into();
if !path.starts_with('/') {
return Err(RpcError::Custom("ProxyGetRequestLayer path must start with `/`".to_string()));
}
Ok(Self { path, method: method.into() })
}
}
impl<S> Layer<S> for ProxyGetRequestLayer {
type Service = ProxyGetRequest<S>;
fn layer(&self, inner: S) -> Self::Service {
ProxyGetRequest::new(inner, &self.path, &self.method)
.expect("Path already validated in ProxyGetRequestLayer; qed")
}
}
/// Proxy `GET /path` requests to the specified RPC method calls.
///
/// # Request
///
/// The `GET /path` requests are modified into valid `POST` requests for
/// calling the RPC method. This middleware adds appropriate headers to the
/// request, and completely modifies the request `BODY`.
///
/// # Response
///
/// The response of the RPC method is stripped down to contain only the method's
/// response, removing any RPC 2.0 spec logic regarding the response' body.
#[derive(Debug, Clone)]
pub struct ProxyGetRequest<S> {
inner: S,
path: Arc<str>,
method: Arc<str>,
}
impl<S> ProxyGetRequest<S> {
/// Creates a new [`ProxyGetRequest`].
///
/// The request `GET /path` is redirected to the provided method.
/// Fails if the path does not start with `/`.
pub fn new(inner: S, path: &str, method: &str) -> Result<Self, RpcError> {
if !path.starts_with('/') {
return Err(RpcError::Custom(format!("ProxyGetRequest path must start with `/`, got: {}", path)));
}
Ok(Self { inner, path: Arc::from(path), method: Arc::from(method) })
}
}
impl<S> Service<Request<Body>> for ProxyGetRequest<S>
where
S: Service<Request<Body>, Response = Response<Body>>,
S::Response: 'static,
S::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = Box<dyn Error + Send + Sync + 'static>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, mut req: Request<Body>) -> Self::Future {
let modify = self.path.as_ref() == req.uri() && req.method() == Method::GET;
// Proxy the request to the appropriate method call.
if modify {
// RPC methods are accessed with `POST`.
*req.method_mut() = Method::POST;
// Precautionary remove the URI.
*req.uri_mut() = Uri::from_static("/");
// Requests must have the following headers:
req.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
req.headers_mut().insert(ACCEPT, HeaderValue::from_static("application/json"));
// Adjust the body to reflect the method call.
let body = Body::from(
serde_json::to_string(&RequestSer::new(&Id::Number(0), &self.method, None))
.expect("Valid request; qed"),
);
req = req.map(|_| body);
}
// Call the inner service and get a future that resolves to the response.
let fut = self.inner.call(req);
// Adjust the response if needed.
let res_fut = async move {
let res = fut.await.map_err(|err| err.into())?;
// Nothing to modify: return the response as is.
if !modify {
return Ok(res);
}
let body = res.into_body();
let bytes = hyper::body::to_bytes(body).await?;
#[derive(serde::Deserialize, Debug)]
struct RpcPayload<'a> {
#[serde(borrow)]
result: &'a serde_json::value::RawValue,
}
let response = if let Ok(payload) = serde_json::from_slice::<RpcPayload>(&bytes) {
response::ok_response(payload.result.to_string())
} else {
response::internal_error()
};
Ok(response)
};
Box::pin(res_fut)
}
}
......@@ -46,7 +46,7 @@ use jsonrpsee_core::server::helpers::{prepare_error, MethodResponse};
use jsonrpsee_core::server::helpers::{BatchResponse, BatchResponseBuilder};
use jsonrpsee_core::server::resource_limiting::Resources;
use jsonrpsee_core::server::rpc_module::{MethodKind, Methods};
use jsonrpsee_core::tracing::{rx_log_from_json, rx_log_from_str, tx_log_from_str, RpcTracing};
use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str, RpcTracing};
use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use jsonrpsee_types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use jsonrpsee_types::{Id, Notification, Params, Request};
......@@ -72,7 +72,6 @@ pub struct Builder<B = Identity, L = ()> {
tokio_runtime: Option<tokio::runtime::Handle>,
logger: L,
max_log_length: u32,
health_api: Option<HealthApi>,
service_builder: tower::ServiceBuilder<B>,
}
......@@ -87,7 +86,6 @@ impl Default for Builder {
tokio_runtime: None,
logger: (),
max_log_length: 4096,
health_api: None,
service_builder: tower::ServiceBuilder::new(),
}
}
......@@ -154,7 +152,6 @@ impl<B, L> Builder<B, L> {
tokio_runtime: self.tokio_runtime,
logger,
max_log_length: self.max_log_length,
health_api: self.health_api,
service_builder: self.service_builder,
}
}
......@@ -203,23 +200,6 @@ impl<B, L> Builder<B, L> {
self
}
/// Enable health endpoint.
/// Allows you to expose one of the methods under GET /<path> The method will be invoked with no parameters.
/// Error returned from the method will be converted to status 500 response.
/// Expects a tuple with (</path>, <rpc-method-name>).
///
/// Fails if the path is missing `/`.
pub fn health_api(mut self, path: impl Into<String>, method: impl Into<String>) -> Result<Self, Error> {
let path = path.into();
if !path.starts_with('/') {
return Err(Error::Custom(format!("Health endpoint path must start with `/` to work, got: {}", path)));
}
self.health_api = Some(HealthApi { path, method: method.into() });
Ok(self)
}
/// Configure a custom [`tower::ServiceBuilder`] middleware for composing layers to be applied to the RPC service.
///
/// Default: No tower layers are applied to the RPC service.
......@@ -254,7 +234,6 @@ impl<B, L> Builder<B, L> {
tokio_runtime: self.tokio_runtime,
logger: self.logger,
max_log_length: self.max_log_length,
health_api: self.health_api,
service_builder,
}
}
......@@ -309,7 +288,6 @@ impl<B, L> Builder<B, L> {
tokio_runtime: self.tokio_runtime,
logger: self.logger,
max_log_length: self.max_log_length,
health_api: self.health_api,
service_builder: self.service_builder,
})
}
......@@ -355,7 +333,6 @@ impl<B, L> Builder<B, L> {
tokio_runtime: self.tokio_runtime,
logger: self.logger,
max_log_length: self.max_log_length,
health_api: self.health_api,
service_builder: self.service_builder,
})
}
......@@ -392,18 +369,11 @@ impl<B, L> Builder<B, L> {
tokio_runtime: self.tokio_runtime,
logger: self.logger,
max_log_length: self.max_log_length,
health_api: self.health_api,
service_builder: self.service_builder,
})
}
}
#[derive(Debug, Clone)]
struct HealthApi {
path: String,
method: String,
}
/// Handle used to run or stop the server.
#[derive(Debug)]
pub struct ServerHandle {
......@@ -448,8 +418,6 @@ struct ServiceData<L> {
resources: Resources,
/// User provided logger.
logger: L,
/// Health API.
health_api: Option<HealthApi>,
/// Max request body size.
max_request_body_size: u32,
/// Max response body size.
......@@ -471,7 +439,6 @@ impl<L: Logger> ServiceData<L> {
acl,
resources,
logger,
health_api,
max_request_body_size,
max_response_body_size,
max_log_length,
......@@ -512,20 +479,6 @@ impl<L: Logger> ServiceData<L> {
})
.await
}
Method::GET => match health_api.as_ref() {
Some(health) if health.path.as_str() == request.uri().path() => {
process_health_request(
health,
logger,
methods,
max_response_body_size,
request_start,
max_log_length,
)
.await
}
_ => response::method_not_allowed(),
},
// Error scenarios:
Method::POST => response::unsupported_content_type(),
_ => response::method_not_allowed(),
......@@ -587,7 +540,6 @@ pub struct Server<B = Identity, L = ()> {
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
logger: L,
health_api: Option<HealthApi>,
service_builder: tower::ServiceBuilder<B>,
}
......@@ -626,7 +578,6 @@ where
let logger = self.logger;
let batch_requests_supported = self.batch_requests_supported;
let methods = methods.into().initialize_resources(&resources)?;
let health_api = self.health_api;
let make_service = make_service_fn(move |conn: &AddrStream| {
let service = TowerService {
......@@ -636,7 +587,6 @@ where
acl: acl.clone(),
resources: resources.clone(),
logger: logger.clone(),
health_api: health_api.clone(),
max_request_body_size,
max_response_body_size,
max_log_length,
......@@ -766,54 +716,6 @@ async fn process_validated_request<L: Logger>(input: ProcessValidatedRequest<L>)
}
}
async fn process_health_request<L: Logger>(
health_api: &HealthApi,
logger: L,
methods: Methods,
max_response_body_size: u32,
request_start: L::Instant,
max_log_length: u32,
) -> hyper::Response<hyper::Body> {
let trace = RpcTracing::method_call(&health_api.method);
async {
tx_log_from_str("HTTP health API", max_log_length);
let response = match methods.method_with_name(&health_api.method) {
None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)),
Some((_name, method_callback)) => match method_callback.inner() {
MethodKind::Sync(callback) => {
(callback)(Id::Number(0), Params::new(None), max_response_body_size as usize)
}
MethodKind::Async(callback) => {
(callback)(Id::Number(0), Params::new(None), 0, max_response_body_size as usize, None).await
}
MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError))
}
},
};
rx_log_from_str(&response.result, max_log_length);
logger.on_result(&health_api.method, response.success, request_start);
logger.on_response(&response.result, request_start);
if response.success {
#[derive(serde::Deserialize)]
struct RpcPayload<'a> {
#[serde(borrow)]
result: &'a serde_json::value::RawValue,
}
let payload: RpcPayload = serde_json::from_str(&response.result)
.expect("valid JSON-RPC response must have a result field and be valid JSON; qed");
response::ok_response(payload.result.to_string())
} else {
response::internal_error()
}
}
.instrument(trace.into_span())
.await
}
#[derive(Debug, Clone)]
struct Batch<'a, L: Logger> {
data: Vec<u8>,
......
......@@ -30,6 +30,7 @@ use std::time::Duration;
use futures::{SinkExt, StreamExt};
use jsonrpsee::core::error::SubscriptionClosed;
use jsonrpsee::core::server::access_control::{AccessControl, AccessControlBuilder};
use jsonrpsee::http_server::middleware::proxy_get_request::ProxyGetRequestLayer;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
use jsonrpsee::types::error::{ErrorObject, SUBSCRIPTION_CLOSED_WITH_ERROR};
use jsonrpsee::ws_server::{WsServerBuilder, WsServerHandle};
......@@ -223,13 +224,15 @@ pub async fn http_server() -> (SocketAddr, HttpServerHandle) {
}
pub async fn http_server_with_access_control(acl: AccessControl, cors: CorsLayer) -> (SocketAddr, HttpServerHandle) {
let middleware = tower::ServiceBuilder::new().layer(cors);
let middleware = tower::ServiceBuilder::new()
// Proxy `GET /health` requests to internal `system_health` method.
.layer(ProxyGetRequestLayer::new("/health", "system_health").unwrap())
// Add `CORS` layer.
.layer(cors);
let server = HttpServerBuilder::default()
.set_access_control(acl)
.set_middleware(middleware)
.health_api("/health", "system_health")
.unwrap()
.build("127.0.0.1:0")
.await
.unwrap();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment