Unverified Commit 6083c82a authored by Niklas Adolfsson's avatar Niklas Adolfsson
Browse files

Merge remote-tracking branch 'origin/master' into na-middleware

parents cbc189e2 01577daf
Pipeline #199242 canceled with stages
in 15 seconds
......@@ -65,7 +65,7 @@ pub type AsyncMethod<'a> = Arc<
>;
/// Method callback for subscriptions.
pub type SubscriptionMethod<'a> = Arc<
dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState, oneshot::Receiver<()>) -> BoxFuture<'a, MethodResponse>,
dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState, oneshot::Receiver<()>, Option<ResourceGuard>) -> BoxFuture<'a, MethodResponse>,
>;
// Method callback to unsubscribe.
type UnsubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, ConnectionId, MaxResponseSize) -> MethodResponse>;
......@@ -434,7 +434,7 @@ impl Methods {
Some(MethodKind::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), 0, usize::MAX, None).await,
Some(MethodKind::Subscription(cb)) => {
let conn_state = ConnState { conn_id: 0, close_notify, id_provider: &RandomIntegerIdProvider };
(cb)(id, params, sink.clone(), conn_state, rx).await
(cb)(id, params, sink.clone(), conn_state, rx, None).await
}
Some(MethodKind::Unsubscription(cb)) => (cb)(id, params, 0, usize::MAX),
};
......@@ -703,7 +703,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
notif_method_name: &'static str,
unsubscribe_method_name: &'static str,
callback: F,
) -> Result<(), Error>
) -> Result<MethodResourcesBuilder, Error>
where
Context: Send + Sync + 'static,
F: Fn(Params, PendingSubscription, Arc<Context>) + Send + Sync + 'static,
......@@ -718,12 +718,48 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
let ctx = self.ctx.clone();
let subscribers = Subscribers::default();
// Subscribe
// Unsubscribe
{
let subscribers = subscribers.clone();
self.methods.mut_callbacks().insert(
unsubscribe_method_name,
MethodCallback::new_unsubscription(Arc::new(move |id, params, conn_id, max_response_size| {
let sub_id = match params.one::<RpcSubscriptionId>() {
Ok(sub_id) => sub_id,
Err(_) => {
tracing::warn!(
"unsubscribe call '{}' failed: couldn't parse subscription id={:?} request id={:?}",
unsubscribe_method_name,
params,
id
);
return MethodResponse::response(id, false, max_response_size);
}
};
let key = SubscriptionKey { conn_id, sub_id: sub_id.into_owned() };
let result = subscribers.lock().remove(&key).is_some();
if !result {
tracing::warn!(
"unsubscribe call `{}` subscription key={:?} not an active subscription",
unsubscribe_method_name,
key,
);
}
// TODO: register as failed in !result.
MethodResponse::response(id, result, max_response_size)
})),
);
}
// Subscribe
let callback = {
self.methods.verify_and_insert(
subscribe_method_name,
MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn, message_sent| {
MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn, message_sent, claimed| {
let uniq_sub = SubscriptionKey { conn_id: conn.conn_id, sub_id: conn.id_provider.next_id() };
// response to the subscription call.
......@@ -738,6 +774,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
uniq_sub: uniq_sub.clone(),
id: id.clone().into_owned(),
message_sent,
claimed,
}));
// The end-user needs to accept/reject the `pending_subscription` to make any progress.
......@@ -754,49 +791,11 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
Box::pin(result)
})),
);
}
// Unsubscribe
{
self.methods.mut_callbacks().insert(
unsubscribe_method_name,
MethodCallback::new_unsubscription(Arc::new(move |id, params, conn_id, max_response_size| {
let sub_id = match params.one::<RpcSubscriptionId>() {
Ok(sub_id) => sub_id,
Err(_) => {
tracing::warn!(
"unsubscribe call '{}' failed: couldn't parse subscription id={:?} request id={:?}",
unsubscribe_method_name,
params,
id
);
return MethodResponse::response(id, false, max_response_size);
}
};
let key = SubscriptionKey { conn_id, sub_id: sub_id.into_owned() };
let result = {
let mut s = subscribers.lock();
tracing::trace!("{:?}", s);
s.remove(&key).is_some()
};
if !result {
tracing::warn!(
"unsubscribe call `{}` subscription key={:?} not an active subscription",
unsubscribe_method_name,
key,
);
}
)?
};
MethodResponse::response(id, result, max_response_size)
})),
);
}
Ok(())
Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
}
/// Register an alias for an existing_method. Alias uniqueness is enforced.
......@@ -834,8 +833,10 @@ struct InnerPendingSubscription {
subscribers: Subscribers,
/// Request ID.
id: Id<'static>,
/// message sent
/// Subscription answered.
message_sent: oneshot::Receiver<()>,
/// Claimed resources.
claimed: Option<ResourceGuard>,
}
/// Represent a pending subscription which waits until it's either accepted or rejected.
......@@ -871,6 +872,7 @@ impl PendingSubscription {
id,
subscribe_call,
message_sent,
claimed,
} = inner;
let response = MethodResponse::response(id, &uniq_sub.sub_id, sink.max_response_size() as usize);
......@@ -897,6 +899,7 @@ impl PendingSubscription {
uniq_sub,
subscribers,
unsubscribe: rx,
_claimed: claimed
});
}
......@@ -930,6 +933,8 @@ pub struct SubscriptionSink {
subscribers: Subscribers,
/// Future that returns when the unsubscribe method has been called.
unsubscribe: watch::Receiver<()>,
/// Claimed resources.
_claimed: Option<ResourceGuard>,
}
impl SubscriptionSink {
......@@ -1061,7 +1066,7 @@ impl SubscriptionSink {
/// m.register_subscription("sub", "_", "unsub", |params, pending, _| {
/// tokio::spawn(async move {
/// let mut sink = pending.accept().await.unwrap();
/// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
/// let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
/// sink.pipe_from_stream(stream).await;
/// });
/// });
......
......@@ -31,6 +31,8 @@ use crate::attributes::Resource;
use crate::helpers::{generate_where_clause, is_option};
use proc_macro2::{Span, TokenStream as TokenStream2};
use quote::{quote, quote_spanned};
use syn::punctuated::Punctuated;
use syn::Token;
impl RpcDescription {
pub(super) fn render_server(&self) -> Result<TokenStream2, syn::Error> {
......@@ -112,6 +114,28 @@ impl RpcDescription {
}}
}
/// Helper that will parse the resources passed to the macro and call the appropriate resource
/// builder to register the resource limits.
fn handle_resource_limits(resources: &Punctuated<Resource, Token![,]>) -> TokenStream2 {
// Nothing to be done if no resources were set.
if resources.is_empty() {
return quote! {};
}
// Transform each resource into a call to `.resource(name, value)`.
let resources = resources.iter().map(|resource| {
let Resource { name, value, .. } = resource;
quote! { .resource(#name, #value)? }
});
quote! {
.and_then(|resource_builder| {
resource_builder #(#resources)*;
Ok(())
})
}
}
let methods = self
.methods
.iter()
......@@ -128,21 +152,7 @@ impl RpcDescription {
check_name(&rpc_method_name, rust_method_name.span());
let resources = method.resources.iter().map(|resource| {
let Resource { name, value, .. } = resource;
quote! { .resource(#name, #value)? }
});
let resources = if method.resources.is_empty() {
TokenStream2::new()
} else {
quote! {
.and_then(|resource_builder| {
resource_builder #(#resources)*;
Ok(())
})
}
};
let resources = handle_resource_limits(&method.resources);
if method.signature.sig.asyncness.is_some() {
handle_register_result(quote! {
......@@ -196,11 +206,14 @@ impl RpcDescription {
None => rpc_sub_name.clone(),
};
let resources = handle_resource_limits(&sub.resources);
handle_register_result(quote! {
rpc.register_subscription(#rpc_sub_name, #rpc_notif_name, #rpc_unsub_name, |params, subscription_sink, context| {
#parsing
context.as_ref().#rust_method_name(subscription_sink, #params_seq)
})
#resources
})
})
.collect::<Vec<_>>();
......
......@@ -133,12 +133,21 @@ pub struct RpcSubscription {
pub signature: syn::TraitItemMethod,
pub aliases: Vec<String>,
pub unsubscribe_aliases: Vec<String>,
pub resources: Punctuated<Resource, Token![,]>,
}
impl RpcSubscription {
pub fn from_item(attr: syn::Attribute, mut sub: syn::TraitItemMethod) -> syn::Result<Self> {
let [aliases, item, name, param_kind, unsubscribe, unsubscribe_aliases] = AttributeMeta::parse(attr)?
.retain(["aliases", "item", "name", "param_kind", "unsubscribe", "unsubscribe_aliases"])?;
let [aliases, item, name, param_kind, unsubscribe, unsubscribe_aliases, resources] =
AttributeMeta::parse(attr)?.retain([
"aliases",
"item",
"name",
"param_kind",
"unsubscribe",
"unsubscribe_aliases",
"resources",
])?;
let aliases = parse_aliases(aliases)?;
let map = name?.value::<NameMapping>()?;
......@@ -147,6 +156,7 @@ impl RpcSubscription {
let item = item?.value()?;
let param_kind = parse_param_kind(param_kind)?;
let unsubscribe_aliases = parse_aliases(unsubscribe_aliases)?;
let resources = optional(resources, Argument::group)?.unwrap_or_default();
let sig = sub.sig.clone();
let docs = extract_doc_comments(&sub.attrs);
......@@ -183,6 +193,7 @@ impl RpcSubscription {
signature: sub,
aliases,
docs,
resources,
})
}
}
......
error: Unknown argument `magic`, expected one of: `aliases`, `item`, `name`, `param_kind`, `unsubscribe`, `unsubscribe_aliases`
error: Unknown argument `magic`, expected one of: `aliases`, `item`, `name`, `param_kind`, `unsubscribe`, `unsubscribe_aliases`, `resources`
--> tests/ui/incorrect/sub/sub_unsupported_field.rs:6:65
|
6 | #[subscription(name = "sub", unsubscribe = "unsub", item = u8, magic = true)]
......
......@@ -27,7 +27,7 @@
use std::net::SocketAddr;
use std::time::Duration;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::client::{ClientT, SubscriptionClientT};
use jsonrpsee::core::Error;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
......@@ -35,7 +35,7 @@ use jsonrpsee::proc_macros::rpc;
use jsonrpsee::types::error::CallError;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{WsServerBuilder, WsServerHandle};
use jsonrpsee::RpcModule;
use jsonrpsee::{PendingSubscription, RpcModule};
use tokio::time::sleep;
fn module_manual() -> Result<RpcModule<()>, Error> {
......@@ -61,6 +61,35 @@ fn module_manual() -> Result<RpcModule<()>, Error> {
.resource("CPU", 0)?
.resource("MEM", 8)?;
// Drop the `SubscriptionSink` to cause the internal `ResourceGuard` allocated per subscription call
// to get dropped. This is the equivalent of not having any resource limits (ie, sink is never used).
module
.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, pending, _| {
let mut _sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
})?
.resource("SUB", 3)?;
// Keep the `SubscriptionSink` alive for a bit to validate that `ResourceGuard` is alive
// and the subscription method gets limited.
module
.register_subscription("subscribe_hello_limit", "s_hello", "unsubscribe_hello_limit", move |_, pending, _| {
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
for val in 0..10 {
sink.send(&val).unwrap();
sleep(Duration::from_secs(1)).await;
}
});
})?
.resource("SUB", 3)?;
Ok(module)
}
......@@ -84,9 +113,36 @@ fn module_macro() -> RpcModule<()> {
sleep(Duration::from_millis(50)).await;
Ok("hello memory hog")
}
#[subscription(name = "subscribe_hello", item = String, resources("SUB" = 3))]
fn sub_hello(&self);
#[subscription(name = "subscribe_hello_limit", item = String, resources("SUB" = 3))]
fn sub_hello_limit(&self);
}
impl RpcServer for () {}
impl RpcServer for () {
fn sub_hello(&self, pending: PendingSubscription) {
let mut _sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
}
fn sub_hello_limit(&self, pending: PendingSubscription) {
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
for val in 0..10 {
sink.send(&val).unwrap();
sleep(Duration::from_secs(1)).await;
}
});
}
}
().into_rpc()
}
......@@ -95,6 +151,7 @@ async fn websocket_server(module: RpcModule<()>) -> Result<(SocketAddr, WsServer
let server = WsServerBuilder::default()
.register_resource("CPU", 6, 2)?
.register_resource("MEM", 10, 1)?
.register_resource("SUB", 6, 1)?
.build("127.0.0.1:0")
.await?;
......@@ -108,6 +165,7 @@ async fn http_server(module: RpcModule<()>) -> Result<(SocketAddr, HttpServerHan
let server = HttpServerBuilder::default()
.register_resource("CPU", 6, 2)?
.register_resource("MEM", 10, 1)?
.register_resource("SUB", 6, 1)?
.build("127.0.0.1:0")
.await?;
......@@ -117,7 +175,7 @@ async fn http_server(module: RpcModule<()>) -> Result<(SocketAddr, HttpServerHan
Ok((addr, handle))
}
fn assert_server_busy(fail: Result<String, Error>) {
fn assert_server_busy<T: std::fmt::Debug>(fail: Result<T, Error>) {
match fail {
Err(Error::Call(CallError::Custom(err))) => {
assert_eq!(err.code(), -32604);
......@@ -159,6 +217,30 @@ async fn run_tests_on_ws_server(server_addr: SocketAddr, server_handle: WsServer
assert!(pass_mem.is_ok());
assert_server_busy(fail_mem);
// If we issue multiple subscription requests at the same time from the same client,
// but the subscriptions immediately drop their sinks, no resources will obviously be held,
// and so there is no limit to how many can be executed.
let (pass1, pass2, pass3) = tokio::join!(
client.subscribe::<i32>("subscribe_hello", None, "unsubscribe_hello"),
client.subscribe::<i32>("subscribe_hello", None, "unsubscribe_hello"),
client.subscribe::<i32>("subscribe_hello", None, "unsubscribe_hello"),
);
assert!(pass1.is_ok());
assert!(pass2.is_ok());
assert!(pass3.is_ok());
// 3 CPU units (manually set for subscriptions) per call, so 3th call exceeds cap
let (pass1, pass2, fail) = tokio::join!(
client.subscribe::<i32>("subscribe_hello_limit", None, "unsubscribe_hello_limit"),
client.subscribe::<i32>("subscribe_hello_limit", None, "unsubscribe_hello_limit"),
client.subscribe::<i32>("subscribe_hello_limit", None, "unsubscribe_hello_limit"),
);
assert!(pass1.is_ok());
assert!(pass2.is_ok());
assert_server_busy(fail);
server_handle.stop().unwrap().await;
}
......
......@@ -70,7 +70,7 @@ fn flatten_rpc_modules() {
fn rpc_context_modules_can_register_subscriptions() {
let cx = ();
let mut cxmodule = RpcModule::new(cx);
let _subscription = cxmodule.register_subscription("hi", "hi", "goodbye", |_, _, _| {});
cxmodule.register_subscription("hi", "hi", "goodbye", |_, _, _| {}).unwrap();
assert!(cxmodule.method("hi").is_some());
assert!(cxmodule.method("goodbye").is_some());
......
......@@ -909,16 +909,15 @@ async fn execute_call<'a, M: Middleware>(c: Call<'a, M>) -> (MethodResponse, Opt
},
MethodKind::Subscription(callback) => match method.claim(name, resources) {
Ok(guard) => {
let r = if let Some(cn) = bounded_subscriptions.acquire() {
if let Some(cn) = bounded_subscriptions.acquire() {
let conn_state = ConnState { conn_id, close_notify: cn, id_provider };
let (subscribe_tx, subscribe_rx) = oneshot::channel();
let result = callback(id.clone(), params, sink.clone(), conn_state, subscribe_rx).await;
let result =
callback(id.clone(), params, sink.clone(), conn_state, subscribe_rx, Some(guard)).await;
(result, Some(subscribe_tx))
} else {
(MethodResponse::error(id, reject_too_many_subscriptions(bounded_subscriptions.max())), None)
};
drop(guard);
r
}
}
Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
......
......@@ -21,7 +21,7 @@
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN background_task WITH THE SOFTWARE OR THE USE OR OTHER
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
#![cfg(test)]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment