From fb922e2794ee208457d5522409427b1796e0ecc6 Mon Sep 17 00:00:00 2001 From: Pierre Krieger <pierre.krieger1708@gmail.com> Date: Tue, 23 Mar 2021 11:02:07 +0100 Subject: [PATCH] Refactor NotifsHandler::poll (#8422) * Refactor a bit NotifsHandler::poll * Avoid some spurious wake-ups --- .../src/protocol/notifications/handler.rs | 143 ++++++++++-------- 1 file changed, 79 insertions(+), 64 deletions(-) diff --git a/substrate/client/network/src/protocol/notifications/handler.rs b/substrate/client/network/src/protocol/notifications/handler.rs index ec3760d5257..99677cc45e5 100644 --- a/substrate/client/network/src/protocol/notifications/handler.rs +++ b/substrate/client/network/src/protocol/notifications/handler.rs @@ -188,10 +188,10 @@ enum State { /// We use two different channels in order to have two different channel sizes, but from /// the receiving point of view, the two channels are the same. /// The receivers are fused in case the user drops the [`NotificationsSink`] entirely. - notifications_sink_rx: stream::Select< + notifications_sink_rx: stream::Peekable<stream::Select< stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>, stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>> - >, + >>, /// Outbound substream that has been accepted by the remote. /// @@ -552,7 +552,7 @@ impl ProtocolsHandler for NotifsHandler { }; self.protocols[protocol_index].state = State::Open { - notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()), + notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(), out_substream: Some(substream), in_substream: in_substream.take(), }; @@ -716,8 +716,80 @@ impl ProtocolsHandler for NotifsHandler { return Poll::Ready(ev); } + // For each open substream, try send messages from `notifications_sink_rx` to the + // substream. + for protocol_index in 0..self.protocols.len() { + if let State::Open { notifications_sink_rx, out_substream: Some(out_substream), .. } + = &mut self.protocols[protocol_index].state + { + loop { + // Only proceed with `out_substream.poll_ready_unpin` if there is an element + // available in `notifications_sink_rx`. This avoids waking up the task when + // a substream is ready to send if there isn't actually something to send. + match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) { + Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) => { + return Poll::Ready( + ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged) + ); + }, + Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {}, + Poll::Ready(None) | Poll::Pending => break, + } + + // Before we extract the element from `notifications_sink_rx`, check that the + // substream is ready to accept a message. + match out_substream.poll_ready_unpin(cx) { + Poll::Ready(_) => {}, + Poll::Pending => break + } + + // Now that the substream is ready for a message, grab what to send. + let message = match notifications_sink_rx.poll_next_unpin(cx) { + Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) => message, + Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) + | Poll::Ready(None) + | Poll::Pending => { + // Should never be reached, as per `poll_peek` above. + debug_assert!(false); + break; + } + }; + + let _ = out_substream.start_send_unpin(message); + // Note that flushing is performed later down this function. + } + } + } + + // Flush all outbound substreams. + // When `poll` returns `Poll::Ready`, the libp2p `Swarm` may decide to no longer call + // `poll` again before it is ready to accept more events. + // In order to make sure that substreams are flushed as soon as possible, the flush is + // performed before the code paths that can produce `Ready` (with some rare exceptions). + // Importantly, however, the flush is performed *after* notifications are queued with + // `Sink::start_send`. + for protocol_index in 0..self.protocols.len() { + match &mut self.protocols[protocol_index].state { + State::Open { out_substream: out_substream @ Some(_), .. } => { + match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) { + Poll::Pending | Poll::Ready(Ok(())) => {}, + Poll::Ready(Err(_)) => { + *out_substream = None; + let event = NotifsHandlerOut::CloseDesired { protocol_index }; + return Poll::Ready(ProtocolsHandlerEvent::Custom(event)); + } + }; + } + + State::Closed { .. } | + State::Opening { .. } | + State::Open { out_substream: None, .. } | + State::OpenDesiredByRemote { .. } => {} + } + } + + // Poll inbound substreams. for protocol_index in 0..self.protocols.len() { - // Poll inbound substreams. // Inbound substreams being closed is always tolerated, except for the // `OpenDesiredByRemote` state which might need to be switched back to `Closed`. match &mut self.protocols[protocol_index].state { @@ -763,68 +835,11 @@ impl ProtocolsHandler for NotifsHandler { } } } - - // Poll outbound substream. - match &mut self.protocols[protocol_index].state { - State::Open { out_substream: out_substream @ Some(_), .. } => { - match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) { - Poll::Pending | Poll::Ready(Ok(())) => {}, - Poll::Ready(Err(_)) => { - *out_substream = None; - let event = NotifsHandlerOut::CloseDesired { protocol_index }; - return Poll::Ready(ProtocolsHandlerEvent::Custom(event)); - } - }; - } - - State::Closed { .. } | - State::Opening { .. } | - State::Open { out_substream: None, .. } | - State::OpenDesiredByRemote { .. } => {} - } - - if let State::Open { notifications_sink_rx, out_substream: Some(out_substream), .. } - = &mut self.protocols[protocol_index].state - { - loop { - // Before we poll the notifications sink receiver, check that the substream - // is ready to accept a message. - match out_substream.poll_ready_unpin(cx) { - Poll::Ready(_) => {}, - Poll::Pending => break - } - - // Now that all substreams are ready for a message, grab what to send. - let message = match notifications_sink_rx.poll_next_unpin(cx) { - Poll::Ready(Some(msg)) => msg, - Poll::Ready(None) | Poll::Pending => break, - }; - - match message { - NotificationsSinkMessage::Notification { message } => { - let _ = out_substream.start_send_unpin(message); - - // Calling `start_send_unpin` only queues the message. Actually - // emitting the message is done with `poll_flush`. In order to - // not introduce too much complexity, this flushing is done earlier - // in the body of this `poll()` method. As such, we schedule a task - // wake-up now in order to guarantee that `poll()` will be called - // again and the flush happening. - // At the time of the writing of this comment, a rewrite of this - // code is being planned. If you find this comment in the wild and - // the rewrite didn't happen, please consider a refactor. - cx.waker().wake_by_ref(); - } - NotificationsSinkMessage::ForceClose => { - return Poll::Ready( - ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged) - ); - } - } - } - } } + // This is the only place in this method that can return `Pending`. + // By putting it at the very bottom, we are guaranteed that everything has been properly + // polled. Poll::Pending } } -- GitLab