Skip to content
Snippets Groups Projects
Commit fb922e27 authored by Pierre Krieger's avatar Pierre Krieger Committed by GitHub
Browse files

Refactor NotifsHandler::poll (#8422)

* Refactor a bit NotifsHandler::poll

* Avoid some spurious wake-ups
parent 3edfdead
No related merge requests found
......@@ -188,10 +188,10 @@ enum State {
/// We use two different channels in order to have two different channel sizes, but from
/// the receiving point of view, the two channels are the same.
/// The receivers are fused in case the user drops the [`NotificationsSink`] entirely.
notifications_sink_rx: stream::Select<
notifications_sink_rx: stream::Peekable<stream::Select<
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>
>,
>>,
/// Outbound substream that has been accepted by the remote.
///
......@@ -552,7 +552,7 @@ impl ProtocolsHandler for NotifsHandler {
};
self.protocols[protocol_index].state = State::Open {
notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()),
notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
out_substream: Some(substream),
in_substream: in_substream.take(),
};
......@@ -716,8 +716,80 @@ impl ProtocolsHandler for NotifsHandler {
return Poll::Ready(ev);
}
// For each open substream, try send messages from `notifications_sink_rx` to the
// substream.
for protocol_index in 0..self.protocols.len() {
if let State::Open { notifications_sink_rx, out_substream: Some(out_substream), .. }
= &mut self.protocols[protocol_index].state
{
loop {
// Only proceed with `out_substream.poll_ready_unpin` if there is an element
// available in `notifications_sink_rx`. This avoids waking up the task when
// a substream is ready to send if there isn't actually something to send.
match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) {
Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) => {
return Poll::Ready(
ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged)
);
},
Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {},
Poll::Ready(None) | Poll::Pending => break,
}
// Before we extract the element from `notifications_sink_rx`, check that the
// substream is ready to accept a message.
match out_substream.poll_ready_unpin(cx) {
Poll::Ready(_) => {},
Poll::Pending => break
}
// Now that the substream is ready for a message, grab what to send.
let message = match notifications_sink_rx.poll_next_unpin(cx) {
Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) => message,
Poll::Ready(Some(NotificationsSinkMessage::ForceClose))
| Poll::Ready(None)
| Poll::Pending => {
// Should never be reached, as per `poll_peek` above.
debug_assert!(false);
break;
}
};
let _ = out_substream.start_send_unpin(message);
// Note that flushing is performed later down this function.
}
}
}
// Flush all outbound substreams.
// When `poll` returns `Poll::Ready`, the libp2p `Swarm` may decide to no longer call
// `poll` again before it is ready to accept more events.
// In order to make sure that substreams are flushed as soon as possible, the flush is
// performed before the code paths that can produce `Ready` (with some rare exceptions).
// Importantly, however, the flush is performed *after* notifications are queued with
// `Sink::start_send`.
for protocol_index in 0..self.protocols.len() {
match &mut self.protocols[protocol_index].state {
State::Open { out_substream: out_substream @ Some(_), .. } => {
match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
Poll::Pending | Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_)) => {
*out_substream = None;
let event = NotifsHandlerOut::CloseDesired { protocol_index };
return Poll::Ready(ProtocolsHandlerEvent::Custom(event));
}
};
}
State::Closed { .. } |
State::Opening { .. } |
State::Open { out_substream: None, .. } |
State::OpenDesiredByRemote { .. } => {}
}
}
// Poll inbound substreams.
for protocol_index in 0..self.protocols.len() {
// Poll inbound substreams.
// Inbound substreams being closed is always tolerated, except for the
// `OpenDesiredByRemote` state which might need to be switched back to `Closed`.
match &mut self.protocols[protocol_index].state {
......@@ -763,68 +835,11 @@ impl ProtocolsHandler for NotifsHandler {
}
}
}
// Poll outbound substream.
match &mut self.protocols[protocol_index].state {
State::Open { out_substream: out_substream @ Some(_), .. } => {
match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
Poll::Pending | Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_)) => {
*out_substream = None;
let event = NotifsHandlerOut::CloseDesired { protocol_index };
return Poll::Ready(ProtocolsHandlerEvent::Custom(event));
}
};
}
State::Closed { .. } |
State::Opening { .. } |
State::Open { out_substream: None, .. } |
State::OpenDesiredByRemote { .. } => {}
}
if let State::Open { notifications_sink_rx, out_substream: Some(out_substream), .. }
= &mut self.protocols[protocol_index].state
{
loop {
// Before we poll the notifications sink receiver, check that the substream
// is ready to accept a message.
match out_substream.poll_ready_unpin(cx) {
Poll::Ready(_) => {},
Poll::Pending => break
}
// Now that all substreams are ready for a message, grab what to send.
let message = match notifications_sink_rx.poll_next_unpin(cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) | Poll::Pending => break,
};
match message {
NotificationsSinkMessage::Notification { message } => {
let _ = out_substream.start_send_unpin(message);
// Calling `start_send_unpin` only queues the message. Actually
// emitting the message is done with `poll_flush`. In order to
// not introduce too much complexity, this flushing is done earlier
// in the body of this `poll()` method. As such, we schedule a task
// wake-up now in order to guarantee that `poll()` will be called
// again and the flush happening.
// At the time of the writing of this comment, a rewrite of this
// code is being planned. If you find this comment in the wild and
// the rewrite didn't happen, please consider a refactor.
cx.waker().wake_by_ref();
}
NotificationsSinkMessage::ForceClose => {
return Poll::Ready(
ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged)
);
}
}
}
}
}
// This is the only place in this method that can return `Pending`.
// By putting it at the very bottom, we are guaranteed that everything has been properly
// polled.
Poll::Pending
}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment