Unverified Commit 49899740 authored by Niklas Adolfsson's avatar Niklas Adolfsson Committed by GitHub
Browse files

HTTP server refactor (#253)

* refactor benches

* start

* fix build: enable `raw value` feature serde_json

* start

* port it

* make tests compile

* fix bench

* fix bench

* introduce builder pattern

* tweaks

* remove unused code

* cleanup

* [http server]: configure tcp socket manually.

The major reason is to provide a uniform API with the WebSocket server to return the local address.

* remove unused deps

* [examples]: remove needless sleep

* chore: add docs and refactor noise.

* Update types/src/jsonrpc/error.rs

* http server use constants
parent 7028dc41
use criterion::*;
use jsonrpsee_http_client::{HttpClient, HttpConfig};
use jsonrpsee_http_client::HttpClientBuilder;
use jsonrpsee_types::{jsonrpc::Params, traits::Client};
use jsonrpsee_ws_client::{WsClient, WsConfig};
use std::sync::Arc;
......@@ -13,7 +13,7 @@ criterion_main!(benches);
pub fn http_requests(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let url = rt.block_on(helpers::http_server());
let client = Arc::new(HttpClient::new(url, HttpConfig::default()).unwrap());
let client = Arc::new(HttpClientBuilder::default().build(&url).unwrap());
run_round_trip(&rt, crit, client.clone(), "http_round_trip");
run_concurrent_round_trip(&rt, crit, client.clone(), "http_concurrent_round_trip");
}
......
use futures::channel::oneshot;
use jsonrpsee_http_server::{HttpConfig, HttpServer};
use jsonrpsee_http_server::HttpServerBuilder;
use jsonrpsee_ws_server::WsServer;
/// Run jsonrpsee HTTP server for benchmarks.
pub async fn http_server() -> String {
let (server_started_tx, server_started_rx) = oneshot::channel();
tokio::spawn(async move {
let server = HttpServer::new("127.0.0.1:0", HttpConfig { max_request_body_size: u32::MAX }).await.unwrap();
let mut say_hello = server.register_method("say_hello".to_string()).unwrap();
server_started_tx.send(*server.local_addr()).unwrap();
loop {
let r = say_hello.next().await;
r.respond(Ok("lo".into())).await.unwrap();
}
let mut server =
HttpServerBuilder::default().max_request_body_size(u32::MAX).build("127.0.0.1:0".parse().unwrap()).unwrap();
server.register_method("say_hello", |_| Ok("lo")).unwrap();
server_started_tx.send(server.local_addr().unwrap()).unwrap();
server.start().await
});
format!("http://{}", server_started_rx.await.unwrap())
}
......
......@@ -8,7 +8,6 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-std = "1.9"
env_logger = "0.8"
futures = "0.3"
log = "0.4"
......
......@@ -24,42 +24,30 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use async_std::task;
use futures::channel::oneshot::{self, Sender};
use jsonrpsee_http_client::{HttpClient, HttpConfig};
use jsonrpsee_http_server::HttpServer;
use jsonrpsee_types::{
jsonrpc::{JsonValue, Params},
traits::Client,
};
use std::net::SocketAddr;
const SOCK_ADDR: &str = "127.0.0.1:9933";
const SERVER_URI: &str = "http://localhost:9933";
use jsonrpsee_http_client::HttpClientBuilder;
use jsonrpsee_http_server::HttpServerBuilder;
use jsonrpsee_types::{jsonrpc::Params, traits::Client};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let (server_started_tx, server_started_rx) = oneshot::channel::<()>();
let _server = task::spawn(async move {
run_server(server_started_tx, SOCK_ADDR).await;
});
let server_addr = run_server().await;
let url = format!("http://{}", server_addr);
server_started_rx.await?;
let client = HttpClient::new(SERVER_URI, HttpConfig::default())?;
let response: Result<JsonValue, _> = client.request("say_hello", Params::None).await;
let client = HttpClientBuilder::default().build(url)?;
let response: Result<String, _> = client.request("say_hello", Params::None).await;
println!("r: {:?}", response);
Ok(())
}
async fn run_server(server_started_tx: Sender<()>, url: &str) {
let server = HttpServer::new(url, HttpConfig::default()).await.unwrap();
let mut say_hello = server.register_method("say_hello".to_string()).unwrap();
server_started_tx.send(()).unwrap();
loop {
let r = say_hello.next().await;
r.respond(Ok(JsonValue::String("lo".to_owned()))).await.unwrap();
}
async fn run_server() -> SocketAddr {
let mut server = HttpServerBuilder::default().build("127.0.0.1:0".parse().unwrap()).unwrap();
server.register_method("say_hello", |_| Ok("lo")).unwrap();
let addr = server.local_addr().unwrap();
tokio::spawn(async move { server.start().await.unwrap() });
addr
}
......@@ -53,8 +53,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
async fn run_server(server_started_tx: Sender<()>, url: &str) {
let mut server = WsServer::new(url).await.unwrap();
async fn run_server(server_started_tx: Sender<()>, addr: &str) {
let mut server = WsServer::new(addr).await.unwrap();
server.register_method("say_hello", |_| Ok("lo")).unwrap();
......
......@@ -4,13 +4,39 @@ use fnv::FnvHashMap;
use jsonrpc::DeserializeOwned;
use jsonrpsee_types::{
error::{Error, Mismatch},
http::HttpConfig,
jsonrpc,
traits::Client,
};
use std::convert::TryInto;
use std::sync::atomic::{AtomicU64, Ordering};
/// Http Client Builder.
#[derive(Debug)]
pub struct HttpClientBuilder {
max_request_body_size: u32,
}
impl HttpClientBuilder {
/// Sets the maximum size of a request body in bytes (default is 10 MiB).
pub fn max_request_body_size(mut self, size: u32) -> Self {
self.max_request_body_size = size;
self
}
/// Build the HTTP client with target to connect to.
pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient, Error> {
let transport = HttpTransportClient::new(target, self.max_request_body_size)
.map_err(|e| Error::TransportError(Box::new(e)))?;
Ok(HttpClient { transport, request_id: AtomicU64::new(0) })
}
}
impl Default for HttpClientBuilder {
fn default() -> Self {
Self { max_request_body_size: 10 * 1024 * 1024 }
}
}
/// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications.
#[derive(Debug)]
pub struct HttpClient {
......@@ -20,16 +46,6 @@ pub struct HttpClient {
request_id: AtomicU64,
}
impl HttpClient {
/// Initializes a new HTTP client.
///
/// Fails when the URL is invalid.
pub fn new(target: impl AsRef<str>, config: HttpConfig) -> Result<Self, Error> {
let transport = HttpTransportClient::new(target, config).map_err(|e| Error::TransportError(Box::new(e)))?;
Ok(Self { transport, request_id: AtomicU64::new(0) })
}
}
#[async_trait]
impl Client for HttpClient {
async fn notification<M, P>(&self, method: M, params: P) -> Result<(), Error>
......
......@@ -42,6 +42,5 @@ mod transport;
#[cfg(test)]
mod tests;
pub use client::HttpClient;
pub use jsonrpsee_types::http::HttpConfig;
pub use client::{HttpClient, HttpClientBuilder};
pub use transport::HttpTransportClient;
use crate::client::HttpClient;
use crate::client::HttpClientBuilder;
use jsonrpsee_types::{
error::Error,
http::HttpConfig,
jsonrpc::{self, ErrorCode, JsonValue, Params},
traits::Client,
};
......@@ -19,7 +18,7 @@ async fn method_call_works() {
async fn notification_works() {
let server_addr = http_server_with_hardcoded_response(String::new()).await;
let uri = format!("http://{}", server_addr);
let client = HttpClient::new(&uri, HttpConfig::default()).unwrap();
let client = HttpClientBuilder::default().build(&uri).unwrap();
client
.notification("i_dont_care_about_the_response_because_the_server_should_not_respond", Params::None)
.await
......@@ -96,14 +95,14 @@ async fn batch_request_out_of_order_response() {
async fn run_batch_request_with_response(batch: Vec<(String, Params)>, response: String) -> Result<Vec<String>, Error> {
let server_addr = http_server_with_hardcoded_response(response).await;
let uri = format!("http://{}", server_addr);
let client = HttpClient::new(&uri, HttpConfig::default()).unwrap();
let client = HttpClientBuilder::default().build(&uri).unwrap();
client.batch_request(batch).await
}
async fn run_request_with_response(response: String) -> Result<JsonValue, Error> {
let server_addr = http_server_with_hardcoded_response(response).await;
let uri = format!("http://{}", server_addr);
let client = HttpClient::new(&uri, HttpConfig::default())?;
let client = HttpClientBuilder::default().build(&uri).unwrap();
client.request("say_hello", Params::None).await
}
......
......@@ -8,7 +8,7 @@
use hyper::client::{Client, HttpConnector};
use hyper_rustls::HttpsConnector;
use jsonrpsee_types::{error::GenericTransportError, http::HttpConfig, jsonrpc};
use jsonrpsee_types::{error::GenericTransportError, jsonrpc};
use jsonrpsee_utils::http::hyper_helpers;
use thiserror::Error;
......@@ -22,12 +22,12 @@ pub struct HttpTransportClient {
/// HTTP client
client: Client<HttpsConnector<HttpConnector>>,
/// Configurable max request body size
config: HttpConfig,
max_request_body_size: u32,
}
impl HttpTransportClient {
/// Initializes a new HTTP client.
pub fn new(target: impl AsRef<str>, config: HttpConfig) -> Result<Self, Error> {
pub fn new(target: impl AsRef<str>, max_request_body_size: u32) -> Result<Self, Error> {
let target = url::Url::parse(target.as_ref()).map_err(|e| Error::Url(format!("Invalid URL: {}", e)))?;
if target.scheme() == "http" || target.scheme() == "https" {
#[cfg(feature = "tokio1")]
......@@ -35,7 +35,7 @@ impl HttpTransportClient {
#[cfg(feature = "tokio02")]
let connector = HttpsConnector::new();
let client = Client::builder().build::<_, hyper::Body>(connector);
Ok(HttpTransportClient { client, target, config })
Ok(HttpTransportClient { client, target, max_request_body_size })
} else {
Err(Error::Url("URL scheme not supported, expects 'http' or 'https'".into()))
}
......@@ -46,7 +46,7 @@ impl HttpTransportClient {
let body = jsonrpc::to_vec(&request).map_err(Error::Serialization)?;
log::debug!("send: {}", request);
if body.len() > self.config.max_request_body_size as usize {
if body.len() > self.max_request_body_size as usize {
return Err(Error::RequestTooLarge);
}
......@@ -77,7 +77,7 @@ impl HttpTransportClient {
) -> Result<jsonrpc::Response, Error> {
let response = self.send_request(request).await?;
let (parts, body) = response.into_parts();
let body = hyper_helpers::read_response_to_body(&parts.headers, body, self.config).await?;
let body = hyper_helpers::read_response_to_body(&parts.headers, body, self.max_request_body_size).await?;
// Note that we don't check the Content-Type of the request. This is deemed
// unnecessary, as a parsing error while happen anyway.
......@@ -138,23 +138,19 @@ where
#[cfg(test)]
mod tests {
use super::{Error, HttpTransportClient};
use jsonrpsee_types::{
http::HttpConfig,
jsonrpc::{Call, Id, MethodCall, Params, Request, Version},
};
use jsonrpsee_types::jsonrpc::{Call, Id, MethodCall, Params, Request, Version};
#[test]
fn invalid_http_url_rejected() {
let err = HttpTransportClient::new("ws://localhost:9933", HttpConfig::default()).unwrap_err();
let err = HttpTransportClient::new("ws://localhost:9933", 80).unwrap_err();
assert!(matches!(err, Error::Url(_)));
}
#[tokio::test]
async fn request_limit_works() {
let eighty_bytes_limit = 80;
let client =
HttpTransportClient::new("http://localhost:9933", HttpConfig { max_request_body_size: 80 }).unwrap();
assert_eq!(client.config.max_request_body_size, eighty_bytes_limit);
let client = HttpTransportClient::new("http://localhost:9933", 80).unwrap();
assert_eq!(client.max_request_body_size, eighty_bytes_limit);
let request = Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
......
......@@ -7,22 +7,17 @@ edition = "2018"
license = "MIT"
[dependencies]
async-std = "1.8"
futures = "0.3"
fnv = "1"
hyper = { version = "0.14", features = ["stream", "client", "server", "http1", "http2", "tcp"] }
anyhow = "1"
hyper = { version = "0.14", features = ["stream", "server", "http1", "http2", "tcp"] }
jsonrpsee-types = { path = "../types", version = "0.2.0-alpha.3" }
jsonrpsee-utils = { path = "../utils", version = "0.2.0-alpha.3" }
log = "0.4"
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = "1"
parking_lot = "0.11"
thiserror = "1"
# `macros feature` is only used for tests but enabled globally because `dev-dependencies`
# are leaked.
tokio = { version = "1", features = ["net", "rt-multi-thread", "macros"] }
unicase = "2"
socket2 = "0.4"
tokio = { version = "1", features = ["full"] }
[dev-dependencies]
env_logger = "0.8"
jsonrpsee-test-utils = { path = "../test-utils" }
jsonrpsee-http-client = { path = "../http-client" }
......@@ -24,16 +24,13 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
mod raw;
mod module;
mod response;
mod server;
mod transport;
pub use jsonrpsee_utils::http::access_control::{AccessControl, AccessControlBuilder};
pub use module::{RpcContextModule, RpcModule};
pub use server::{Builder as HttpServerBuilder, Server as HttpServer};
#[cfg(test)]
mod tests;
pub use jsonrpsee_types::http::HttpConfig;
pub use raw::RawServer as HttpRawServer;
pub use raw::RawServerEvent as HttpRawServerEvent;
pub use raw::TypedResponder as HttpTypedResponder;
pub use server::{RegisteredMethod, RegisteredNotification, Server as HttpServer};
pub use transport::HttpTransportServer;
use jsonrpsee_types::error::Error;
use jsonrpsee_types::v2::{traits::RpcMethod, RpcError, RpcParams};
use jsonrpsee_utils::server_utils::{send_response, Methods};
use serde::Serialize;
use std::sync::Arc;
#[derive(Default)]
pub struct RpcModule {
methods: Methods,
}
impl RpcModule {
/// Instantiate a new `RpcModule`.
pub fn new() -> Self {
RpcModule { methods: Methods::default() }
}
/// Add context for this module, turning it into an `RpcContextModule`.
pub fn with_context<Context>(self, ctx: Context) -> RpcContextModule<Context> {
RpcContextModule { ctx: Arc::new(ctx), module: self }
}
fn verify_method_name(&mut self, name: &str) -> Result<(), Error> {
if self.methods.get(name).is_some() {
return Err(Error::MethodAlreadyRegistered(name.into()));
}
Ok(())
}
/// Register a new RPC method, which responds with a given callback.
pub fn register_method<F, R>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
where
R: Serialize,
F: RpcMethod<R>,
{
self.verify_method_name(method_name)?;
self.methods.insert(
method_name,
Box::new(move |id, params, tx, _| {
let result = callback(params)?;
send_response(id, tx, result);
Ok(())
}),
);
Ok(())
}
pub(crate) fn into_methods(self) -> Methods {
self.methods
}
pub(crate) fn merge(&mut self, other: RpcModule) -> Result<(), Error> {
for name in other.methods.keys() {
self.verify_method_name(name)?;
}
for (name, callback) in other.methods {
self.methods.insert(name, callback);
}
Ok(())
}
}
pub struct RpcContextModule<Context> {
ctx: Arc<Context>,
module: RpcModule,
}
impl<Context> RpcContextModule<Context> {
/// Create a new module with a given shared `Context`.
pub fn new(ctx: Context) -> Self {
RpcContextModule { ctx: Arc::new(ctx), module: RpcModule::new() }
}
/// Register a new RPC method, which responds with a given callback.
pub fn register_method<F, R>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
where
Context: Send + Sync + 'static,
R: Serialize,
F: Fn(RpcParams, &Context) -> Result<R, RpcError> + Send + Sync + 'static,
{
self.module.verify_method_name(method_name)?;
let ctx = self.ctx.clone();
self.module.methods.insert(
method_name,
Box::new(move |id, params, tx, _| {
let result = callback(params, &*ctx)?;
send_response(id, tx, result);
Ok(())
}),
);
Ok(())
}
/// Convert this `RpcContextModule` into a regular `RpcModule` that can be registered on the `Server`.
pub fn into_module(self) -> RpcModule {
self.module
}
}
// Copyright (c) 2019 Parity Technologies Limited
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::transport::{HttpTransportServer, TransportServerEvent};
use jsonrpsee_types::jsonrpc::{
self,
wrapped::{batches, Notification, Params},
};
use core::{fmt, hash::Hash};
pub type RequestId = u64;
/// Wraps around a "raw server" and adds capabilities.
///
/// See the module-level documentation for more information.
pub struct RawServer {
/// Internal "raw" server.
raw: HttpTransportServer,
/// List of requests that are in the progress of being answered. Each batch is associated with
/// the raw request ID, or with `None` if this raw request has been closed.
///
/// See the documentation of [`BatchesState`][batches::BatchesState] for more information.
batches: batches::BatchesState<Option<RequestId>>,
}
/// Identifier of a request within a `RawServer`.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct RawServerRequestId {
inner: batches::BatchesRequestId,
}
/// Identifier of a subscription within a [`RawServer`](crate::server::RawServer).
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct RawServerSubscriptionId([u8; 32]);
/// Event generated by a [`RawServer`](crate::server::RawServer).
///
/// > **Note**: Holds a borrow of the `RawServer`. Therefore, must be dropped before the `RawServer` can
/// > be dropped.
#[derive(Debug)]
pub enum RawServerEvent<'a> {
/// Request is a notification.
Notification(Notification),
/// Request is a method call.
Request(RawServerRequest<'a>),
}
/// Request received by a [`RawServer`](crate::raw::RawServer).
pub struct RawServerRequest<'a> {
/// Reference to the request within `self.batches`.
inner: batches::BatchesRequest<'a, Option<RequestId>>,
}
impl RawServer {
/// Starts a [`RawServer`](crate::raw::RawServer) using the given raw server internally.
pub fn new(raw: HttpTransportServer) -> RawServer {
RawServer { raw, batches: batches::BatchesState::new() }
}
}
impl RawServer {
/// Returns a `Future` resolving to the next event that this server generates.
pub async fn next_event<'a>(&'a mut self) -> RawServerEvent<'a> {
let request_id = loop {
match self.batches.next_event() {
None => {}
Some(batches::BatchesEvent::Notification { notification, .. }) => {
return RawServerEvent::Notification(notification)
}
Some(batches::BatchesEvent::Request(inner)) => {
break RawServerRequestId { inner: inner.id() };
}
Some(batches::BatchesEvent::ReadyToSend { response, user_param: Some(raw_request_id) }) => {
let _ = self.raw.finish(&raw_request_id, Some(&response)).await;
continue;
}
Some(batches::BatchesEvent::ReadyToSend { response: _, user_param: None }) => {
// This situation happens if the connection has been closed by the client.
// Clients who close their connection.
continue;
}
};
match self.raw.next_request().await {
TransportServerEvent::Request { id, request } => self.batches.inject(request, Some(id)),
TransportServerEvent::Closed(raw_id) => {
// The client has a closed their connection. We eliminate all traces of the
// raw request ID from our state.
// TODO: this has an O(n) complexity; make sure that this is not attackable
for ud in self.batches.batches() {
if ud.as_ref() == Some(&raw_id) {
*ud = None;
}
}
}
};
};
RawServerEvent::Request(self.request_by_id(&request_id).unwrap())
}
/// Returns a request previously returned by [`next_event`](crate::raw::RawServer::next_event)
/// by its id.
///
/// Note that previous notifications don't have an ID and can't be accessed with this method.
///
/// Returns `None` if the request ID is invalid or if the request has already been answered in
/// the past.
pub fn request_by_id<'a>(&'a mut self, id: &RawServerRequestId) -> Option<RawServerRequest<'a>> {
Some(<