Unverified Commit acdef050 authored by Niklas Adolfsson's avatar Niklas Adolfsson
Browse files

fix nits

parent c2642e45
Pipeline #200321 passed with stages
in 5 minutes and 6 seconds
......@@ -259,7 +259,7 @@ impl MethodResponse {
}
/// Builder to build a `BatchResponse`.
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct BatchResponseBuilder {
/// Serialized JSON-RPC response,
result: String,
......
......@@ -74,12 +74,12 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, pending, _| {
let rx = BroadcastStream::new(tx.clone().subscribe());
tokio::spawn(async move {
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
match sink.pipe_from_try_stream(rx).await {
SubscriptionClosed::Success => {
sink.close(SubscriptionClosed::Success);
......
......@@ -106,13 +106,12 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let interval = interval(Duration::from_millis(200));
let stream = IntervalStream::new(interval).map(move |_| item);
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
tokio::spawn(async move {
let sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
match sink.pipe_from_stream(stream).await {
// Send close notification when subscription stream failed.
SubscriptionClosed::Failed(err) => {
......
......@@ -425,7 +425,7 @@ impl<M: Middleware> Server<M> {
async move {
Ok::<_, HyperError>(service_fn(move |request| {
let request_start = middleware.on_request(remote_addr, &request.headers());
let request_start = middleware.on_request(remote_addr, request.headers());
let methods = methods.clone();
let acl = acl.clone();
......@@ -490,7 +490,7 @@ impl<M: Middleware> Server<M> {
// to be read in a browser.
Method::POST if content_type_is_json(&request) => {
let origin = return_origin_if_different_from_host(request.headers()).cloned();
let mut res = process_validated_request(
let mut res = process_validated_request(ProcessValidatedRequest {
request,
middleware,
methods,
......@@ -500,7 +500,7 @@ impl<M: Middleware> Server<M> {
max_log_length,
batch_requests_supported,
request_start,
)
})
.await?;
if let Some(origin) = origin {
......@@ -578,8 +578,7 @@ fn is_json(content_type: Option<&hyper::header::HeaderValue>) -> bool {
}
}
/// Process a verified request, it implies a POST request with content type JSON.
async fn process_validated_request<M: Middleware>(
struct ProcessValidatedRequest<M: Middleware> {
request: hyper::Request<hyper::Body>,
middleware: M,
methods: Methods,
......@@ -589,7 +588,24 @@ async fn process_validated_request<M: Middleware>(
max_log_length: u32,
batch_requests_supported: bool,
request_start: M::Instant,
}
/// Process a verified request, it implies a POST request with content type JSON.
async fn process_validated_request<M: Middleware>(
input: ProcessValidatedRequest<M>,
) -> Result<hyper::Response<hyper::Body>, HyperError> {
let ProcessValidatedRequest {
request,
middleware,
methods,
resources,
max_request_body_size,
max_response_body_size,
max_log_length,
batch_requests_supported,
request_start,
} = input;
let (parts, body) = request.into_parts();
let (body, is_single) = match read_body(&parts.headers, body, max_request_body_size).await {
......
......@@ -486,10 +486,10 @@ async fn background_task<M: Middleware>(input: BackgroundTask<'_, M>) -> Result<
let fut = async move {
let call = CallData {
conn_id,
resources: &resources,
resources,
max_response_body_size,
max_log_length,
methods: &methods,
methods,
bounded_subscriptions,
sink: &sink,
id_provider: &*id_provider,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment