Unverified Commit 01577daf authored by Alexandru Vasile's avatar Alexandru Vasile Committed by GitHub
Browse files

Add resource limiting for `Subscriptions` (#786)



* ws-server: Fix copyright for tests
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* rpc_module: Return a resource builder when subscribing

Registering a subscription returns the subscription'
callback wrapped into a `MethodResourcesBuilder` for resource
limiting purposes.
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* tests: Fix `register_subscription` tests
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* server: Drop `ResourceGuard` with `SubscriptionSink` for resource limit
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* tests: Check resource limits for subscription
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* proc-macros: Render resource limits for subscription macro
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* tests: Extend subscription limiting test via macro generation
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* core: Check if the `unsubscribe` method was already inserted
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* tests: Fix unsupported fields for subscriptions
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* server: Verify subscription methods before registering them
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Update test comment for subscription limiting
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>

* Modify tests comments
Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent 21189759
Pipeline #198583 passed with stages
in 5 minutes and 34 seconds
......@@ -60,7 +60,7 @@ pub type AsyncMethod<'a> = Arc<
dyn Send + Sync + Fn(Id<'a>, Params<'a>, MethodSink, ConnectionId, Option<ResourceGuard>) -> BoxFuture<'a, bool>,
>;
/// Method callback for subscriptions.
pub type SubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState) -> bool>;
pub type SubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, MethodSink, ConnState, Option<ResourceGuard>) -> bool>;
// Method callback to unsubscribe.
type UnsubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, &MethodSink, ConnectionId) -> bool>;
......@@ -417,7 +417,7 @@ impl Methods {
Some(MethodKind::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), sink, 0, None).await,
Some(MethodKind::Subscription(cb)) => {
let conn_state = ConnState { conn_id: 0, close_notify, id_provider: &RandomIntegerIdProvider };
(cb)(id, params, sink, conn_state)
(cb)(id, params, sink, conn_state, None)
}
Some(MethodKind::Unsubscription(cb)) => (cb)(id, params, &sink, 0),
};
......@@ -682,7 +682,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
notif_method_name: &'static str,
unsubscribe_method_name: &'static str,
callback: F,
) -> Result<(), Error>
) -> Result<MethodResourcesBuilder, Error>
where
Context: Send + Sync + 'static,
F: Fn(Params, PendingSubscription, Arc<Context>) + Send + Sync + 'static,
......@@ -697,32 +697,9 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
let ctx = self.ctx.clone();
let subscribers = Subscribers::default();
// Subscribe
{
let subscribers = subscribers.clone();
self.methods.mut_callbacks().insert(
subscribe_method_name,
MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn| {
let sub_id: RpcSubscriptionId = conn.id_provider.next_id();
let sink = PendingSubscription(Some(InnerPendingSubscription {
sink: method_sink.clone(),
close_notify: Some(conn.close_notify),
method: notif_method_name,
subscribers: subscribers.clone(),
uniq_sub: SubscriptionKey { conn_id: conn.conn_id, sub_id },
id: id.clone().into_owned(),
}));
callback(params, sink, ctx.clone());
true
})),
);
}
// Unsubscribe
{
let subscribers = subscribers.clone();
self.methods.mut_callbacks().insert(
unsubscribe_method_name,
MethodCallback::new_unsubscription(Arc::new(move |id, params, sink, conn_id| {
......@@ -756,7 +733,31 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
);
}
Ok(())
// Subscribe
let callback = {
self.methods.verify_and_insert(
subscribe_method_name,
MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn, claimed| {
let sub_id: RpcSubscriptionId = conn.id_provider.next_id();
let sink = PendingSubscription(Some(InnerPendingSubscription {
sink: method_sink.clone(),
close_notify: Some(conn.close_notify),
method: notif_method_name,
subscribers: subscribers.clone(),
uniq_sub: SubscriptionKey { conn_id: conn.conn_id, sub_id },
id: id.clone().into_owned(),
claimed,
}));
callback(params, sink, ctx.clone());
true
})),
)?
};
Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
}
/// Register an alias for an existing_method. Alias uniqueness is enforced.
......@@ -792,6 +793,8 @@ struct InnerPendingSubscription {
subscribers: Subscribers,
/// Request ID.
id: Id<'static>,
/// Claimed resources.
claimed: Option<ResourceGuard>,
}
/// Represent a pending subscription which waits until it's either accepted or rejected.
......@@ -817,12 +820,12 @@ impl PendingSubscription {
pub fn accept(mut self) -> Option<SubscriptionSink> {
let inner = self.0.take()?;
let InnerPendingSubscription { sink, close_notify, method, uniq_sub, subscribers, id } = inner;
let InnerPendingSubscription { sink, close_notify, method, uniq_sub, subscribers, id, claimed } = inner;
if sink.send_response(id, &uniq_sub.sub_id) {
let (tx, rx) = watch::channel(());
subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), tx));
Some(SubscriptionSink { inner: sink, close_notify, method, uniq_sub, subscribers, unsubscribe: rx })
Some(SubscriptionSink { inner: sink, close_notify, method, uniq_sub, subscribers, unsubscribe: rx, _claimed: claimed })
} else {
None
}
......@@ -854,6 +857,8 @@ pub struct SubscriptionSink {
subscribers: Subscribers,
/// Future that returns when the unsubscribe method has been called.
unsubscribe: watch::Receiver<()>,
/// Claimed resources.
_claimed: Option<ResourceGuard>,
}
impl SubscriptionSink {
......
......@@ -31,6 +31,8 @@ use crate::attributes::Resource;
use crate::helpers::{generate_where_clause, is_option};
use proc_macro2::{Span, TokenStream as TokenStream2};
use quote::{quote, quote_spanned};
use syn::punctuated::Punctuated;
use syn::Token;
impl RpcDescription {
pub(super) fn render_server(&self) -> Result<TokenStream2, syn::Error> {
......@@ -112,6 +114,28 @@ impl RpcDescription {
}}
}
/// Helper that will parse the resources passed to the macro and call the appropriate resource
/// builder to register the resource limits.
fn handle_resource_limits(resources: &Punctuated<Resource, Token![,]>) -> TokenStream2 {
// Nothing to be done if no resources were set.
if resources.is_empty() {
return quote! {};
}
// Transform each resource into a call to `.resource(name, value)`.
let resources = resources.iter().map(|resource| {
let Resource { name, value, .. } = resource;
quote! { .resource(#name, #value)? }
});
quote! {
.and_then(|resource_builder| {
resource_builder #(#resources)*;
Ok(())
})
}
}
let methods = self
.methods
.iter()
......@@ -128,21 +152,7 @@ impl RpcDescription {
check_name(&rpc_method_name, rust_method_name.span());
let resources = method.resources.iter().map(|resource| {
let Resource { name, value, .. } = resource;
quote! { .resource(#name, #value)? }
});
let resources = if method.resources.is_empty() {
TokenStream2::new()
} else {
quote! {
.and_then(|resource_builder| {
resource_builder #(#resources)*;
Ok(())
})
}
};
let resources = handle_resource_limits(&method.resources);
if method.signature.sig.asyncness.is_some() {
handle_register_result(quote! {
......@@ -196,11 +206,14 @@ impl RpcDescription {
None => rpc_sub_name.clone(),
};
let resources = handle_resource_limits(&sub.resources);
handle_register_result(quote! {
rpc.register_subscription(#rpc_sub_name, #rpc_notif_name, #rpc_unsub_name, |params, subscription_sink, context| {
#parsing
context.as_ref().#rust_method_name(subscription_sink, #params_seq)
})
#resources
})
})
.collect::<Vec<_>>();
......
......@@ -133,12 +133,21 @@ pub struct RpcSubscription {
pub signature: syn::TraitItemMethod,
pub aliases: Vec<String>,
pub unsubscribe_aliases: Vec<String>,
pub resources: Punctuated<Resource, Token![,]>,
}
impl RpcSubscription {
pub fn from_item(attr: syn::Attribute, mut sub: syn::TraitItemMethod) -> syn::Result<Self> {
let [aliases, item, name, param_kind, unsubscribe, unsubscribe_aliases] = AttributeMeta::parse(attr)?
.retain(["aliases", "item", "name", "param_kind", "unsubscribe", "unsubscribe_aliases"])?;
let [aliases, item, name, param_kind, unsubscribe, unsubscribe_aliases, resources] =
AttributeMeta::parse(attr)?.retain([
"aliases",
"item",
"name",
"param_kind",
"unsubscribe",
"unsubscribe_aliases",
"resources",
])?;
let aliases = parse_aliases(aliases)?;
let map = name?.value::<NameMapping>()?;
......@@ -147,6 +156,7 @@ impl RpcSubscription {
let item = item?.value()?;
let param_kind = parse_param_kind(param_kind)?;
let unsubscribe_aliases = parse_aliases(unsubscribe_aliases)?;
let resources = optional(resources, Argument::group)?.unwrap_or_default();
let sig = sub.sig.clone();
let docs = extract_doc_comments(&sub.attrs);
......@@ -183,6 +193,7 @@ impl RpcSubscription {
signature: sub,
aliases,
docs,
resources,
})
}
}
......
error: Unknown argument `magic`, expected one of: `aliases`, `item`, `name`, `param_kind`, `unsubscribe`, `unsubscribe_aliases`
error: Unknown argument `magic`, expected one of: `aliases`, `item`, `name`, `param_kind`, `unsubscribe`, `unsubscribe_aliases`, `resources`
--> tests/ui/incorrect/sub/sub_unsupported_field.rs:6:65
|
6 | #[subscription(name = "sub", unsubscribe = "unsub", item = u8, magic = true)]
......
......@@ -27,7 +27,7 @@
use std::net::SocketAddr;
use std::time::Duration;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::client::{ClientT, SubscriptionClientT};
use jsonrpsee::core::Error;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
......@@ -35,7 +35,7 @@ use jsonrpsee::proc_macros::rpc;
use jsonrpsee::types::error::CallError;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{WsServerBuilder, WsServerHandle};
use jsonrpsee::RpcModule;
use jsonrpsee::{PendingSubscription, RpcModule};
use tokio::time::sleep;
fn module_manual() -> Result<RpcModule<()>, Error> {
......@@ -61,6 +61,35 @@ fn module_manual() -> Result<RpcModule<()>, Error> {
.resource("CPU", 0)?
.resource("MEM", 8)?;
// Drop the `SubscriptionSink` to cause the internal `ResourceGuard` allocated per subscription call
// to get dropped. This is the equivalent of not having any resource limits (ie, sink is never used).
module
.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, pending, _| {
let mut _sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
})?
.resource("SUB", 3)?;
// Keep the `SubscriptionSink` alive for a bit to validate that `ResourceGuard` is alive
// and the subscription method gets limited.
module
.register_subscription("subscribe_hello_limit", "s_hello", "unsubscribe_hello_limit", move |_, pending, _| {
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
for val in 0..10 {
sink.send(&val).unwrap();
sleep(Duration::from_secs(1)).await;
}
});
})?
.resource("SUB", 3)?;
Ok(module)
}
......@@ -84,9 +113,36 @@ fn module_macro() -> RpcModule<()> {
sleep(Duration::from_millis(50)).await;
Ok("hello memory hog")
}
#[subscription(name = "subscribe_hello", item = String, resources("SUB" = 3))]
fn sub_hello(&self);
#[subscription(name = "subscribe_hello_limit", item = String, resources("SUB" = 3))]
fn sub_hello_limit(&self);
}
impl RpcServer for () {}
impl RpcServer for () {
fn sub_hello(&self, pending: PendingSubscription) {
let mut _sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
}
fn sub_hello_limit(&self, pending: PendingSubscription) {
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
for val in 0..10 {
sink.send(&val).unwrap();
sleep(Duration::from_secs(1)).await;
}
});
}
}
().into_rpc()
}
......@@ -95,6 +151,7 @@ async fn websocket_server(module: RpcModule<()>) -> Result<(SocketAddr, WsServer
let server = WsServerBuilder::default()
.register_resource("CPU", 6, 2)?
.register_resource("MEM", 10, 1)?
.register_resource("SUB", 6, 1)?
.build("127.0.0.1:0")
.await?;
......@@ -108,6 +165,7 @@ async fn http_server(module: RpcModule<()>) -> Result<(SocketAddr, HttpServerHan
let server = HttpServerBuilder::default()
.register_resource("CPU", 6, 2)?
.register_resource("MEM", 10, 1)?
.register_resource("SUB", 6, 1)?
.build("127.0.0.1:0")
.await?;
......@@ -117,7 +175,7 @@ async fn http_server(module: RpcModule<()>) -> Result<(SocketAddr, HttpServerHan
Ok((addr, handle))
}
fn assert_server_busy(fail: Result<String, Error>) {
fn assert_server_busy<T: std::fmt::Debug>(fail: Result<T, Error>) {
match fail {
Err(Error::Call(CallError::Custom(err))) => {
assert_eq!(err.code(), -32604);
......@@ -159,6 +217,30 @@ async fn run_tests_on_ws_server(server_addr: SocketAddr, server_handle: WsServer
assert!(pass_mem.is_ok());
assert_server_busy(fail_mem);
// If we issue multiple subscription requests at the same time from the same client,
// but the subscriptions immediately drop their sinks, no resources will obviously be held,
// and so there is no limit to how many can be executed.
let (pass1, pass2, pass3) = tokio::join!(
client.subscribe::<i32>("subscribe_hello", None, "unsubscribe_hello"),
client.subscribe::<i32>("subscribe_hello", None, "unsubscribe_hello"),
client.subscribe::<i32>("subscribe_hello", None, "unsubscribe_hello"),
);
assert!(pass1.is_ok());
assert!(pass2.is_ok());
assert!(pass3.is_ok());
// 3 CPU units (manually set for subscriptions) per call, so 3th call exceeds cap
let (pass1, pass2, fail) = tokio::join!(
client.subscribe::<i32>("subscribe_hello_limit", None, "unsubscribe_hello_limit"),
client.subscribe::<i32>("subscribe_hello_limit", None, "unsubscribe_hello_limit"),
client.subscribe::<i32>("subscribe_hello_limit", None, "unsubscribe_hello_limit"),
);
assert!(pass1.is_ok());
assert!(pass2.is_ok());
assert_server_busy(fail);
server_handle.stop().unwrap().await;
}
......
......@@ -70,7 +70,7 @@ fn flatten_rpc_modules() {
fn rpc_context_modules_can_register_subscriptions() {
let cx = ();
let mut cxmodule = RpcModule::new(cx);
let _subscription = cxmodule.register_subscription("hi", "hi", "goodbye", |_, _, _| {});
cxmodule.register_subscription("hi", "hi", "goodbye", |_, _, _| {}).unwrap();
assert!(cxmodule.method("hi").is_some());
assert!(cxmodule.method("goodbye").is_some());
......
......@@ -495,7 +495,7 @@ async fn background_task(
let result = if let Some(cn) = bounded_subscriptions.acquire() {
let conn_state =
ConnState { conn_id, close_notify: cn, id_provider: &*id_provider };
callback(id, params, sink.clone(), conn_state)
callback(id, params, sink.clone(), conn_state, Some(guard))
} else {
sink.send_error(
req.id,
......@@ -505,7 +505,6 @@ async fn background_task(
};
middleware.on_result(name, result, request_start);
middleware.on_response(request_start);
drop(guard);
}
Err(err) => {
tracing::error!(
......@@ -617,7 +616,13 @@ async fn background_task(
close_notify: cn,
id_provider: &*id_provider,
};
callback(id, params, sink_batch.clone(), conn_state)
callback(
id,
params,
sink_batch.clone(),
conn_state,
Some(guard),
)
} else {
sink_batch.send_error(
req.id,
......@@ -626,7 +631,6 @@ async fn background_task(
false
};
middleware.on_result(&req.method, result, request_start);
drop(guard);
None
}
Err(err) => {
......
......@@ -21,7 +21,7 @@
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN background_task WITH THE SOFTWARE OR THE USE OR OTHER
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
#![cfg(test)]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment