Unverified Commit e159c449 authored by Maciej Hirsz's avatar Maciej Hirsz Committed by GitHub
Browse files

Crate restructuring (#590)



* Nuke V2

* fmt

* Formatting and imports

* Updated benches

* Fix doc comment link

* Brace imports in ws-server

* Reworking imports

* std first

* fmt

* std on top

* Update to match changed line numbers

* Rename jsonrpsee_utils -> jsonrpsee_core

* Migrating things types -> core

* RpcError -> ErrorResponse

* Merge types::client into core::client

* Continued move types -> core

* Removing features to make checks pass

* Move rpc_module tests to tests crate

* Fixed doc comment links

* Add futures-util dependency for client

* Remove dead code

Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>

* fmt

* Feature gate the Client trait

* Move `Client` traits to `client` module

Co-authored-by: Niklas Adolfsson's avatarNiklas Adolfsson <niklasadolfsson1@gmail.com>
parent 1a5f8a81
......@@ -8,7 +8,7 @@ members = [
"jsonrpsee",
"tests",
"types",
"utils",
"core",
"ws-client",
"ws-server",
"proc-macros",
......
use std::sync::Arc;
use criterion::*;
use futures_util::future::join_all;
use helpers::{SUB_METHOD_NAME, UNSUB_METHOD_NAME};
use jsonrpsee::{
http_client::HttpClientBuilder,
types::traits::SubscriptionClient,
types::{
traits::Client,
v2::{Id, ParamsSer, RequestSer},
},
ws_client::WsClientBuilder,
};
use std::sync::Arc;
use jsonrpsee::core::client::{Client, SubscriptionClient};
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::types::{Id, ParamsSer, RequestSer};
use jsonrpsee::ws_client::WsClientBuilder;
use tokio::runtime::Runtime as TokioRuntime;
mod helpers;
......
......@@ -26,10 +26,11 @@ pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_htt
/// Run jsonrpc WebSocket server for benchmarks.
#[cfg(feature = "jsonrpc-crate")]
pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_ws_server::Server) {
use std::sync::atomic::{AtomicU64, Ordering};
use jsonrpc_pubsub::{PubSubHandler, Session, Subscriber, SubscriptionId};
use jsonrpc_ws_server::jsonrpc_core::*;
use jsonrpc_ws_server::*;
use std::sync::atomic::{AtomicU64, Ordering};
const ID: AtomicU64 = AtomicU64::new(0);
......
[package]
name = "jsonrpsee-utils"
name = "jsonrpsee-core"
version = "0.6.0"
authors = ["Parity Technologies <admin@parity.io>"]
description = "Utilities for jsonrpsee"
......@@ -7,38 +7,36 @@ edition = "2018"
license = "MIT"
[dependencies]
anyhow = "1"
arrayvec = "0.7.1"
async-trait = "0.1"
beef = { version = "0.5.1", features = ["impl_serde"] }
thiserror = { version = "1", optional = true }
futures-channel = { version = "0.3.14", default-features = false, optional = true }
thiserror = "1"
futures-channel = { version = "0.3.14", default-features = false }
futures-util = { version = "0.3.14", default-features = false, optional = true }
hyper = { version = "0.14.10", default-features = false, features = ["stream"], optional = true }
jsonrpsee-types = { path = "../types", version = "0.6.0", optional = true }
hyper = { version = "0.14.10", default-features = false, features = ["stream"] }
jsonrpsee-types = { path = "../types", version = "0.6.0" }
tracing = { version = "0.1", optional = true }
rustc-hash = { version = "1", optional = true }
rand = { version = "0.8", optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
serde_json = { version = "1", features = ["raw_value"], optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1", features = ["raw_value"] }
soketto = "0.7.1"
parking_lot = { version = "0.11", optional = true }
tokio = { version = "1.8", features = ["rt"], optional = true }
[features]
default = []
http-helpers = ["hyper", "futures-util", "jsonrpsee-types"]
http-helpers = ["futures-util"]
server = [
"thiserror",
"futures-channel",
"futures-util",
"jsonrpsee-types",
"rustc-hash",
"serde",
"serde_json",
"tracing",
"parking_lot",
"rand",
"tokio",
]
client = ["jsonrpsee-types"]
client = ["futures-util"]
[dev-dependencies]
serde_json = "1.0"
......
......@@ -24,21 +24,101 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{error::SubscriptionClosed, v2::SubscriptionId, Error};
use core::marker::PhantomData;
use futures_channel::{mpsc, oneshot};
use futures_util::{
future::FutureExt,
sink::SinkExt,
stream::{Stream, StreamExt},
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value as JsonValue;
//! Shared utilities for `jsonrpsee` clients.
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task;
use crate::error::{Error, SubscriptionClosed};
use async_trait::async_trait;
use core::marker::PhantomData;
use futures_channel::{mpsc, oneshot};
use futures_util::future::FutureExt;
use futures_util::sink::SinkExt;
use futures_util::stream::{Stream, StreamExt};
use jsonrpsee_types::{ParamsSer, SubscriptionId};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value as JsonValue;
#[doc(hidden)]
pub mod __reexports {
pub use crate::to_json_value;
pub use jsonrpsee_types::ParamsSer;
}
/// [JSON-RPC](https://www.jsonrpc.org/specification) client interface that can make requests and notifications.
#[async_trait]
pub trait Client {
/// Send a [notification request](https://www.jsonrpc.org/specification#notification)
async fn notification<'a>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<(), Error>;
/// Send a [method call request](https://www.jsonrpc.org/specification#request_object).
async fn request<'a, R>(&self, method: &'a str, params: Option<ParamsSer<'a>>) -> Result<R, Error>
where
R: DeserializeOwned;
/// Send a [batch request](https://www.jsonrpc.org/specification#batch).
///
/// The response to batch are returned in the same order as it was inserted in the batch.
///
/// Returns `Ok` if all requests in the batch were answered successfully.
/// Returns `Error` if any of the requests in batch fails.
async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option<ParamsSer<'a>>)>) -> Result<Vec<R>, Error>
where
R: DeserializeOwned + Default + Clone;
}
/// [JSON-RPC](https://www.jsonrpc.org/specification) client interface that can make requests, notifications and subscriptions.
#[async_trait]
pub trait SubscriptionClient: Client {
/// Initiate a subscription by performing a JSON-RPC method call where the server responds with
/// a `Subscription ID` that is used to fetch messages on that subscription,
///
/// The `subscribe_method` and `params` are used to ask for the subscription towards the
/// server.
///
/// The params may be used as input for the subscription for the server to process.
///
/// The `unsubscribe_method` is used to close the subscription
///
/// The `Notif` param is a generic type to receive generic subscriptions, see [`Subscription`] for further
/// documentation.
async fn subscribe<'a, Notif>(
&self,
subscribe_method: &'a str,
params: Option<ParamsSer<'a>>,
unsubscribe_method: &'a str,
) -> Result<Subscription<Notif>, Error>
where
Notif: DeserializeOwned;
/// Register a method subscription, this is used to filter only server notifications that a user is interested in.
///
/// The `Notif` param is a generic type to receive generic subscriptions, see [`Subscription`] for further
/// documentation.
async fn subscribe_to_method<'a, Notif>(&self, method: &'a str) -> Result<Subscription<Notif>, Error>
where
Notif: DeserializeOwned;
}
#[macro_export]
/// Convert the given values to a [`jsonrpsee_types::ParamsSer`] as expected by a jsonrpsee Client (http or websocket).
macro_rules! rpc_params {
($($param:expr),*) => {
{
let mut __params = vec![];
$(
__params.push($crate::client::__reexports::to_json_value($param).expect("json serialization is infallible; qed."));
)*
Some($crate::client::__reexports::ParamsSer::Array(__params))
}
};
() => {
None
}
}
/// Subscription kind
#[derive(Debug)]
#[non_exhaustive]
......
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::fmt;
use jsonrpsee_types::error::CallError;
use serde::{Deserialize, Serialize};
/// Convenience type for displaying errors.
#[derive(Clone, Debug, PartialEq)]
pub struct Mismatch<T> {
/// Expected value.
pub expected: T,
/// Actual value.
pub got: T,
}
impl<T: fmt::Display> fmt::Display for Mismatch<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_fmt(format_args!("Expected: {}, Got: {}", self.expected, self.got))
}
}
// NOTE(niklasad1): this `From` impl is a bit opinionated to regard all generic errors as `CallError`.
// In practice this should be the most common use case for users of this library.
impl From<anyhow::Error> for Error {
fn from(err: anyhow::Error) -> Self {
Error::Call(CallError::Failed(err))
}
}
/// Error type.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Error that occurs when a call failed.
#[error("Server call failed: {0}")]
Call(#[from] CallError),
/// Networking error or error on the low-level protocol layer.
#[error("Networking or low-level protocol error: {0}")]
Transport(#[source] anyhow::Error),
/// JSON-RPC request error.
#[error("JSON-RPC request error: {0:?}")]
Request(String),
/// Frontend/backend channel error.
#[error("Frontend/backend channel error: {0}")]
Internal(#[from] futures_channel::mpsc::SendError),
/// Invalid response,
#[error("Invalid response: {0}")]
InvalidResponse(Mismatch<String>),
/// The background task has been terminated.
#[error("The background task been terminated because: {0}; restart required")]
RestartNeeded(String),
/// Failed to parse the data.
#[error("Parse error: {0}")]
ParseError(#[from] serde_json::Error),
/// Invalid subscription ID.
#[error("Invalid subscription ID")]
InvalidSubscriptionId,
/// Invalid request ID.
#[error("Invalid request ID")]
InvalidRequestId,
/// Client received a notification with an unregistered method
#[error("Unregistered notification method")]
UnregisteredNotification(String),
/// A request with the same request ID has already been registered.
#[error("A request with the same request ID has already been registered")]
DuplicateRequestId,
/// Method was already registered.
#[error("Method: {0} was already registered")]
MethodAlreadyRegistered(String),
/// Method with that name has not yet been registered.
#[error("Method: {0} has not yet been registered")]
MethodNotFound(String),
/// Subscribe and unsubscribe method names are the same.
#[error("Cannot use the same method name for subscribe and unsubscribe, used: {0}")]
SubscriptionNameConflict(String),
/// Subscription got closed.
#[error("Subscription closed: {0:?}")]
SubscriptionClosed(SubscriptionClosed),
/// Request timeout
#[error("Request timeout")]
RequestTimeout,
/// Configured max number of request slots exceeded.
#[error("Configured max number of request slots exceeded")]
MaxSlotsExceeded,
/// Attempted to stop server that is already stopped.
#[error("Attempted to stop server that is already stopped")]
AlreadyStopped,
/// List passed into `set_allowed_origins` was empty
#[error("Must set at least one allowed value for the {0} header")]
EmptyAllowList(&'static str),
/// Failed to execute a method because a resource was already at capacity
#[error("Resource at capacity: {0}")]
ResourceAtCapacity(&'static str),
/// Failed to register a resource due to a name conflict
#[error("Resource name already taken: {0}")]
ResourceNameAlreadyTaken(&'static str),
/// Failed to initialize resources for a method at startup
#[error("Resource name `{0}` not found for method `{1}`")]
ResourceNameNotFoundForMethod(&'static str, &'static str),
/// Trying to claim resources for a method execution, but the method resources have not been initialized
#[error("Method `{0}` has uninitialized resources")]
UninitializedMethod(Box<str>),
/// Failed to register a resource due to a maximum number of resources already registered
#[error("Maximum number of resources reached")]
MaxResourcesReached,
/// Custom error.
#[error("Custom error: {0}")]
Custom(String),
/// Not implemented for HTTP clients.
#[error("Not implemented")]
HttpNotImplemented,
}
impl Error {
/// Create `Error::CallError` from a generic error.
/// Useful if you don't care about specific JSON-RPC error code and
/// just wants to return your custom error type.
pub fn to_call_error<E>(err: E) -> Self
where
E: std::error::Error + Send + Sync + 'static,
{
Error::Call(CallError::from_std_error(err))
}
}
/// A type with a special `subscription_closed` field to detect that
/// a subscription has been closed to distinguish valid items produced
/// by the server on the subscription stream from an error.
///
/// This is included in the `result field` of the SubscriptionResponse
/// when an error is reported by the server.
#[derive(Deserialize, Serialize, Debug, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct SubscriptionClosed {
reason: SubscriptionClosedReason,
}
impl From<SubscriptionClosedReason> for SubscriptionClosed {
fn from(reason: SubscriptionClosedReason) -> Self {
Self::new(reason)
}
}
impl SubscriptionClosed {
/// Create a new [`SubscriptionClosed`].
pub fn new(reason: SubscriptionClosedReason) -> Self {
Self { reason }
}
/// Get the close reason.
pub fn close_reason(&self) -> &SubscriptionClosedReason {
&self.reason
}
}
/// A type to represent when a subscription gets closed
/// by either the server or client side.
#[derive(Deserialize, Serialize, Debug, PartialEq)]
pub enum SubscriptionClosedReason {
/// The subscription was closed by calling the unsubscribe method.
Unsubscribed,
/// The client closed the connection.
ConnectionReset,
/// The server closed the subscription, providing a description of the reason as a `String`.
Server(String),
}
/// Generic transport error.
#[derive(Debug, thiserror::Error)]
pub enum GenericTransportError<T: std::error::Error + Send + Sync> {
/// Request was too large.
#[error("The request was too big")]
TooLarge,
/// Malformed request
#[error("Malformed request")]
Malformed,
/// Concrete transport error.
#[error("Transport error: {0}")]
Inner(T),
}
impl From<std::io::Error> for Error {
fn from(io_err: std::io::Error) -> Error {
Error::Transport(io_err.into())
}
}
impl From<soketto::handshake::Error> for Error {
fn from(handshake_err: soketto::handshake::Error) -> Error {
Error::Transport(handshake_err.into())
}
}
impl From<soketto::connection::Error> for Error {
fn from(conn_err: soketto::connection::Error) -> Error {
Error::Transport(conn_err.into())
}
}
impl From<hyper::Error> for Error {
fn from(hyper_err: hyper::Error) -> Error {
Error::Transport(hyper_err.into())
}
}
#[cfg(test)]
mod tests {
use super::{SubscriptionClosed, SubscriptionClosedReason};
#[test]
fn subscription_closed_ser_deser_works() {
let items: Vec<(&str, SubscriptionClosed)> = vec![
(r#"{"reason":"Unsubscribed"}"#, SubscriptionClosedReason::Unsubscribed.into()),
(r#"{"reason":"ConnectionReset"}"#, SubscriptionClosedReason::ConnectionReset.into()),
(r#"{"reason":{"Server":"hoho"}}"#, SubscriptionClosedReason::Server("hoho".into()).into()),
];
for (s, d) in items {
let dsr: SubscriptionClosed = serde_json::from_str(s).unwrap();
assert_eq!(dsr, d);
let ser = serde_json::to_string(&d).unwrap();
assert_eq!(ser, s);
}
}
#[test]
fn subscription_closed_deny_unknown_field() {
let ser = r#"{"reason":"Unsubscribed","deny":1}"#;
assert!(serde_json::from_str::<SubscriptionClosed>(ser).is_err());
}
}
......@@ -26,8 +26,8 @@
//! Utility methods relying on hyper
use crate::error::GenericTransportError;
use futures_util::stream::StreamExt;
use jsonrpsee_types::error::GenericTransportError;
/// Read a data from a [`hyper::Body`] and return the data if it is valid and within the allowed size range.
///
......
......@@ -28,6 +28,15 @@
#![warn(missing_docs, missing_debug_implementations, unreachable_pub)]
/// Error type.
pub mod error;
/// Traits
pub mod traits;
/// Middleware trait and implementation.
pub mod middleware;
/// Shared hyper helpers.
#[cfg(feature = "http-helpers")]
pub mod http_helpers;
......@@ -39,3 +48,25 @@ pub mod server;
/// Shared code for JSON-RPC clients.
#[cfg(feature = "client")]
pub mod client;
pub use async_trait::async_trait;
pub use error::Error;
/// JSON-RPC result.
pub type RpcResult<T> = std::result::Result<T, Error>;
/// Re-exports for proc-macro library to not require any additional
/// dependencies to be explicitly added on the client side.
#[doc(hidden)]
pub mod __reexports {
pub use async_trait::async_trait;
pub use serde;
pub use serde_json;
}
pub use beef::Cow;
pub use serde::{de::DeserializeOwned, Serialize};
pub use serde_json::{
to_value as to_json_value, value::to_raw_value as to_json_raw_value, value::RawValue as JsonRawValue,
Value as JsonValue,
};
......@@ -24,25 +24,25 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::io;
use crate::{to_json_raw_value, Error};
use futures_channel::mpsc;
use futures_util::stream::StreamExt;
use jsonrpsee_types::error::{CallError, Error};
use jsonrpsee_types::to_json_raw_value;
use jsonrpsee_types::v2::error::{OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG};
use jsonrpsee_types::v2::{
error::{CALL_EXECUTION_FAILED_CODE, UNKNOWN_ERROR_CODE},
ErrorCode, ErrorObject, Id, InvalidRequest, Response, RpcError,
use jsonrpsee_types::error::{
CallError, ErrorCode, ErrorObject, ErrorResponse, CALL_EXECUTION_FAILED_CODE, OVERSIZED_RESPONSE_CODE,
OVERSIZED_RESPONSE_MSG, UNKNOWN_ERROR_CODE,
};
use jsonrpsee_types::{Id, InvalidRequest, Response};
use serde::Serialize;
use std::io;
/// Bounded writer that allows writing at most `max_len` bytes.
///
/// ```
/// use jsonrpsee_utils::server::helpers::BoundedWriter;
/// use std::io::Write;
///
/// use jsonrpsee_core::server::helpers::BoundedWriter;
///
/// let mut writer = BoundedWriter::new(10);
/// (&mut writer).write("hello".as_bytes()).unwrap();
/// assert_eq!(std::str::from_utf8(&writer.into_bytes()).unwrap(), "hello");
......@@ -143,7 +143,7 @@ impl MethodSink {
/// Send a JSON-RPC error to the client
pub fn send_error(&self, id: Id, error: ErrorObject) -> bool {
let json = match serde_json::to_string(&RpcError::new(error, id)) {
let json = match serde_json::to_string(&ErrorResponse::new(error, id)) {
Ok(json) => json,
Err(err) => {
tracing::error!("Error serializing error message: {:?}", err);
......
......@@ -52,7 +52,7 @@
//! `#[method]` attribute:
//!
//! ```
//! # use jsonrpsee::{types::RpcResult, proc_macros::rpc};
//! # use jsonrpsee::{core::RpcResult, proc_macros::rpc};
//! #
//! #[rpc(server)]
//! pub trait Rpc {
......@@ -67,7 +67,7 @@
//! Alternatively, you can use the `resource` method when creating a module manually without the help of the macro:
//!
//! ```
//! # use jsonrpsee::{RpcModule, types::RpcResult};
//! # use jsonrpsee::{RpcModule, core::RpcResult};
//! #
//! # fn main() -> RpcResult<()> {
//! #
......@@ -91,8 +91,8 @@
use std::sync::Arc;
use crate::Error;
use arrayvec::ArrayVec;
use jsonrpsee_types::error::Error;
use parking_lot::Mutex;
// The number of kinds of resources that can be used for limiting.
......
......@@ -24,32 +24,27 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::collections::hash_map::Entry;
use std::fmt::{self, Debug};
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use crate::error::{Error, SubscriptionClosed, SubscriptionClosedReason};
use crate::server::helpers::MethodSink;