diff --git a/prdoc/pr_7724.prdoc b/prdoc/pr_7724.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..30c1da3fc1553f1ca438ce753b0996effd2817f9
--- /dev/null
+++ b/prdoc/pr_7724.prdoc
@@ -0,0 +1,13 @@
+title: Terminate libp2p the outbound notification substream on io errors
+
+doc:
+  - audience: [Node Dev, Node Operator]
+    description: |
+      This PR handles a case where we called the poll_next on an outbound substream notification to check if the stream is closed.
+      It is entirely possible that the poll_next would return an io::error, for example end of file.
+      This PR ensures that we make the distinction between unexpected incoming data, and error originated from poll_next.
+      While at it, the bulk of the PR change propagates the PeerID from the network behavior, through the notification handler, to the notification outbound stream for logging purposes.
+
+crates:
+  - name: sc-network
+    bump: patch
diff --git a/substrate/client/network/src/protocol/notifications/behaviour.rs b/substrate/client/network/src/protocol/notifications/behaviour.rs
index 217ef304bd0fc6c96fffc33fdb08b8ae38a1c47d..f524d3f0d3d1d02b6618f5f974c06f1b8c44508c 100644
--- a/substrate/client/network/src/protocol/notifications/behaviour.rs
+++ b/substrate/client/network/src/protocol/notifications/behaviour.rs
@@ -703,7 +703,7 @@ impl Notifications {
 					self.events.push_back(ToSwarm::NotifyHandler {
 						peer_id,
 						handler: NotifyHandler::One(*connec_id),
-						event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
+						event: NotifsHandlerIn::Open { protocol_index: set_id.into(), peer_id },
 					});
 					*connec_state = ConnectionState::Opening;
 					*occ_entry.into_mut() = PeerState::Enabled { connections };
@@ -1062,7 +1062,10 @@ impl Notifications {
 					self.events.push_back(ToSwarm::NotifyHandler {
 						peer_id: incoming.peer_id,
 						handler: NotifyHandler::One(*connec_id),
-						event: NotifsHandlerIn::Open { protocol_index: incoming.set_id.into() },
+						event: NotifsHandlerIn::Open {
+							protocol_index: incoming.set_id.into(),
+							peer_id: incoming.peer_id,
+						},
 					});
 					*connec_state = ConnectionState::Opening;
 				}
@@ -1260,7 +1263,10 @@ impl NetworkBehaviour for Notifications {
 							self.events.push_back(ToSwarm::NotifyHandler {
 								peer_id,
 								handler: NotifyHandler::One(connection_id),
-								event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
+								event: NotifsHandlerIn::Open {
+									protocol_index: set_id.into(),
+									peer_id,
+								},
 							});
 
 							let mut connections = SmallVec::new();
@@ -1762,7 +1768,10 @@ impl NetworkBehaviour for Notifications {
 								self.events.push_back(ToSwarm::NotifyHandler {
 									peer_id,
 									handler: NotifyHandler::One(connection_id),
-									event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
+									event: NotifsHandlerIn::Open {
+										protocol_index: set_id.into(),
+										peer_id,
+									},
 								});
 								*connec_state = ConnectionState::Opening;
 							} else {
@@ -1849,7 +1858,10 @@ impl NetworkBehaviour for Notifications {
 								self.events.push_back(ToSwarm::NotifyHandler {
 									peer_id,
 									handler: NotifyHandler::One(connection_id),
-									event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
+									event: NotifsHandlerIn::Open {
+										protocol_index: set_id.into(),
+										peer_id,
+									},
 								});
 								*connec_state = ConnectionState::Opening;
 
@@ -2336,7 +2348,7 @@ impl NetworkBehaviour for Notifications {
 						self.events.push_back(ToSwarm::NotifyHandler {
 							peer_id,
 							handler: NotifyHandler::One(*connec_id),
-							event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
+							event: NotifsHandlerIn::Open { protocol_index: set_id.into(), peer_id },
 						});
 						*connec_state = ConnectionState::Opening;
 						*peer_state = PeerState::Enabled { connections: mem::take(connections) };
diff --git a/substrate/client/network/src/protocol/notifications/handler.rs b/substrate/client/network/src/protocol/notifications/handler.rs
index 416a35ad88c9ac1cf24675c37904998bb8d943f3..c5f0de9991c7121fb4d69ac92864166aede29d0c 100644
--- a/substrate/client/network/src/protocol/notifications/handler.rs
+++ b/substrate/client/network/src/protocol/notifications/handler.rs
@@ -264,6 +264,9 @@ pub enum NotifsHandlerIn {
 	Open {
 		/// Index of the protocol in the list of protocols passed at initialization.
 		protocol_index: usize,
+
+		/// The peer id of the remote.
+		peer_id: PeerId,
 	},
 
 	/// Instruct the handler to close the notification substreams, or reject any pending incoming
@@ -632,7 +635,7 @@ impl ConnectionHandler for NotifsHandler {
 
 	fn on_behaviour_event(&mut self, message: NotifsHandlerIn) {
 		match message {
-			NotifsHandlerIn::Open { protocol_index } => {
+			NotifsHandlerIn::Open { protocol_index, peer_id } => {
 				let protocol_info = &mut self.protocols[protocol_index];
 				match &mut protocol_info.state {
 					State::Closed { pending_opening } => {
@@ -642,6 +645,7 @@ impl ConnectionHandler for NotifsHandler {
 								protocol_info.config.fallback_names.clone(),
 								protocol_info.config.handshake.read().clone(),
 								protocol_info.config.max_notification_size,
+								peer_id,
 							);
 
 							self.events_queue.push_back(
@@ -663,6 +667,7 @@ impl ConnectionHandler for NotifsHandler {
 								protocol_info.config.fallback_names.clone(),
 								handshake_message.clone(),
 								protocol_info.config.max_notification_size,
+								peer_id,
 							);
 
 							self.events_queue.push_back(
@@ -1202,7 +1207,10 @@ pub mod tests {
 		.await;
 
 		// move the handler state to 'Opening'
-		handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
+		handler.on_behaviour_event(NotifsHandlerIn::Open {
+			protocol_index: 0,
+			peer_id: PeerId::random(),
+		});
 		assert!(std::matches!(
 			handler.protocols[0].state,
 			State::Opening { in_substream: Some(_), .. }
@@ -1273,7 +1281,10 @@ pub mod tests {
 		.await;
 
 		// move the handler state to 'Opening'
-		handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
+		handler.on_behaviour_event(NotifsHandlerIn::Open {
+			protocol_index: 0,
+			peer_id: PeerId::random(),
+		});
 		assert!(std::matches!(
 			handler.protocols[0].state,
 			State::Opening { in_substream: Some(_), .. }
@@ -1362,7 +1373,10 @@ pub mod tests {
 
 		// first instruct the handler to open a connection and then close it right after
 		// so the handler is in state `Closed { pending_opening: true }`
-		handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
+		handler.on_behaviour_event(NotifsHandlerIn::Open {
+			protocol_index: 0,
+			peer_id: PeerId::random(),
+		});
 		assert!(std::matches!(
 			handler.protocols[0].state,
 			State::Opening { in_substream: Some(_), .. }
@@ -1421,7 +1435,10 @@ pub mod tests {
 
 		// first instruct the handler to open a connection and then close it right after
 		// so the handler is in state `Closed { pending_opening: true }`
-		handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
+		handler.on_behaviour_event(NotifsHandlerIn::Open {
+			protocol_index: 0,
+			peer_id: PeerId::random(),
+		});
 		assert!(std::matches!(
 			handler.protocols[0].state,
 			State::Opening { in_substream: Some(_), .. }
@@ -1502,7 +1519,10 @@ pub mod tests {
 
 		// first instruct the handler to open a connection and then close it right after
 		// so the handler is in state `Closed { pending_opening: true }`
-		handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
+		handler.on_behaviour_event(NotifsHandlerIn::Open {
+			protocol_index: 0,
+			peer_id: PeerId::random(),
+		});
 		assert!(std::matches!(
 			handler.protocols[0].state,
 			State::Opening { in_substream: Some(_), .. }
@@ -1550,7 +1570,10 @@ pub mod tests {
 
 		// first instruct the handler to open a connection and then close it right after
 		// so the handler is in state `Closed { pending_opening: true }`
-		handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
+		handler.on_behaviour_event(NotifsHandlerIn::Open {
+			protocol_index: 0,
+			peer_id: PeerId::random(),
+		});
 		assert!(std::matches!(
 			handler.protocols[0].state,
 			State::Opening { in_substream: Some(_), .. }
diff --git a/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs b/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs
index b4d0de171a183935d38c928dc5b935b95d42e2e6..dba6a7b598acf74c5803f959216c16bdc6174a84 100644
--- a/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs
+++ b/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs
@@ -39,8 +39,11 @@ use crate::types::ProtocolName;
 use asynchronous_codec::Framed;
 use bytes::BytesMut;
 use futures::prelude::*;
-use libp2p::core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
-use log::{error, warn};
+use libp2p::{
+	core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
+	PeerId,
+};
+use log::{debug, error, warn};
 use unsigned_varint::codec::UviBytes;
 
 use std::{
@@ -78,6 +81,8 @@ pub struct NotificationsOut {
 	initial_message: Vec<u8>,
 	/// Maximum allowed size for a single notification.
 	max_notification_size: u64,
+	/// The peerID of the remote.
+	peer_id: PeerId,
 }
 
 /// A substream for incoming notification messages.
@@ -114,12 +119,15 @@ pub struct NotificationsOutSubstream<TSubstream> {
 	/// Substream where to send messages.
 	#[pin]
 	socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,
+
+	/// The remote peer.
+	peer_id: PeerId,
 }
 
 #[cfg(test)]
 impl<TSubstream> NotificationsOutSubstream<TSubstream> {
 	pub fn new(socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>) -> Self {
-		Self { socket }
+		Self { socket, peer_id: PeerId::random() }
 	}
 }
 
@@ -349,6 +357,7 @@ impl NotificationsOut {
 		fallback_names: Vec<ProtocolName>,
 		initial_message: impl Into<Vec<u8>>,
 		max_notification_size: u64,
+		peer_id: PeerId,
 	) -> Self {
 		let initial_message = initial_message.into();
 		if initial_message.len() > MAX_HANDSHAKE_SIZE {
@@ -358,7 +367,7 @@ impl NotificationsOut {
 		let mut protocol_names = fallback_names;
 		protocol_names.insert(0, main_protocol_name.into());
 
-		Self { protocol_names, initial_message, max_notification_size }
+		Self { protocol_names, initial_message, max_notification_size, peer_id }
 	}
 }
 
@@ -414,7 +423,10 @@ where
 				} else {
 					Some(negotiated_name)
 				},
-				substream: NotificationsOutSubstream { socket: Framed::new(socket, codec) },
+				substream: NotificationsOutSubstream {
+					socket: Framed::new(socket, codec),
+					peer_id: self.peer_id,
+				},
 			})
 		})
 	}
@@ -465,11 +477,25 @@ where
 		// even if we don't write anything into it.
 		match Stream::poll_next(this.socket.as_mut(), cx) {
 			Poll::Pending => {},
-			Poll::Ready(Some(_)) => {
-				error!(
-					target: LOG_TARGET,
-					"Unexpected incoming data in `NotificationsOutSubstream`",
-				);
+			Poll::Ready(Some(result)) => match result {
+				Ok(_) => {
+					error!(
+						target: "sub-libp2p",
+						"Unexpected incoming data in `NotificationsOutSubstream` peer={:?}",
+						this.peer_id
+					);
+				},
+				Err(error) => {
+					debug!(
+						target: "sub-libp2p",
+						"Error while reading from `NotificationsOutSubstream` peer={:?} error={error:?}",
+						this.peer_id
+					);
+
+					// The expectation is that the remote has closed the substream.
+					// This is similar to the `Poll::Ready(None)` branch below.
+					return Poll::Ready(Err(NotificationsOutError::Terminated));
+				},
 			},
 			Poll::Ready(None) => return Poll::Ready(Err(NotificationsOutError::Terminated)),
 		}
@@ -537,7 +563,10 @@ mod tests {
 		NotificationsOutSubstream,
 	};
 	use futures::{channel::oneshot, future, prelude::*, SinkExt, StreamExt};
-	use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
+	use libp2p::{
+		core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo},
+		PeerId,
+	};
 	use std::{pin::Pin, task::Poll};
 	use tokio::net::{TcpListener, TcpStream};
 	use tokio_util::compat::TokioAsyncReadCompatExt;
@@ -557,7 +586,13 @@ mod tests {
 		NotificationsHandshakeError,
 	> {
 		let socket = TcpStream::connect(addr).await.unwrap();
-		let notifs_out = NotificationsOut::new("/test/proto/1", Vec::new(), handshake, 1024 * 1024);
+		let notifs_out = NotificationsOut::new(
+			"/test/proto/1",
+			Vec::new(),
+			handshake,
+			1024 * 1024,
+			PeerId::random(),
+		);
 		let (_, substream) = multistream_select::dialer_select_proto(
 			socket.compat(),
 			notifs_out.protocol_info(),
@@ -721,7 +756,13 @@ mod tests {
 		let client = tokio::spawn(async move {
 			let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
 			let NotificationsOutOpen { handshake, .. } = OutboundUpgrade::upgrade_outbound(
-				NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
+				NotificationsOut::new(
+					PROTO_NAME,
+					Vec::new(),
+					&b"initial message"[..],
+					1024 * 1024,
+					PeerId::random(),
+				),
 				socket.compat(),
 				ProtocolName::Static(PROTO_NAME),
 			)
@@ -766,6 +807,7 @@ mod tests {
 						Vec::new(),
 						&b"initial message"[..],
 						1024 * 1024,
+						PeerId::random(),
 					),
 					socket.compat(),
 					ProtocolName::Static(PROTO_NAME),