diff --git a/substrate/client/network/src/protocol/notifications/handler.rs b/substrate/client/network/src/protocol/notifications/handler.rs
index 99677cc45e54ef9da6d374dcfcfcea37b08078ba..2b350cd7fcfc835ec1c586f81ae651c22a11fced 100644
--- a/substrate/client/network/src/protocol/notifications/handler.rs
+++ b/substrate/client/network/src/protocol/notifications/handler.rs
@@ -159,6 +159,16 @@ enum State {
 	Closed {
 		/// True if an outgoing substream is still in the process of being opened.
 		pending_opening: bool,
+
+		/// Outbound substream that has been accepted by the remote. Being closed.
+		out_substream_closing: Option<NotificationsOutSubstream<NegotiatedSubstream>>,
+
+		/// Substream opened by the remote. Being closed.
+		in_substream_closing: Option<NotificationsInSubstream<NegotiatedSubstream>>,
+
+		/// Substream re-opened by the remote. Not to be closed after `in_substream_closing` has
+		/// been closed.
+		in_substream_reopened: Option<NotificationsInSubstream<NegotiatedSubstream>>,
 	},
 
 	/// Protocol is in the "Closed" state. A [`NotifsHandlerOut::OpenDesiredByRemote`] has been
@@ -167,6 +177,9 @@ enum State {
 		/// Substream opened by the remote and that hasn't been accepted/rejected yet.
 		in_substream: NotificationsInSubstream<NegotiatedSubstream>,
 
+		/// Outbound substream that has been accepted by the remote. Being closed.
+		out_substream_closing: Option<NotificationsOutSubstream<NegotiatedSubstream>>,
+
 		/// See [`State::Closed::pending_opening`].
 		pending_opening: bool,
 	},
@@ -177,8 +190,15 @@ enum State {
 	/// A [`NotifsHandlerOut::OpenResultOk`] or a [`NotifsHandlerOut::OpenResultErr`] event must
 	/// be emitted when transitionning to respectively [`State::Open`] or [`State::Closed`].
 	Opening {
-		/// Substream opened by the remote. If `Some`, has been accepted.
-		in_substream: Option<NotificationsInSubstream<NegotiatedSubstream>>,
+		/// Outbound substream that has been accepted by the remote. Being closed. An outbound
+		/// substream request has been emitted towards libp2p if and only if this field is `None`.
+		out_substream_closing: Option<NotificationsOutSubstream<NegotiatedSubstream>>,
+
+		/// Substream re-opened by the remote. Has been accepted.
+		in_substream_reopened: Option<NotificationsInSubstream<NegotiatedSubstream>>,
+
+		/// Substream opened by the remote. Being closed.
+		in_substream_closing: Option<NotificationsInSubstream<NegotiatedSubstream>>,
 	},
 
 	/// Protocol is in the "Open" state.
@@ -227,6 +247,9 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
 					handshake,
 					state: State::Closed {
 						pending_opening: false,
+						in_substream_closing: None,
+						in_substream_reopened: None,
+						out_substream_closing: None,
 					},
 					max_notification_size: max_size,
 				}
@@ -487,7 +510,16 @@ impl ProtocolsHandler for NotifsHandler {
 	) {
 		let mut protocol_info = &mut self.protocols[protocol_index];
 		match protocol_info.state {
-			State::Closed { pending_opening } => {
+			State::Closed {
+				ref mut pending_opening,
+				ref mut out_substream_closing,
+				ref mut in_substream_closing,
+				ref mut in_substream_reopened
+			}
+				if in_substream_closing.is_none() && in_substream_reopened.is_none()
+			=> {
+				debug_assert!(!(out_substream_closing.is_some() && *pending_opening));
+
 				self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
 					NotifsHandlerOut::OpenDesiredByRemote {
 						protocol_index,
@@ -496,9 +528,31 @@ impl ProtocolsHandler for NotifsHandler {
 
 				protocol_info.state = State::OpenDesiredByRemote {
 					in_substream: new_substream,
-					pending_opening,
+					out_substream_closing: out_substream_closing.take(),
+					pending_opening: *pending_opening,
 				};
 			},
+
+			State::Opening { ref mut in_substream_closing, ref mut in_substream_reopened, .. } => {
+				*in_substream_closing = None;
+
+				// Create `handshake_message` on a separate line to be sure that the
+				// lock is released as soon as possible.
+				let handshake_message = protocol_info.handshake.read().clone();
+				new_substream.send_handshake(handshake_message);
+				*in_substream_reopened = Some(new_substream);
+			},
+
+			State::Open { ref mut in_substream, .. } if in_substream.is_none() => {
+				// Create `handshake_message` on a separate line to be sure that the
+				// lock is released as soon as possible.
+				let handshake_message = protocol_info.handshake.read().clone();
+				new_substream.send_handshake(handshake_message);
+				*in_substream = Some(new_substream);
+			},
+
+			State::Closed { .. } |
+			State::Open { .. } |
 			State::OpenDesiredByRemote { .. } => {
 				// If a substream already exists, silently drop the new one.
 				// Note that we drop the substream, which will send an equivalent to a
@@ -509,19 +563,6 @@ impl ProtocolsHandler for NotifsHandler {
 				// to do.
 				return;
 			},
-			State::Opening { ref mut in_substream, .. } |
-			State::Open { ref mut in_substream, .. } => {
-				if in_substream.is_some() {
-					// Same remark as above.
-					return;
-				}
-
-				// Create `handshake_message` on a separate line to be sure that the
-				// lock is released as soon as possible.
-				let handshake_message = protocol_info.handshake.read().clone();
-				new_substream.send_handshake(handshake_message);
-				*in_substream = Some(new_substream);
-			},
 		}
 	}
 
@@ -531,16 +572,24 @@ impl ProtocolsHandler for NotifsHandler {
 		protocol_index: Self::OutboundOpenInfo
 	) {
 		match self.protocols[protocol_index].state {
-			State::Closed { ref mut pending_opening } |
-			State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
+			State::Closed { ref mut pending_opening, ref mut out_substream_closing, .. } |
+			State::OpenDesiredByRemote { ref mut pending_opening, ref mut out_substream_closing, .. } => {
+				debug_assert!(out_substream_closing.is_none());
 				debug_assert!(*pending_opening);
+				*out_substream_closing = Some(substream);
 				*pending_opening = false;
 			}
 			State::Open { .. } => {
 				error!(target: "sub-libp2p", "☎️ State mismatch in notifications handler");
 				debug_assert!(false);
 			}
-			State::Opening { ref mut in_substream } => {
+			State::Opening {
+				ref mut in_substream_reopened, ref mut in_substream_closing,
+				ref mut out_substream_closing
+			} => {
+				debug_assert!(out_substream_closing.is_none());
+				debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some()));
+
 				let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
 				let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
 				let notifications_sink = NotificationsSink {
@@ -554,7 +603,7 @@ impl ProtocolsHandler for NotifsHandler {
 				self.protocols[protocol_index].state = State::Open {
 					notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
 					out_substream: Some(substream),
-					in_substream: in_substream.take(),
+					in_substream: in_substream_reopened.take().or(in_substream_closing.take()),
 				};
 
 				self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
@@ -574,8 +623,13 @@ impl ProtocolsHandler for NotifsHandler {
 			NotifsHandlerIn::Open { protocol_index } => {
 				let protocol_info = &mut self.protocols[protocol_index];
 				match &mut protocol_info.state {
-					State::Closed { pending_opening } => {
-						if !*pending_opening {
+					State::Closed {
+						ref mut pending_opening,
+						ref mut in_substream_closing,
+						ref mut in_substream_reopened,
+						ref mut out_substream_closing
+					} => {
+						if !*pending_opening && out_substream_closing.is_none() {
 							let proto = NotificationsOut::new(
 								protocol_info.name.clone(),
 								protocol_info.handshake.read().clone(),
@@ -588,14 +642,31 @@ impl ProtocolsHandler for NotifsHandler {
 							});
 						}
 
+						debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some()));
 						protocol_info.state = State::Opening {
-							in_substream: None,
+							in_substream_closing: in_substream_closing.take(),
+							in_substream_reopened: in_substream_reopened.take(),
+							out_substream_closing: out_substream_closing.take(),
 						};
 					},
-					State::OpenDesiredByRemote { pending_opening, in_substream } => {
+
+					State::OpenDesiredByRemote { ..  } => {
+						// The state change is done in two steps because of borrowing issues.
+						let (pending_opening, out_substream_closing, mut in_substream) = match
+							mem::replace(&mut protocol_info.state,
+								State::Opening {
+									in_substream_closing: None, in_substream_reopened: None,
+									out_substream_closing: None,
+								})
+						{
+							State::OpenDesiredByRemote { pending_opening, out_substream_closing, in_substream, .. } =>
+								(pending_opening, out_substream_closing, in_substream),
+							_ => unreachable!()
+						};
+
 						let handshake_message = protocol_info.handshake.read().clone();
 
-						if !*pending_opening {
+						if !pending_opening && out_substream_closing.is_none() {
 							let proto = NotificationsOut::new(
 								protocol_info.name.clone(),
 								handshake_message.clone(),
@@ -610,17 +681,13 @@ impl ProtocolsHandler for NotifsHandler {
 
 						in_substream.send_handshake(handshake_message);
 
-						// The state change is done in two steps because of borrowing issues.
-						let in_substream = match
-							mem::replace(&mut protocol_info.state, State::Opening { in_substream: None })
-						{
-							State::OpenDesiredByRemote { in_substream, .. } => in_substream,
-							_ => unreachable!()
-						};
 						protocol_info.state = State::Opening {
-							in_substream: Some(in_substream),
+							out_substream_closing,
+							in_substream_closing: None,
+							in_substream_reopened: Some(in_substream),
 						};
 					},
+
 					State::Opening { .. } |
 					State::Open { .. } => {
 						// As documented, it is forbidden to send an `Open` while there is already
@@ -632,15 +699,30 @@ impl ProtocolsHandler for NotifsHandler {
 			},
 
 			NotifsHandlerIn::Close { protocol_index } => {
-				match self.protocols[protocol_index].state {
-					State::Open { .. } => {
+				match &mut self.protocols[protocol_index].state {
+					State::Open { in_substream, out_substream, .. } => {
+						if let Some(in_substream) = in_substream.as_mut() {
+							in_substream.set_close_desired();
+						}
 						self.protocols[protocol_index].state = State::Closed {
+							in_substream_closing: in_substream.take(),
+							in_substream_reopened: None,
+							out_substream_closing: out_substream.take(),
 							pending_opening: false,
 						};
 					},
-					State::Opening { .. } => {
+					State::Opening { in_substream_closing, in_substream_reopened, out_substream_closing } => {
+						debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some()));
+
+						let pending_opening = out_substream_closing.is_none();
+						if let Some(in_substream_reopened) = in_substream_reopened.as_mut() {
+							in_substream_reopened.set_close_desired();
+						}
 						self.protocols[protocol_index].state = State::Closed {
-							pending_opening: true,
+							in_substream_closing: in_substream_reopened.take().or(in_substream_closing.take()),
+							in_substream_reopened: None,
+							out_substream_closing: out_substream_closing.take(),
+							pending_opening,
 						};
 
 						self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
@@ -649,8 +731,23 @@ impl ProtocolsHandler for NotifsHandler {
 							}
 						));
 					},
-					State::OpenDesiredByRemote { pending_opening, .. } => {
+					State::OpenDesiredByRemote { .. } => {
+						let (mut in_substream, pending_opening, out_substream_closing) = match mem::replace(
+							&mut self.protocols[protocol_index].state,
+							State::Closed { pending_opening: false, in_substream_closing: None,
+											in_substream_reopened: None, out_substream_closing: None,
+										}
+						) {
+							State::OpenDesiredByRemote { in_substream, pending_opening, out_substream_closing } =>
+								(in_substream, pending_opening, out_substream_closing),
+							_ => unreachable!("Can only enter this branch after OpenDesiredByRemote; qed")
+						};
+
+						in_substream.set_close_desired();
 						self.protocols[protocol_index].state = State::Closed {
+							in_substream_closing: Some(in_substream),
+							in_substream_reopened: None,
+							out_substream_closing,
 							pending_opening,
 						};
 					}
@@ -672,14 +769,30 @@ impl ProtocolsHandler for NotifsHandler {
 		_: ProtocolsHandlerUpgrErr<NotificationsHandshakeError>
 	) {
 		match self.protocols[num].state {
-			State::Closed { ref mut pending_opening } |
-			State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
+			State::Closed { ref mut pending_opening, ref mut out_substream_closing, .. } |
+			State::OpenDesiredByRemote { ref mut pending_opening, ref mut out_substream_closing, .. } => {
+				debug_assert!(out_substream_closing.is_none());
 				debug_assert!(*pending_opening);
 				*pending_opening = false;
 			}
 
-			State::Opening { .. } => {
+			State::Opening {
+				ref mut out_substream_closing,
+				ref mut in_substream_closing,
+				ref mut in_substream_reopened,
+				..
+			} => {
+				debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some()));
+				debug_assert!(out_substream_closing.is_none());
+
+				if let Some(in_substream_reopened) = in_substream_reopened.as_mut() {
+					in_substream_reopened.set_close_desired();
+				}
+
 				self.protocols[num].state = State::Closed {
+					out_substream_closing: None,
+					in_substream_closing: in_substream_reopened.take().or(in_substream_closing.take()),
+					in_substream_reopened: None,
 					pending_opening: false,
 				};
 
@@ -788,15 +901,67 @@ impl ProtocolsHandler for NotifsHandler {
 			}
 		}
 
+		// Try close outbound substreams that are marked for closing.
+		for protocol_index in 0..self.protocols.len() {
+			match &mut self.protocols[protocol_index].state {
+				State::Closed { out_substream_closing: ref mut substream @ Some(_), .. } |
+				State::OpenDesiredByRemote { out_substream_closing: ref mut substream @ Some(_), .. } |
+				State::Opening { out_substream_closing: ref mut substream @ Some(_), .. } => {
+					match Sink::poll_close(Pin::new(substream.as_mut().unwrap()), cx) {
+						Poll::Pending => {},
+						Poll::Ready(_) => {
+							*substream = None;
+
+							if matches!(self.protocols[protocol_index].state, State::Opening { .. }) {
+								let protocol_info = &mut self.protocols[protocol_index];
+								let proto = NotificationsOut::new(
+									protocol_info.name.clone(),
+									protocol_info.handshake.read().clone(),
+									protocol_info.max_notification_size
+								);
+
+								self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
+									protocol: SubstreamProtocol::new(proto, protocol_index)
+										.with_timeout(OPEN_TIMEOUT),
+								});
+							}
+						}
+					}
+				}
+				_ => {}
+			}
+
+			if let State::Closed {
+				pending_opening,
+				out_substream_closing: None,
+				in_substream_closing,
+				in_substream_reopened: ref mut in_substream_reopened @ Some(_),
+				..
+			} = &mut self.protocols[protocol_index].state {
+				debug_assert!(!*pending_opening);
+				debug_assert!(in_substream_closing.is_none());
+
+				self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
+					NotifsHandlerOut::OpenDesiredByRemote {
+						protocol_index,
+					}
+				));
+
+				self.protocols[protocol_index].state = State::OpenDesiredByRemote {
+					in_substream: in_substream_reopened.take()
+						.expect("The if let above ensures that this is Some ; qed"),
+					out_substream_closing: None,
+					pending_opening: false,
+				};
+			}
+		}
+
 		// Poll inbound substreams.
 		for protocol_index in 0..self.protocols.len() {
 			// Inbound substreams being closed is always tolerated, except for the
 			// `OpenDesiredByRemote` state which might need to be switched back to `Closed`.
 			match &mut self.protocols[protocol_index].state {
-				State::Closed { .. } |
-				State::Open { in_substream: None, .. } |
-				State::Opening { in_substream: None } => {}
-
+				State::Open { in_substream: None, .. } => {}
 				State::Open { in_substream: in_substream @ Some(_), .. } => {
 					match Stream::poll_next(Pin::new(in_substream.as_mut().unwrap()), cx) {
 						Poll::Pending => {},
@@ -812,13 +977,16 @@ impl ProtocolsHandler for NotifsHandler {
 					}
 				}
 
-				State::OpenDesiredByRemote { in_substream, pending_opening } => {
+				State::OpenDesiredByRemote { in_substream, pending_opening, out_substream_closing } => {
 					match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) {
 						Poll::Pending => {},
 						Poll::Ready(Ok(void)) => match void {},
 						Poll::Ready(Err(_)) => {
 							self.protocols[protocol_index].state = State::Closed {
 								pending_opening: *pending_opening,
+								in_substream_closing: None,
+								in_substream_reopened: None,
+								out_substream_closing: out_substream_closing.take(),
 							};
 							return Poll::Ready(ProtocolsHandlerEvent::Custom(
 								NotifsHandlerOut::CloseDesired { protocol_index }
@@ -827,13 +995,40 @@ impl ProtocolsHandler for NotifsHandler {
 					}
 				}
 
-				State::Opening { in_substream: in_substream @ Some(_), .. } => {
+				State::Opening { in_substream_closing: None, in_substream_reopened: None, .. } |
+				State::Closed { in_substream_closing: None, in_substream_reopened: None, .. } => {}
+
+				State::Opening {
+					in_substream_closing: ref mut in_substream @ Some(_),
+					in_substream_reopened: None,
+					..
+				} |
+				State::Opening {
+					in_substream_closing: None,
+					in_substream_reopened: ref mut in_substream @ Some(_),
+					..
+				} |
+				State::Closed {
+					in_substream_closing: ref mut in_substream @ Some(_),
+					in_substream_reopened: None,
+					..
+				} |
+				State::Closed {
+					in_substream_closing: None,
+					in_substream_reopened: ref mut in_substream @ Some(_),
+					..
+				} => {
 					match NotificationsInSubstream::poll_process(Pin::new(in_substream.as_mut().unwrap()), cx) {
 						Poll::Pending => {},
 						Poll::Ready(Ok(void)) => match void {},
 						Poll::Ready(Err(_)) => *in_substream = None,
 					}
 				}
+
+				State::Opening { in_substream_closing: Some(_), in_substream_reopened: Some(_), .. } |
+				State::Closed { in_substream_closing: Some(_), in_substream_reopened: Some(_), .. } => {
+					debug_assert!(false);
+				}
 			}
 		}
 
diff --git a/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs b/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs
index eba96441bcfdeab5c3175f3aeea7c01834cf8337..f76472a0de2c5a8b85998aa7c15c404ffed38051 100644
--- a/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs
+++ b/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs
@@ -88,10 +88,11 @@ enum NotificationsInSubstreamHandshake {
 	PendingSend(Vec<u8>),
 	/// Handshake message was pushed in the socket. Still need to flush.
 	Flush,
-	/// Handshake message successfully sent and flushed.
-	Sent,
-	/// Remote has closed their writing side. We close our own writing side in return.
-	ClosingInResponseToRemote,
+	/// Ready to receive notifications. Handshake message successfully sent and flushed, or
+	/// sending side closed before handshake sent.
+	Normal { write_side_open: bool },
+	/// Closing our writing side.
+	Closing { remote_write_open: bool },
 	/// Both our side and the remote have closed their writing side.
 	BothSidesClosed,
 }
@@ -169,8 +170,30 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
 impl<TSubstream> NotificationsInSubstream<TSubstream>
 where TSubstream: AsyncRead + AsyncWrite + Unpin,
 {
+	/// Closes the writing side of the substream, indicating to the remote that we would like this
+	/// substream to be closed.
+	pub fn set_close_desired(&mut self) {
+		match self.handshake {
+			NotificationsInSubstreamHandshake::PendingSend(_) |
+			NotificationsInSubstreamHandshake::Flush |
+			NotificationsInSubstreamHandshake::NotSent |
+			NotificationsInSubstreamHandshake::Normal { write_side_open: true } => {
+				self.handshake = NotificationsInSubstreamHandshake::Closing { remote_write_open: true };
+			}
+			NotificationsInSubstreamHandshake::Normal { write_side_open: false } |
+			NotificationsInSubstreamHandshake::Closing { .. } |
+			NotificationsInSubstreamHandshake::BothSidesClosed => {}
+		}
+	}
+
 	/// Sends the handshake in order to inform the remote that we accept the substream.
+	///
+	/// Has no effect if `set_close_desired` has been called.
 	pub fn send_handshake(&mut self, message: impl Into<Vec<u8>>) {
+		if matches!(self.handshake, NotificationsInSubstreamHandshake::Normal { write_side_open: false }) {
+			return;
+		}
+
 		if !matches!(self.handshake, NotificationsInSubstreamHandshake::NotSent) {
 			error!(target: "sub-libp2p", "Tried to send handshake twice");
 			return;
@@ -185,7 +208,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
 		let mut this = self.project();
 
 		loop {
-			match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) {
+			match mem::replace(this.handshake, NotificationsInSubstreamHandshake::NotSent) {
 				NotificationsInSubstreamHandshake::PendingSend(msg) =>
 					match Sink::poll_ready(this.socket.as_mut(), cx) {
 						Poll::Ready(_) => {
@@ -203,16 +226,28 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
 				NotificationsInSubstreamHandshake::Flush =>
 					match Sink::poll_flush(this.socket.as_mut(), cx)? {
 						Poll::Ready(()) =>
-							*this.handshake = NotificationsInSubstreamHandshake::Sent,
+							*this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: true },
 						Poll::Pending => {
 							*this.handshake = NotificationsInSubstreamHandshake::Flush;
 							return Poll::Pending
 						}
 					},
 
+				NotificationsInSubstreamHandshake::Closing { remote_write_open } =>
+					match Sink::poll_close(this.socket.as_mut(), cx)? {
+						Poll::Ready(()) => if remote_write_open {
+							*this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: false }
+						} else {
+							*this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed;
+						},
+						Poll::Pending => {
+							*this.handshake = NotificationsInSubstreamHandshake::Closing { remote_write_open };
+							return Poll::Pending
+						}
+					},
+
 				st @ NotificationsInSubstreamHandshake::NotSent |
-				st @ NotificationsInSubstreamHandshake::Sent |
-				st @ NotificationsInSubstreamHandshake::ClosingInResponseToRemote |
+				st @ NotificationsInSubstreamHandshake::Normal { .. } |
 				st @ NotificationsInSubstreamHandshake::BothSidesClosed => {
 					*this.handshake = st;
 					return Poll::Pending;
@@ -232,7 +267,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
 
 		// This `Stream` implementation first tries to send back the handshake if necessary.
 		loop {
-			match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) {
+			match mem::replace(this.handshake, NotificationsInSubstreamHandshake::NotSent) {
 				NotificationsInSubstreamHandshake::NotSent => {
 					*this.handshake = NotificationsInSubstreamHandshake::NotSent;
 					return Poll::Pending
@@ -254,34 +289,39 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
 				NotificationsInSubstreamHandshake::Flush =>
 					match Sink::poll_flush(this.socket.as_mut(), cx)? {
 						Poll::Ready(()) =>
-							*this.handshake = NotificationsInSubstreamHandshake::Sent,
+							*this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: true },
 						Poll::Pending => {
 							*this.handshake = NotificationsInSubstreamHandshake::Flush;
 							return Poll::Pending
 						}
 					},
 
-				NotificationsInSubstreamHandshake::Sent => {
+				NotificationsInSubstreamHandshake::Normal { write_side_open } => {
 					match Stream::poll_next(this.socket.as_mut(), cx) {
-						Poll::Ready(None) => *this.handshake =
-							NotificationsInSubstreamHandshake::ClosingInResponseToRemote,
+						Poll::Ready(None) if write_side_open =>
+							*this.handshake =
+								NotificationsInSubstreamHandshake::Closing { remote_write_open: false },
+						Poll::Ready(None) =>
+							*this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed,
 						Poll::Ready(Some(msg)) => {
-							*this.handshake = NotificationsInSubstreamHandshake::Sent;
+							*this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open };
 							return Poll::Ready(Some(msg))
 						},
 						Poll::Pending => {
-							*this.handshake = NotificationsInSubstreamHandshake::Sent;
+							*this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open };
 							return Poll::Pending
 						},
 					}
 				},
 
-				NotificationsInSubstreamHandshake::ClosingInResponseToRemote =>
+				NotificationsInSubstreamHandshake::Closing { remote_write_open } =>
 					match Sink::poll_close(this.socket.as_mut(), cx)? {
+						Poll::Ready(()) if remote_write_open =>
+							*this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: false },
 						Poll::Ready(()) =>
 							*this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed,
 						Poll::Pending => {
-							*this.handshake = NotificationsInSubstreamHandshake::ClosingInResponseToRemote;
+							*this.handshake = NotificationsInSubstreamHandshake::Closing { remote_write_open };
 							return Poll::Pending
 						}
 					},