Unverified Commit cbc189e2 authored by Niklas Adolfsson's avatar Niklas Adolfsson
Browse files

fix tests

parent 000492cb
Pipeline #199239 passed with stages
in 5 minutes and 32 seconds
......@@ -142,12 +142,11 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws
module
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, pending, _ctx| {
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
tokio::spawn(async move {
let mut sink = pending.accept().await.unwrap();
let _ = sink.send(&x);
});
})
.unwrap();
......
......@@ -15,7 +15,7 @@ jsonrpsee-client-transport = { path = "../transport", version = "0.14.0", featur
jsonrpsee-core = { path = "../../core", version = "0.14.0", features = ["async-wasm-client"] }
[dev-dependencies]
env_logger = "0.9"
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
jsonrpsee-test-utils = { path = "../../test-utils" }
tokio = { version = "1", features = ["macros"] }
serde_json = "1"
......@@ -15,7 +15,7 @@ jsonrpsee-client-transport = { path = "../transport", version = "0.14.0", featur
jsonrpsee-core = { path = "../../core", version = "0.14.0", features = ["async-client"] }
[dev-dependencies]
env_logger = "0.9"
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
jsonrpsee-test-utils = { path = "../../test-utils" }
tokio = { version = "1", features = ["macros"] }
serde_json = "1"
......
......@@ -205,7 +205,7 @@ async fn notification_without_polling_doesnt_make_client_unuseable() {
client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap();
// don't poll the notification stream for 2 seconds, should be full now.
std::thread::sleep(std::time::Duration::from_secs(2));
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
// Capacity is `num_sender` + `capacity`
for _ in 0..5 {
......@@ -244,6 +244,11 @@ async fn batch_request_out_of_order_response() {
#[tokio::test]
async fn is_connected_works() {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");
let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
ok_response(JsonValue::String("foo".into()), Id::Num(99_u64)),
......@@ -254,9 +259,11 @@ async fn is_connected_works() {
let uri = to_ws_uri_string(server.local_addr());
let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap();
assert!(client.is_connected());
client.request::<String>("say_hello", None).with_default_timeout().await.unwrap().unwrap_err();
// give the background thread some time to terminate.
std::thread::sleep(std::time::Duration::from_millis(100));
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert!(!client.is_connected())
}
......@@ -295,7 +302,6 @@ fn assert_error_response(err: Error, exp: ErrorObjectOwned) {
#[tokio::test]
async fn redirections() {
let _ = env_logger::try_init();
let expected = "abc 123";
let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
......
......@@ -29,7 +29,6 @@ use std::sync::Arc;
use crate::Error;
use futures_channel::mpsc;
use futures_util::StreamExt;
use jsonrpsee_types::error::{ErrorCode, ErrorObject, ErrorResponse, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG};
use jsonrpsee_types::{Id, InvalidRequest, Response};
use serde::Serialize;
......@@ -186,24 +185,6 @@ pub fn prepare_error(data: &[u8]) -> (Id<'_>, ErrorCode) {
}
}
/// Read all the results of all method calls in a batch request from the ['Stream']. Format the result into a single
/// `String` appropriately wrapped in `[`/`]`.
pub async fn collect_batch_response(rx: mpsc::UnboundedReceiver<String>) -> String {
let mut buf = String::with_capacity(2048);
buf.push('[');
let mut buf = rx
.fold(buf, |mut acc, response| async move {
acc.push_str(&response);
acc.push(',');
acc
})
.await;
// Remove trailing comma
buf.pop();
buf.push(']');
buf
}
/// A permitted subscription.
#[derive(Debug)]
pub struct SubscriptionPermit {
......@@ -296,17 +277,19 @@ impl MethodResponse {
}
}
pub fn error(id: Id, err: impl Into<ErrorObject<'static>>) -> Self {
/// Create a `MethodResponse` from an error.
pub fn error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
let result = serde_json::to_string(&ErrorResponse::borrowed(err.into(), id)).unwrap();
Self { result, success: false }
}
}
/// Builder to build a `BatchResponse`.
#[derive(Debug)]
pub struct BatchResponseBuilder {
/// Serialized response,
/// Formatted JSON-RPC response.
result: String,
/// Status indicates whether the call was successful or or.
/// Indicates whether the call was successful or not.
success: bool,
}
......@@ -338,13 +321,17 @@ impl BatchResponseBuilder {
}
}
/// Response to a batch request.
#[derive(Debug)]
pub struct BatchResponse {
/// Formatted JSON-RPC response.
pub result: String,
/// Indicates whether the call was successful or not.
pub success: bool,
}
impl BatchResponse {
/// Create a `BatchResponse` from an error.
pub fn error(id: Id, err: impl Into<ErrorObject<'static>>) -> Self {
let result = serde_json::to_string(&ErrorResponse::borrowed(err.into(), id)).unwrap();
Self { result, success: false }
......
......@@ -64,8 +64,9 @@ pub type AsyncMethod<'a> = Arc<
+ Fn(Id<'a>, Params<'a>, ConnectionId, MaxResponseSize, Option<ResourceGuard>) -> BoxFuture<'a, MethodResponse>,
>;
/// Method callback for subscriptions.
pub type SubscriptionMethod<'a> =
Arc<dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState) -> BoxFuture<'a, MethodResponse>>;
pub type SubscriptionMethod<'a> = Arc<
dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState, oneshot::Receiver<()>) -> BoxFuture<'a, MethodResponse>,
>;
// Method callback to unsubscribe.
type UnsubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, ConnectionId, MaxResponseSize) -> MethodResponse>;
......@@ -391,10 +392,12 @@ impl Methods {
///
/// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| {
/// pending.accept().unwrap().send(&"one answer").unwrap();
/// tokio::spawn(async move {
/// pending.accept().await.unwrap().send(&"one answer").unwrap();
/// });
/// }).unwrap();
/// let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#).await.unwrap();
/// let resp = serde_json::from_str::<Response<u64>>(&resp).unwrap();
/// let resp = serde_json::from_str::<Response<u64>>(&resp.result).unwrap();
/// let sub_resp = stream.next().await.unwrap();
/// assert_eq!(
/// format!(r#"{{"jsonrpc":"2.0","method":"hi","params":{{"subscription":{},"result":"one answer"}}}}"#, resp.result),
......@@ -414,6 +417,9 @@ impl Methods {
/// Execute a callback.
async fn inner_call(&self, req: Request<'_>) -> RawRpcResponse {
// the subscription call was dispatched.
let (tx, rx) = oneshot::channel();
let (tx_sink, rx_sink) = mpsc::unbounded();
let sink = MethodSink::new(tx_sink);
let id = req.id.clone();
......@@ -428,13 +434,16 @@ impl Methods {
Some(MethodKind::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), 0, usize::MAX, None).await,
Some(MethodKind::Subscription(cb)) => {
let conn_state = ConnState { conn_id: 0, close_notify, id_provider: &RandomIntegerIdProvider };
(cb)(id, params, sink.clone(), conn_state).await
(cb)(id, params, sink.clone(), conn_state, rx).await
}
Some(MethodKind::Unsubscription(cb)) => (cb)(id, params, 0, usize::MAX),
};
tracing::trace!("[Methods::inner_call]: method: `{}` result: {:?}", req.method, result);
// indicate that the subscription has been accepted.
let _ = tx.send(());
(result, rx_sink, notify)
}
......@@ -453,7 +462,9 @@ impl Methods {
///
/// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _| {
/// pending.accept().unwrap().send(&"one answer").unwrap();
/// tokio::spawn(async move {
/// pending.accept().await.unwrap().send(&"one answer").unwrap();
/// });
/// }).unwrap();
///
/// let mut sub = module.subscribe("hi", EmptyParams::new()).await.unwrap();
......@@ -465,9 +476,13 @@ impl Methods {
pub async fn subscribe(&self, sub_method: &str, params: impl ToRpcParams) -> Result<Subscription, Error> {
let params = params.to_rpc_params()?;
let req = Request::new(sub_method.into(), Some(&params), Id::Number(0));
tracing::trace!("[Methods::subscribe] Calling subscription method: {:?}, params: {:?}", sub_method, params);
let (response, rx, close_notify) = self.inner_call(req).await;
tracing::trace!("[Methods::subscribe] response {:?}", response);
let subscription_response = match serde_json::from_str::<Response<RpcSubscriptionId>>(&response.result) {
Ok(r) => r,
Err(_) => match serde_json::from_str::<ErrorResponse>(&response.result) {
......@@ -475,6 +490,7 @@ impl Methods {
Err(err) => return Err(err.into()),
},
};
let sub_id = subscription_response.result.into_owned();
let close_notify = Some(close_notify);
Ok(Subscription { sub_id, rx, close_notify })
......@@ -620,7 +636,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
Ok(r) => r,
Err(err) => {
tracing::error!("Join error for blocking RPC method: {:?}", err);
todo!();
MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError))
}
})
.boxed()
......@@ -672,14 +688,10 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// }
/// };
///
/// let mut sink = match pending.accept() {
/// Some(sink) => sink,
/// _ => {
/// return;
/// }
/// };
/// tokio::spawn(async move {
/// // Only fails in the connection is closed.
/// let mut sink = pending.accept().await.unwrap();
///
/// std::thread::spawn(move || {
/// let sum = x + (*ctx);
/// let _ = sink.send(&sum);
/// });
......@@ -711,30 +723,34 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
let subscribers = subscribers.clone();
self.methods.mut_callbacks().insert(
subscribe_method_name,
MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn| {
let sub_id: RpcSubscriptionId = conn.id_provider.next_id();
MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn, message_sent| {
let uniq_sub = SubscriptionKey { conn_id: conn.conn_id, sub_id: conn.id_provider.next_id() };
let (tx, rx) = oneshot::channel();
// response to the subscription call.
let (subscribe_call_tx, subscribe_call_rx) = oneshot::channel();
let sink = PendingSubscription(Some(InnerPendingSubscription {
let pending_subscription = PendingSubscription(Some(InnerPendingSubscription {
sink: method_sink.clone(),
result: tx,
subscribe_call: subscribe_call_tx,
close_notify: Some(conn.close_notify),
method: notif_method_name,
subscribers: subscribers.clone(),
uniq_sub: SubscriptionKey { conn_id: conn.conn_id, sub_id },
uniq_sub: uniq_sub.clone(),
id: id.clone().into_owned(),
message_sent,
}));
callback(params, sink, ctx.clone());
// The end-user needs to accept/reject the `pending_subscription` to make any progress.
callback(params, pending_subscription, ctx.clone());
let id = id.clone().into_owned();
let result = rx.then(|r| async move {
match r {
Ok(r) => r,
Err(_) => MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError)),
let result = async move {
match subscribe_call_rx.await {
Ok(result) => result,
Err(_) => return MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError)),
}
});
};
Box::pin(result)
})),
......@@ -756,12 +772,16 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
id
);
return MethodResponse::response(id, false, 999);
return MethodResponse::response(id, false, max_response_size);
}
};
let key = SubscriptionKey { conn_id, sub_id: sub_id.into_owned() };
let result = subscribers.lock().remove(&key).is_some();
let result = {
let mut s = subscribers.lock();
tracing::trace!("{:?}", s);
s.remove(&key).is_some()
};
if !result {
tracing::warn!(
......@@ -802,8 +822,8 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
struct InnerPendingSubscription {
/// Sink.
sink: MethodSink,
/// Oneshot which sends response to the subscription call.
result: oneshot::Sender<MethodResponse>,
/// Response to the subscription call.
subscribe_call: oneshot::Sender<MethodResponse>,
/// Get notified when subscribers leave so we can exit
close_notify: Option<SubscriptionPermit>,
/// MethodCallback.
......@@ -814,6 +834,8 @@ struct InnerPendingSubscription {
subscribers: Subscribers,
/// Request ID.
id: Id<'static>,
/// message sent
message_sent: oneshot::Receiver<()>,
}
/// Represent a pending subscription which waits until it's either accepted or rejected.
......@@ -826,8 +848,8 @@ impl PendingSubscription {
/// Reject the subscription call from [`ErrorObject`].
pub fn reject(mut self, err: impl Into<ErrorObjectOwned>) -> bool {
if let Some(inner) = self.0.take() {
let InnerPendingSubscription { sink, id, .. } = inner;
sink.send_error(id, err.into())
let InnerPendingSubscription { subscribe_call, id, .. } = inner;
subscribe_call.send(MethodResponse::error(id, err.into())).is_ok()
} else {
false
}
......@@ -836,24 +858,50 @@ impl PendingSubscription {
/// Attempt to accept the subscription and respond the subscription method call.
///
/// Fails if the connection was closed
pub fn accept(mut self) -> Option<SubscriptionSink> {
pub async fn accept(mut self) -> Option<SubscriptionSink> {
tracing::trace!("[PendingSubscription] accept");
let inner = self.0.take()?;
let InnerPendingSubscription { sink, result, close_notify, method, uniq_sub, subscribers, id } = inner;
let response = MethodResponse::response(id, &uniq_sub.sub_id, usize::MAX);
let InnerPendingSubscription {
sink,
close_notify,
method,
uniq_sub,
subscribers,
id,
subscribe_call,
message_sent,
} = inner;
let response = MethodResponse::response(id, &uniq_sub.sub_id, sink.max_response_size() as usize);
let success = response.success;
if result.send(response).is_ok() && success {
// TODO: It might be possible that the actual `WebSocket message` might not have been sent yet
// So we must be super careful that sink doesn't start sending stuff before that actual
// subscription call has been answered.
if subscribe_call.send(response).is_ok() && success {
tracing::trace!("[PendingSubscription] waiting for server to send the message");
let (tx, rx) = watch::channel(());
subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), tx));
Some(SubscriptionSink { inner: sink, close_notify, method, uniq_sub, subscribers, unsubscribe: rx })
} else {
None
match message_sent.await {
Ok(_) => (),
// the connection was closed.
Err(_canceled) => {
subscribers.lock().remove(&uniq_sub);
return None;
}
}
return Some(SubscriptionSink {
inner: sink,
close_notify,
method,
uniq_sub,
subscribers,
unsubscribe: rx,
});
}
tracing::trace!("[PendingSubscription] call failed");
None
}
}
......@@ -861,8 +909,8 @@ impl PendingSubscription {
impl Drop for PendingSubscription {
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
let InnerPendingSubscription { sink, id, .. } = inner;
sink.send_error(id, ErrorCode::InvalidParams.into());
let InnerPendingSubscription { subscribe_call, id, .. } = inner;
let _ = subscribe_call.send(MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidParams)));
}
}
}
......@@ -919,11 +967,15 @@ impl SubscriptionSink {
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
/// let mut sink = pending.accept().unwrap();
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
///
///
/// // This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
/// // because after the `Err(_)` the stream is terminated.
/// let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
///
/// tokio::spawn(async move {
/// let mut sink = pending.accept().await.unwrap();
///
/// // jsonrpsee doesn't send an error notification unless `close` is explicitly called.
/// // If we pipe messages to the sink, we can inspect why it ended:
/// match sink.pipe_from_try_stream(stream).await {
......@@ -1007,9 +1059,11 @@ impl SubscriptionSink {
///
/// let mut m = RpcModule::new(());
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
/// let mut sink = pending.accept().unwrap();
/// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
/// tokio::spawn(async move { sink.pipe_from_stream(stream).await; });
/// tokio::spawn(async move {
/// let mut sink = pending.accept().await.unwrap();
/// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
/// sink.pipe_from_stream(stream).await;
/// });
/// });
/// ```
pub async fn pipe_from_stream<S, T>(&mut self, stream: S) -> SubscriptionClosed
......
......@@ -61,9 +61,11 @@ impl RpcServer<ExampleHash, ExampleStorageKey> for RpcServerImpl {
}
fn subscribe_storage(&self, pending: PendingSubscription, _keys: Option<Vec<ExampleStorageKey>>) {
if let Some(mut sink) = pending.accept() {
let _ = sink.send(&vec![[0; 32]]);
}
tokio::spawn(async move {
if let Some(mut sink) = pending.accept().await {
let _ = sink.send(&vec![[0; 32]]);
}
});
}
}
......
......@@ -73,12 +73,13 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, pending, _| {
let rx = BroadcastStream::new(tx.clone().subscribe());
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
let mut sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
match sink.pipe_from_try_stream(rx).await {
SubscriptionClosed::Success => {
sink.close(SubscriptionClosed::Success);
......
......@@ -67,8 +67,8 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let mut module = RpcModule::new(());
module
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, pending, _| {
let (idx, mut sink) = match (params.one(), pending.accept()) {
(Ok(idx), Some(sink)) => (idx, sink),
let idx = match params.one() {
Ok(idx) => idx,
_ => return,
};
let item = LETTERS.chars().nth(idx);
......@@ -77,6 +77,11 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let stream = IntervalStream::new(interval).map(move |_| item);
tokio::spawn(async move {
let mut sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
match sink.pipe_from_stream(stream).await {
// Send close notification when subscription stream failed.
SubscriptionClosed::Failed(err) => {
......@@ -92,8 +97,8 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
.unwrap();
module
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, pending, _| {
let (one, two, mut sink) = match (params.parse::<(usize, usize)>(), pending.accept()) {
(Ok((one, two)), Some(sink)) => (one, two, sink),
let (one, two) = match params.parse::<(usize, usize)>() {
Ok(res) => res,
_ => return,
};
......@@ -103,6 +108,11 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let stream = IntervalStream::new(interval).map(move |_| item);
tokio::spawn(async move {
let mut sink = match pending.accept().await {
Some(sink) => sink,
_ => return,
};
match sink.pipe_from_stream(stream).await {
// Send close notification when subscription stream failed.
SubscriptionClosed::Failed(err) => {
......
......@@ -95,9 +95,10 @@ impl<M> Builder<M> {
/// Add a middleware to the builder [`Middleware`](../jsonrpsee_core/middleware/trait.Middleware.html).
///
/// ```
/// use std::time::Instant;
/// use std::{time::Instant, net::SocketAddr};
///
/// use jsonrpsee_core::middleware::Middleware;
/// use jsonrpsee_core::HeaderMap;
/// use jsonrpsee_http_server::HttpServerBuilder;
///
/// #[derive(Clone)]
......@@ -106,7 +107,7 @@ impl<M> Builder<M> {
/// impl Middleware for MyMiddleware {
/// type Instant = Instant;
///
/// fn on_request(&self) -> Instant {
/// fn on_request(&self, _remote_addr: SocketAddr, _headers: &HeaderMap) -> Instant {
/// Instant::now()
/// }
///
......
......@@ -287,8 +287,8 @@ pub(crate) mod visitor;
/// // The stream API can be used to pipe items from the underlying stream
/// // as subscription responses.
/// fn sub_override_notif_method(&self, pending: PendingSubscription) {
/// let mut sink = pending.accept().unwrap();
/// tokio::spawn(async move {
/// let mut sink = pending.accept().await.unwrap();
/// let stream = futures_util::stream::iter(["one", "two", "three"]);
/// sink.pipe_from_stream(stream).await;
/// });
......@@ -298,9 +298,11 @@ pub(crate) mod visitor;
/// // but for simplicity of the example we will only send two values and then close
/// // the subscription.
/// fn sub(&self, pending: PendingSubscription) {
/// let mut sink = pending.accept().unwrap();
/// let _ = sink.send(&"Response_A");
/// let _ = sink.send(&"Response_B");
/// tokio::spawn(async move {
/// let mut sink = pending.accept().await.unwrap();
/// let _ = sink.send(&"Response_A");
/// let _ = sink.send(&"Response_B");
/// });
/// }
/// }
/// }
......
......@@ -64,27 +64,33 @@ impl RpcServer for RpcServerImpl {
}
fn sub(&self, pending: PendingSubscription) {
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let _ = sink.send(&"Response_A");
let _ = sink.send(&"Response_B");