Unverified Commit 5ad1192e authored by Niklas Adolfsson's avatar Niklas Adolfsson
Browse files

make the code more readable

parent 108794b5
Pipeline #199780 passed with stages
in 5 minutes and 56 seconds
......@@ -72,7 +72,7 @@ pub type SubscriptionMethod<'a> = Arc<
Params,
MethodSink,
ConnState,
oneshot::Receiver<()>,
PendingSubscriptionCallRx,
Option<ResourceGuard>,
) -> BoxFuture<'a, MethodResponse>,
>;
......@@ -86,6 +86,34 @@ pub type ConnectionId = usize;
/// Max response size.
pub type MaxResponseSize = usize;
/// Represent a state until a subscription call has been answered by the server.
#[derive(Debug)]
pub struct PendingSubscriptionCallRx(oneshot::Receiver<()>);
impl PendingSubscriptionCallRx {
/// Wait until a subscription call is accepted.
pub async fn is_accepted(self) -> bool {
self.0.await.is_ok()
}
}
/// Represent a state until a subscription call has been answered by the server.
#[derive(Debug)]
pub struct PendingSubscriptionCallTx(oneshot::Sender<()>);
impl PendingSubscriptionCallTx {
/// Accept the subscription.
pub fn accept(self) {
let _ = self.0.send(());
}
}
/// Create a channel to wait for a subscription to be ready.
pub fn pending_subscription_channel() -> (PendingSubscriptionCallTx, PendingSubscriptionCallRx) {
let (tx, rx) = oneshot::channel();
(PendingSubscriptionCallTx(tx), PendingSubscriptionCallRx(rx))
}
/// Raw response from an RPC
/// A 3-tuple containing:
/// - Call result as a `String`,
......@@ -426,8 +454,7 @@ impl Methods {
/// Execute a callback.
async fn inner_call(&self, req: Request<'_>) -> RawRpcResponse {
// the subscription call was dispatched.
let (tx, rx) = oneshot::channel();
let (pending_sub_tx, pending_sub_rx) = pending_subscription_channel();
let (tx_sink, rx_sink) = mpsc::unbounded();
let sink = MethodSink::new(tx_sink);
......@@ -443,7 +470,7 @@ impl Methods {
Some(MethodKind::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), 0, usize::MAX, None).await,
Some(MethodKind::Subscription(cb)) => {
let conn_state = ConnState { conn_id: 0, close_notify, id_provider: &RandomIntegerIdProvider };
(cb)(id, params, sink.clone(), conn_state, rx, None).await
(cb)(id, params, sink.clone(), conn_state, pending_sub_rx, None).await
}
Some(MethodKind::Unsubscription(cb)) => (cb)(id, params, 0, usize::MAX),
};
......@@ -451,7 +478,7 @@ impl Methods {
tracing::trace!("[Methods::inner_call]: method: `{}` result: {:?}", req.method, result);
// indicate that the subscription has been accepted.
let _ = tx.send(());
pending_sub_tx.accept();
(result, rx_sink, notify)
}
......@@ -769,7 +796,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
self.methods.verify_and_insert(
subscribe_method_name,
MethodCallback::new_subscription(Arc::new(
move |id, params, method_sink, conn, message_sent, claimed| {
move |id, params, method_sink, conn, pending_sub_rx, claimed| {
let uniq_sub = SubscriptionKey { conn_id: conn.conn_id, sub_id: conn.id_provider.next_id() };
// response to the subscription call.
......@@ -783,7 +810,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
subscribers: subscribers.clone(),
uniq_sub,
id: id.clone().into_owned(),
message_sent,
pending_sub_rx,
claimed,
}));
......@@ -843,8 +870,10 @@ struct InnerPendingSubscription {
subscribers: Subscribers,
/// Request ID.
id: Id<'static>,
/// Subscription answered.
message_sent: oneshot::Receiver<()>,
/// Represents when the server has answered the subscription
/// such that it's allowed to start to send out notifications
/// on the subscription.
pending_sub_rx: PendingSubscriptionCallRx,
/// Claimed resources.
claimed: Option<ResourceGuard>,
}
......@@ -880,7 +909,7 @@ impl PendingSubscription {
subscribers,
id,
subscribe_call,
message_sent,
pending_sub_rx,
claimed,
} = inner;
......@@ -888,19 +917,25 @@ impl PendingSubscription {
let success = response.success;
if subscribe_call.send(response).is_ok() && success {
let (tx, rx) = watch::channel(());
// mark the subscription is "active"
subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), tx));
if message_sent.await.is_ok() {
let (unsubscribe_tx, unsubscribe_rx) = watch::channel(());
// Mark the subscription is "active"
// We perform this "here" to avoid races as the call might get answered
// and it might take some time until the `message_sent` future finishes
//
// Thus, the subscription must be removed below `message_sent` fails below.
subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), unsubscribe_tx));
// The subscription call has been sent to `WS task`
// It's now allowed to start sending out notifications on the subscription.
if pending_sub_rx.is_accepted().await {
return Some(SubscriptionSink {
inner: sink,
close_notify,
method,
uniq_sub,
subscribers,
unsubscribe: rx,
unsubscribe: unsubscribe_rx,
_claimed: claimed,
});
} else {
......
......@@ -34,7 +34,7 @@ use std::time::Duration;
use crate::future::{FutureDriver, ServerHandle, StopMonitor};
use crate::types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use crate::types::{Id, Request};
use futures_channel::{mpsc, oneshot};
use futures_channel::mpsc;
use futures_util::future::{Either, FutureExt};
use futures_util::io::{BufReader, BufWriter};
use futures_util::stream::StreamExt;
......@@ -47,7 +47,9 @@ use jsonrpsee_core::server::helpers::{
prepare_error, BatchResponse, BatchResponseBuilder, BoundedSubscriptions, MethodResponse, MethodSink,
};
use jsonrpsee_core::server::resource_limiting::Resources;
use jsonrpsee_core::server::rpc_module::{ConnState, ConnectionId, MethodKind, Methods};
use jsonrpsee_core::server::rpc_module::{
pending_subscription_channel, ConnState, ConnectionId, MethodKind, Methods, PendingSubscriptionCallTx,
};
use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str, RpcTracing};
use jsonrpsee_core::traits::IdProvider;
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
......@@ -497,12 +499,12 @@ async fn background_task<M: Middleware>(input: BackgroundTask<'_, M>) -> Result<
request_start,
};
let (response, maybe_sub) = process_single_request(data, call).await;
let (response, maybe_pending_sub_tx) = process_single_request(data, call).await;
middleware.on_response(&response.result, request_start);
let _ = sink.send_raw(response.result);
if let Some(sub) = maybe_sub {
let _ = sub.send(());
if let Some(pending_sub_tx) = maybe_pending_sub_tx {
pending_sub_tx.accept();
}
}
.boxed();
......@@ -863,11 +865,11 @@ where
.fold(BatchResponseBuilder::new(), |mut batch_response, (req, call)| async move {
let params = Params::new(req.params.map(|params| params.get()));
let (response, maybe_sub) =
let (response, maybe_pending_sub_tx) =
execute_call(Call { name: &req.method, params, id: req.id, call }).await;
if let Some(sub) = maybe_sub {
let _ = sub.send(());
if let Some(pending_sub_tx) = maybe_pending_sub_tx {
pending_sub_tx.accept();
}
batch_response.append(&response);
......@@ -889,7 +891,7 @@ where
async fn process_single_request<M: Middleware>(
data: Vec<u8>,
call: CallData<'_, M>,
) -> (MethodResponse, Option<oneshot::Sender<()>>) {
) -> (MethodResponse, Option<PendingSubscriptionCallTx>) {
if let Ok(req) = serde_json::from_slice::<Request>(&data) {
let params = Params::new(req.params.map(|params| params.get()));
let name = &req.method;
......@@ -902,15 +904,12 @@ async fn process_single_request<M: Middleware>(
}
}
/// This a workaround the see whether it works
///
/// Essentially, the sender is used to indicate to the other side that call has been answered
/// such that the subscription notifications are not allowed to start until `the sender` has ACK:ed
/// that.
/// Execute a call which returns result of the call with a additional sink
/// to fire a signal once the subscription call has been answered.
///
/// Otherwise it's possible that the subscription notifications could start before that the actual subscription
/// response has been sent.
async fn execute_call<M: Middleware>(c: Call<'_, M>) -> (MethodResponse, Option<oneshot::Sender<()>>) {
/// Returns `(MethodResponse, None)` on every call that isn't a subscription
/// Otherwise `(MethodResponse, Some(PendingSubscriptionCallTx)`.
async fn execute_call<M: Middleware>(c: Call<'_, M>) -> (MethodResponse, Option<PendingSubscriptionCallTx>) {
let Call { name, id, params, call } = c;
let CallData {
resources,
......@@ -957,10 +956,10 @@ async fn execute_call<M: Middleware>(c: Call<'_, M>) -> (MethodResponse, Option<
Ok(guard) => {
if let Some(cn) = bounded_subscriptions.acquire() {
let conn_state = ConnState { conn_id, close_notify: cn, id_provider };
let (subscribe_tx, subscribe_rx) = oneshot::channel();
let (pending_sub_tx, pending_sub_rx) = pending_subscription_channel();
let result =
callback(id.clone(), params, sink.clone(), conn_state, subscribe_rx, Some(guard)).await;
(result, Some(subscribe_tx))
callback(id.clone(), params, sink.clone(), conn_state, pending_sub_rx, Some(guard)).await;
(result, Some(pending_sub_tx))
} else {
(MethodResponse::error(id, reject_too_many_subscriptions(bounded_subscriptions.max())), None)
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment