Unverified Commit 1f6368d2 authored by Niklas Adolfsson's avatar Niklas Adolfsson Committed by GitHub
Browse files

fix(server): read accepted conns properly (#929)

This PR fixes that the connection count is read after a connection has been accepted and
not before which cause this log to be out of order.
parent bc11e9de
Pipeline #224884 canceled with stages
in 24 seconds
......@@ -196,6 +196,7 @@ impl ServerHandle {
}
/// Limits the number of connections.
#[derive(Debug)]
pub(crate) struct ConnectionGuard(Arc<Semaphore>);
impl ConnectionGuard {
......
......@@ -36,7 +36,7 @@ use crate::future::{ConnectionGuard, FutureDriver, ServerHandle, StopHandle};
use crate::logger::{Logger, TransportProtocol};
use crate::transport::{http, ws};
use futures_util::future::FutureExt;
use futures_util::future::{BoxFuture, FutureExt};
use futures_util::io::{BufReader, BufWriter};
use hyper::body::HttpBody;
......@@ -139,55 +139,24 @@ where
loop {
match connections.select_with(&mut incoming).await {
Ok((socket, remote_addr)) => {
if let Err(e) = socket.set_nodelay(true) {
tracing::warn!("Could not set NODELAY on socket: {:?}", e);
continue;
}
let conn = match connection_guard.try_acquire() {
Some(conn) => conn,
None => {
tracing::warn!("Too many connections. Please try again later.");
connections.add(http::reject_connection(socket).boxed());
continue;
}
};
let tower_service = TowerService {
inner: ServiceData {
remote_addr,
methods: methods.clone(),
allow_hosts: allow_hosts.clone(),
resources: resources.clone(),
max_request_body_size,
max_response_body_size,
max_log_length,
batch_requests_supported,
id_provider: id_provider.clone(),
ping_interval: self.cfg.ping_interval,
stop_handle: stop_handle.clone(),
max_subscriptions_per_connection,
conn_id: id,
logger: logger.clone(),
conn: Arc::new(conn),
},
};
let service = self.service_builder.service(tower_service);
let max_conns = self.cfg.max_connections as usize;
let curr_conns = max_conns - connection_guard.available_connections();
connections.add(Box::pin(try_accept_connection(
socket,
service,
stop_handle.clone(),
curr_conns,
max_conns,
let data = ProcessConnection {
remote_addr,
id,
)));
methods: methods.clone(),
allow_hosts: allow_hosts.clone(),
resources: resources.clone(),
max_request_body_size,
max_response_body_size,
max_log_length,
batch_requests_supported,
id_provider: id_provider.clone(),
ping_interval: self.cfg.ping_interval,
stop_handle: stop_handle.clone(),
max_subscriptions_per_connection,
conn_id: id,
logger: logger.clone(),
max_connections: self.cfg.max_connections,
};
process_connection(&self.service_builder, &connection_guard, data, socket, &mut connections);
id = id.wrapping_add(1);
}
Err(MonitoredError::Selector(err)) => {
......@@ -728,17 +697,107 @@ where
}
}
// Attempts to accept a new connection
#[instrument(name = "connection", skip(socket, service, stop_handle, curr_conns, max_conns), level = "INFO")]
async fn try_accept_connection<S, Bd>(
socket: TcpStream,
service: S,
mut stop_handle: StopHandle,
curr_conns: usize,
max_conns: usize,
struct ProcessConnection<L> {
/// Remote server address.
remote_addr: SocketAddr,
/// Registered server methods.
methods: Methods,
/// Access control.
allow_hosts: AllowHosts,
/// Tracker for currently used resources on the server.
resources: Resources,
/// Max request body size.
max_request_body_size: u32,
/// Max response body size.
max_response_body_size: u32,
/// Max length for logging for request and response
///
/// Logs bigger than this limit will be truncated.
max_log_length: u32,
/// Whether batch requests are supported by this server or not.
batch_requests_supported: bool,
/// Subscription ID provider.
id_provider: Arc<dyn IdProvider>,
/// Ping interval
ping_interval: Duration,
/// Stop handle.
stop_handle: StopHandle,
/// Max subscriptions per connection.
max_subscriptions_per_connection: u32,
/// Max connections,
max_connections: u32,
/// Connection ID
conn_id: u32,
/// Logger.
logger: L,
}
#[instrument(name = "connection", skip_all, fields(remote_addr = %cfg.remote_addr, conn_id = %cfg.conn_id), level = "INFO")]
fn process_connection<'a, L: Logger, B, U>(
service_builder: &tower::ServiceBuilder<B>,
connection_guard: &ConnectionGuard,
cfg: ProcessConnection<L>,
socket: TcpStream,
connections: &mut FutureDriver<BoxFuture<'a, ()>>,
) where
B: Layer<TowerService<L>> + Send + 'static,
<B as Layer<TowerService<L>>>::Service: Send
+ Service<
hyper::Request<hyper::Body>,
Response = hyper::Response<U>,
Error = Box<(dyn StdError + Send + Sync + 'static)>,
>,
<<B as Layer<TowerService<L>>>::Service as Service<hyper::Request<hyper::Body>>>::Future: Send,
U: HttpBody + Send + 'static,
<U as HttpBody>::Error: Send + Sync + StdError,
<U as HttpBody>::Data: Send,
{
if let Err(e) = socket.set_nodelay(true) {
tracing::warn!("Could not set NODELAY on socket: {:?}", e);
return;
}
let conn = match connection_guard.try_acquire() {
Some(conn) => conn,
None => {
tracing::warn!("Too many connections. Please try again later.");
connections.add(http::reject_connection(socket).in_current_span().boxed());
return;
}
};
let max_conns = cfg.max_connections as usize;
let curr_conns = max_conns - connection_guard.available_connections();
tracing::info!("Accepting new connection {}/{}", curr_conns, max_conns);
let tower_service = TowerService {
inner: ServiceData {
remote_addr: cfg.remote_addr,
methods: cfg.methods,
allow_hosts: cfg.allow_hosts,
resources: cfg.resources,
max_request_body_size: cfg.max_request_body_size,
max_response_body_size: cfg.max_response_body_size,
max_log_length: cfg.max_log_length,
batch_requests_supported: cfg.batch_requests_supported,
id_provider: cfg.id_provider,
ping_interval: cfg.ping_interval,
stop_handle: cfg.stop_handle.clone(),
max_subscriptions_per_connection: cfg.max_subscriptions_per_connection,
conn_id: cfg.conn_id,
logger: cfg.logger,
conn: Arc::new(conn),
},
};
let service = service_builder.service(tower_service);
connections.add(Box::pin(try_accept_connection(socket, service, cfg.stop_handle).in_current_span()));
}
// Attempts to create a HTTP connection from a socket.
async fn try_accept_connection<S, Bd>(socket: TcpStream, service: S, mut stop_handle: StopHandle)
where
S: Service<hyper::Request<hyper::Body>, Response = hyper::Response<Bd>> + Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
S::Future: Send,
......@@ -752,9 +811,8 @@ async fn try_accept_connection<S, Bd>(
tokio::select! {
res = &mut conn => {
match res {
Ok(_) => tracing::info!("Accepting new connection {}/{}", curr_conns, max_conns),
Err(e) => tracing::warn!("Connection failed: {:?}", e),
if let Err(e) = res {
tracing::warn!("HTTP serve connection failed {:?}", e);
}
}
_ = stop_handle.shutdown() => {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment