Unverified Commit eaf337d6 authored by Niklas Adolfsson's avatar Niklas Adolfsson Committed by GitHub
Browse files

add connection span (#922)

parent e649f38e
Pipeline #224299 passed with stages
in 6 minutes and 10 seconds
......@@ -55,6 +55,7 @@ use tokio::sync::{watch, OwnedSemaphorePermit};
use tokio_util::compat::TokioAsyncReadCompatExt;
use tower::layer::util::Identity;
use tower::{Layer, Service};
use tracing::{instrument, Instrument};
/// Default maximum connections allowed.
const MAX_CONNECTIONS: u32 = 100;
......@@ -183,6 +184,8 @@ where
stop_handle.clone(),
curr_conns,
max_conns,
remote_addr,
id,
)));
id = id.wrapping_add(1);
......@@ -625,22 +628,25 @@ impl<L: Logger> hyper::service::Service<hyper::Request<hyper::Body>> for TowerSe
self.inner.logger.on_connect(self.inner.remote_addr, &request, TransportProtocol::WebSocket);
let data = self.inner.clone();
tokio::spawn(async move {
let upgraded = match hyper::upgrade::on(request).await {
Ok(u) => u,
Err(e) => {
tracing::warn!("Could not upgrade connection: {}", e);
return;
}
};
let stream = BufReader::new(BufWriter::new(upgraded.compat()));
let mut ws_builder = server.into_builder(stream);
ws_builder.set_max_message_size(data.max_request_body_size as usize);
let (sender, receiver) = ws_builder.finish();
let _ = ws::background_task::<L>(sender, receiver, data).await;
});
tokio::spawn(
async move {
let upgraded = match hyper::upgrade::on(request).await {
Ok(u) => u,
Err(e) => {
tracing::warn!("Could not upgrade connection: {}", e);
return;
}
};
let stream = BufReader::new(BufWriter::new(upgraded.compat()));
let mut ws_builder = server.into_builder(stream);
ws_builder.set_max_message_size(data.max_request_body_size as usize);
let (sender, receiver) = ws_builder.finish();
let _ = ws::background_task::<L>(sender, receiver, data).await;
}
.in_current_span(),
);
response.map(|()| hyper::Body::empty())
}
......@@ -723,12 +729,15 @@ where
}
// Attempts to accept a new connection
#[instrument(name = "connection", skip(socket, service, stop_handle, curr_conns, max_conns), level = "INFO")]
async fn try_accept_connection<S, Bd>(
socket: TcpStream,
service: S,
mut stop_handle: StopHandle,
curr_conns: usize,
max_conns: usize,
remote_addr: SocketAddr,
conn_id: u32,
) where
S: Service<hyper::Request<hyper::Body>, Response = hyper::Response<Bd>> + Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment