Skip to content
Snippets Groups Projects
Commit c9f3d16f authored by Pierre Krieger's avatar Pierre Krieger Committed by GitHub
Browse files

Improve warning about notifications queue and remove spurious triggers (#5512)

* Better logging for notifications and buffer size increase

* Address review

* Improve warning about notifications queue and remove spurious triggers
parent 0426e171
No related merge requests found
......@@ -79,13 +79,14 @@ impl IntoProtocolsHandler for NotifsOutHandlerProto {
DeniedUpgrade
}
fn into_handler(self, _: &PeerId, _: &ConnectedPoint) -> Self::Handler {
fn into_handler(self, peer_id: &PeerId, _: &ConnectedPoint) -> Self::Handler {
NotifsOutHandler {
protocol_name: self.protocol_name,
when_connection_open: Instant::now(),
queue_size_report: self.queue_size_report,
state: State::Disabled,
events_queue: SmallVec::new(),
peer_id: peer_id.clone(),
}
}
}
......@@ -116,6 +117,9 @@ pub struct NotifsOutHandler {
/// This queue must only ever be modified to insert elements at the back, or remove the first
/// element.
events_queue: SmallVec<[ProtocolsHandlerEvent<NotificationsOut, (), NotifsOutHandlerOut, void::Void>; 16]>,
/// Who we are connected to.
peer_id: PeerId,
}
/// Our relationship with the node we're connected to.
......@@ -308,16 +312,17 @@ impl ProtocolsHandler for NotifsOutHandler {
NotifsOutHandlerIn::Send(msg) =>
if let State::Open { substream, .. } = &mut self.state {
if let Some(Ok(_)) = substream.send(msg).now_or_never() {
if let Some(metric) = &self.queue_size_report {
metric.observe(substream.queue_len() as f64);
}
} else {
if substream.push_message(msg).is_err() {
log::warn!(
target: "sub-libp2p",
"📞 Failed to push message to queue, dropped it"
"📞 Notifications queue with peer {} is full, dropped message (protocol: {:?})",
self.peer_id,
self.protocol_name,
);
}
if let Some(metric) = &self.queue_size_report {
metric.observe(substream.queue_len() as f64);
}
} else {
// This is an API misuse.
log::warn!(
......
......@@ -43,8 +43,7 @@ use unsigned_varint::codec::UviBytes;
/// Maximum allowed size of the two handshake messages, in bytes.
const MAX_HANDSHAKE_SIZE: usize = 1024;
/// Maximum number of buffered messages before we consider the remote unresponsive and kill the
/// substream.
/// Maximum number of buffered messages before we refuse to accept more.
const MAX_PENDING_MESSAGES: usize = 256;
/// Upgrade that accepts a substream, sends back a status message, then becomes a unidirectional
......@@ -285,6 +284,18 @@ impl<TSubstream> NotificationsOutSubstream<TSubstream> {
pub fn queue_len(&self) -> u32 {
u32::try_from(self.messages_queue.len()).unwrap_or(u32::max_value())
}
/// Push a message to the queue of messages.
///
/// This has the same effect as the `Sink::start_send` implementation.
pub fn push_message(&mut self, item: Vec<u8>) -> Result<(), NotificationsOutError> {
if self.messages_queue.len() >= MAX_PENDING_MESSAGES {
return Err(NotificationsOutError::Clogged);
}
self.messages_queue.push_back(item);
Ok(())
}
}
impl<TSubstream> Sink<Vec<u8>> for NotificationsOutSubstream<TSubstream>
......@@ -297,12 +308,7 @@ impl<TSubstream> Sink<Vec<u8>> for NotificationsOutSubstream<TSubstream>
}
fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
if self.messages_queue.len() >= MAX_PENDING_MESSAGES {
return Err(NotificationsOutError::Clogged);
}
self.messages_queue.push_back(item);
Ok(())
self.push_message(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment