Unverified Commit dbd30e28 authored by Niklas Adolfsson's avatar Niklas Adolfsson
Browse files

Merge remote-tracking branch 'origin/master' into na-middleware

parents c2e686cb 98c23fc1
Pipeline #200896 passed with stages
in 6 minutes and 19 seconds
......@@ -92,7 +92,7 @@ jobs:
run: cargo hack check --workspace --each-feature --all-targets
tests_ubuntu:
name: Run nextests on Ubuntu
name: Run tests Ubuntu
runs-on: ubuntu-latest
steps:
- name: Checkout sources
......
......@@ -141,13 +141,10 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws
let mut module = gen_rpc_module();
module
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, pending, _ctx| {
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Ok(())
})
.unwrap();
......
......@@ -122,9 +122,10 @@ impl MethodSink {
if let Err(err) = self.send_raw(json) {
tracing::warn!("Error sending response {:?}", err);
false
} else {
true
}
false
}
/// Helper for sending the general purpose `Error` as a JSON-RPC errors to the client
......
......@@ -39,11 +39,14 @@ use futures_channel::{mpsc, oneshot};
use futures_util::future::Either;
use futures_util::pin_mut;
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt, TryStream, TryStreamExt};
use jsonrpsee_types::error::{CallError, ErrorCode, ErrorObject, ErrorObjectOwned, SUBSCRIPTION_CLOSED_WITH_ERROR};
use jsonrpsee_types::error::{
CallError, ErrorCode, ErrorObject, ErrorObjectOwned, SubscriptionAcceptRejectError, INTERNAL_ERROR_CODE,
SUBSCRIPTION_CLOSED_WITH_ERROR,
};
use jsonrpsee_types::response::{SubscriptionError, SubscriptionPayloadError};
use jsonrpsee_types::{
ErrorResponse, Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload,
SubscriptionResponse,
SubscriptionResponse, SubscriptionResult,
};
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
......@@ -96,7 +99,7 @@ pub struct ConnState<'a> {
/// Outcome of a successful terminated subscription.
#[derive(Debug)]
pub enum SubscriptionResult {
pub enum InnerSubscriptionResult {
/// The subscription stream was executed successfully.
Success,
/// The subscription was aborted by the remote peer.
......@@ -391,8 +394,9 @@ impl Methods {
/// use futures_util::StreamExt;
///
/// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| {
/// pending.accept().unwrap().send(&"one answer").unwrap();
/// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| {
/// sink.send(&"one answer").unwrap();
/// Ok(())
/// }).unwrap();
/// let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#).await.unwrap();
/// let resp = serde_json::from_str::<Response<u64>>(&resp.result).unwrap();
......@@ -461,8 +465,9 @@ impl Methods {
/// use jsonrpsee::{RpcModule, types::EmptyParams};
///
/// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| {
/// pending.accept().unwrap().send(&"one answer").unwrap();
/// module.register_subscription("hi", "hi", "goodbye", |_, mut sink, _| {
/// sink.send(&"one answer").unwrap();
/// Ok(())
/// }).unwrap();
///
/// let mut sub = module.subscribe("hi", EmptyParams::new()).await.unwrap();
......@@ -677,21 +682,22 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// use jsonrpsee_core::Error;
///
/// let mut ctx = RpcModule::new(99_usize);
/// ctx.register_subscription("sub", "notif_name", "unsub", |params, pending, ctx| {
/// ctx.register_subscription("sub", "notif_name", "unsub", |params, mut sink, ctx| {
/// let x = match params.one::<usize>() {
/// Ok(x) => x,
/// Err(e) => {
/// let err: Error = e.into();
/// pending.reject(err);
/// return;
/// sink.reject(err);
/// return Ok(());
/// }
/// };
/// // Sink is accepted on the first `send` call.
/// std::thread::spawn(move || {
/// let sum = x + (*ctx);
/// let _ = sink.send(&sum);
/// });
///
/// // Only fails in the connection is closed.
/// let sink = pending.accept().unwrap();
///
/// let sum = x + (*ctx);
/// let _ = sink.send(&sum);
/// Ok(())
/// });
/// ```
pub fn register_subscription<F>(
......@@ -703,7 +709,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
) -> Result<MethodResourcesBuilder, Error>
where
Context: Send + Sync + 'static,
F: Fn(Params, PendingSubscription, Arc<Context>) + Send + Sync + 'static,
F: Fn(Params, SubscriptionSink, Arc<Context>) -> SubscriptionResult + Send + Sync + 'static,
{
if subscribe_method_name == unsubscribe_method_name {
return Err(Error::SubscriptionNameConflict(subscribe_method_name.into()));
......@@ -762,19 +768,21 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
// response to the subscription call.
let (tx, rx) = oneshot::channel();
let pending_subscription = PendingSubscription(Some(InnerPendingSubscription {
sink: method_sink,
subscribe_call: tx,
let sink = SubscriptionSink {
inner: method_sink.clone(),
close_notify: Some(conn.close_notify),
method: notif_method_name,
subscribers: subscribers.clone(),
uniq_sub,
id: id.clone().into_owned(),
claimed,
}));
id: Some((id.clone().into_owned(), tx)),
unsubscribe: None,
_claimed: claimed,
};
// The end-user needs to accept/reject the `pending_subscription` to make any progress.
callback(params, pending_subscription, ctx.clone());
// The callback returns a `SubscriptionResult` for better ergonomics and is not propagated further.
if let Err(_) = callback(params, sink, ctx.clone()) {
tracing::warn!("subscribe call `{}` failed", subscribe_method_name);
}
let id = id.clone().into_owned();
......@@ -808,130 +816,86 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
}
}
/// Represent a pending subscription which waits to be accepted or rejected.
///
/// Note: you need to call either `PendingSubscription::accept` or `PendingSubscription::reject` otherwise
/// the subscription will be dropped with an `InvalidParams` error.
/// Returns once the unsubscribe method has been called.
type UnsubscribeCall = Option<watch::Receiver<()>>;
/// Represents a single subscription.
#[derive(Debug)]
struct InnerPendingSubscription {
pub struct SubscriptionSink {
/// Sink.
sink: MethodSink,
/// Response to the subscription call.
subscribe_call: oneshot::Sender<MethodResponse>,
inner: MethodSink,
/// Get notified when subscribers leave so we can exit
close_notify: Option<SubscriptionPermit>,
/// MethodCallback.
method: &'static str,
/// Shared Mutex of subscriptions for this method.
subscribers: Subscribers,
/// Unique subscription.
uniq_sub: SubscriptionKey,
/// Shared Mutex of subscriptions
subscribers: Subscribers,
/// Request ID.
id: Id<'static>,
/// Id of the `subscription call` (i.e. not the same as subscription id) which is used
/// to reply to subscription method call and must only be used once.
///
/// *Note*: Having some value means the subscription was not accepted or rejected yet.
id: Option<(Id<'static>, oneshot::Sender<MethodResponse>)>,
/// Having some value means the subscription was accepted.
unsubscribe: UnsubscribeCall,
/// Claimed resources.
claimed: Option<ResourceGuard>,
_claimed: Option<ResourceGuard>,
}
/// Represent a pending subscription which waits until it's either accepted or rejected.
///
/// This type implements `Drop` for ease of use, e.g. when dropped in error short circuiting via `map_err()?`.
#[derive(Debug)]
pub struct PendingSubscription(Option<InnerPendingSubscription>);
impl PendingSubscription {
impl SubscriptionSink {
/// Reject the subscription call from [`ErrorObject`].
pub fn reject(mut self, err: impl Into<ErrorObjectOwned>) -> bool {
if let Some(inner) = self.0.take() {
let InnerPendingSubscription { subscribe_call, id, sink, .. } = inner;
pub fn reject(&mut self, err: impl Into<ErrorObjectOwned>) -> Result<(), SubscriptionAcceptRejectError> {
let (id, subscribe_call) = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?;
let err = MethodResponse::error(id, err.into());
let err = MethodResponse::error(id, err.into());
let ws_send = sink.send_raw(err.result.clone()).is_ok();
let middleware_call = subscribe_call.send(err).is_ok();
let ws_send = self.inner.send_raw(err.result.clone()).is_ok();
let middleware_call = subscribe_call.send(err).is_ok();
ws_send && middleware_call
if ws_send && middleware_call {
Ok(())
} else {
false
Err(SubscriptionAcceptRejectError::RemotePeerAborted)
}
}
/// Attempt to accept the subscription and respond the subscription method call.
///
/// Fails if the connection was closed
pub fn accept(mut self) -> Option<SubscriptionSink> {
let inner = self.0.take()?;
/// Fails if the connection was closed, or if called multiple times.
pub fn accept(&mut self) -> Result<(), SubscriptionAcceptRejectError> {
let (id, subscribe_call) = self.id.take().ok_or(SubscriptionAcceptRejectError::AlreadyCalled)?;
let InnerPendingSubscription { sink, close_notify, method, uniq_sub, subscribers, id, subscribe_call, claimed } =
inner;
let response = MethodResponse::response(id, &uniq_sub.sub_id, sink.max_response_size() as usize);
let response = MethodResponse::response(id, &self.uniq_sub.sub_id, self.inner.max_response_size() as usize);
let success = response.success;
let ws_send = sink.send_raw(response.result.clone()).is_ok();
let ws_send = self.inner.send_raw(response.result.clone()).is_ok();
let middleware_call = subscribe_call.send(response).is_ok();
if ws_send && middleware_call && success {
let (unsubscribe_tx, unsubscribe_rx) = watch::channel(());
subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), unsubscribe_tx));
return Some(SubscriptionSink {
inner: sink,
close_notify,
method,
uniq_sub,
subscribers,
unsubscribe: unsubscribe_rx,
_claimed: claimed,
});
}
None
}
}
// When dropped it returns an [`InvalidParams`] error to the subscriber
impl Drop for PendingSubscription {
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
let InnerPendingSubscription { subscribe_call, id, sink, .. } = inner;
let err = MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidParams));
let _ws_send = sink.send_raw(err.result.clone()).is_ok();
let _middleware_call = subscribe_call.send(err).is_ok();
let (tx, rx) = watch::channel(());
self.subscribers.lock().insert(self.uniq_sub.clone(), (self.inner.clone(), tx));
self.unsubscribe = Some(rx);
Ok(())
} else {
Err(SubscriptionAcceptRejectError::RemotePeerAborted)
}
}
}
/// Represents a single subscription.
#[derive(Debug)]
pub struct SubscriptionSink {
/// Sink.
inner: MethodSink,
/// Get notified when subscribers leave so we can exit
close_notify: Option<SubscriptionPermit>,
/// MethodCallback.
method: &'static str,
/// Unique subscription.
uniq_sub: SubscriptionKey,
/// Shared Mutex of subscriptions for this method.
subscribers: Subscribers,
/// Future that returns when the unsubscribe method has been called.
unsubscribe: watch::Receiver<()>,
/// Claimed resources.
_claimed: Option<ResourceGuard>,
}
impl SubscriptionSink {
/// Send a message back to subscribers.
///
/// Returns `Ok(true)` if the message could be send
/// Returns `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated)
/// Return `Err(err)` if the message could not be serialized.
///
pub fn send<T: Serialize>(&self, result: &T) -> Result<bool, serde_json::Error> {
// only possible to trigger when the connection is dropped.
/// Returns
/// - `Ok(true)` if the message could be send.
/// - `Ok(false)` if the sink was closed (either because the subscription was closed or the connection was terminated),
/// or the subscription could not be accepted.
/// - `Err(err)` if the message could not be serialized.
pub fn send<T: Serialize>(&mut self, result: &T) -> Result<bool, serde_json::Error> {
// Cannot accept the subscription.
if let Err(SubscriptionAcceptRejectError::RemotePeerAborted) = self.accept() {
return Ok(false);
}
// Only possible to trigger when the connection is dropped.
if self.is_closed() {
return Ok(false);
}
......@@ -951,18 +915,17 @@ impl SubscriptionSink {
///
/// ```no_run
///
/// use jsonrpsee_core::server::rpc_module::{RpcModule, SubscriptionResult};
/// use jsonrpsee_core::server::rpc_module::RpcModule;
/// use jsonrpsee_core::error::{Error, SubscriptionClosed};
/// use jsonrpsee_types::ErrorObjectOwned;
/// use anyhow::anyhow;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
///
/// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
/// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
/// // because after the `Err(_)` the stream is terminated.
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
/// let sink = pending.accept().unwrap();
///
/// tokio::spawn(async move {
///
......@@ -980,22 +943,35 @@ impl SubscriptionSink {
/// }
/// }
/// });
/// Ok(())
/// });
/// ```
pub async fn pipe_from_try_stream<S, T, E>(&self, mut stream: S) -> SubscriptionClosed
pub async fn pipe_from_try_stream<S, T, E>(&mut self, mut stream: S) -> SubscriptionClosed
where
S: TryStream<Ok = T, Error = E> + Unpin,
T: Serialize,
E: std::fmt::Display,
{
if let Err(SubscriptionAcceptRejectError::RemotePeerAborted) = self.accept() {
return SubscriptionClosed::RemotePeerAborted;
}
let conn_closed = match self.close_notify.as_ref().map(|cn| cn.handle()) {
Some(cn) => cn,
None => {
return SubscriptionClosed::RemotePeerAborted;
None => return SubscriptionClosed::RemotePeerAborted,
};
let mut sub_closed = match self.unsubscribe.as_ref() {
Some(rx) => rx.clone(),
_ => {
return SubscriptionClosed::Failed(ErrorObject::owned(
INTERNAL_ERROR_CODE,
"Unsubscribe watcher not set after accepting the subscription".to_string(),
None::<()>,
))
}
};
let mut sub_closed = self.unsubscribe.clone();
let sub_closed_fut = sub_closed.changed();
let conn_closed_fut = conn_closed.notified();
......@@ -1048,16 +1024,13 @@ impl SubscriptionSink {
/// use jsonrpsee_core::server::rpc_module::RpcModule;
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
/// let sink = pending.accept().unwrap();
/// m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
/// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
///
/// tokio::spawn(async move {
/// sink.pipe_from_stream(stream).await;
/// });
/// tokio::spawn(async move { sink.pipe_from_stream(stream).await; });
/// Ok(())
/// });
/// ```
pub async fn pipe_from_stream<S, T>(&self, stream: S) -> SubscriptionClosed
pub async fn pipe_from_stream<S, T>(&mut self, stream: S) -> SubscriptionClosed
where
S: Stream<Item = T> + Unpin,
T: Serialize,
......@@ -1071,7 +1044,10 @@ impl SubscriptionSink {
}
fn is_active_subscription(&self) -> bool {
!self.unsubscribe.has_changed().is_err()
match self.unsubscribe.as_ref() {
Some(unsubscribe) => !unsubscribe.has_changed().is_err(),
_ => false,
}
}
fn build_message<T: Serialize>(&self, result: &T) -> Result<String, serde_json::Error> {
......@@ -1125,7 +1101,16 @@ impl SubscriptionSink {
impl Drop for SubscriptionSink {
fn drop(&mut self) {
if self.is_active_subscription() {
if let Some((id, subscribe_call)) = self.id.take() {
// Subscription was never accepted / rejected. As such,
// we default to assuming that the params were invalid,
// because that's how the previous PendingSubscription logic
// worked.
let err = MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidParams));
let _ws_send = self.inner.send_raw(err.result.clone()).is_ok();
let _middleware_call = subscribe_call.send(err).is_ok();
} else if self.is_active_subscription() {
self.subscribers.lock().remove(&self.uniq_sub);
}
}
......
......@@ -28,8 +28,9 @@ use std::net::SocketAddr;
use jsonrpsee::core::{async_trait, client::Subscription, Error};
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::types::SubscriptionResult;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{PendingSubscription, WsServerBuilder, WsServerHandle};
use jsonrpsee::ws_server::{SubscriptionSink, WsServerBuilder, WsServerHandle};
type ExampleHash = [u8; 32];
type ExampleStorageKey = Vec<u8>;
......@@ -60,10 +61,14 @@ impl RpcServer<ExampleHash, ExampleStorageKey> for RpcServerImpl {
Ok(vec![storage_key])
}
fn subscribe_storage(&self, pending: PendingSubscription, _keys: Option<Vec<ExampleStorageKey>>) {
if let Some(sink) = pending.accept() {
let _ = sink.send(&vec![[0; 32]]);
}
// Note that the server's subscription method must return `SubscriptionResult`.
fn subscribe_storage(
&self,
mut sink: SubscriptionSink,
_keys: Option<Vec<ExampleStorageKey>>,
) -> SubscriptionResult {
let _ = sink.send(&vec![[0; 32]]);
Ok(())
}
}
......
......@@ -71,14 +71,9 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
std::thread::spawn(move || produce_items(tx2));
module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, pending, _| {
module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, mut sink, _| {
let rx = BroadcastStream::new(tx.clone().subscribe());
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
match sink.pipe_from_try_stream(rx).await {
SubscriptionClosed::Success => {
......@@ -90,6 +85,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
}
};
});
Ok(())
})?;
let addr = server.local_addr()?;
server.start(module)?;
......
......@@ -66,63 +66,43 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, pending, _| {
let idx = match params.one() {
Ok(idx) => idx,
_ => return,
};
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, mut sink, _| {
let idx = params.one()?;
let item = LETTERS.chars().nth(idx);
let interval = interval(Duration::from_millis(200));
let stream = IntervalStream::new(interval).map(move |_| item);
tokio::spawn(async move {
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
match sink.pipe_from_stream(stream).await {
// Send close notification when subscription stream failed.
SubscriptionClosed::Failed(err) => {
sink.close(err);
}
// Don't send close notification because the stream should run forever.
SubscriptionClosed::Success => (),
// Don't send close because the client has already disconnected.
SubscriptionClosed::RemotePeerAborted => (),
_ => (),
};
});
Ok(())
})
.unwrap();
module
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, pending, _| {
let (one, two) = match params.parse::<(usize, usize)>() {
Ok(res) => res,
_ => return,
};
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, mut sink, _| {
let (one, two) = params.parse::<(usize, usize)>()?;
let item = &LETTERS[one..two];
let interval = interval(Duration::from_millis(200));
let stream = IntervalStream::new(interval).map(move |_| item);
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
match sink.pipe_from_stream(stream).await {
// Send close notification when subscription stream failed.
SubscriptionClosed::Failed(err) => {
sink.close(err);
}
// Don't send close notification because the stream should run forever.
SubscriptionClosed::Success => (),
// Don't send close because the client has already disconnected.
SubscriptionClosed::RemotePeerAborted => (),
_ => (),
};
});
Ok(())
})
.unwrap();
......
......@@ -92,7 +92,7 @@ cfg_types! {
}
cfg_server! {
pub use jsonrpsee_core::server::rpc_module::{PendingSubscription, RpcModule, SubscriptionSink};
pub use jsonrpsee_core::server::rpc_module::{RpcModule, SubscriptionSink};
}
cfg_client_or_server! {
......
......@@ -49,13 +49,14 @@ pub(crate) mod visitor;
/// type that implements `Client` or `SubscriptionClient` (depending on whether trait has
/// subscriptions methods or not), namely `HttpClient` and `WsClient`.
///
/// For servers, it will generate a trait mostly equivalent to the input, with two main
/// differences:
/// For servers, it will generate a trait mostly equivalent to the input, with the following differences:
///
/// - The trait will have one additional (already implemented) method, `into_rpc`, which turns any object that
/// implements the server trait into an `RpcModule`.
/// - For subscription methods, there will be one additional argument inserted right after `&self`: `subscription_sink:
/// PendingSubscription`. It should be used accept or reject a pending subscription.
/// - For subscription methods:
/// - There will be one additional argument inserted right after `&self`: `subscription_sink: SubscriptionSink`.
/// It should be used to accept or reject a subscription and send data back to subscribers.
/// - The return type of the subscription method is `SubscriptionResult` for improved ergonomics.
///