diff --git a/substrate/client/network/src/lib.rs b/substrate/client/network/src/lib.rs index b8e5d7582b96b46934eed9ba76f2742feb4e5d55..fc5cab321d1279224d2f44df1f5167f27a22f954 100644 --- a/substrate/client/network/src/lib.rs +++ b/substrate/client/network/src/lib.rs @@ -197,13 +197,14 @@ //! handshake message can be of length 0, in which case the sender has to send a single `0`. //! - The receiver then either immediately closes the substream, or answers with its own //! LEB128-prefixed protocol-specific handshake response. The message can be of length 0, in which -//! case a single `0` has to be sent back. The receiver is then encouraged to close its sending -//! side. +//! case a single `0` has to be sent back. //! - Once the handshake has completed, the notifications protocol is unidirectional. Only the //! node which initiated the substream can push notifications. If the remote wants to send //! notifications as well, it has to open its own undirectional substream. //! - Each notification must be prefixed with an LEB128-encoded length. The encoding of the //! messages is specific to each protocol. +//! - Either party can signal that it doesn't want a notifications substream anymore by closing +//! its writing side. The other party should respond by closing its own writing side soon after. //! //! The API of `sc-network` allows one to register user-defined notification protocols. //! `sc-network` automatically tries to open a substream towards each node for which the legacy diff --git a/substrate/client/network/src/protocol/generic_proto/handler/notif_in.rs b/substrate/client/network/src/protocol/generic_proto/handler/notif_in.rs index be78fb970e90b2524fd1478f48462e159dd1b28d..ddd78566fcd2a7b773205d49f00c26e885da413d 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/notif_in.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/notif_in.rs @@ -163,11 +163,9 @@ impl ProtocolsHandler for NotifsInHandler { } // Note that we drop the existing substream, which will send an equivalent to a TCP "RST" - // to the remote and force-close the substream. It might seem like an unclean way to get + // to the remote and force-close the substream. It might seem like an unclean way to get // rid of a substream. However, keep in mind that it is invalid for the remote to open - // multiple such substreams, and therefore sending a "RST" is the correct thing to do. - // Also note that we have already closed our writing side during the initial handshake, - // and we can't close "more" than that anyway. + // multiple such substreams, and therefore sending a "RST" is not an incorrect thing to do. self.substream = Some(proto); self.events_queue.push_back(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(msg))); diff --git a/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs b/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs index f1f41d5bccf8e74f2266bbdc834d58ed28f75dd2..80fd7761f8088482f309d8ec772586287df8a35e 100644 --- a/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs +++ b/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs @@ -22,12 +22,13 @@ /// higher-level logic. This message is prefixed with a variable-length integer message length. /// This message can be empty, in which case `0` is sent. /// - If node B accepts the substream, it sends back a message with the same properties. -/// Afterwards, the sending side of B is closed. /// - If instead B refuses the connection (which typically happens because no empty slot is /// available), then it immediately closes the substream without sending back anything. /// - Node A can then send notifications to B, prefixed with a variable-length integer indicating /// the length of the message. -/// - Node A closes its writing side if it doesn't want the notifications substream anymore. +/// - Either node A or node B can signal that it doesn't want this notifications substream anymore +/// by closing its writing side. The other party should respond by also closing their own +/// writing side soon after. /// /// Notification substreams are unidirectional. If A opens a substream with B, then B is /// encouraged but not required to open a substream to A as well. @@ -80,9 +81,13 @@ enum NotificationsInSubstreamHandshake { /// User gave us the handshake message. Trying to push it in the socket. PendingSend(Vec<u8>), /// Handshake message was pushed in the socket. Still need to flush. - Close, - /// Handshake message successfully sent. + Flush, + /// Handshake message successfully sent and flushed. Sent, + /// Remote has closed their writing side. We close our own writing side in return. + ClosingInResponseToRemote, + /// Both our side and the remote have closed their writing side. + BothSidesClosed, } /// A substream for outgoing notification messages. @@ -177,8 +182,6 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, // This `Stream` implementation first tries to send back the handshake if necessary. loop { match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) { - NotificationsInSubstreamHandshake::Sent => - return Stream::poll_next(this.socket.as_mut(), cx), NotificationsInSubstreamHandshake::NotSent => { *this.handshake = NotificationsInSubstreamHandshake::NotSent; return Poll::Pending @@ -186,7 +189,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, NotificationsInSubstreamHandshake::PendingSend(msg) => match Sink::poll_ready(this.socket.as_mut(), cx) { Poll::Ready(_) => { - *this.handshake = NotificationsInSubstreamHandshake::Close; + *this.handshake = NotificationsInSubstreamHandshake::Flush; match Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg)) { Ok(()) => {}, Err(err) => return Poll::Ready(Some(Err(err))), @@ -197,15 +200,43 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, return Poll::Pending } }, - NotificationsInSubstreamHandshake::Close => - match Sink::poll_close(this.socket.as_mut(), cx)? { + NotificationsInSubstreamHandshake::Flush => + match Sink::poll_flush(this.socket.as_mut(), cx)? { Poll::Ready(()) => *this.handshake = NotificationsInSubstreamHandshake::Sent, Poll::Pending => { - *this.handshake = NotificationsInSubstreamHandshake::Close; + *this.handshake = NotificationsInSubstreamHandshake::Flush; return Poll::Pending } }, + + NotificationsInSubstreamHandshake::Sent => { + match Stream::poll_next(this.socket.as_mut(), cx) { + Poll::Ready(None) => *this.handshake = + NotificationsInSubstreamHandshake::ClosingInResponseToRemote, + Poll::Ready(Some(msg)) => { + *this.handshake = NotificationsInSubstreamHandshake::Sent; + return Poll::Ready(Some(msg)) + }, + Poll::Pending => { + *this.handshake = NotificationsInSubstreamHandshake::Sent; + return Poll::Pending + }, + } + }, + + NotificationsInSubstreamHandshake::ClosingInResponseToRemote => + match Sink::poll_close(this.socket.as_mut(), cx)? { + Poll::Ready(()) => + *this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed, + Poll::Pending => { + *this.handshake = NotificationsInSubstreamHandshake::ClosingInResponseToRemote; + return Poll::Pending + } + }, + + NotificationsInSubstreamHandshake::BothSidesClosed => + return Poll::Ready(None), } } }