diff --git a/substrate/client/network/src/service/out_events.rs b/substrate/client/network/src/service/out_events.rs
index 398c26793fd4157111169926b9c6f24c8727f8cf..ededccd5e32335d4e0ae53fef2f29b7f78db963a 100644
--- a/substrate/client/network/src/service/out_events.rs
+++ b/substrate/client/network/src/service/out_events.rs
@@ -34,37 +34,48 @@
 use crate::event::Event;
 
 use futures::{prelude::*, ready, stream::FusedStream};
-use log::error;
-use parking_lot::Mutex;
+use log::{debug, error};
 use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
 use std::{
 	backtrace::Backtrace,
 	cell::RefCell,
 	fmt,
 	pin::Pin,
-	sync::Arc,
 	task::{Context, Poll},
 };
 
+/// Log target for this file.
+pub const LOG_TARGET: &str = "sub-libp2p::out_events";
+
 /// Creates a new channel that can be associated to a [`OutChannels`].
 ///
 /// The name is used in Prometheus reports, the queue size threshold is used
 /// to warn if there are too many unprocessed events in the channel.
 pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) {
 	let (tx, rx) = async_channel::unbounded();
-	let metrics = Arc::new(Mutex::new(None));
 	let tx = Sender {
 		inner: tx,
 		name,
 		queue_size_warning,
-		warning_fired: false,
+		warning_fired: SenderWarningState::NotFired,
 		creation_backtrace: Backtrace::force_capture(),
-		metrics: metrics.clone(),
+		metrics: None,
 	};
-	let rx = Receiver { inner: rx, name, metrics };
+	let rx = Receiver { inner: rx, name, metrics: None };
 	(tx, rx)
 }
 
+/// A state of a sender warning that is used to avoid spamming the logs.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum SenderWarningState {
+	/// The warning has not been fired yet.
+	NotFired,
+	/// The warning has been fired, and the channel is full
+	FiredFull,
+	/// The warning has been fired and the channel is not full anymore.
+	FiredFree,
+}
+
 /// Sending side of a channel.
 ///
 /// Must be associated with an [`OutChannels`] before anything can be sent on it
@@ -78,13 +89,14 @@ pub struct Sender {
 	name: &'static str,
 	/// Threshold queue size to generate an error message in the logs.
 	queue_size_warning: usize,
-	/// We generate the error message only once to not spam the logs.
-	warning_fired: bool,
+	/// We generate the error message only once to not spam the logs after the first error.
+	/// Subsequently we indicate channel fullness on debug level.
+	warning_fired: SenderWarningState,
 	/// Backtrace of a place where the channel was created.
 	creation_backtrace: Backtrace,
 	/// Clone of [`Receiver::metrics`]. Will be initialized when [`Sender`] is added to
 	/// [`OutChannels`] with `OutChannels::push()`.
-	metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
+	metrics: Option<Metrics>,
 }
 
 impl fmt::Debug for Sender {
@@ -95,8 +107,7 @@ impl fmt::Debug for Sender {
 
 impl Drop for Sender {
 	fn drop(&mut self) {
-		let metrics = self.metrics.lock();
-		if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) {
+		if let Some(metrics) = self.metrics.as_ref() {
 			metrics.num_channels.with_label_values(&[self.name]).dec();
 		}
 	}
@@ -108,7 +119,7 @@ pub struct Receiver {
 	name: &'static str,
 	/// Initially contains `None`, and will be set to a value once the corresponding [`Sender`]
 	/// is assigned to an instance of [`OutChannels`].
-	metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
+	metrics: Option<Metrics>,
 }
 
 impl Stream for Receiver {
@@ -116,13 +127,8 @@ impl Stream for Receiver {
 
 	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Event>> {
 		if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) {
-			let metrics = self.metrics.lock().clone();
-			match metrics.as_ref().map(|m| m.as_ref()) {
-				Some(Some(metrics)) => metrics.event_out(&ev, self.name),
-				Some(None) => (), // no registry
-				None => log::warn!(
-					"Inconsistency in out_events: event happened before sender associated"
-				),
+			if let Some(metrics) = &self.metrics {
+				metrics.event_out(&ev, self.name);
 			}
 			Poll::Ready(Some(ev))
 		} else {
@@ -151,7 +157,7 @@ pub struct OutChannels {
 	event_streams: Vec<Sender>,
 	/// The metrics we collect. A clone of this is sent to each [`Receiver`] associated with this
 	/// object.
-	metrics: Arc<Option<Metrics>>,
+	metrics: Option<Metrics>,
 }
 
 impl OutChannels {
@@ -160,17 +166,15 @@ impl OutChannels {
 		let metrics =
 			if let Some(registry) = registry { Some(Metrics::register(registry)?) } else { None };
 
-		Ok(Self { event_streams: Vec::new(), metrics: Arc::new(metrics) })
+		Ok(Self { event_streams: Vec::new(), metrics })
 	}
 
 	/// Adds a new [`Sender`] to the collection.
-	pub fn push(&mut self, sender: Sender) {
-		let mut metrics = sender.metrics.lock();
-		debug_assert!(metrics.is_none());
-		*metrics = Some(self.metrics.clone());
-		drop(metrics);
+	pub fn push(&mut self, mut sender: Sender) {
+		debug_assert!(sender.metrics.is_none());
+		sender.metrics = self.metrics.clone();
 
-		if let Some(metrics) = &*self.metrics {
+		if let Some(metrics) = &self.metrics {
 			metrics.num_channels.with_label_values(&[sender.name]).inc();
 		}
 
@@ -180,22 +184,42 @@ impl OutChannels {
 	/// Sends an event.
 	pub fn send(&mut self, event: Event) {
 		self.event_streams.retain_mut(|sender| {
-			if sender.inner.len() >= sender.queue_size_warning && !sender.warning_fired {
-				sender.warning_fired = true;
-				error!(
-					"The number of unprocessed events in channel `{}` exceeded {}.\n\
-					 The channel was created at:\n{:}\n
-					 The last event was sent from:\n{:}",
-					sender.name,
-					sender.queue_size_warning,
-					sender.creation_backtrace,
-					Backtrace::force_capture(),
+			let current_pending = sender.inner.len();
+			if current_pending >= sender.queue_size_warning {
+				if sender.warning_fired == SenderWarningState::NotFired {
+					error!(
+						"The number of unprocessed events in channel `{}` exceeded {}.\n\
+						 The channel was created at:\n{:}\n
+						 The last event was sent from:\n{:}",
+						sender.name,
+						sender.queue_size_warning,
+						sender.creation_backtrace,
+						Backtrace::force_capture(),
+					);
+				} else if sender.warning_fired == SenderWarningState::FiredFree {
+					// We don't want to spam the logs, so we only log on debug level
+					debug!(
+						target: LOG_TARGET,
+						"Channel `{}` is overflowed again. Number of events: {}",
+						sender.name, current_pending
+					);
+				}
+				sender.warning_fired = SenderWarningState::FiredFull;
+			} else if sender.warning_fired == SenderWarningState::FiredFull &&
+				current_pending < sender.queue_size_warning.wrapping_div(2)
+			{
+				sender.warning_fired = SenderWarningState::FiredFree;
+				debug!(
+					target: LOG_TARGET,
+					"Channel `{}` is no longer overflowed. Number of events: {}",
+					sender.name, current_pending
 				);
 			}
+
 			sender.inner.try_send(event.clone()).is_ok()
 		});
 
-		if let Some(metrics) = &*self.metrics {
+		if let Some(metrics) = &self.metrics {
 			for ev in &self.event_streams {
 				metrics.event_in(&event, ev.name);
 			}
@@ -211,6 +235,7 @@ impl fmt::Debug for OutChannels {
 	}
 }
 
+#[derive(Clone)]
 struct Metrics {
 	// This list is ordered alphabetically
 	events_total: CounterVec<U64>,