diff --git a/substrate/client/network/src/protocol/notifications/handler.rs b/substrate/client/network/src/protocol/notifications/handler.rs
index ec3760d5257664ed38a8afbe5f4057040603c055..99677cc45e54ef9da6d374dcfcfcea37b08078ba 100644
--- a/substrate/client/network/src/protocol/notifications/handler.rs
+++ b/substrate/client/network/src/protocol/notifications/handler.rs
@@ -188,10 +188,10 @@ enum State {
 		/// We use two different channels in order to have two different channel sizes, but from
 		/// the receiving point of view, the two channels are the same.
 		/// The receivers are fused in case the user drops the [`NotificationsSink`] entirely.
-		notifications_sink_rx: stream::Select<
+		notifications_sink_rx: stream::Peekable<stream::Select<
 			stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
 			stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>
-		>,
+		>>,
 
 		/// Outbound substream that has been accepted by the remote.
 		///
@@ -552,7 +552,7 @@ impl ProtocolsHandler for NotifsHandler {
 				};
 
 				self.protocols[protocol_index].state = State::Open {
-					notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()),
+					notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
 					out_substream: Some(substream),
 					in_substream: in_substream.take(),
 				};
@@ -716,8 +716,80 @@ impl ProtocolsHandler for NotifsHandler {
 			return Poll::Ready(ev);
 		}
 
+		// For each open substream, try send messages from `notifications_sink_rx` to the
+		// substream.
+		for protocol_index in 0..self.protocols.len() {
+			if let State::Open { notifications_sink_rx, out_substream: Some(out_substream), .. }
+				= &mut self.protocols[protocol_index].state
+			{
+				loop {
+					// Only proceed with `out_substream.poll_ready_unpin` if there is an element
+					// available in `notifications_sink_rx`. This avoids waking up the task when
+					// a substream is ready to send if there isn't actually something to send.
+					match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) {
+						Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) => {
+							return Poll::Ready(
+								ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged)
+							);
+						},
+						Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {},
+						Poll::Ready(None) | Poll::Pending => break,
+					}
+
+					// Before we extract the element from `notifications_sink_rx`, check that the
+					// substream is ready to accept a message.
+					match out_substream.poll_ready_unpin(cx) {
+						Poll::Ready(_) => {},
+						Poll::Pending => break
+					}
+
+					// Now that the substream is ready for a message, grab what to send.
+					let message = match notifications_sink_rx.poll_next_unpin(cx) {
+						Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) => message,
+						Poll::Ready(Some(NotificationsSinkMessage::ForceClose))
+						| Poll::Ready(None)
+						| Poll::Pending => {
+							// Should never be reached, as per `poll_peek` above.
+							debug_assert!(false);
+							break;
+						}
+					};
+
+					let _ = out_substream.start_send_unpin(message);
+					// Note that flushing is performed later down this function.
+				}
+			}
+		}
+
+		// Flush all outbound substreams.
+		// When `poll` returns `Poll::Ready`, the libp2p `Swarm` may decide to no longer call
+		// `poll` again before it is ready to accept more events.
+		// In order to make sure that substreams are flushed as soon as possible, the flush is
+		// performed before the code paths that can produce `Ready` (with some rare exceptions).
+		// Importantly, however, the flush is performed *after* notifications are queued with
+		// `Sink::start_send`.
+		for protocol_index in 0..self.protocols.len() {
+			match &mut self.protocols[protocol_index].state {
+				State::Open { out_substream: out_substream @ Some(_), .. } => {
+					match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
+						Poll::Pending | Poll::Ready(Ok(())) => {},
+						Poll::Ready(Err(_)) => {
+							*out_substream = None;
+							let event = NotifsHandlerOut::CloseDesired { protocol_index };
+							return Poll::Ready(ProtocolsHandlerEvent::Custom(event));
+						}
+					};
+				}
+
+				State::Closed { .. } |
+				State::Opening { .. } |
+				State::Open { out_substream: None, .. } |
+				State::OpenDesiredByRemote { .. } => {}
+			}
+		}
+
+		// Poll inbound substreams.
 		for protocol_index in 0..self.protocols.len() {
-			// Poll inbound substreams.
 			// Inbound substreams being closed is always tolerated, except for the
 			// `OpenDesiredByRemote` state which might need to be switched back to `Closed`.
 			match &mut self.protocols[protocol_index].state {
@@ -763,68 +835,11 @@ impl ProtocolsHandler for NotifsHandler {
 					}
 				}
 			}
-
-			// Poll outbound substream.
-			match &mut self.protocols[protocol_index].state {
-				State::Open { out_substream: out_substream @ Some(_), .. } => {
-					match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
-						Poll::Pending | Poll::Ready(Ok(())) => {},
-						Poll::Ready(Err(_)) => {
-							*out_substream = None;
-							let event = NotifsHandlerOut::CloseDesired { protocol_index };
-							return Poll::Ready(ProtocolsHandlerEvent::Custom(event));
-						}
-					};
-				}
-
-				State::Closed { .. } |
-				State::Opening { .. } |
-				State::Open { out_substream: None, .. } |
-				State::OpenDesiredByRemote { .. } => {}
-			}
-
-			if let State::Open { notifications_sink_rx, out_substream: Some(out_substream), .. }
-				= &mut self.protocols[protocol_index].state
-			{
-				loop {
-					// Before we poll the notifications sink receiver, check that the substream
-					// is ready to accept a message.
-					match out_substream.poll_ready_unpin(cx) {
-						Poll::Ready(_) => {},
-						Poll::Pending => break
-					}
-
-					// Now that all substreams are ready for a message, grab what to send.
-					let message = match notifications_sink_rx.poll_next_unpin(cx) {
-						Poll::Ready(Some(msg)) => msg,
-						Poll::Ready(None) | Poll::Pending => break,
-					};
-
-					match message {
-						NotificationsSinkMessage::Notification { message } => {
-							let _ = out_substream.start_send_unpin(message);
-
-							// Calling `start_send_unpin` only queues the message. Actually
-							// emitting the message is done with `poll_flush`. In order to
-							// not introduce too much complexity, this flushing is done earlier
-							// in the body of this `poll()` method. As such, we schedule a task
-							// wake-up now in order to guarantee that `poll()` will be called
-							// again and the flush happening.
-							// At the time of the writing of this comment, a rewrite of this
-							// code is being planned. If you find this comment in the wild and
-							// the rewrite didn't happen, please consider a refactor.
-							cx.waker().wake_by_ref();
-						}
-						NotificationsSinkMessage::ForceClose => {
-							return Poll::Ready(
-								ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged)
-							);
-						}
-					}
-				}
-			}
 		}
 
+		// This is the only place in this method that can return `Pending`.
+		// By putting it at the very bottom, we are guaranteed that everything has been properly
+		// polled.
 		Poll::Pending
 	}
 }