Unverified Commit b51abeca authored by Niklas Adolfsson's avatar Niklas Adolfsson Committed by GitHub
Browse files

[servers] return error if context or params fails (#295)

* ret err if context/params fails

* address grumbles: specific error_code context fail

* address grumbles: make env_logger dev-dependency

* address grumbles: add tests

* chore(deps): remove unused deps

* address grumbles: rename types and docs

* address grumbles: more renaming.

* fix build
parent 2cae10b6
......@@ -27,8 +27,8 @@ impl HttpClientBuilder {
/// Build the HTTP client with target to connect to.
pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient, Error> {
let transport = HttpTransportClient::new(target, self.max_request_body_size)
.map_err(|e| Error::TransportError(Box::new(e)))?;
let transport =
HttpTransportClient::new(target, self.max_request_body_size).map_err(|e| Error::Transport(Box::new(e)))?;
Ok(HttpClient { transport, request_id: AtomicU64::new(0) })
}
}
......@@ -55,7 +55,7 @@ impl Client for HttpClient {
self.transport
.send(serde_json::to_string(&notif).map_err(Error::ParseError)?)
.await
.map_err(|e| Error::TransportError(Box::new(e)))
.map_err(|e| Error::Transport(Box::new(e)))
}
/// Perform a request towards the server.
......@@ -71,7 +71,7 @@ impl Client for HttpClient {
.transport
.send_and_read_body(serde_json::to_string(&request).map_err(Error::ParseError)?)
.await
.map_err(|e| Error::TransportError(Box::new(e)))?;
.map_err(|e| Error::Transport(Box::new(e)))?;
let response: JsonRpcResponse<_> = match serde_json::from_slice(&body) {
Ok(response) => response,
......@@ -110,7 +110,7 @@ impl Client for HttpClient {
.transport
.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?)
.await
.map_err(|e| Error::TransportError(Box::new(e)))?;
.map_err(|e| Error::Transport(Box::new(e)))?;
let rps: Vec<JsonRpcResponse<_>> = match serde_json::from_slice(&body) {
Ok(response) => response,
......
use jsonrpsee_types::{traits::RpcMethod, v2::params::RpcParams, Error};
use jsonrpsee_utils::server::{send_response, Methods};
use jsonrpsee_types::v2::error::{JsonRpcErrorCode, JsonRpcErrorObject, CALL_EXECUTION_FAILED_CODE};
use jsonrpsee_types::{
error::{CallError, Error, InvalidParams},
traits::RpcMethod,
v2::params::RpcParams,
};
use jsonrpsee_utils::server::{send_error, send_response, Methods};
use serde::Serialize;
use std::sync::Arc;
......@@ -31,16 +36,17 @@ impl RpcModule {
pub fn register_method<F, R>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
where
R: Serialize,
F: RpcMethod<R>,
F: RpcMethod<R, InvalidParams>,
{
self.verify_method_name(method_name)?;
self.methods.insert(
method_name,
Box::new(move |id, params, tx, _| {
let result = callback(params)?;
send_response(id, tx, result);
match callback(params) {
Ok(res) => send_response(id, tx, res),
Err(InvalidParams) => send_error(id, tx, JsonRpcErrorCode::InvalidParams.into()),
};
Ok(())
}),
......@@ -82,7 +88,7 @@ impl<Context> RpcContextModule<Context> {
where
Context: Send + Sync + 'static,
R: Serialize,
F: Fn(RpcParams, &Context) -> Result<R, Error> + Send + Sync + 'static,
F: Fn(RpcParams, &Context) -> Result<R, CallError> + Send + Sync + 'static,
{
self.module.verify_method_name(method_name)?;
......@@ -91,10 +97,18 @@ impl<Context> RpcContextModule<Context> {
self.module.methods.insert(
method_name,
Box::new(move |id, params, tx, _| {
let result = callback(params, &*ctx)?;
send_response(id, tx, result);
match callback(params, &*ctx) {
Ok(res) => send_response(id, tx, res),
Err(CallError::InvalidParams(_)) => send_error(id, tx, JsonRpcErrorCode::InvalidParams.into()),
Err(CallError::Failed(err)) => {
let err = JsonRpcErrorObject {
code: JsonRpcErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE),
message: &err.to_string(),
data: None,
};
send_error(id, tx, err)
}
};
Ok(())
}),
);
......
......@@ -36,7 +36,7 @@ use hyper::{
service::{make_service_fn, service_fn},
Error as HyperError,
};
use jsonrpsee_types::error::{Error, GenericTransportError};
use jsonrpsee_types::error::{Error, GenericTransportError, InvalidParams};
use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest};
use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams};
use jsonrpsee_utils::{
......@@ -129,7 +129,7 @@ impl Server {
pub fn register_method<F, R>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
where
R: Serialize,
F: Fn(RpcParams) -> Result<R, Error> + Send + Sync + 'static,
F: Fn(RpcParams) -> Result<R, InvalidParams> + Send + Sync + 'static,
{
self.root.register_method(method_name, callback)
}
......
......@@ -2,9 +2,10 @@
use std::net::SocketAddr;
use crate::HttpServerBuilder;
use crate::{HttpServerBuilder, RpcContextModule};
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::{Id, StatusCode};
use jsonrpsee_test_utils::types::{Id, StatusCode, TestContext};
use jsonrpsee_types::error::CallError;
use serde_json::Value as JsonValue;
async fn server() -> SocketAddr {
......@@ -23,6 +24,35 @@ async fn server() -> SocketAddr {
addr
}
/// Run server with user provided context.
pub async fn server_with_context() -> SocketAddr {
let mut server = HttpServerBuilder::default().build("127.0.0.1:0".parse().unwrap()).unwrap();
let ctx = TestContext;
let mut rpc_ctx = RpcContextModule::new(ctx);
rpc_ctx
.register_method("should_err", |_p, ctx| {
let _ = ctx.err().map_err(|e| CallError::Failed(e.into()))?;
Ok("err")
})
.unwrap();
rpc_ctx
.register_method("should_ok", |_p, ctx| {
let _ = ctx.ok().map_err(|e| CallError::Failed(e.into()))?;
Ok("ok")
})
.unwrap();
let rpc_module = rpc_ctx.into_module();
server.register_module(rpc_module).unwrap();
let addr = server.local_addr().unwrap();
tokio::spawn(async { server.start().await });
addr
}
#[tokio::test]
async fn single_method_call_works() {
let _ = env_logger::try_init();
......@@ -54,14 +84,45 @@ async fn single_method_call_with_params() {
let addr = server().await;
let uri = to_http_uri(addr);
std::thread::sleep(std::time::Duration::from_secs(2));
let req = r#"{"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1}"#;
let response = http_request(req.into(), uri).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(response.body, ok_response(JsonValue::Number(3.into()), Id::Num(1)));
}
#[tokio::test]
async fn single_method_call_with_faulty_params_returns_err() {
let addr = server().await;
let uri = to_http_uri(addr);
let req = r#"{"jsonrpc":"2.0","method":"add", "params":["Invalid"],"id":1}"#;
let response = http_request(req.into(), uri).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(response.body, invalid_params(Id::Num(1)));
}
#[tokio::test]
async fn single_method_call_with_faulty_context() {
let addr = server_with_context().await;
let uri = to_http_uri(addr);
let req = r#"{"jsonrpc":"2.0","method":"should_err", "params":[],"id":1}"#;
let response = http_request(req.into(), uri).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(response.body, invalid_context("RPC context failed", Id::Num(1)));
}
#[tokio::test]
async fn single_method_call_with_ok_context() {
let addr = server_with_context().await;
let uri = to_http_uri(addr);
let req = r#"{"jsonrpc":"2.0","method":"should_ok", "params":[],"id":1}"#;
let response = http_request(req.into(), uri).await.unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(response.body, ok_response("ok".into(), Id::Num(1)));
}
#[tokio::test]
async fn valid_batched_method_calls() {
let _ = env_logger::try_init();
......
......@@ -9,6 +9,7 @@ edition = "2018"
[dependencies]
async-std = "1.9"
anyhow = "1"
futures-channel = "0.3"
futures-util = "0.3"
hyper = { version = "0.14", features = ["full"] }
......
......@@ -57,6 +57,14 @@ pub fn invalid_params(id: Id) -> String {
)
}
pub fn invalid_context(msg: &str, id: Id) -> String {
format!(
r#"{{"jsonrpc":"2.0","error":{{"code":-32000,"message":"{}"}},"id":{}}}"#,
msg,
serde_json::to_string(&id).unwrap()
)
}
pub fn internal_error(id: Id) -> String {
format!(
r#"{{"jsonrpc":"2.0","error":{{"code":-32603,"message":"Internal error"}},"id":{}}}"#,
......
......@@ -18,6 +18,17 @@ pub use hyper::{Body, HeaderMap, StatusCode, Uri};
type Error = Box<dyn std::error::Error>;
pub struct TestContext;
impl TestContext {
pub fn ok(&self) -> Result<(), anyhow::Error> {
Ok(())
}
pub fn err(&self) -> Result<(), anyhow::Error> {
Err(anyhow::anyhow!("RPC context failed"))
}
}
/// Request Id
#[derive(Debug, PartialEq, Clone, Hash, Eq, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
......
......@@ -203,14 +203,14 @@ async fn wss_works() {
#[tokio::test]
async fn ws_with_non_ascii_url_doesnt_hang_or_panic() {
let err = WsClientBuilder::default().build("wss://♥♥♥♥♥♥∀∂").await;
assert!(matches!(err, Err(Error::TransportError(_))));
assert!(matches!(err, Err(Error::Transport(_))));
}
#[tokio::test]
async fn http_with_non_ascii_url_doesnt_hang_or_panic() {
let client = HttpClientBuilder::default().build("http://♥♥♥♥♥♥∀∂").unwrap();
let err: Result<(), Error> = client.request("system_chain", JsonRpcParams::NoParams).await;
assert!(matches!(err, Err(Error::TransportError(_))));
assert!(matches!(err, Err(Error::Transport(_))));
}
#[tokio::test]
......
......@@ -16,12 +16,36 @@ impl<T: fmt::Display> fmt::Display for Mismatch<T> {
}
}
/// Invalid params.
#[derive(Debug)]
pub struct InvalidParams;
/// Error that occurs when a call failed.
#[derive(Debug, thiserror::Error)]
pub enum CallError {
#[error("Invalid params in the RPC call")]
/// Invalid params in the call.
InvalidParams(InvalidParams),
#[error("RPC Call failed: {0}")]
/// The call failed.
Failed(#[source] Box<dyn std::error::Error + Send + Sync>),
}
impl From<InvalidParams> for CallError {
fn from(params: InvalidParams) -> Self {
Self::InvalidParams(params)
}
}
/// Error type.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Error that occurs when a call failed.
#[error("Server call failed: {0}")]
Call(CallError),
/// Networking error or error on the low-level protocol layer.
#[error("Networking or low-level protocol error: {0}")]
TransportError(#[source] Box<dyn std::error::Error + Send + Sync>),
Transport(#[source] Box<dyn std::error::Error + Send + Sync>),
/// JSON-RPC request error.
#[error("JSON-RPC request error: {0:?}")]
Request(#[source] JsonRpcErrorAlloc),
......@@ -34,7 +58,7 @@ pub enum Error {
/// The background task has been terminated.
#[error("The background task been terminated because: {0}; restart required")]
RestartNeeded(String),
/// Failed to parse the data that the server sent back to us.
/// Failed to parse the data.
#[error("Parse error: {0}")]
ParseError(#[source] serde_json::Error),
/// Invalid subscription ID.
......@@ -43,9 +67,6 @@ pub enum Error {
/// Invalid request ID.
#[error("Invalid request ID")]
InvalidRequestId,
/// Invalid params in the RPC call.
#[error("Invalid params in the RPC call")]
InvalidParams,
/// A request with the same request ID has already been registered.
#[error("A request with the same request ID has already been registered")]
DuplicateRequestId,
......
......@@ -47,6 +47,6 @@ pub trait SubscriptionClient: Client {
}
/// JSON-RPC server interface for managing method calls.
pub trait RpcMethod<R>: Fn(RpcParams) -> Result<R, Error> + Send + Sync + 'static {}
pub trait RpcMethod<R, E>: Fn(RpcParams) -> Result<R, E> + Send + Sync + 'static {}
impl<R, T> RpcMethod<R> for T where T: Fn(RpcParams) -> Result<R, Error> + Send + Sync + 'static {}
impl<R, T, E> RpcMethod<R, E> for T where T: Fn(RpcParams) -> Result<R, E> + Send + Sync + 'static {}
......@@ -80,6 +80,8 @@ pub const INVALID_PARAMS_CODE: i32 = -32602;
pub const INVALID_REQUEST_CODE: i32 = -32600;
/// Method not found error code.
pub const METHOD_NOT_FOUND_CODE: i32 = -32601;
/// Custom server error when a call failed.
pub const CALL_EXECUTION_FAILED_CODE: i32 = -32000;
/// Parse error message
pub const PARSE_ERROR_MSG: &str = "Parse error";
......
use crate::Error;
use crate::error::Error;
use serde::de::DeserializeOwned;
use serde_json::value::RawValue;
......@@ -12,11 +12,11 @@ pub mod request;
pub mod response;
/// Parse request ID from RawValue.
pub fn parse_request_id<T: DeserializeOwned>(raw: Option<&RawValue>) -> Result<T, crate::Error> {
pub fn parse_request_id<T: DeserializeOwned>(raw: Option<&RawValue>) -> Result<T, Error> {
match raw {
None => Err(Error::InvalidRequestId),
Some(v) => {
let val = serde_json::from_str(v.get()).map_err(Error::ParseError)?;
let val = serde_json::from_str(v.get()).map_err(|_| Error::InvalidRequestId)?;
Ok(val)
}
}
......
use crate::error::Error;
use crate::error::InvalidParams;
use alloc::collections::BTreeMap;
use serde::de::{self, Deserializer, Unexpected, Visitor};
use serde::ser::Serializer;
......@@ -78,18 +78,18 @@ impl<'a> RpcParams<'a> {
}
/// Attempt to parse all parameters as array or map into type T
pub fn parse<T>(self) -> Result<T, Error>
pub fn parse<T>(self) -> Result<T, InvalidParams>
where
T: Deserialize<'a>,
{
match self.0 {
None => Err(Error::InvalidParams),
Some(params) => serde_json::from_str(params).map_err(|_| Error::InvalidParams),
None => Err(InvalidParams),
Some(params) => serde_json::from_str(params).map_err(|_| InvalidParams),
}
}
/// Attempt to parse only the first parameter from an array into type T
pub fn one<T>(self) -> Result<T, Error>
pub fn one<T>(self) -> Result<T, InvalidParams>
where
T: Deserialize<'a>,
{
......
......@@ -253,7 +253,7 @@ impl<'a> WsClientBuilder<'a> {
let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
let (err_tx, err_rx) = oneshot::channel();
let (sockaddrs, host, mode) = parse_url(url).map_err(|e| Error::TransportError(Box::new(e)))?;
let (sockaddrs, host, mode) = parse_url(url).map_err(|e| Error::Transport(Box::new(e)))?;
let builder = WsTransportClientBuilder {
sockaddrs,
......@@ -265,7 +265,7 @@ impl<'a> WsClientBuilder<'a> {
max_request_body_size: self.max_request_body_size,
};
let (sender, receiver) = builder.build().await.map_err(|e| Error::TransportError(Box::new(e)))?;
let (sender, receiver) = builder.build().await.map_err(|e| Error::Transport(Box::new(e)))?;
async_std::task::spawn(async move {
background_task(sender, receiver, from_front, err_tx, max_capacity_per_subscription).await;
......@@ -518,7 +518,7 @@ async fn background_task(
.expect("ID unused checked above; qed"),
Err(e) => {
log::warn!("[backend]: client request failed: {:?}", e);
let _ = request.send_back.map(|s| s.send(Err(Error::TransportError(Box::new(e)))));
let _ = request.send_back.map(|s| s.send(Err(Error::Transport(Box::new(e)))));
}
}
}
......@@ -535,7 +535,7 @@ async fn background_task(
.expect("Request ID unused checked above; qed"),
Err(e) => {
log::warn!("[backend]: client subscription failed: {:?}", e);
let _ = sub.send_back.send(Err(Error::TransportError(Box::new(e))));
let _ = sub.send_back.send(Err(Error::Transport(Box::new(e))));
}
},
// User dropped a subscription.
......@@ -600,7 +600,7 @@ async fn background_task(
}
Either::Right((Some(Err(e)), _)) => {
log::error!("Error: {:?} terminating client", e);
let _ = front_error.send(Error::TransportError(Box::new(e)));
let _ = front_error.send(Error::Transport(Box::new(e)));
return;
}
Either::Right((None, _)) => {
......
......@@ -27,5 +27,6 @@ tokio-stream = { version = "0.1.1", features = ["net"] }
tokio-util = { version = "0.6", features = ["compat"] }
[dev-dependencies]
env_logger = "0.8"
jsonrpsee-test-utils = { path = "../test-utils" }
jsonrpsee-ws-client = { path = "../ws-client" }
......@@ -37,7 +37,7 @@ use tokio::net::{TcpListener, ToSocketAddrs};
use tokio_stream::{wrappers::TcpListenerStream, StreamExt};
use tokio_util::compat::TokioAsyncReadCompatExt;
use jsonrpsee_types::error::Error;
use jsonrpsee_types::error::{Error, InvalidParams};
use jsonrpsee_types::v2::error::JsonRpcErrorCode;
use jsonrpsee_types::v2::params::{JsonRpcNotificationParams, RpcParams, TwoPointZero};
use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcNotification, JsonRpcRequest};
......@@ -105,7 +105,7 @@ impl Server {
pub fn register_method<F, R>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
where
R: Serialize,
F: Fn(RpcParams) -> Result<R, Error> + Send + Sync + 'static,
F: Fn(RpcParams) -> Result<R, InvalidParams> + Send + Sync + 'static,
{
self.root.register_method(method_name, callback)
}
......
use crate::server::{RpcParams, SubscriptionId, SubscriptionSink};
use jsonrpsee_types::error::Error;
use jsonrpsee_types::traits::RpcMethod;
use jsonrpsee_utils::server::{send_response, Methods};
use jsonrpsee_types::{error::InvalidParams, traits::RpcMethod, v2::error::CALL_EXECUTION_FAILED_CODE};
use jsonrpsee_types::{
error::{CallError, Error},
v2::error::{JsonRpcErrorCode, JsonRpcErrorObject},
};
use jsonrpsee_utils::server::{send_error, send_response, Methods};
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
use serde::Serialize;
......@@ -35,16 +38,17 @@ impl RpcModule {
pub fn register_method<F, R>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
where
R: Serialize,
F: RpcMethod<R>,
F: RpcMethod<R, InvalidParams>,
{
self.verify_method_name(method_name)?;
self.methods.insert(
method_name,
Box::new(move |id, params, tx, _| {
let result = callback(params)?;
send_response(id, tx, result);
match callback(params) {
Ok(res) => send_response(id, tx, res),
Err(InvalidParams) => send_error(id, tx, JsonRpcErrorCode::InvalidParams.into()),
};
Ok(())
}),
......@@ -95,7 +99,7 @@ impl RpcModule {
self.methods.insert(
unsubscribe_method_name,
Box::new(move |id, params, tx, conn| {
let sub_id = params.one()?;
let sub_id = params.one().map_err(|e| anyhow::anyhow!("{:?}", e))?;
subscribers.lock().remove(&(conn, sub_id));
......@@ -142,7 +146,7 @@ impl<Context> RpcContextModule<Context> {
where
Context: Send + Sync + 'static,
R: Serialize,
F: Fn(RpcParams, &Context) -> Result<R, Error> + Send + Sync + 'static,
F: Fn(RpcParams, &Context) -> Result<R, CallError> + Send + Sync + 'static,
{
self.module.verify_method_name(method_name)?;
......@@ -151,14 +155,22 @@ impl<Context> RpcContextModule<Context> {
self.module.methods.insert(
method_name,
Box::new(move |id, params, tx, _| {
let result = callback(params, &*ctx)?;
send_response(id, tx, result);
match callback(params, &*ctx) {
Ok(res) => send_response(id, tx, res),
Err(CallError::InvalidParams(_)) => send_error(id, tx, JsonRpcErrorCode::InvalidParams.into()),
Err(CallError::Failed(err)) => {
let err = JsonRpcErrorObject {
code: JsonRpcErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE),
message: &err.to_string(),
data: None,
};
send_error(id, tx, err)
}
};
Ok(())
}),
);
Ok(())
}
......
#![cfg(test)]
use crate::WsServer;
use futures_channel::oneshot::{self, Sender};
use crate::{RpcContextModule, WsServer};
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::{Id, WebSocketTestClient};
use jsonrpsee_types::error::Error;
use jsonrpsee_test_utils::types::{Id, TestContext, WebSocketTestClient};
use jsonrpsee_types::error::{CallError, Error};
use serde_json::Value as JsonValue;
use std::net::SocketAddr;
/// Spawns a dummy `JSONRPC v2 WebSocket`
/// It has two hardcoded methods: "say_hello" and "add"
pub async fn server(server_started: Sender<SocketAddr>) {
pub async fn server() -> SocketAddr {
let mut server = WsServer::new("127.0.0.1:0").await.unwrap();
server
......@@ -26,17 +25,45 @@ pub async fn server(server_started: Sender<SocketAddr>) {
Ok(sum)
})
.unwrap();
server_started.send(server.local_addr().unwrap()).unwrap();
let addr = server.local_addr().unwrap();
server.start().await;
tokio::spawn(async { server.start().await });
addr
}
/// Run server with user provided context.
pub async fn server_with_context() -> SocketAddr {
let mut server = WsServer::new("127.0.0.1:0").await.unwrap();
let ctx = TestContext;
let mut rpc_ctx = RpcContextModule::new(ctx);
rpc_ctx
.register_method("should_err", |_p, ctx| {
let _ = ctx.err().map_err(|e| CallError::Failed(e.into()))?;