Newer
Older
let trace = RpcTracing::batch();
let _enter = trace.span().enter();
let batch_response = batch_stream
.try_fold(
BatchResponseBuilder::new_with_limit(max_response_size as usize),
|batch_response, (req, call)| async move {
let params = Params::new(req.params.map(|params| params.get()));
let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await;
.await;
return match batch_response {
Ok(batch) => batch.finish(),
Err(batch_err) => batch_err,
};
}
if let Ok(batch) = serde_json::from_slice::<Vec<Notif>>(&data) {
return if !batch.is_empty() {
BatchResponse { result: "".to_string(), success: true }
} else {
BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
};
}
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
let (id, code) = prepare_error(&data);
BatchResponse::error(id, ErrorObject::from(code))
}
async fn process_single_request<M: Middleware>(data: Vec<u8>, call: CallData<'_, M>) -> MethodResponse {
if let Ok(req) = serde_json::from_slice::<Request>(&data) {
let trace = RpcTracing::method_call(&req.method);
let _enter = trace.span().enter();
rx_log_from_json(&req, call.max_log_length);
let params = Params::new(req.params.map(|params| params.get()));
let name = &req.method;
let id = req.id;
execute_call(Call { name, params, id, call }).in_current_span().await
} else if let Ok(req) = serde_json::from_slice::<Notif>(&data) {
let trace = RpcTracing::notification(&req.method);
let _enter = trace.span().enter();
rx_log_from_json(&req, call.max_log_length);
MethodResponse { result: String::new(), success: true }
} else {
let (id, code) = prepare_error(&data);
MethodResponse::error(id, ErrorObject::from(code))
}
}
async fn execute_call<M: Middleware>(c: Call<'_, M>) -> MethodResponse {
let Call { name, id, params, call } = c;
let CallData { resources, methods, middleware, max_response_body_size, max_log_length, conn_id, request_start } =
call;
let response = match methods.method_with_name(name) {
None => {
middleware.on_call(name, params.clone(), middleware::MethodKind::Unknown);
MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound))
}
Some((name, method)) => match &method.inner() {
MethodKind::Sync(callback) => {
middleware.on_call(name, params.clone(), middleware::MethodKind::MethodCall);
match method.claim(name, resources) {
Ok(guard) => {
let r = (callback)(id, params, max_response_body_size as usize);
drop(guard);
r
}
Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
}
}
MethodKind::Async(callback) => {
middleware.on_call(name, params.clone(), middleware::MethodKind::MethodCall);
match method.claim(name, resources) {
Ok(guard) => {
let id = id.into_owned();
let params = params.into_owned();
(callback)(id, params, conn_id, max_response_body_size as usize, Some(guard)).await
}
Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
}
MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
middleware.on_call(name, params.clone(), middleware::MethodKind::Unknown);
tracing::error!("Subscriptions not supported on HTTP");
MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
tx_log_from_str(&response.result, max_log_length);
middleware.on_result(name, response.success, request_start);
response