Unverified Commit 42759bbb authored by Niklas Adolfsson's avatar Niklas Adolfsson Committed by GitHub
Browse files

feat(middleware): expose type of the method call (#820)

parent 538854bc
Pipeline #203838 passed with stages
in 4 minutes and 23 seconds
...@@ -31,6 +31,32 @@ use std::net::SocketAddr; ...@@ -31,6 +31,32 @@ use std::net::SocketAddr;
pub use http::HeaderMap as Headers; pub use http::HeaderMap as Headers;
pub use jsonrpsee_types::Params; pub use jsonrpsee_types::Params;
/// The type JSON-RPC v2 call, it can be a subscription, method call or unknown.
#[derive(Debug, Copy, Clone)]
pub enum MethodKind {
/// Subscription Call.
Subscription,
/// Unsubscription Call.
Unsubscription,
/// Method call.
MethodCall,
/// Unknown method.
Unknown,
}
impl std::fmt::Display for MethodKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::Subscription => "subscription",
Self::MethodCall => "method call",
Self::Unknown => "unknown",
Self::Unsubscription => "unsubscription",
};
write!(f, "{}", s)
}
}
/// Defines a middleware specifically for HTTP requests with callbacks during the RPC request life-cycle. /// Defines a middleware specifically for HTTP requests with callbacks during the RPC request life-cycle.
/// The primary use case for this is to collect timings for a larger metrics collection solution. /// The primary use case for this is to collect timings for a larger metrics collection solution.
/// ///
...@@ -45,7 +71,7 @@ pub trait HttpMiddleware: Send + Sync + Clone + 'static { ...@@ -45,7 +71,7 @@ pub trait HttpMiddleware: Send + Sync + Clone + 'static {
fn on_request(&self, remote_addr: SocketAddr, headers: &Headers) -> Self::Instant; fn on_request(&self, remote_addr: SocketAddr, headers: &Headers) -> Self::Instant;
/// Called on each JSON-RPC method call, batch requests will trigger `on_call` multiple times. /// Called on each JSON-RPC method call, batch requests will trigger `on_call` multiple times.
fn on_call(&self, method_name: &str, params: Params); fn on_call(&self, method_name: &str, params: Params, kind: MethodKind);
/// Called on each JSON-RPC method completion, batch requests will trigger `on_result` multiple times. /// Called on each JSON-RPC method completion, batch requests will trigger `on_result` multiple times.
fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant); fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant);
...@@ -71,7 +97,7 @@ pub trait WsMiddleware: Send + Sync + Clone + 'static { ...@@ -71,7 +97,7 @@ pub trait WsMiddleware: Send + Sync + Clone + 'static {
fn on_request(&self) -> Self::Instant; fn on_request(&self) -> Self::Instant;
/// Called on each JSON-RPC method call, batch requests will trigger `on_call` multiple times. /// Called on each JSON-RPC method call, batch requests will trigger `on_call` multiple times.
fn on_call(&self, method_name: &str, params: Params); fn on_call(&self, method_name: &str, params: Params, kind: MethodKind);
/// Called on each JSON-RPC method completion, batch requests will trigger `on_result` multiple times. /// Called on each JSON-RPC method completion, batch requests will trigger `on_result` multiple times.
fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant); fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant);
...@@ -88,7 +114,7 @@ impl HttpMiddleware for () { ...@@ -88,7 +114,7 @@ impl HttpMiddleware for () {
fn on_request(&self, _: std::net::SocketAddr, _: &Headers) -> Self::Instant {} fn on_request(&self, _: std::net::SocketAddr, _: &Headers) -> Self::Instant {}
fn on_call(&self, _: &str, _: Params) {} fn on_call(&self, _: &str, _: Params, _: MethodKind) {}
fn on_result(&self, _: &str, _: bool, _: Self::Instant) {} fn on_result(&self, _: &str, _: bool, _: Self::Instant) {}
...@@ -102,7 +128,7 @@ impl WsMiddleware for () { ...@@ -102,7 +128,7 @@ impl WsMiddleware for () {
fn on_request(&self) -> Self::Instant {} fn on_request(&self) -> Self::Instant {}
fn on_call(&self, _: &str, _: Params) {} fn on_call(&self, _: &str, _: Params, _: MethodKind) {}
fn on_result(&self, _: &str, _: bool, _: Self::Instant) {} fn on_result(&self, _: &str, _: bool, _: Self::Instant) {}
...@@ -126,9 +152,9 @@ where ...@@ -126,9 +152,9 @@ where
(self.0.on_request(), self.1.on_request()) (self.0.on_request(), self.1.on_request())
} }
fn on_call(&self, method_name: &str, params: Params) { fn on_call(&self, method_name: &str, params: Params, kind: MethodKind) {
self.0.on_call(method_name, params.clone()); self.0.on_call(method_name, params.clone(), kind);
self.1.on_call(method_name, params); self.1.on_call(method_name, params, kind);
} }
fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant) { fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant) {
...@@ -157,9 +183,9 @@ where ...@@ -157,9 +183,9 @@ where
(self.0.on_request(remote_addr, headers), self.1.on_request(remote_addr, headers)) (self.0.on_request(remote_addr, headers), self.1.on_request(remote_addr, headers))
} }
fn on_call(&self, method_name: &str, params: Params) { fn on_call(&self, method_name: &str, params: Params, kind: MethodKind) {
self.0.on_call(method_name, params.clone()); self.0.on_call(method_name, params.clone(), kind);
self.1.on_call(method_name, params); self.1.on_call(method_name, params, kind);
} }
fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant) { fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant) {
......
...@@ -28,7 +28,7 @@ use std::net::SocketAddr; ...@@ -28,7 +28,7 @@ use std::net::SocketAddr;
use std::time::Instant; use std::time::Instant;
use jsonrpsee::core::client::ClientT; use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::middleware::{self, Headers, Params}; use jsonrpsee::core::middleware::{self, Headers, MethodKind, Params};
use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle, RpcModule}; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle, RpcModule};
...@@ -43,8 +43,8 @@ impl middleware::HttpMiddleware for Timings { ...@@ -43,8 +43,8 @@ impl middleware::HttpMiddleware for Timings {
Instant::now() Instant::now()
} }
fn on_call(&self, name: &str, params: Params) { fn on_call(&self, name: &str, params: Params, kind: MethodKind) {
println!("[Middleware::on_call] method: '{}', params: {:?}", name, params); println!("[Middleware::on_call] method: '{}', params: {:?}, kind: {}", name, params, kind);
} }
fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) { fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) {
......
...@@ -28,7 +28,7 @@ use std::net::SocketAddr; ...@@ -28,7 +28,7 @@ use std::net::SocketAddr;
use std::time::Instant; use std::time::Instant;
use jsonrpsee::core::client::ClientT; use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::middleware::{self, Headers, Params}; use jsonrpsee::core::middleware::{self, Headers, MethodKind, Params};
use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder}; use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};
...@@ -47,8 +47,8 @@ impl middleware::WsMiddleware for Timings { ...@@ -47,8 +47,8 @@ impl middleware::WsMiddleware for Timings {
Instant::now() Instant::now()
} }
fn on_call(&self, name: &str, params: Params) { fn on_call(&self, name: &str, params: Params, kind: MethodKind) {
println!("[Middleware::on_call] method: '{}' params: {:?}", name, params); println!("[Middleware::on_call] method: '{}', params: {:?}, kind: {}", name, params, kind);
} }
fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) { fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) {
......
...@@ -30,6 +30,7 @@ use std::net::SocketAddr; ...@@ -30,6 +30,7 @@ use std::net::SocketAddr;
use std::process::Command; use std::process::Command;
use std::time::Instant; use std::time::Instant;
use jsonrpsee::core::middleware::MethodKind;
use jsonrpsee::core::{client::ClientT, middleware, middleware::Headers}; use jsonrpsee::core::{client::ClientT, middleware, middleware::Headers};
use jsonrpsee::rpc_params; use jsonrpsee::rpc_params;
use jsonrpsee::types::Params; use jsonrpsee::types::Params;
...@@ -51,8 +52,8 @@ impl middleware::WsMiddleware for Timings { ...@@ -51,8 +52,8 @@ impl middleware::WsMiddleware for Timings {
Instant::now() Instant::now()
} }
fn on_call(&self, name: &str, _params: Params) { fn on_call(&self, name: &str, params: Params, kind: MethodKind) {
println!("[Timings] They called '{}'", name); println!("[Timings:on_call] method: '{}', params: {:?}, kind: {}", name, params, kind);
} }
fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) { fn on_result(&self, name: &str, succeess: bool, started_at: Self::Instant) {
...@@ -94,7 +95,7 @@ impl middleware::WsMiddleware for ThreadWatcher { ...@@ -94,7 +95,7 @@ impl middleware::WsMiddleware for ThreadWatcher {
println!("[ThreadWatcher::on_connect] remote_addr {}, headers: {:?}", remote_addr, headers); println!("[ThreadWatcher::on_connect] remote_addr {}, headers: {:?}", remote_addr, headers);
} }
fn on_call(&self, _method: &str, _params: Params) { fn on_call(&self, _method: &str, _params: Params, _kind: MethodKind) {
let threads = Self::count_threads(); let threads = Self::count_threads();
println!("[ThreadWatcher::on_call] Threads running on the machine at the start of a call: {}", threads); println!("[ThreadWatcher::on_call] Threads running on the machine at the start of a call: {}", threads);
} }
......
...@@ -41,7 +41,7 @@ use hyper::service::{make_service_fn, service_fn}; ...@@ -41,7 +41,7 @@ use hyper::service::{make_service_fn, service_fn};
use hyper::{Error as HyperError, Method}; use hyper::{Error as HyperError, Method};
use jsonrpsee_core::error::{Error, GenericTransportError}; use jsonrpsee_core::error::{Error, GenericTransportError};
use jsonrpsee_core::http_helpers::{self, read_body}; use jsonrpsee_core::http_helpers::{self, read_body};
use jsonrpsee_core::middleware::HttpMiddleware as Middleware; use jsonrpsee_core::middleware::{self, HttpMiddleware as Middleware};
use jsonrpsee_core::server::access_control::AccessControl; use jsonrpsee_core::server::access_control::AccessControl;
use jsonrpsee_core::server::helpers::{prepare_error, MethodResponse}; use jsonrpsee_core::server::helpers::{prepare_error, MethodResponse};
use jsonrpsee_core::server::helpers::{BatchResponse, BatchResponseBuilder}; use jsonrpsee_core::server::helpers::{BatchResponse, BatchResponseBuilder};
...@@ -102,7 +102,7 @@ impl<M> Builder<M> { ...@@ -102,7 +102,7 @@ impl<M> Builder<M> {
/// ``` /// ```
/// use std::{time::Instant, net::SocketAddr}; /// use std::{time::Instant, net::SocketAddr};
/// ///
/// use jsonrpsee_core::middleware::{HttpMiddleware, Headers, Params}; /// use jsonrpsee_core::middleware::{HttpMiddleware, Headers, MethodKind, Params};
/// use jsonrpsee_http_server::HttpServerBuilder; /// use jsonrpsee_http_server::HttpServerBuilder;
/// ///
/// #[derive(Clone)] /// #[derive(Clone)]
...@@ -119,8 +119,8 @@ impl<M> Builder<M> { ...@@ -119,8 +119,8 @@ impl<M> Builder<M> {
/// ///
/// // Called once a single JSON-RPC method call is processed, it may be called multiple times /// // Called once a single JSON-RPC method call is processed, it may be called multiple times
/// // on batches. /// // on batches.
/// fn on_call(&self, method_name: &str, params: Params) { /// fn on_call(&self, method_name: &str, params: Params, kind: MethodKind) {
/// println!("Call to method: '{}' params: {:?}", method_name, params); /// println!("Call to method: '{}' params: {:?}, kind: {}", method_name, params, kind);
/// } /// }
/// ///
/// // Called once a single JSON-RPC call is completed, it may be called multiple times /// // Called once a single JSON-RPC call is completed, it may be called multiple times
...@@ -833,35 +833,44 @@ async fn execute_call<M: Middleware>(c: Call<'_, M>) -> MethodResponse { ...@@ -833,35 +833,44 @@ async fn execute_call<M: Middleware>(c: Call<'_, M>) -> MethodResponse {
let CallData { resources, methods, middleware, max_response_body_size, max_log_length, conn_id, request_start } = let CallData { resources, methods, middleware, max_response_body_size, max_log_length, conn_id, request_start } =
call; call;
middleware.on_call(name, params.clone());
let response = match methods.method_with_name(name) { let response = match methods.method_with_name(name) {
None => MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)), None => {
middleware.on_call(name, params.clone(), middleware::MethodKind::Unknown);
MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound))
}
Some((name, method)) => match &method.inner() { Some((name, method)) => match &method.inner() {
MethodKind::Sync(callback) => match method.claim(name, resources) { MethodKind::Sync(callback) => {
Ok(guard) => { middleware.on_call(name, params.clone(), middleware::MethodKind::MethodCall);
let r = (callback)(id, params, max_response_body_size as usize);
drop(guard); match method.claim(name, resources) {
r Ok(guard) => {
} let r = (callback)(id, params, max_response_body_size as usize);
Err(err) => { drop(guard);
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err); r
MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy)) }
Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
}
} }
}, }
MethodKind::Async(callback) => match method.claim(name, resources) { MethodKind::Async(callback) => {
Ok(guard) => { middleware.on_call(name, params.clone(), middleware::MethodKind::MethodCall);
let id = id.into_owned(); match method.claim(name, resources) {
let params = params.into_owned(); Ok(guard) => {
let id = id.into_owned();
let params = params.into_owned();
(callback)(id, params, conn_id, max_response_body_size as usize, Some(guard)).await (callback)(id, params, conn_id, max_response_body_size as usize, Some(guard)).await
} }
Err(err) => { Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err); tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy)) MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
}
} }
}, }
MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => { MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
middleware.on_call(name, params.clone(), middleware::MethodKind::Unknown);
tracing::error!("Subscriptions not supported on HTTP"); tracing::error!("Subscriptions not supported on HTTP");
MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError)) MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
} }
......
...@@ -30,7 +30,7 @@ use std::sync::{Arc, Mutex}; ...@@ -30,7 +30,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use hyper::HeaderMap; use hyper::HeaderMap;
use jsonrpsee::core::middleware::{HttpMiddleware, WsMiddleware}; use jsonrpsee::core::middleware::{HttpMiddleware, MethodKind, WsMiddleware};
use jsonrpsee::core::{client::ClientT, Error}; use jsonrpsee::core::{client::ClientT, Error};
use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle}; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
...@@ -73,7 +73,7 @@ impl WsMiddleware for Counter { ...@@ -73,7 +73,7 @@ impl WsMiddleware for Counter {
n n
} }
fn on_call(&self, name: &str, _params: Params) { fn on_call(&self, name: &str, _params: Params, _kind: MethodKind) {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
let entry = inner.calls.entry(name.into()).or_insert((0, Vec::new())); let entry = inner.calls.entry(name.into()).or_insert((0, Vec::new()));
...@@ -108,7 +108,7 @@ impl HttpMiddleware for Counter { ...@@ -108,7 +108,7 @@ impl HttpMiddleware for Counter {
n n
} }
fn on_call(&self, name: &str, _params: Params) { fn on_call(&self, name: &str, _params: Params, _kind: MethodKind) {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
let entry = inner.calls.entry(name.into()).or_insert((0, Vec::new())); let entry = inner.calls.entry(name.into()).or_insert((0, Vec::new()));
......
...@@ -42,7 +42,7 @@ use futures_util::TryStreamExt; ...@@ -42,7 +42,7 @@ use futures_util::TryStreamExt;
use http::header::{HOST, ORIGIN}; use http::header::{HOST, ORIGIN};
use http::{HeaderMap, HeaderValue}; use http::{HeaderMap, HeaderValue};
use jsonrpsee_core::id_providers::RandomIntegerIdProvider; use jsonrpsee_core::id_providers::RandomIntegerIdProvider;
use jsonrpsee_core::middleware::WsMiddleware as Middleware; use jsonrpsee_core::middleware::{self, WsMiddleware as Middleware};
use jsonrpsee_core::server::access_control::AccessControl; use jsonrpsee_core::server::access_control::AccessControl;
use jsonrpsee_core::server::helpers::{ use jsonrpsee_core::server::helpers::{
prepare_error, BatchResponse, BatchResponseBuilder, BoundedSubscriptions, MethodResponse, MethodSink, prepare_error, BatchResponse, BatchResponseBuilder, BoundedSubscriptions, MethodResponse, MethodSink,
...@@ -655,7 +655,7 @@ impl<M> Builder<M> { ...@@ -655,7 +655,7 @@ impl<M> Builder<M> {
/// ``` /// ```
/// use std::{time::Instant, net::SocketAddr}; /// use std::{time::Instant, net::SocketAddr};
/// ///
/// use jsonrpsee_core::middleware::{WsMiddleware, Headers, Params}; /// use jsonrpsee_core::middleware::{WsMiddleware, Headers, MethodKind, Params};
/// use jsonrpsee_ws_server::WsServerBuilder; /// use jsonrpsee_ws_server::WsServerBuilder;
/// ///
/// #[derive(Clone)] /// #[derive(Clone)]
...@@ -672,8 +672,8 @@ impl<M> Builder<M> { ...@@ -672,8 +672,8 @@ impl<M> Builder<M> {
/// Instant::now() /// Instant::now()
/// } /// }
/// ///
/// fn on_call(&self, method_name: &str, params: Params) { /// fn on_call(&self, method_name: &str, params: Params, kind: MethodKind) {
/// println!("[MyMiddleware::on_call] method: '{}' params: {:?}", method_name, params); /// println!("[MyMiddleware::on_call] method: '{}' params: {:?}, kind: {:?}", method_name, params, kind);
/// } /// }
/// ///
/// fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant) { /// fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant) {
...@@ -929,59 +929,73 @@ async fn execute_call<M: Middleware>(c: Call<'_, M>) -> MethodResult { ...@@ -929,59 +929,73 @@ async fn execute_call<M: Middleware>(c: Call<'_, M>) -> MethodResult {
request_start, request_start,
} = call; } = call;
middleware.on_call(name, params.clone());
let response = match methods.method_with_name(name) { let response = match methods.method_with_name(name) {
None => { None => {
middleware.on_call(name, params.clone(), middleware::MethodKind::Unknown);
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)); let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound));
MethodResult::SendAndMiddleware(response) MethodResult::SendAndMiddleware(response)
} }
Some((name, method)) => match &method.inner() { Some((name, method)) => match &method.inner() {
MethodKind::Sync(callback) => match method.claim(name, resources) { MethodKind::Sync(callback) => {
Ok(guard) => { middleware.on_call(name, params.clone(), middleware::MethodKind::MethodCall);
let r = (callback)(id, params, max_response_body_size as usize);
drop(guard); match method.claim(name, resources) {
MethodResult::SendAndMiddleware(r) Ok(guard) => {
} let r = (callback)(id, params, max_response_body_size as usize);
Err(err) => { drop(guard);
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err); MethodResult::SendAndMiddleware(r)
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy)); }
MethodResult::SendAndMiddleware(response) Err(err) => {
} tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
}, let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy));
MethodKind::Async(callback) => match method.claim(name, resources) { MethodResult::SendAndMiddleware(response)
Ok(guard) => { }
let id = id.into_owned();
let params = params.into_owned();
let response = (callback)(id, params, conn_id, max_response_body_size as usize, Some(guard)).await;
MethodResult::SendAndMiddleware(response)
}
Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy));
MethodResult::SendAndMiddleware(response)
} }
}, }
MethodKind::Subscription(callback) => match method.claim(name, resources) { MethodKind::Async(callback) => {
Ok(guard) => { middleware.on_call(name, params.clone(), middleware::MethodKind::MethodCall);
if let Some(cn) = bounded_subscriptions.acquire() {
let conn_state = ConnState { conn_id, close_notify: cn, id_provider }; match method.claim(name, resources) {
let response = callback(id.clone(), params, sink.clone(), conn_state, Some(guard)).await; Ok(guard) => {
MethodResult::JustMiddleware(response) let id = id.into_owned();
} else { let params = params.into_owned();
let response = let response =
MethodResponse::error(id, reject_too_many_subscriptions(bounded_subscriptions.max())); (callback)(id, params, conn_id, max_response_body_size as usize, Some(guard)).await;
MethodResult::SendAndMiddleware(response)
}
Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy));
MethodResult::SendAndMiddleware(response) MethodResult::SendAndMiddleware(response)
} }
} }
Err(err) => { }
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err); MethodKind::Subscription(callback) => {
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy)); middleware.on_call(name, params.clone(), middleware::MethodKind::Subscription);
MethodResult::SendAndMiddleware(response)
match method.claim(name, resources) {
Ok(guard) => {
if let Some(cn) = bounded_subscriptions.acquire() {
let conn_state = ConnState { conn_id, close_notify: cn, id_provider };
let response = callback(id.clone(), params, sink.clone(), conn_state, Some(guard)).await;
MethodResult::JustMiddleware(response)
} else {
let response =
MethodResponse::error(id, reject_too_many_subscriptions(bounded_subscriptions.max()));
MethodResult::SendAndMiddleware(response)
}
}
Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy));
MethodResult::SendAndMiddleware(response)
}
} }
}, }
MethodKind::Unsubscription(callback) => { MethodKind::Unsubscription(callback) => {
middleware.on_call(name, params.clone(), middleware::MethodKind::Unsubscription);
// Don't adhere to any resource or subscription limits; always let unsubscribing happen! // Don't adhere to any resource or subscription limits; always let unsubscribing happen!
let result = callback(id, params, conn_id, max_response_body_size as usize); let result = callback(id, params, conn_id, max_response_body_size as usize);
MethodResult::SendAndMiddleware(result) MethodResult::SendAndMiddleware(result)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment