Unverified Commit 9fa817d9 authored by Niklas Adolfsson's avatar Niklas Adolfsson Committed by GitHub
Browse files

fix(rpc module): fail subscription calls with bad params (#728)



* fix(rpc module): fail subscription with bad params

* draft; show my point

* fix tests

* fix build

* add tests for proc macros too

* add tests for bad params in proc macros

* fix nits

* commit all files

* add ugly fix for proc macro code

* add more user friendly API

* make SubscriptionSink::close take mut self

* fix grumbles

* show james some code

* Update core/src/server/rpc_module.rs

Co-authored-by: James Wilson's avatarJames Wilson <james@jsdw.me>

* remove needless clone

* fix build

* client fix docs + error type

* simplify code: merge connect reset and unsubscribe close reason

* remove unknown close reason

* refactor: remove Error::SubscriptionClosed

* add some nice APIs to ErrorObjectOwned

* unify api

* address grumbles

* remove redundant methods for close and reject

* proc macro: compile err when subscription -> Result

* rpc module: fix test subscription test

* Update core/src/server/rpc_module.rs

Co-authored-by: James Wilson's avatarJames Wilson <james@jsdw.me>

* Update core/src/server/rpc_module.rs

Co-authored-by: James Wilson's avatarJames Wilson <james@jsdw.me>

* Update core/src/server/rpc_module.rs

Co-authored-by: James Wilson's avatarJames Wilson <james@jsdw.me>

* Update core/src/server/rpc_module.rs

Co-authored-by: James Wilson's avatarJames Wilson <james@jsdw.me>

* Update core/src/server/rpc_module.rs

Co-authored-by: James Wilson's avatarJames Wilson <james@jsdw.me>

* Update proc-macros/src/lib.rs

Co-authored-by: James Wilson's avatarJames Wilson <james@jsdw.me>

* address grumbles

* remove faulty comment

* Update core/src/server/rpc_module.rs

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>

* Update core/src/server/rpc_module.rs

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>

* Update core/src/server/rpc_module.rs

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>

* Update core/src/server/rpc_module.rs

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>

* Update core/src/server/rpc_module.rs

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>

* fix: don't send `RPC Call failed: error`.

* remove debug assert

Co-authored-by: James Wilson's avatarJames Wilson <james@jsdw.me>
Co-authored-by: David's avatarDavid <dvdplm@gmail.com>
parent b96a54bf
Pipeline #189301 passed with stages
in 4 minutes and 21 seconds
......@@ -141,10 +141,13 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws
let mut module = gen_rpc_module();
module
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, pending, _ctx| {
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Ok(())
})
.unwrap();
......
......@@ -32,6 +32,7 @@ use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Re
use async_trait::async_trait;
use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::error::CallError;
use rustc_hash::FxHashMap;
use serde::de::DeserializeOwned;
......@@ -147,7 +148,7 @@ impl ClientT for HttpClient {
Ok(response) => response,
Err(_) => {
let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?;
return Err(Error::Call(err.error.to_call_error()));
return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned())));
}
};
......@@ -186,7 +187,7 @@ impl ClientT for HttpClient {
let rps: Vec<Response<_>> =
serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::<ErrorResponse>(&body) {
Ok(e) => Error::Call(e.error.to_call_error()),
Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())),
Err(e) => Error::ParseError(e),
})?;
......
......@@ -33,7 +33,7 @@ use jsonrpsee_core::Error;
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::mocks::Id;
use jsonrpsee_test_utils::TimeoutFutureExt;
use jsonrpsee_types::error::CallError;
use jsonrpsee_types::error::{CallError, ErrorObjectOwned};
#[tokio::test]
async fn method_call_works() {
......@@ -98,34 +98,34 @@ async fn response_with_wrong_id() {
async fn response_method_not_found() {
let err =
run_request_with_response(method_not_found(Id::Num(0))).with_default_timeout().await.unwrap().unwrap_err();
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::MethodNotFound).to_call_error());
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::MethodNotFound).into_owned());
}
#[tokio::test]
async fn response_parse_error() {
let err = run_request_with_response(parse_error(Id::Num(0))).with_default_timeout().await.unwrap().unwrap_err();
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::ParseError).to_call_error());
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::ParseError).into_owned());
}
#[tokio::test]
async fn invalid_request_works() {
let err =
run_request_with_response(invalid_request(Id::Num(0_u64))).with_default_timeout().await.unwrap().unwrap_err();
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::InvalidRequest).to_call_error());
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::InvalidRequest).into_owned());
}
#[tokio::test]
async fn invalid_params_works() {
let err =
run_request_with_response(invalid_params(Id::Num(0_u64))).with_default_timeout().await.unwrap().unwrap_err();
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::InvalidParams).to_call_error());
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::InvalidParams).into_owned());
}
#[tokio::test]
async fn internal_error_works() {
let err =
run_request_with_response(internal_error(Id::Num(0_u64))).with_default_timeout().await.unwrap().unwrap_err();
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::InternalError).to_call_error());
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::InternalError).into_owned());
}
#[tokio::test]
......@@ -172,10 +172,11 @@ async fn run_request_with_response(response: String) -> Result<String, Error> {
client.request("say_hello", None).with_default_timeout().await.unwrap()
}
fn assert_jsonrpc_error_response(err: Error, exp: CallError) {
fn assert_jsonrpc_error_response(err: Error, exp: ErrorObjectOwned) {
let exp = CallError::Custom(exp);
match &err {
Error::Call(e) => {
assert_eq!(e.to_string(), exp.to_string());
Error::Call(err) => {
assert_eq!(err.to_string(), exp.to_string());
}
e => panic!("Expected error: \"{}\", got: {:?}", err, e),
};
......
......@@ -35,7 +35,7 @@ use jsonrpsee_core::Error;
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::mocks::{Id, WebSocketTestServer};
use jsonrpsee_test_utils::TimeoutFutureExt;
use jsonrpsee_types::error::CallError;
use jsonrpsee_types::error::{CallError, ErrorObjectOwned};
use serde_json::Value as JsonValue;
#[tokio::test]
......@@ -109,34 +109,34 @@ async fn response_with_wrong_id() {
async fn response_method_not_found() {
let err =
run_request_with_response(method_not_found(Id::Num(0))).with_default_timeout().await.unwrap().unwrap_err();
assert_error_response(err, ErrorObject::from(ErrorCode::MethodNotFound).to_call_error());
assert_error_response(err, ErrorObject::from(ErrorCode::MethodNotFound).into_owned());
}
#[tokio::test]
async fn parse_error_works() {
let err = run_request_with_response(parse_error(Id::Num(0))).with_default_timeout().await.unwrap().unwrap_err();
assert_error_response(err, ErrorObject::from(ErrorCode::ParseError).to_call_error());
assert_error_response(err, ErrorObject::from(ErrorCode::ParseError).into_owned());
}
#[tokio::test]
async fn invalid_request_works() {
let err =
run_request_with_response(invalid_request(Id::Num(0_u64))).with_default_timeout().await.unwrap().unwrap_err();
assert_error_response(err, ErrorObject::from(ErrorCode::InvalidRequest).to_call_error());
assert_error_response(err, ErrorObject::from(ErrorCode::InvalidRequest).into_owned());
}
#[tokio::test]
async fn invalid_params_works() {
let err =
run_request_with_response(invalid_params(Id::Num(0_u64))).with_default_timeout().await.unwrap().unwrap_err();
assert_error_response(err, ErrorObject::from(ErrorCode::InvalidParams).to_call_error());
assert_error_response(err, ErrorObject::from(ErrorCode::InvalidParams).into_owned());
}
#[tokio::test]
async fn internal_error_works() {
let err =
run_request_with_response(internal_error(Id::Num(0_u64))).with_default_timeout().await.unwrap().unwrap_err();
assert_error_response(err, ErrorObject::from(ErrorCode::InternalError).to_call_error());
assert_error_response(err, ErrorObject::from(ErrorCode::InternalError).into_owned());
}
#[tokio::test]
......@@ -283,7 +283,8 @@ async fn run_request_with_response(response: String) -> Result<String, Error> {
client.request("say_hello", None).with_default_timeout().await.unwrap()
}
fn assert_error_response(err: Error, exp: CallError) {
fn assert_error_response(err: Error, exp: ErrorObjectOwned) {
let exp = CallError::Custom(exp);
match &err {
Error::Call(e) => {
assert_eq!(e.to_string(), exp.to_string());
......
......@@ -28,10 +28,11 @@ use std::time::Duration;
use crate::client::async_client::manager::{RequestManager, RequestStatus};
use crate::client::{RequestMessage, TransportSenderT};
use crate::error::SubscriptionClosed;
use crate::Error;
use futures_channel::{mpsc, oneshot};
use jsonrpsee_types::error::CallError;
use jsonrpsee_types::response::SubscriptionError;
use jsonrpsee_types::{
ErrorResponse, Id, Notification, ParamsSer, RequestSer, Response, SubscriptionId, SubscriptionResponse,
};
......@@ -81,20 +82,14 @@ pub(crate) fn process_subscription_response(
let sub_id = response.params.subscription.into_owned();
let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
Some(request_id) => request_id,
None => return Err(None),
None => {
tracing::error!("Subscription ID: {:?} is not an active subscription", sub_id);
return Err(None);
}
};
match manager.as_subscription_mut(&request_id) {
Some(send_back_sink) => match send_back_sink.try_send(response.params.result.clone()) {
// The server sent a subscription closed notification, then close down the subscription.
Ok(()) if serde_json::from_value::<SubscriptionClosed>(response.params.result).is_ok() => {
if manager.remove_subscription(request_id, sub_id.clone()).is_some() {
Ok(())
} else {
tracing::error!("The server tried to close down an invalid subscription: {:?}", sub_id);
Err(None)
}
}
Some(send_back_sink) => match send_back_sink.try_send(response.params.result) {
Ok(()) => Ok(()),
Err(err) => {
tracing::error!("Dropping subscription {:?} error: {:?}", sub_id, err);
......@@ -110,6 +105,27 @@ pub(crate) fn process_subscription_response(
}
}
/// Attempts to close a subscription when a [`SubscriptionError`] is received.
///
/// Returns `Ok(())` if the subscription was removed
/// Return `Err(e)` if the subscription was not found.
pub(crate) fn process_subscription_close_response(
manager: &mut RequestManager,
response: SubscriptionError<JsonValue>,
) -> Result<(), Error> {
let sub_id = response.params.subscription.into_owned();
let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
Some(request_id) => request_id,
None => {
tracing::error!("The server tried to close down an invalid subscription: {:?}", sub_id);
return Err(Error::InvalidSubscriptionId);
}
};
manager.remove_subscription(request_id, sub_id).expect("Both request ID and sub ID in RequestManager; qed");
Ok(())
}
/// Attempts to process an incoming notification
///
/// Returns Ok() if the response was successfully handled
......@@ -217,17 +233,18 @@ pub(crate) fn build_unsubscribe_message(
/// Returns `Ok` if the response was successfully sent.
/// Returns `Err(_)` if the response ID was not found.
pub(crate) fn process_error_response(manager: &mut RequestManager, err: ErrorResponse) -> Result<(), Error> {
let id = err.id.clone().into_owned();
let id = err.id().clone().into_owned();
match manager.request_status(&id) {
RequestStatus::PendingMethodCall => {
let send_back = manager.complete_pending_call(id).expect("State checked above; qed");
let _ = send_back.map(|s| s.send(Err(Error::Call(err.error.to_call_error()))));
let _ =
send_back.map(|s| s.send(Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned())))));
Ok(())
}
RequestStatus::PendingSubscription => {
let (_, send_back, _) = manager.complete_pending_subscription(id).expect("State checked above; qed");
let _ = send_back.send(Err(Error::Call(err.error.to_call_error())));
let _ = send_back.send(Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned()))));
Ok(())
}
_ => Err(Error::InvalidRequestId),
......
......@@ -4,8 +4,9 @@ mod manager;
use std::time::Duration;
use crate::client::{
BatchMessage, ClientT, RegisterNotificationMessage, RequestMessage, Subscription, SubscriptionClientT,
SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT,
async_client::helpers::process_subscription_close_response, BatchMessage, ClientT, RegisterNotificationMessage,
RequestMessage, Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT,
TransportSenderT,
};
use helpers::{
build_unsubscribe_message, call_with_timeout, process_batch_response, process_error_response, process_notification,
......@@ -20,7 +21,8 @@ use futures_util::future::Either;
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use jsonrpsee_types::{
ErrorResponse, Id, Notification, NotificationSer, ParamsSer, RequestSer, Response, SubscriptionResponse,
response::SubscriptionError, ErrorResponse, Id, Notification, NotificationSer, ParamsSer, RequestSer, Response,
SubscriptionResponse,
};
use serde::de::DeserializeOwned;
use tokio::sync::Mutex;
......@@ -489,6 +491,11 @@ async fn background_task<S: TransportSenderT, R: TransportReceiverT>(
let _ = stop_subscription(&mut sender, &mut manager, unsub).await;
}
}
// Subscription error response.
else if let Ok(response) = serde_json::from_str::<SubscriptionError<_>>(&raw) {
tracing::debug!("[backend]: recv subscription closed {:?}", response);
let _ = process_subscription_close_response(&mut manager, response);
}
// Incoming Notification
else if let Ok(notif) = serde_json::from_str::<Notification<_>>(&raw) {
tracing::debug!("[backend]: recv notification {:?}", notif);
......
......@@ -30,7 +30,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task;
use crate::error::{Error, SubscriptionClosed};
use crate::error::Error;
use async_trait::async_trait;
use core::marker::PhantomData;
use futures_channel::{mpsc, oneshot};
......@@ -38,7 +38,7 @@ use futures_util::future::FutureExt;
use futures_util::sink::SinkExt;
use futures_util::stream::{Stream, StreamExt};
use jsonrpsee_types::{Id, ParamsSer, SubscriptionId};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde::de::DeserializeOwned;
use serde_json::Value as JsonValue;
#[doc(hidden)]
......@@ -162,17 +162,6 @@ pub enum SubscriptionKind {
Method(String),
}
/// Internal type to detect whether a subscription response from
/// the server was a valid notification or should be treated as an error.
#[derive(Debug, Deserialize, Serialize)]
#[serde(untagged)]
pub enum NotifResponse<Notif> {
/// Successful response.
Ok(Notif),
/// Subscription was closed.
Err(SubscriptionClosed),
}
/// Active subscription on the client.
///
/// It will automatically unsubscribe in the [`Subscription::drop`] so no need to explicitly call
......@@ -301,9 +290,8 @@ where
type Item = Result<Notif, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
let n = futures_util::ready!(self.notifs_rx.poll_next_unpin(cx));
let res = n.map(|n| match serde_json::from_value::<NotifResponse<Notif>>(n) {
Ok(NotifResponse::Ok(parsed)) => Ok(parsed),
Ok(NotifResponse::Err(e)) => Err(Error::SubscriptionClosed(e)),
let res = n.map(|n| match serde_json::from_value::<Notif>(n) {
Ok(parsed) => Ok(parsed),
Err(e) => Err(Error::ParseError(e)),
});
task::Poll::Ready(res)
......
......@@ -26,8 +26,10 @@
use std::fmt;
use jsonrpsee_types::error::CallError;
use serde::{Deserialize, Serialize};
use jsonrpsee_types::error::{
CallError, ErrorObject, ErrorObjectOwned, CALL_EXECUTION_FAILED_CODE, INVALID_PARAMS_CODE, SUBSCRIPTION_CLOSED,
UNKNOWN_ERROR_CODE,
};
/// Convenience type for displaying errors.
#[derive(Clone, Debug, PartialEq)]
......@@ -56,7 +58,7 @@ impl From<anyhow::Error> for Error {
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Error that occurs when a call failed.
#[error("JSON-RPC call failed: {0}")]
#[error("{0}")]
Call(#[from] CallError),
/// Networking error or error on the low-level protocol layer.
#[error("Networking or low-level protocol error: {0}")]
......@@ -94,9 +96,6 @@ pub enum Error {
/// Subscribe and unsubscribe method names are the same.
#[error("Cannot use the same method name for subscribe and unsubscribe, used: {0}")]
SubscriptionNameConflict(String),
/// Subscription got closed.
#[error("Subscription closed: {0:?}")]
SubscriptionClosed(SubscriptionClosed),
/// Request timeout
#[error("Request timeout")]
RequestTimeout,
......@@ -144,46 +143,47 @@ impl Error {
}
}
/// A type with a special `subscription_closed` field to detect that
/// a subscription has been closed to distinguish valid items produced
/// by the server on the subscription stream from an error.
///
/// This is included in the `result field` of the SubscriptionResponse
/// when an error is reported by the server.
#[derive(Deserialize, Serialize, Debug, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct SubscriptionClosed {
reason: SubscriptionClosedReason,
}
impl From<SubscriptionClosedReason> for SubscriptionClosed {
fn from(reason: SubscriptionClosedReason) -> Self {
Self::new(reason)
}
}
impl SubscriptionClosed {
/// Create a new [`SubscriptionClosed`].
pub fn new(reason: SubscriptionClosedReason) -> Self {
Self { reason }
}
/// Get the close reason.
pub fn close_reason(&self) -> &SubscriptionClosedReason {
&self.reason
impl Into<ErrorObjectOwned> for Error {
fn into(self) -> ErrorObjectOwned {
match self {
Error::Call(CallError::Custom(err)) => err,
Error::Call(CallError::InvalidParams(e)) => {
ErrorObject::owned(INVALID_PARAMS_CODE, e.to_string(), None::<()>)
}
Error::Call(CallError::Failed(e)) => {
ErrorObject::owned(CALL_EXECUTION_FAILED_CODE, e.to_string(), None::<()>)
}
_ => ErrorObject::owned(UNKNOWN_ERROR_CODE, self.to_string(), None::<()>),
}
}
}
/// A type to represent when a subscription gets closed
/// by either the server or client side.
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
pub enum SubscriptionClosedReason {
/// The subscription was closed by calling the unsubscribe method.
Unsubscribed,
/// The client closed the connection.
ConnectionReset,
/// The server closed the subscription, providing a description of the reason as a `String`.
Server(String),
#[derive(Clone, Debug)]
pub enum SubscriptionClosed {
/// The remote peer closed the connection or called the unsubscribe method.
RemotePeerAborted,
/// The subscription was completed successfully by the server.
Success,
/// The subscription failed during execution by the server.
Failed(ErrorObject<'static>),
}
impl Into<ErrorObjectOwned> for SubscriptionClosed {
fn into(self) -> ErrorObjectOwned {
match self {
Self::RemotePeerAborted => {
ErrorObject::owned(SUBSCRIPTION_CLOSED, "Subscription was closed by the remote peer", None::<()>)
}
Self::Success => ErrorObject::owned(
SUBSCRIPTION_CLOSED,
"Subscription was completed by the server successfully",
None::<()>,
),
Self::Failed(err) => err,
}
}
}
/// Generic transport error.
......@@ -223,30 +223,3 @@ impl From<hyper::Error> for Error {
Error::Transport(hyper_err.into())
}
}
#[cfg(test)]
mod tests {
use super::{SubscriptionClosed, SubscriptionClosedReason};
#[test]
fn subscription_closed_ser_deser_works() {
let items: Vec<(&str, SubscriptionClosed)> = vec![
(r#"{"reason":"Unsubscribed"}"#, SubscriptionClosedReason::Unsubscribed.into()),
(r#"{"reason":"ConnectionReset"}"#, SubscriptionClosedReason::ConnectionReset.into()),
(r#"{"reason":{"Server":"hoho"}}"#, SubscriptionClosedReason::Server("hoho".into()).into()),
];
for (s, d) in items {
let dsr: SubscriptionClosed = serde_json::from_str(s).unwrap();
assert_eq!(dsr, d);
let ser = serde_json::to_string(&d).unwrap();
assert_eq!(ser, s);
}
}
#[test]
fn subscription_closed_deny_unknown_field() {
let ser = r#"{"reason":"Unsubscribed","deny":1}"#;
assert!(serde_json::from_str::<SubscriptionClosed>(ser).is_err());
}
}
......@@ -29,10 +29,7 @@ use std::io;
use crate::{to_json_raw_value, Error};
use futures_channel::mpsc;
use futures_util::StreamExt;
use jsonrpsee_types::error::{
CallError, ErrorCode, ErrorObject, ErrorResponse, CALL_EXECUTION_FAILED_CODE, OVERSIZED_RESPONSE_CODE,
OVERSIZED_RESPONSE_MSG, UNKNOWN_ERROR_CODE,
};
use jsonrpsee_types::error::{ErrorCode, ErrorObject, ErrorResponse, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG};
use jsonrpsee_types::{Id, InvalidRequest, Response};
use serde::Serialize;
......@@ -121,11 +118,7 @@ impl MethodSink {
if err.is_io() {
let data = to_json_raw_value(&format!("Exceeded max limit {}", self.max_response_size)).ok();
let err = ErrorObject {
code: ErrorCode::ServerError(OVERSIZED_RESPONSE_CODE),
message: OVERSIZED_RESPONSE_MSG.into(),
data: data.as_deref(),
};
let err = ErrorObject::borrowed(OVERSIZED_RESPONSE_CODE, &OVERSIZED_RESPONSE_MSG, data.as_deref());
return self.send_error(id, err);
} else {
return self.send_error(id, ErrorCode::InternalError.into());
......@@ -143,7 +136,7 @@ impl MethodSink {
/// Send a JSON-RPC error to the client
pub fn send_error(&self, id: Id, error: ErrorObject) -> bool {
let json = match serde_json::to_string(&ErrorResponse::new(error, id)) {
let json = match serde_json::to_string(&ErrorResponse::borrowed(error, id)) {
Ok(json) => json,
Err(err) => {
tracing::error!("Error serializing error message: {:?}", err);
......@@ -161,20 +154,7 @@ impl MethodSink {
/// Helper for sending the general purpose `Error` as a JSON-RPC errors to the client
pub fn send_call_error(&self, id: Id, err: Error) -> bool {
let (code, message, data) = match err {
Error::Call(CallError::InvalidParams(e)) => (ErrorCode::InvalidParams, e.to_string(), None),
Error::Call(CallError::Failed(e)) => {
(ErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE), e.to_string(), None)
}
Error::Call(CallError::Custom { code, message, data }) => (code.into(), message, data),
// This should normally not happen because the most common use case is to
// return `Error::Call` in `register_async_method`.
e => (ErrorCode::ServerError(UNKNOWN_ERROR_CODE), e.to_string(), None),
};
let err = ErrorObject { code, message: message.into(), data: data.as_deref() };
self.send_error(id, err)
self.send_error(id, err.into())
}
/// Send a raw JSON-RPC message to the client, `MethodSink` does not check verify the validity
......
This diff is collapsed.
......@@ -15,6 +15,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
tokio = { version = "1.8", features = ["full"] }
tokio-stream = { version = "0.1", features = ["sync"] }
serde_json = { version = "1" }
[[example]]
name = "http"
......
......@@ -29,7 +29,7 @@ use std::net::SocketAddr;
use jsonrpsee::core::{async_trait, client::Subscription, Error};
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{SubscriptionSink, WsServerBuilder, WsServerHandle};
use jsonrpsee::ws_server::{PendingSubscription, WsServerBuilder, WsServerHandle};
type ExampleHash = [u8; 32];
type ExampleStorageKey = Vec<u8>;
......@@ -45,7 +45,7 @@ where
/// Subscription that takes a `StorageKey` as input and produces a `Vec<Hash>`.
#[subscription(name = "subscribeStorage" => "override", item = Vec<Hash>)]
fn subscribe_storage(&self, keys: Option<Vec<StorageKey>>) -> Result<(), Error>;
fn subscribe_storage(&self, keys: Option<Vec<StorageKey>>);
}
pub struct RpcServerImpl;
......@@ -60,12 +60,10 @@ impl RpcServer<ExampleHash, ExampleStorageKey> for RpcServerImpl {
Ok(vec![storage_key])
}
fn subscribe_storage(
&self,
mut sink: SubscriptionSink,
_keys: Option<Vec<ExampleStorageKey>>,
) -> Result<(), Error> {
sink.send(&vec![[0; 32]])
fn subscribe_storage(&self, pending: PendingSubscription, _keys: Option<Vec<ExampleStorageKey>>) {
if let Some(mut sink) = pending.accept() {
let _ = sink.send(&vec![[0; 32]]);
}
}
}
......
......@@ -31,6 +31,7 @@ use std::net::SocketAddr;
use futures::future;
use futures::StreamExt;
use jsonrpsee::core::client::{Subscription, SubscriptionClientT};
use jsonrpsee::core::error::SubscriptionClosed;
use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};
......@@ -70,13 +71,24 @@ async fn run_server() -> anyhow::Result<SocketAddr> {