server.rs 34.8 KiB
Newer Older
		let trace = RpcTracing::method_call(&req.method);
		let _enter = trace.span().enter();

		rx_log_from_json(&req, call.max_log_length);

		let params = Params::new(req.params.map(|params| params.get()));
		let name = &req.method;
		let id = req.id;

		execute_call(Call { name, params, id, call }).in_current_span().await
	} else if let Ok(req) = serde_json::from_slice::<Notif>(&data) {
		let trace = RpcTracing::notification(&req.method);
		let _enter = trace.span().enter();

		rx_log_from_json(&req, call.max_log_length);
		MethodResponse { result: String::new(), success: true }
	} else {
		let (id, code) = prepare_error(&data);
		MethodResponse::error(id, ErrorObject::from(code))
	}
async fn execute_call<M: Middleware>(c: Call<'_, M>) -> MethodResponse {
	let Call { name, id, params, call } = c;
	let CallData { resources, methods, middleware, max_response_body_size, max_log_length, conn_id, request_start } =
		call;

	let response = match methods.method_with_name(name) {
		None => {
			middleware.on_call(name, params.clone(), middleware::MethodKind::Unknown);
			MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound))
		}
		Some((name, method)) => match &method.inner() {
			MethodKind::Sync(callback) => {
				middleware.on_call(name, params.clone(), middleware::MethodKind::MethodCall);

				match method.claim(name, resources) {
					Ok(guard) => {
						let r = (callback)(id, params, max_response_body_size as usize);
						drop(guard);
						r
					}
					Err(err) => {
						tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
						MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
					}
			}
			MethodKind::Async(callback) => {
				middleware.on_call(name, params.clone(), middleware::MethodKind::MethodCall);
				match method.claim(name, resources) {
					Ok(guard) => {
						let id = id.into_owned();
						let params = params.into_owned();
						(callback)(id, params, conn_id, max_response_body_size as usize, Some(guard)).await
					}
					Err(err) => {
						tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
						MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
					}
			MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
				middleware.on_call(name, params.clone(), middleware::MethodKind::Unknown);
				tracing::error!("Subscriptions not supported on HTTP");
				MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
	tx_log_from_str(&response.result, max_log_length);
	middleware.on_result(name, response.success, request_start);
	response