Unverified Commit 8846d177 authored by Niklas Adolfsson's avatar Niklas Adolfsson
Browse files

simplify code but one extra clone

parent eeef40ec
Pipeline #200283 failed with stages
in 4 minutes and 35 seconds
......@@ -65,16 +65,7 @@ pub type AsyncMethod<'a> = Arc<
>;
/// Method callback for subscriptions.
pub type SubscriptionMethod<'a> = Arc<
dyn Send
+ Sync
+ Fn(
Id,
Params,
MethodSink,
ConnState,
PendingSubscriptionCallRx,
Option<ResourceGuard>,
) -> BoxFuture<'a, MethodResponse>,
dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState, Option<ResourceGuard>) -> BoxFuture<'a, MethodResponse>,
>;
// Method callback to unsubscribe.
type UnsubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, ConnectionId, MaxResponseSize) -> MethodResponse>;
......@@ -86,34 +77,6 @@ pub type ConnectionId = usize;
/// Max response size.
pub type MaxResponseSize = usize;
/// Represent a state until a subscription call has been answered by the server.
#[derive(Debug)]
pub struct PendingSubscriptionCallRx(oneshot::Receiver<()>);
impl PendingSubscriptionCallRx {
/// Wait until a subscription call is accepted.
pub async fn is_accepted(self) -> bool {
self.0.await.is_ok()
}
}
/// Represent a state until a subscription call has been answered by the server.
#[derive(Debug)]
pub struct PendingSubscriptionCallTx(oneshot::Sender<()>);
impl PendingSubscriptionCallTx {
/// Accept the subscription.
pub fn accept(self) {
let _ = self.0.send(());
}
}
/// Create a channel to wait for a subscription to be ready.
pub fn pending_subscription_channel() -> (PendingSubscriptionCallTx, PendingSubscriptionCallRx) {
let (tx, rx) = oneshot::channel();
(PendingSubscriptionCallTx(tx), PendingSubscriptionCallRx(rx))
}
/// Raw response from an RPC
/// A 3-tuple containing:
/// - Call result as a `String`,
......@@ -454,9 +417,7 @@ impl Methods {
/// Execute a callback.
async fn inner_call(&self, req: Request<'_>) -> RawRpcResponse {
let (pending_sub_tx, pending_sub_rx) = pending_subscription_channel();
let (tx_sink, rx_sink) = mpsc::unbounded();
let (tx_sink, mut rx_sink) = mpsc::unbounded();
let sink = MethodSink::new(tx_sink);
let id = req.id.clone();
let params = Params::new(req.params.map(|params| params.get()));
......@@ -470,16 +431,21 @@ impl Methods {
Some(MethodKind::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), 0, usize::MAX, None).await,
Some(MethodKind::Subscription(cb)) => {
let conn_state = ConnState { conn_id: 0, close_notify, id_provider: &RandomIntegerIdProvider };
(cb)(id, params, sink.clone(), conn_state, pending_sub_rx, None).await
let res = (cb)(id, params, sink.clone(), conn_state, None).await;
// This message is not used because it's used for middleware so we discard in other to
// not read once this is used for subscriptions.
//
// The same information is part of `res` above.
let _ = rx_sink.next().await.expect("Every call must at least produce one reponse; qed");
res
}
Some(MethodKind::Unsubscription(cb)) => (cb)(id, params, 0, usize::MAX),
};
tracing::trace!("[Methods::inner_call]: method: `{}` result: {:?}", req.method, result);
// indicate that the subscription has been accepted.
pending_sub_tx.accept();
(result, rx_sink, notify)
}
......@@ -529,6 +495,7 @@ impl Methods {
let sub_id = subscription_response.result.into_owned();
let close_notify = Some(close_notify);
Ok(Subscription { sub_id, rx, close_notify })
}
......@@ -795,40 +762,37 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
let callback = {
self.methods.verify_and_insert(
subscribe_method_name,
MethodCallback::new_subscription(Arc::new(
move |id, params, method_sink, conn, pending_sub_rx, claimed| {
let uniq_sub = SubscriptionKey { conn_id: conn.conn_id, sub_id: conn.id_provider.next_id() };
// response to the subscription call.
let (subscribe_call_tx, subscribe_call_rx) = oneshot::channel();
let pending_subscription = PendingSubscription(Some(InnerPendingSubscription {
sink: method_sink,
subscribe_call: subscribe_call_tx,
close_notify: Some(conn.close_notify),
method: notif_method_name,
subscribers: subscribers.clone(),
uniq_sub,
id: id.clone().into_owned(),
pending_sub_rx,
claimed,
}));
// The end-user needs to accept/reject the `pending_subscription` to make any progress.
callback(params, pending_subscription, ctx.clone());
let id = id.clone().into_owned();
let result = async move {
match subscribe_call_rx.await {
Ok(result) => result,
Err(_) => MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError)),
}
};
Box::pin(result)
},
)),
MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn, claimed| {
let uniq_sub = SubscriptionKey { conn_id: conn.conn_id, sub_id: conn.id_provider.next_id() };
// response to the subscription call.
let (tx, rx) = oneshot::channel();
let pending_subscription = PendingSubscription(Some(InnerPendingSubscription {
sink: method_sink,
subscribe_call: tx,
close_notify: Some(conn.close_notify),
method: notif_method_name,
subscribers: subscribers.clone(),
uniq_sub,
id: id.clone().into_owned(),
claimed,
}));
// The end-user needs to accept/reject the `pending_subscription` to make any progress.
callback(params, pending_subscription, ctx.clone());
let id = id.clone().into_owned();
let result = async move {
match rx.await {
Ok(result) => result,
Err(_) => MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError)),
}
};
Box::pin(result)
})),
)?
};
......@@ -870,10 +834,6 @@ struct InnerPendingSubscription {
subscribers: Subscribers,
/// Request ID.
id: Id<'static>,
/// Represents when the server has answered the subscription
/// such that it's allowed to start to send out notifications
/// on the subscription.
pending_sub_rx: PendingSubscriptionCallRx,
/// Claimed resources.
claimed: Option<ResourceGuard>,
}
......@@ -888,8 +848,14 @@ impl PendingSubscription {
/// Reject the subscription call from [`ErrorObject`].
pub fn reject(mut self, err: impl Into<ErrorObjectOwned>) -> bool {
if let Some(inner) = self.0.take() {
let InnerPendingSubscription { subscribe_call, id, .. } = inner;
subscribe_call.send(MethodResponse::error(id, err.into())).is_ok()
let InnerPendingSubscription { subscribe_call, id, sink, .. } = inner;
let err = MethodResponse::error(id, err.into());
let ws_send = sink.send_raw(err.result.clone()).is_ok();
let middleware_call = subscribe_call.send(err).is_ok();
ws_send && middleware_call
} else {
false
}
......@@ -901,46 +867,29 @@ impl PendingSubscription {
pub async fn accept(mut self) -> Option<SubscriptionSink> {
let inner = self.0.take()?;
let InnerPendingSubscription {
sink,
close_notify,
method,
uniq_sub,
subscribers,
id,
subscribe_call,
pending_sub_rx,
claimed,
} = inner;
let InnerPendingSubscription { sink, close_notify, method, uniq_sub, subscribers, id, subscribe_call, claimed } =
inner;
let response = MethodResponse::response(id, &uniq_sub.sub_id, sink.max_response_size() as usize);
let success = response.success;
if subscribe_call.send(response).is_ok() && success {
let ws_send = sink.send_raw(response.result.clone()).is_ok();
let middleware_call = subscribe_call.send(response).is_ok();
if ws_send && middleware_call && success {
let (unsubscribe_tx, unsubscribe_rx) = watch::channel(());
// Mark the subscription is "active"
// We perform this "here" to avoid races as the call might get answered
// and it might take some time until the `message_sent` future finishes
//
// Thus, the subscription must be removed below `message_sent` fails below.
subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), unsubscribe_tx));
// The subscription call has been sent to `WS task`
// It's now allowed to start sending out notifications on the subscription.
if pending_sub_rx.is_accepted().await {
return Some(SubscriptionSink {
inner: sink,
close_notify,
method,
uniq_sub,
subscribers,
unsubscribe: unsubscribe_rx,
_claimed: claimed,
});
} else {
subscribers.lock().remove(&uniq_sub);
}
return Some(SubscriptionSink {
inner: sink,
close_notify,
method,
uniq_sub,
subscribers,
unsubscribe: unsubscribe_rx,
_claimed: claimed,
});
}
None
......@@ -951,8 +900,12 @@ impl PendingSubscription {
impl Drop for PendingSubscription {
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
let InnerPendingSubscription { subscribe_call, id, .. } = inner;
let _ = subscribe_call.send(MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidParams)));
let InnerPendingSubscription { subscribe_call, id, sink, .. } = inner;
let err = MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidParams));
let _ws_send = sink.send_raw(err.result.clone()).is_ok();
let _middleware_call = subscribe_call.send(err).is_ok();
}
}
}
......@@ -1224,7 +1177,7 @@ impl Subscription {
}
let raw = self.rx.next().await?;
tracing::debug!("rx: {}", raw);
tracing::debug!("[Subscription::next]: rx {}", raw);
let res = match serde_json::from_str::<SubscriptionResponse<T>>(&raw) {
Ok(r) => Some(Ok((r.params.result, r.params.subscription.into_owned()))),
Err(e) => match serde_json::from_str::<SubscriptionError<serde_json::Value>>(&raw) {
......
......@@ -44,6 +44,12 @@ macro_rules! assert_type {
}};
}
fn init_logger() {
let _ = tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
}
#[test]
fn rpc_modules_with_different_contexts_can_be_merged() {
let cx = Vec::<u8>::new();
......@@ -201,6 +207,8 @@ async fn calling_method_without_server_using_proc_macro() {
#[tokio::test]
async fn subscribing_without_server() {
init_logger();
let mut module = RpcModule::new(());
module
.register_subscription("my_sub", "my_sub", "my_unsub", |_, pending, _| {
......
......@@ -47,10 +47,8 @@ use jsonrpsee_core::server::helpers::{
prepare_error, BatchResponse, BatchResponseBuilder, BoundedSubscriptions, MethodResponse, MethodSink,
};
use jsonrpsee_core::server::resource_limiting::Resources;
use jsonrpsee_core::server::rpc_module::{
pending_subscription_channel, ConnState, ConnectionId, MethodKind, Methods, PendingSubscriptionCallTx,
};
use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str, RpcTracing};
use jsonrpsee_core::server::rpc_module::{ConnState, ConnectionId, MethodKind, Methods};
use jsonrpsee_core::tracing::{rx_log_from_json, rx_log_from_str, tx_log_from_str, RpcTracing};
use jsonrpsee_core::traits::IdProvider;
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::error::{reject_too_big_request, reject_too_many_subscriptions};
......@@ -499,14 +497,15 @@ async fn background_task<M: Middleware>(input: BackgroundTask<'_, M>) -> Result<
request_start,
};
let (response, maybe_pending_sub_tx) = process_single_request(data, call).await;
middleware.on_response(&response.result, request_start);
let _ = sink.send_raw(response.result);
if let Some(pending_sub_tx) = maybe_pending_sub_tx {
pending_sub_tx.accept();
}
match process_single_request(data, call).await {
MethodResult::JustMiddleware(r) => {
middleware.on_response(&r.result, request_start);
}
MethodResult::SendAndMiddleware(r) => {
middleware.on_response(&r.result, request_start);
let _ = sink.send_raw(r.result);
}
};
}
.boxed();
......@@ -847,6 +846,20 @@ struct Call<'a, M: Middleware> {
id: Id<'a>,
}
enum MethodResult {
JustMiddleware(MethodResponse),
SendAndMiddleware(MethodResponse),
}
impl MethodResult {
fn as_inner(&self) -> &MethodResponse {
match &self {
Self::JustMiddleware(r) => r,
Self::SendAndMiddleware(r) => r,
}
}
}
// Batch responses must be sent back as a single message so we read the results from each
// request in the batch and read the results off of a new channel, `rx_batch`, and then send the
// complete batch response back to the client over `tx`.
......@@ -868,14 +881,10 @@ where
.fold(BatchResponseBuilder::new(), |mut batch_response, (req, call)| async move {
let params = Params::new(req.params.map(|params| params.get()));
let (response, maybe_pending_sub_tx) =
let response =
execute_call(Call { name: &req.method, params, id: req.id, call }).in_current_span().await;
if let Some(pending_sub_tx) = maybe_pending_sub_tx {
pending_sub_tx.accept();
}
batch_response.append(&response);
batch_response.append(response.as_inner());
batch_response
})
......@@ -891,10 +900,7 @@ where
BatchResponse::error(id, ErrorObject::from(code))
}
async fn process_single_request<M: Middleware>(
data: Vec<u8>,
call: CallData<'_, M>,
) -> (MethodResponse, Option<PendingSubscriptionCallTx>) {
async fn process_single_request<M: Middleware>(data: Vec<u8>, call: CallData<'_, M>) -> MethodResult {
if let Ok(req) = serde_json::from_slice::<Request>(&data) {
let trace = RpcTracing::method_call(&req.method);
let _enter = trace.span().enter();
......@@ -908,7 +914,7 @@ async fn process_single_request<M: Middleware>(
execute_call(Call { name, params, id, call }).in_current_span().await
} else {
let (id, code) = prepare_error(&data);
(MethodResponse::error(id, ErrorObject::from(code)), None)
MethodResult::SendAndMiddleware(MethodResponse::error(id, ErrorObject::from(code)))
}
}
......@@ -917,7 +923,7 @@ async fn process_single_request<M: Middleware>(
///
/// Returns `(MethodResponse, None)` on every call that isn't a subscription
/// Otherwise `(MethodResponse, Some(PendingSubscriptionCallTx)`.
async fn execute_call<M: Middleware>(c: Call<'_, M>) -> (MethodResponse, Option<PendingSubscriptionCallTx>) {
async fn execute_call<M: Middleware>(c: Call<'_, M>) -> MethodResult {
let Call { name, id, params, call } = c;
let CallData {
resources,
......@@ -935,17 +941,21 @@ async fn execute_call<M: Middleware>(c: Call<'_, M>) -> (MethodResponse, Option<
middleware.on_call(name, params.clone());
let response = match methods.method_with_name(name) {
None => (MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)), None),
None => {
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound));
MethodResult::SendAndMiddleware(response)
}
Some((name, method)) => match &method.inner() {
MethodKind::Sync(callback) => match method.claim(name, resources) {
Ok(guard) => {
let r = (callback)(id, params, max_response_body_size as usize);
drop(guard);
(r, None)
MethodResult::SendAndMiddleware(r)
}
Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
(MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy)), None)
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy));
MethodResult::SendAndMiddleware(response)
}
},
MethodKind::Async(callback) => match method.claim(name, resources) {
......@@ -953,38 +963,44 @@ async fn execute_call<M: Middleware>(c: Call<'_, M>) -> (MethodResponse, Option<
let id = id.into_owned();
let params = params.into_owned();
((callback)(id, params, conn_id, max_response_body_size as usize, Some(guard)).await, None)
let response = (callback)(id, params, conn_id, max_response_body_size as usize, Some(guard)).await;
MethodResult::SendAndMiddleware(response)
}
Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
(MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy)), None)
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy));
MethodResult::SendAndMiddleware(response)
}
},
MethodKind::Subscription(callback) => match method.claim(name, resources) {
Ok(guard) => {
if let Some(cn) = bounded_subscriptions.acquire() {
let conn_state = ConnState { conn_id, close_notify: cn, id_provider };
let (pending_sub_tx, pending_sub_rx) = pending_subscription_channel();
let result =
callback(id.clone(), params, sink.clone(), conn_state, pending_sub_rx, Some(guard)).await;
(result, Some(pending_sub_tx))
let response = callback(id.clone(), params, sink.clone(), conn_state, Some(guard)).await;
MethodResult::JustMiddleware(response)
} else {
(MethodResponse::error(id, reject_too_many_subscriptions(bounded_subscriptions.max())), None)
let response =
MethodResponse::error(id, reject_too_many_subscriptions(bounded_subscriptions.max()));
MethodResult::SendAndMiddleware(response)
}
}
Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
(MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy)), None)
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy));
MethodResult::SendAndMiddleware(response)
}
},
MethodKind::Unsubscription(callback) => {
// Don't adhere to any resource or subscription limits; always let unsubscribing happen!
(callback(id, params, conn_id, max_response_body_size as usize), None)
let result = callback(id, params, conn_id, max_response_body_size as usize);
MethodResult::SendAndMiddleware(result)
}
},
};
tx_log_from_str(&response.0.result, max_log_length);
middleware.on_result(name, response.0.success, request_start);
let r = response.as_inner();
rx_log_from_str(&r.result, max_log_length);
middleware.on_result(name, r.success, request_start);
response
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment