server.rs 36 KiB
Newer Older
		let trace = RpcTracing::batch();
		let _enter = trace.span().enter();

		let batch_response = batch_stream
			.try_fold(
				BatchResponseBuilder::new_with_limit(max_response_size as usize),
				|batch_response, (req, call)| async move {
					let params = Params::new(req.params.map(|params| params.get()));

					let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await;

Niklas Adolfsson's avatar
Niklas Adolfsson committed
					batch_response.append(&response)
				},
			)
			.in_current_span()

		return match batch_response {
			Ok(batch) => batch.finish(),
			Err(batch_err) => batch_err,
		};
	}

	if let Ok(batch) = serde_json::from_slice::<Vec<Notif>>(&data) {
		return if !batch.is_empty() {
			BatchResponse { result: "".to_string(), success: true }
			BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
		};
	}
	// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
	// Array with at least one value, the response from the Server MUST be a single
	// Response object." – The Spec.
	let (id, code) = prepare_error(&data);
	BatchResponse::error(id, ErrorObject::from(code))
}

async fn process_single_request<M: Middleware>(data: Vec<u8>, call: CallData<'_, M>) -> MethodResponse {
	if let Ok(req) = serde_json::from_slice::<Request>(&data) {
		let trace = RpcTracing::method_call(&req.method);
		let _enter = trace.span().enter();

		rx_log_from_json(&req, call.max_log_length);

		let params = Params::new(req.params.map(|params| params.get()));
		let name = &req.method;
		let id = req.id;

		execute_call(Call { name, params, id, call }).in_current_span().await
	} else if let Ok(req) = serde_json::from_slice::<Notif>(&data) {
		let trace = RpcTracing::notification(&req.method);
		let _enter = trace.span().enter();

		rx_log_from_json(&req, call.max_log_length);
		MethodResponse { result: String::new(), success: true }
	} else {
		let (id, code) = prepare_error(&data);
		MethodResponse::error(id, ErrorObject::from(code))
	}
async fn execute_call<M: Middleware>(c: Call<'_, M>) -> MethodResponse {
	let Call { name, id, params, call } = c;
	let CallData { resources, methods, middleware, max_response_body_size, max_log_length, conn_id, request_start } =
		call;

	let response = match methods.method_with_name(name) {
		None => {
			middleware.on_call(name, params.clone(), middleware::MethodKind::Unknown);
			MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound))
		}
		Some((name, method)) => match &method.inner() {
			MethodKind::Sync(callback) => {
				middleware.on_call(name, params.clone(), middleware::MethodKind::MethodCall);

				match method.claim(name, resources) {
					Ok(guard) => {
						let r = (callback)(id, params, max_response_body_size as usize);
						drop(guard);
						r
					}
					Err(err) => {
						tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
						MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
					}
			}
			MethodKind::Async(callback) => {
				middleware.on_call(name, params.clone(), middleware::MethodKind::MethodCall);
				match method.claim(name, resources) {
					Ok(guard) => {
						let id = id.into_owned();
						let params = params.into_owned();
						(callback)(id, params, conn_id, max_response_body_size as usize, Some(guard)).await
					}
					Err(err) => {
						tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
						MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
					}
			MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
				middleware.on_call(name, params.clone(), middleware::MethodKind::Unknown);
				tracing::error!("Subscriptions not supported on HTTP");
				MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
	tx_log_from_str(&response.result, max_log_length);
	middleware.on_result(name, response.success, request_start);
	response