diff --git a/substrate/client/finality-grandpa/src/communication/tests.rs b/substrate/client/finality-grandpa/src/communication/tests.rs index dc37a1615f415f6a41f46cd458ffcb395209df23..bfc5b1d10a41379eaff642619554cd7a9798067c 100644 --- a/substrate/client/finality-grandpa/src/communication/tests.rs +++ b/substrate/client/finality-grandpa/src/communication/tests.rs @@ -295,6 +295,7 @@ fn good_commit_leads_to_relay() { let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened { remote: sender_id.clone(), protocol: GRANDPA_PROTOCOL_NAME.into(), + negotiated_fallback: None, role: ObservedRole::Full, }); @@ -308,6 +309,7 @@ fn good_commit_leads_to_relay() { let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened { remote: receiver_id.clone(), protocol: GRANDPA_PROTOCOL_NAME.into(), + negotiated_fallback: None, role: ObservedRole::Full, }); @@ -442,6 +444,7 @@ fn bad_commit_leads_to_report() { let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened { remote: sender_id.clone(), protocol: GRANDPA_PROTOCOL_NAME.into(), + negotiated_fallback: None, role: ObservedRole::Full, }); let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived { diff --git a/substrate/client/finality-grandpa/src/lib.rs b/substrate/client/finality-grandpa/src/lib.rs index e1c3a2c1315408ca0c1361314afed4073b8b7a4c..672b08d0b714281506f00b0cdaba407970826695 100644 --- a/substrate/client/finality-grandpa/src/lib.rs +++ b/substrate/client/finality-grandpa/src/lib.rs @@ -690,6 +690,7 @@ pub struct GrandpaParams<Block: BlockT, C, N, SC, VR> { pub fn grandpa_peers_set_config() -> sc_network::config::NonDefaultSetConfig { sc_network::config::NonDefaultSetConfig { notifications_protocol: communication::GRANDPA_PROTOCOL_NAME.into(), + fallback_names: Vec::new(), // Notifications reach ~256kiB in size at the time of writing on Kusama and Polkadot. max_notification_size: 1024 * 1024, set_config: sc_network::config::SetConfig { @@ -1134,12 +1135,12 @@ fn local_authority_id( voters: &VoterSet<AuthorityId>, keystore: Option<&SyncCryptoStorePtr>, ) -> Option<AuthorityId> { - keystore.and_then(|keystore| { + keystore.and_then(|keystore| { voters .iter() .find(|(p, _)| { SyncCryptoStore::has_keys(&**keystore, &[(p.to_raw_vec(), AuthorityId::ID)]) }) - .map(|(p, _)| p.clone()) + .map(|(p, _)| p.clone()) }) } diff --git a/substrate/client/network-gossip/src/bridge.rs b/substrate/client/network-gossip/src/bridge.rs index 235ac98dc3968e9c085a6990c8fb35a048b2397d..fd9aac96c01026d34dd846c11c4654f2c8bf8581 100644 --- a/substrate/client/network-gossip/src/bridge.rs +++ b/substrate/client/network-gossip/src/bridge.rs @@ -188,7 +188,7 @@ impl<B: BlockT> Future for GossipEngine<B> { Event::SyncDisconnected { remote } => { this.network.remove_set_reserved(remote, this.protocol.clone()); } - Event::NotificationStreamOpened { remote, protocol, role } => { + Event::NotificationStreamOpened { remote, protocol, role, .. } => { if protocol != this.protocol { continue; } @@ -416,6 +416,7 @@ mod tests { Event::NotificationStreamOpened { remote: remote_peer.clone(), protocol: protocol.clone(), + negotiated_fallback: None, role: ObservedRole::Authority, } ).expect("Event stream is unbounded; qed."); @@ -575,6 +576,7 @@ mod tests { Event::NotificationStreamOpened { remote: remote_peer.clone(), protocol: protocol.clone(), + negotiated_fallback: None, role: ObservedRole::Authority, } ).expect("Event stream is unbounded; qed."); diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index a73685ed3bf32f0fe8ef75e85814a55151714fb4..17c38b6f95456e62c322999fb0631026cc01fdb1 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -124,6 +124,11 @@ pub enum BehaviourOut<B: BlockT> { remote: PeerId, /// The concerned protocol. Each protocol uses a different substream. protocol: Cow<'static, str>, + /// If the negotiation didn't use the main name of the protocol (the one in + /// `notifications_protocol`), then this field contains which name has actually been + /// used. + /// See also [`crate::Event::NotificationStreamOpened`]. + negotiated_fallback: Option<Cow<'static, str>>, /// Object that permits sending notifications to the peer. notifications_sink: NotificationsSink, /// Role of the remote. @@ -324,10 +329,13 @@ Behaviour<B> { &target, &self.block_request_protocol_name, buf, pending_response, IfDisconnected::ImmediateError, ); }, - CustomMessageOutcome::NotificationStreamOpened { remote, protocol, roles, notifications_sink } => { + CustomMessageOutcome::NotificationStreamOpened { + remote, protocol, negotiated_fallback, roles, notifications_sink + } => { self.events.push_back(BehaviourOut::NotificationStreamOpened { remote, protocol, + negotiated_fallback, role: reported_roles_to_observed_role(roles), notifications_sink: notifications_sink.clone(), }); diff --git a/substrate/client/network/src/config.rs b/substrate/client/network/src/config.rs index 3864b77d88be3e66d1d9cc4def1221f7b754f461..77618f27711486fa886a6059ccf2676e565605e8 100644 --- a/substrate/client/network/src/config.rs +++ b/substrate/client/network/src/config.rs @@ -541,6 +541,13 @@ pub struct NonDefaultSetConfig { /// > **Note**: This field isn't present for the default set, as this is handled internally /// > by the networking code. pub notifications_protocol: Cow<'static, str>, + /// If the remote reports that it doesn't support the protocol indicated in the + /// `notifications_protocol` field, then each of these fallback names will be tried one by + /// one. + /// + /// If a fallback is used, it will be reported in + /// [`crate::Event::NotificationStreamOpened::negotiated_fallback`]. + pub fallback_names: Vec<Cow<'static, str>>, /// Maximum allowed size of single notifications. pub max_notification_size: u64, /// Base configuration. @@ -553,6 +560,7 @@ impl NonDefaultSetConfig { NonDefaultSetConfig { notifications_protocol, max_notification_size, + fallback_names: Vec::new(), set_config: SetConfig { in_peers: 0, out_peers: 0, diff --git a/substrate/client/network/src/gossip/tests.rs b/substrate/client/network/src/gossip/tests.rs index b000cf575ddb3e205ca82bc5aa718f106c9e73dc..19ac002aac869316141f98e7bdb153711681961e 100644 --- a/substrate/client/network/src/gossip/tests.rs +++ b/substrate/client/network/src/gossip/tests.rs @@ -159,6 +159,7 @@ fn build_nodes_one_proto() extra_sets: vec![ config::NonDefaultSetConfig { notifications_protocol: PROTOCOL_NAME, + fallback_names: Vec::new(), max_notification_size: 1024 * 1024, set_config: Default::default() } @@ -173,6 +174,7 @@ fn build_nodes_one_proto() extra_sets: vec![ config::NonDefaultSetConfig { notifications_protocol: PROTOCOL_NAME, + fallback_names: Vec::new(), max_notification_size: 1024 * 1024, set_config: config::SetConfig { reserved_nodes: vec![config::MultiaddrWithPeerId { diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index e0fa7a1cb467c57c12953a28295f6915ad90d764..6dafd8b85f3518dc704367b87f0bba945f46ff1a 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -362,12 +362,24 @@ impl<B: BlockT> Protocol<B> { genesis_hash, ).encode(); + let sync_protocol_config = notifications::ProtocolConfig { + name: block_announces_protocol, + fallback_names: Vec::new(), + handshake: block_announces_handshake, + max_notification_size: MAX_BLOCK_ANNOUNCE_SIZE, + }; + Notifications::new( peerset, - iter::once((block_announces_protocol, block_announces_handshake, MAX_BLOCK_ANNOUNCE_SIZE)) + iter::once(sync_protocol_config) .chain(network_config.extra_sets.iter() .zip(notifications_protocols_handshakes) - .map(|(s, hs)| (s.notifications_protocol.clone(), hs, s.max_notification_size)) + .map(|(s, hs)| notifications::ProtocolConfig { + name: s.notifications_protocol.clone(), + fallback_names: s.fallback_names.clone(), + handshake: hs, + max_notification_size: s.max_notification_size, + }) ), ) }; @@ -1154,6 +1166,8 @@ pub enum CustomMessageOutcome<B: BlockT> { NotificationStreamOpened { remote: PeerId, protocol: Cow<'static, str>, + /// See [`crate::Event::NotificationStreamOpened::negotiated_fallback`]. + negotiated_fallback: Option<Cow<'static, str>>, roles: Roles, notifications_sink: NotificationsSink }, @@ -1346,9 +1360,13 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> { }; let outcome = match event { - NotificationsOut::CustomProtocolOpen { peer_id, set_id, received_handshake, notifications_sink, .. } => { + NotificationsOut::CustomProtocolOpen { + peer_id, set_id, received_handshake, notifications_sink, negotiated_fallback + } => { // Set number 0 is hardcoded the default set of peers we sync from. if set_id == HARDCODED_PEERSETS_SYNC { + debug_assert!(negotiated_fallback.is_none()); + // `received_handshake` can be either a `Status` message if received from the // legacy substream ,or a `BlockAnnouncesHandshake` if received from the block // announces substream. @@ -1408,6 +1426,7 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> { CustomMessageOutcome::NotificationStreamOpened { remote: peer_id, protocol: self.notification_protocols[usize::from(set_id) - NUM_HARDCODED_PEERSETS].clone(), + negotiated_fallback, roles, notifications_sink, }, @@ -1419,6 +1438,7 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> { CustomMessageOutcome::NotificationStreamOpened { remote: peer_id, protocol: self.notification_protocols[usize::from(set_id) - NUM_HARDCODED_PEERSETS].clone(), + negotiated_fallback, roles: peer.info.roles, notifications_sink, } diff --git a/substrate/client/network/src/protocol/event.rs b/substrate/client/network/src/protocol/event.rs index fb2e3b33dd6807f1b5ac9a633015d4d189ee3207..c13980b3f43026836a90d781dbc24bd3143e224c 100644 --- a/substrate/client/network/src/protocol/event.rs +++ b/substrate/client/network/src/protocol/event.rs @@ -67,7 +67,16 @@ pub enum Event { /// Node we opened the substream with. remote: PeerId, /// The concerned protocol. Each protocol uses a different substream. + /// This is always equal to the value of + /// [`crate::config::NonDefaultSetConfig::notifications_protocol`] of one of the + /// configured sets. protocol: Cow<'static, str>, + /// If the negotiation didn't use the main name of the protocol (the one in + /// `notifications_protocol`), then this field contains which name has actually been + /// used. + /// Always contains a value equal to the value in + /// [`crate::config::NonDefaultSetConfig::fallback_names`]. + negotiated_fallback: Option<Cow<'static, str>>, /// Role of the remote. role: ObservedRole, }, diff --git a/substrate/client/network/src/protocol/notifications.rs b/substrate/client/network/src/protocol/notifications.rs index ef25795758b807f58613cffebd703b94cc9deea7..8739eb4948b77d56273913ea1c8c6891c7782039 100644 --- a/substrate/client/network/src/protocol/notifications.rs +++ b/substrate/client/network/src/protocol/notifications.rs @@ -19,7 +19,7 @@ //! Implementation of libp2p's `NetworkBehaviour` trait that establishes communications and opens //! notifications substreams. -pub use self::behaviour::{Notifications, NotificationsOut}; +pub use self::behaviour::{Notifications, NotificationsOut, ProtocolConfig}; pub use self::handler::{NotifsHandlerError, NotificationsSink, Ready}; mod behaviour; diff --git a/substrate/client/network/src/protocol/notifications/behaviour.rs b/substrate/client/network/src/protocol/notifications/behaviour.rs index d5112a9f981d7b68530ad47a315ead484644dcae..0a883543de52608b38e7bf6b2c7b9b41b41eb78e 100644 --- a/substrate/client/network/src/protocol/notifications/behaviour.rs +++ b/substrate/client/network/src/protocol/notifications/behaviour.rs @@ -17,7 +17,7 @@ // along with this program. If not, see <https://www.gnu.org/licenses/>. use crate::protocol::notifications::{ - handler::{NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn} + handler::{self, NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn} }; use bytes::BytesMut; @@ -95,10 +95,8 @@ use wasm_timer::Instant; /// accommodates for any number of connections. /// pub struct Notifications { - /// Notification protocols. Entries are only ever added and not removed. - /// Contains, for each protocol, the protocol name and the message to send as part of the - /// initial handshake. - notif_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>, + /// Notification protocols. Entries never change after initialization. + notif_protocols: Vec<handler::ProtocolConfig>, /// Receiver for instructions about who to connect to or disconnect from. peerset: sc_peerset::Peerset, @@ -130,6 +128,19 @@ pub struct Notifications { events: VecDeque<NetworkBehaviourAction<NotifsHandlerIn, NotificationsOut>>, } +/// Configuration for a notifications protocol. +#[derive(Debug, Clone)] +pub struct ProtocolConfig { + /// Name of the protocol. + pub name: Cow<'static, str>, + /// Names of the protocol to use if the main one isn't available. + pub fallback_names: Vec<Cow<'static, str>>, + /// Handshake of the protocol. + pub handshake: Vec<u8>, + /// Maximum allowed size for a notification. + pub max_notification_size: u64, +} + /// Identifier for a delay firing. #[derive(Debug, Copy, Clone, PartialEq, Eq)] struct DelayId(u64); @@ -311,6 +322,9 @@ pub enum NotificationsOut { peer_id: PeerId, /// Peerset set ID the substream is tied to. set_id: sc_peerset::SetId, + /// If `Some`, a fallback protocol name has been used rather the main protocol name. + /// Always matches one of the fallback names passed at initialization. + negotiated_fallback: Option<Cow<'static, str>>, /// Handshake that was sent to us. /// This is normally a "Status" message, but this is out of the concern of this code. received_handshake: Vec<u8>, @@ -358,10 +372,15 @@ impl Notifications { /// Creates a `CustomProtos`. pub fn new( peerset: sc_peerset::Peerset, - notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>, u64)>, + notif_protocols: impl Iterator<Item = ProtocolConfig>, ) -> Self { let notif_protocols = notif_protocols - .map(|(n, hs, sz)| (n, Arc::new(RwLock::new(hs)), sz)) + .map(|cfg| handler::ProtocolConfig { + name: cfg.name, + fallback_names: cfg.fallback_names, + handshake: Arc::new(RwLock::new(cfg.handshake)), + max_notification_size: cfg.max_notification_size, + }) .collect::<Vec<_>>(); assert!(!notif_protocols.is_empty()); @@ -385,7 +404,7 @@ impl Notifications { handshake_message: impl Into<Vec<u8>> ) { if let Some(p) = self.notif_protocols.get_mut(usize::from(set_id)) { - *p.1.write() = handshake_message.into(); + *p.handshake.write() = handshake_message.into(); } else { log::error!(target: "sub-libp2p", "Unknown handshake change set: {:?}", set_id); debug_assert!(false); @@ -1728,7 +1747,9 @@ impl NetworkBehaviour for Notifications { } } - NotifsHandlerOut::OpenResultOk { protocol_index, received_handshake, notifications_sink, .. } => { + NotifsHandlerOut::OpenResultOk { + protocol_index, negotiated_fallback, received_handshake, notifications_sink, .. + } => { let set_id = sc_peerset::SetId::from(protocol_index); trace!(target: "sub-libp2p", "Handler({}, {:?}) => OpenResultOk({:?})", @@ -1748,6 +1769,7 @@ impl NetworkBehaviour for Notifications { let event = NotificationsOut::CustomProtocolOpen { peer_id: source, set_id, + negotiated_fallback, received_handshake, notifications_sink: notifications_sink.clone(), }; diff --git a/substrate/client/network/src/protocol/notifications/handler.rs b/substrate/client/network/src/protocol/notifications/handler.rs index 99677cc45e54ef9da6d374dcfcfcea37b08078ba..3d38182c3c9d6d6461df3cd4ae9951ca259e2213 100644 --- a/substrate/client/network/src/protocol/notifications/handler.rs +++ b/substrate/client/network/src/protocol/notifications/handler.rs @@ -110,7 +110,7 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5); pub struct NotifsHandlerProto { /// Name of protocols, prototypes for upgrades for inbound substreams, and the message we /// send or respond with in the handshake. - protocols: Vec<(Cow<'static, str>, NotificationsIn, Arc<RwLock<Vec<u8>>>, u64)>, + protocols: Vec<ProtocolConfig>, } /// The actual handler once the connection has been established. @@ -135,20 +135,27 @@ pub struct NotifsHandler { >, } +/// Configuration for a notifications protocol. +#[derive(Debug, Clone)] +pub struct ProtocolConfig { + /// Name of the protocol. + pub name: Cow<'static, str>, + /// Names of the protocol to use if the main one isn't available. + pub fallback_names: Vec<Cow<'static, str>>, + /// Handshake of the protocol. The `RwLock` is locked every time a new substream is opened. + pub handshake: Arc<RwLock<Vec<u8>>>, + /// Maximum allowed size for a notification. + pub max_notification_size: u64, +} + /// Fields specific for each individual protocol. struct Protocol { - /// Name of the protocol. - name: Cow<'static, str>, + /// Other fields. + config: ProtocolConfig, /// Prototype for the inbound upgrade. in_upgrade: NotificationsIn, - /// Handshake to send when opening a substream or receiving an open request. - handshake: Arc<RwLock<Vec<u8>>>, - - /// Maximum allowed size of individual notifications. - max_notification_size: u64, - /// Current state of the substreams for this protocol. state: State, } @@ -214,21 +221,25 @@ impl IntoProtocolsHandler for NotifsHandlerProto { fn inbound_protocol(&self) -> UpgradeCollec<NotificationsIn> { self.protocols.iter() - .map(|(_, p, _, _)| p.clone()) + .map(|cfg| NotificationsIn::new(cfg.name.clone(), cfg.fallback_names.clone(), cfg.max_notification_size)) .collect::<UpgradeCollec<_>>() } fn into_handler(self, peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler { NotifsHandler { - protocols: self.protocols.into_iter().map(|(name, in_upgrade, handshake, max_size)| { + protocols: self.protocols.into_iter().map(|config| { + let in_upgrade = NotificationsIn::new( + config.name.clone(), + config.fallback_names.clone(), + config.max_notification_size + ); + Protocol { - name, + config, in_upgrade, - handshake, state: State::Closed { pending_opening: false, }, - max_notification_size: max_size, } }).collect(), peer_id: peer_id.clone(), @@ -271,6 +282,8 @@ pub enum NotifsHandlerOut { OpenResultOk { /// Index of the protocol in the list of protocols passed at initialization. protocol_index: usize, + /// Name of the protocol that was actually negotiated, if the default one wasn't available. + negotiated_fallback: Option<Cow<'static, str>>, /// The endpoint of the connection that is open for custom protocols. endpoint: ConnectedPoint, /// Handshake that was sent to us. @@ -445,18 +458,10 @@ impl NotifsHandlerProto { /// is always the same whether we open a substream ourselves or respond to handshake from /// the remote. pub fn new( - list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>>, + list: impl Into<Vec<ProtocolConfig>>, ) -> Self { - let protocols = list - .into() - .into_iter() - .map(|(proto_name, msg, max_notif_size)| { - (proto_name.clone(), NotificationsIn::new(proto_name, max_notif_size), msg, max_notif_size) - }) - .collect(); - NotifsHandlerProto { - protocols, + protocols: list.into(), } } } @@ -481,7 +486,7 @@ impl ProtocolsHandler for NotifsHandler { fn inject_fully_negotiated_inbound( &mut self, - ((_remote_handshake, mut new_substream), protocol_index): + (mut in_substream_open, protocol_index): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output, (): () ) { @@ -495,7 +500,7 @@ impl ProtocolsHandler for NotifsHandler { )); protocol_info.state = State::OpenDesiredByRemote { - in_substream: new_substream, + in_substream: in_substream_open.substream, pending_opening, }; }, @@ -518,16 +523,16 @@ impl ProtocolsHandler for NotifsHandler { // Create `handshake_message` on a separate line to be sure that the // lock is released as soon as possible. - let handshake_message = protocol_info.handshake.read().clone(); - new_substream.send_handshake(handshake_message); - *in_substream = Some(new_substream); + let handshake_message = protocol_info.config.handshake.read().clone(); + in_substream_open.substream.send_handshake(handshake_message); + *in_substream = Some(in_substream_open.substream); }, } } fn inject_fully_negotiated_outbound( &mut self, - (handshake, substream): <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output, + new_open: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output, protocol_index: Self::OutboundOpenInfo ) { match self.protocols[protocol_index].state { @@ -553,15 +558,16 @@ impl ProtocolsHandler for NotifsHandler { self.protocols[protocol_index].state = State::Open { notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(), - out_substream: Some(substream), + out_substream: Some(new_open.substream), in_substream: in_substream.take(), }; self.events_queue.push_back(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::OpenResultOk { protocol_index, + negotiated_fallback: new_open.negotiated_fallback, endpoint: self.endpoint.clone(), - received_handshake: handshake, + received_handshake: new_open.handshake, notifications_sink } )); @@ -577,9 +583,10 @@ impl ProtocolsHandler for NotifsHandler { State::Closed { pending_opening } => { if !*pending_opening { let proto = NotificationsOut::new( - protocol_info.name.clone(), - protocol_info.handshake.read().clone(), - protocol_info.max_notification_size + protocol_info.config.name.clone(), + protocol_info.config.fallback_names.clone(), + protocol_info.config.handshake.read().clone(), + protocol_info.config.max_notification_size ); self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest { @@ -593,13 +600,14 @@ impl ProtocolsHandler for NotifsHandler { }; }, State::OpenDesiredByRemote { pending_opening, in_substream } => { - let handshake_message = protocol_info.handshake.read().clone(); + let handshake_message = protocol_info.config.handshake.read().clone(); if !*pending_opening { let proto = NotificationsOut::new( - protocol_info.name.clone(), + protocol_info.config.name.clone(), + protocol_info.config.fallback_names.clone(), handshake_message.clone(), - protocol_info.max_notification_size, + protocol_info.config.max_notification_size, ); self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest { diff --git a/substrate/client/network/src/protocol/notifications/tests.rs b/substrate/client/network/src/protocol/notifications/tests.rs index 8efe897afec3a9b5ce787d8fec47c857c973c56f..4c7461c94b20d33360a8f272deb2e3781279a545 100644 --- a/substrate/client/network/src/protocol/notifications/tests.rs +++ b/substrate/client/network/src/protocol/notifications/tests.rs @@ -18,7 +18,7 @@ #![cfg(test)] -use crate::protocol::notifications::{Notifications, NotificationsOut}; +use crate::protocol::notifications::{Notifications, NotificationsOut, ProtocolConfig}; use futures::prelude::*; use libp2p::{PeerId, Multiaddr, Transport}; @@ -80,7 +80,12 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) { }); let behaviour = CustomProtoWithAddr { - inner: Notifications::new(peerset, iter::once(("/foo".into(), Vec::new(), 1024 * 1024))), + inner: Notifications::new(peerset, iter::once(ProtocolConfig { + name: "/foo".into(), + fallback_names: Vec::new(), + handshake: Vec::new(), + max_notification_size: 1024 * 1024 + })), addrs: addrs .iter() .enumerate() diff --git a/substrate/client/network/src/protocol/notifications/upgrade.rs b/substrate/client/network/src/protocol/notifications/upgrade.rs index b23e5eab06d9e06568133e05335ee34635230609..35ae6917272a271952ab15424b5ad6bd8c4ba336 100644 --- a/substrate/client/network/src/protocol/notifications/upgrade.rs +++ b/substrate/client/network/src/protocol/notifications/upgrade.rs @@ -19,8 +19,10 @@ pub use self::collec::UpgradeCollec; pub use self::notifications::{ NotificationsIn, + NotificationsInOpen, NotificationsInSubstream, NotificationsOut, + NotificationsOutOpen, NotificationsOutSubstream, NotificationsHandshakeError, NotificationsOutError, diff --git a/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs b/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs index eba96441bcfdeab5c3175f3aeea7c01834cf8337..e2ef26c81eba9b9672123fb60c572f6793c5d427 100644 --- a/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs +++ b/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs @@ -41,7 +41,7 @@ use futures::prelude::*; use asynchronous_codec::Framed; use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade}; use log::error; -use std::{borrow::Cow, convert::{Infallible, TryFrom as _}, io, iter, mem, pin::Pin, task::{Context, Poll}}; +use std::{borrow::Cow, convert::{Infallible, TryFrom as _}, io, mem, pin::Pin, task::{Context, Poll}, vec}; use unsigned_varint::codec::UviBytes; /// Maximum allowed size of the two handshake messages, in bytes. @@ -52,7 +52,8 @@ const MAX_HANDSHAKE_SIZE: usize = 1024; #[derive(Debug, Clone)] pub struct NotificationsIn { /// Protocol name to use when negotiating the substream. - protocol_name: Cow<'static, str>, + /// The first one is the main name, while the other ones are fall backs. + protocol_names: Vec<Cow<'static, str>>, /// Maximum allowed size for a single notification. max_notification_size: u64, } @@ -62,7 +63,8 @@ pub struct NotificationsIn { #[derive(Debug, Clone)] pub struct NotificationsOut { /// Protocol name to use when negotiating the substream. - protocol_name: Cow<'static, str>, + /// The first one is the main name, while the other ones are fall backs. + protocol_names: Vec<Cow<'static, str>>, /// Message to send when we start the handshake. initial_message: Vec<u8>, /// Maximum allowed size for a single notification. @@ -106,51 +108,54 @@ pub struct NotificationsOutSubstream<TSubstream> { impl NotificationsIn { /// Builds a new potential upgrade. - pub fn new(protocol_name: impl Into<Cow<'static, str>>, max_notification_size: u64) -> Self { + pub fn new( + main_protocol_name: impl Into<Cow<'static, str>>, + fallback_names: Vec<Cow<'static, str>>, + max_notification_size: u64 + ) -> Self { + let mut protocol_names = fallback_names; + protocol_names.insert(0, main_protocol_name.into()); + NotificationsIn { - protocol_name: protocol_name.into(), + protocol_names, max_notification_size, } } } impl UpgradeInfo for NotificationsIn { - type Info = Cow<'static, [u8]>; - type InfoIter = iter::Once<Self::Info>; + type Info = StringProtocolName; + type InfoIter = vec::IntoIter<Self::Info>; fn protocol_info(&self) -> Self::InfoIter { - let bytes: Cow<'static, [u8]> = match &self.protocol_name { - Cow::Borrowed(s) => Cow::Borrowed(s.as_bytes()), - Cow::Owned(s) => Cow::Owned(s.as_bytes().to_vec()) - }; - iter::once(bytes) + self.protocol_names.iter().cloned().map(StringProtocolName).collect::<Vec<_>>().into_iter() } } impl<TSubstream> InboundUpgrade<TSubstream> for NotificationsIn where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - type Output = (Vec<u8>, NotificationsInSubstream<TSubstream>); + type Output = NotificationsInOpen<TSubstream>; type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>; type Error = NotificationsHandshakeError; fn upgrade_inbound( self, mut socket: TSubstream, - _: Self::Info, + negotiated_name: Self::Info, ) -> Self::Future { Box::pin(async move { - let initial_message_len = unsigned_varint::aio::read_usize(&mut socket).await?; - if initial_message_len > MAX_HANDSHAKE_SIZE { + let handshake_len = unsigned_varint::aio::read_usize(&mut socket).await?; + if handshake_len > MAX_HANDSHAKE_SIZE { return Err(NotificationsHandshakeError::TooLarge { - requested: initial_message_len, + requested: handshake_len, max: MAX_HANDSHAKE_SIZE, }); } - let mut initial_message = vec![0u8; initial_message_len]; - if !initial_message.is_empty() { - socket.read_exact(&mut initial_message).await?; + let mut handshake = vec![0u8; handshake_len]; + if !handshake.is_empty() { + socket.read_exact(&mut handshake).await?; } let mut codec = UviBytes::default(); @@ -161,11 +166,30 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, handshake: NotificationsInSubstreamHandshake::NotSent, }; - Ok((initial_message, substream)) + Ok(NotificationsInOpen { + handshake, + negotiated_fallback: if negotiated_name.0 == self.protocol_names[0] { + None + } else { + Some(negotiated_name.0) + }, + substream, + }) }) } } +/// Yielded by the [`NotificationsIn`] after a successfuly upgrade. +pub struct NotificationsInOpen<TSubstream> { + /// Handshake sent by the remote. + pub handshake: Vec<u8>, + /// If the negotiated name is not the "main" protocol name but a fallback, contains the + /// name of the negotiated fallback. + pub negotiated_fallback: Option<Cow<'static, str>>, + /// Implementation of `Stream` that allows receives messages from the substream. + pub substream: NotificationsInSubstream<TSubstream>, +} + impl<TSubstream> NotificationsInSubstream<TSubstream> where TSubstream: AsyncRead + AsyncWrite + Unpin, { @@ -296,7 +320,8 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, impl NotificationsOut { /// Builds a new potential upgrade. pub fn new( - protocol_name: impl Into<Cow<'static, str>>, + main_protocol_name: impl Into<Cow<'static, str>>, + fallback_names: Vec<Cow<'static, str>>, initial_message: impl Into<Vec<u8>>, max_notification_size: u64, ) -> Self { @@ -305,38 +330,47 @@ impl NotificationsOut { error!(target: "sub-libp2p", "Outbound networking handshake is above allowed protocol limit"); } + let mut protocol_names = fallback_names; + protocol_names.insert(0, main_protocol_name.into()); + NotificationsOut { - protocol_name: protocol_name.into(), + protocol_names, initial_message, max_notification_size, } } } +/// Implementation of the `ProtocolName` trait, where the protocol name is a string. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StringProtocolName(Cow<'static, str>); + +impl upgrade::ProtocolName for StringProtocolName { + fn protocol_name(&self) -> &[u8] { + self.0.as_bytes() + } +} + impl UpgradeInfo for NotificationsOut { - type Info = Cow<'static, [u8]>; - type InfoIter = iter::Once<Self::Info>; + type Info = StringProtocolName; + type InfoIter = vec::IntoIter<Self::Info>; fn protocol_info(&self) -> Self::InfoIter { - let bytes: Cow<'static, [u8]> = match &self.protocol_name { - Cow::Borrowed(s) => Cow::Borrowed(s.as_bytes()), - Cow::Owned(s) => Cow::Owned(s.as_bytes().to_vec()) - }; - iter::once(bytes) + self.protocol_names.iter().cloned().map(StringProtocolName).collect::<Vec<_>>().into_iter() } } impl<TSubstream> OutboundUpgrade<TSubstream> for NotificationsOut where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - type Output = (Vec<u8>, NotificationsOutSubstream<TSubstream>); + type Output = NotificationsOutOpen<TSubstream>; type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>; type Error = NotificationsHandshakeError; fn upgrade_outbound( self, mut socket: TSubstream, - _: Self::Info, + negotiated_name: Self::Info, ) -> Self::Future { Box::pin(async move { upgrade::write_with_len_prefix(&mut socket, &self.initial_message).await?; @@ -358,13 +392,32 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, let mut codec = UviBytes::default(); codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::max_value())); - Ok((handshake, NotificationsOutSubstream { - socket: Framed::new(socket, codec), - })) + Ok(NotificationsOutOpen { + handshake, + negotiated_fallback: if negotiated_name.0 == self.protocol_names[0] { + None + } else { + Some(negotiated_name.0) + }, + substream: NotificationsOutSubstream { + socket: Framed::new(socket, codec), + } + }) }) } } +/// Yielded by the [`NotificationsOut`] after a successfuly upgrade. +pub struct NotificationsOutOpen<TSubstream> { + /// Handshake returned by the remote. + pub handshake: Vec<u8>, + /// If the negotiated name is not the "main" protocol name but a fallback, contains the + /// name of the negotiated fallback. + pub negotiated_fallback: Option<Cow<'static, str>>, + /// Implementation of `Sink` that allows sending messages on the substream. + pub substream: NotificationsOutSubstream<TSubstream>, +} + impl<TSubstream> Sink<Vec<u8>> for NotificationsOutSubstream<TSubstream> where TSubstream: AsyncRead + AsyncWrite + Unpin, { @@ -436,7 +489,7 @@ pub enum NotificationsOutError { #[cfg(test)] mod tests { - use super::{NotificationsIn, NotificationsOut}; + use super::{NotificationsIn, NotificationsInOpen, NotificationsOut, NotificationsOutOpen}; use async_std::net::{TcpListener, TcpStream}; use futures::{prelude::*, channel::oneshot}; @@ -450,9 +503,9 @@ mod tests { let client = async_std::task::spawn(async move { let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); - let (handshake, mut substream) = upgrade::apply_outbound( + let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound( socket, - NotificationsOut::new(PROTO_NAME, &b"initial message"[..], 1024 * 1024), + NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024), upgrade::Version::V1 ).await.unwrap(); @@ -465,12 +518,12 @@ mod tests { listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); let (socket, _) = listener.accept().await.unwrap(); - let (initial_message, mut substream) = upgrade::apply_inbound( + let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound( socket, - NotificationsIn::new(PROTO_NAME, 1024 * 1024) + NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024) ).await.unwrap(); - assert_eq!(initial_message, b"initial message"); + assert_eq!(handshake, b"initial message"); substream.send_handshake(&b"hello world"[..]); let msg = substream.next().await.unwrap().unwrap(); @@ -489,9 +542,9 @@ mod tests { let client = async_std::task::spawn(async move { let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); - let (handshake, mut substream) = upgrade::apply_outbound( + let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound( socket, - NotificationsOut::new(PROTO_NAME, vec![], 1024 * 1024), + NotificationsOut::new(PROTO_NAME, Vec::new(), vec![], 1024 * 1024), upgrade::Version::V1 ).await.unwrap(); @@ -504,12 +557,12 @@ mod tests { listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); let (socket, _) = listener.accept().await.unwrap(); - let (initial_message, mut substream) = upgrade::apply_inbound( + let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound( socket, - NotificationsIn::new(PROTO_NAME, 1024 * 1024) + NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024) ).await.unwrap(); - assert!(initial_message.is_empty()); + assert!(handshake.is_empty()); substream.send_handshake(vec![]); let msg = substream.next().await.unwrap().unwrap(); @@ -528,7 +581,7 @@ mod tests { let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); let outcome = upgrade::apply_outbound( socket, - NotificationsOut::new(PROTO_NAME, &b"hello"[..], 1024 * 1024), + NotificationsOut::new(PROTO_NAME, Vec::new(), &b"hello"[..], 1024 * 1024), upgrade::Version::V1 ).await; @@ -543,12 +596,12 @@ mod tests { listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); let (socket, _) = listener.accept().await.unwrap(); - let (initial_msg, substream) = upgrade::apply_inbound( + let NotificationsInOpen { handshake, substream, .. } = upgrade::apply_inbound( socket, - NotificationsIn::new(PROTO_NAME, 1024 * 1024) + NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024) ).await.unwrap(); - assert_eq!(initial_msg, b"hello"); + assert_eq!(handshake, b"hello"); // We successfully upgrade to the protocol, but then close the substream. drop(substream); @@ -567,7 +620,7 @@ mod tests { let ret = upgrade::apply_outbound( socket, // We check that an initial message that is too large gets refused. - NotificationsOut::new(PROTO_NAME, (0..32768).map(|_| 0).collect::<Vec<_>>(), 1024 * 1024), + NotificationsOut::new(PROTO_NAME, Vec::new(), (0..32768).map(|_| 0).collect::<Vec<_>>(), 1024 * 1024), upgrade::Version::V1 ).await; assert!(ret.is_err()); @@ -580,7 +633,7 @@ mod tests { let (socket, _) = listener.accept().await.unwrap(); let ret = upgrade::apply_inbound( socket, - NotificationsIn::new(PROTO_NAME, 1024 * 1024) + NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024) ).await; assert!(ret.is_err()); }); @@ -597,7 +650,7 @@ mod tests { let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); let ret = upgrade::apply_outbound( socket, - NotificationsOut::new(PROTO_NAME, &b"initial message"[..], 1024 * 1024), + NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024), upgrade::Version::V1 ).await; assert!(ret.is_err()); @@ -608,11 +661,11 @@ mod tests { listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); let (socket, _) = listener.accept().await.unwrap(); - let (initial_message, mut substream) = upgrade::apply_inbound( + let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound( socket, - NotificationsIn::new(PROTO_NAME, 1024 * 1024) + NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024) ).await.unwrap(); - assert_eq!(initial_message, b"initial message"); + assert_eq!(handshake, b"initial message"); // We check that a handshake that is too large gets refused. substream.send_handshake((0..32768).map(|_| 0).collect::<Vec<_>>()); diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 99036c5effad8be7305bd0f490117642a41d06be..03b71b8c86f5ec7c46a4fff898c890005df1cd52 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -1541,7 +1541,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> { } }, Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened { - remote, protocol, notifications_sink, role + remote, protocol, negotiated_fallback, notifications_sink, role })) => { if let Some(metrics) = this.metrics.as_ref() { metrics.notifications_streams_opened_total @@ -1554,6 +1554,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> { this.event_streams.send(Event::NotificationStreamOpened { remote, protocol, + negotiated_fallback, role, }); }, diff --git a/substrate/client/network/src/service/tests.rs b/substrate/client/network/src/service/tests.rs index dd4a0597cbcbc13a09c1f0f59ee150ca0f18d9a8..4e5bba8f7d33f52b92849097e5c3688301558b92 100644 --- a/substrate/client/network/src/service/tests.rs +++ b/substrate/client/network/src/service/tests.rs @@ -159,6 +159,7 @@ fn build_nodes_one_proto() extra_sets: vec![ config::NonDefaultSetConfig { notifications_protocol: PROTOCOL_NAME, + fallback_names: Vec::new(), max_notification_size: 1024 * 1024, set_config: Default::default() } @@ -172,6 +173,7 @@ fn build_nodes_one_proto() extra_sets: vec![ config::NonDefaultSetConfig { notifications_protocol: PROTOCOL_NAME, + fallback_names: Vec::new(), max_notification_size: 1024 * 1024, set_config: config::SetConfig { reserved_nodes: vec![config::MultiaddrWithPeerId { @@ -328,6 +330,7 @@ fn lots_of_incoming_peers_works() { extra_sets: vec![ config::NonDefaultSetConfig { notifications_protocol: PROTOCOL_NAME, + fallback_names: Vec::new(), max_notification_size: 1024 * 1024, set_config: config::SetConfig { in_peers: u32::max_value(), @@ -353,6 +356,7 @@ fn lots_of_incoming_peers_works() { extra_sets: vec![ config::NonDefaultSetConfig { notifications_protocol: PROTOCOL_NAME, + fallback_names: Vec::new(), max_notification_size: 1024 * 1024, set_config: config::SetConfig { reserved_nodes: vec![config::MultiaddrWithPeerId { @@ -456,6 +460,81 @@ fn notifications_back_pressure() { }); } +#[test] +fn fallback_name_working() { + // Node 1 supports the protocols "new" and "old". Node 2 only supports "old". Checks whether + // they can connect. + + const NEW_PROTOCOL_NAME: Cow<'static, str> = + Cow::Borrowed("/new-shiny-protocol-that-isnt-PROTOCOL_NAME"); + + let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())]; + + let (node1, mut events_stream1) = build_test_full_node(config::NetworkConfiguration { + extra_sets: vec![ + config::NonDefaultSetConfig { + notifications_protocol: NEW_PROTOCOL_NAME.clone(), + fallback_names: vec![PROTOCOL_NAME], + max_notification_size: 1024 * 1024, + set_config: Default::default() + } + ], + listen_addresses: vec![listen_addr.clone()], + transport: config::TransportConfig::MemoryOnly, + .. config::NetworkConfiguration::new_local() + }); + + let (_, mut events_stream2) = build_test_full_node(config::NetworkConfiguration { + extra_sets: vec![ + config::NonDefaultSetConfig { + notifications_protocol: PROTOCOL_NAME, + fallback_names: Vec::new(), + max_notification_size: 1024 * 1024, + set_config: config::SetConfig { + reserved_nodes: vec![config::MultiaddrWithPeerId { + multiaddr: listen_addr, + peer_id: node1.local_peer_id().clone(), + }], + .. Default::default() + } + } + ], + listen_addresses: vec![], + transport: config::TransportConfig::MemoryOnly, + .. config::NetworkConfiguration::new_local() + }); + + let receiver = async_std::task::spawn(async move { + // Wait for the `NotificationStreamOpened`. + loop { + match events_stream2.next().await.unwrap() { + Event::NotificationStreamOpened { protocol, negotiated_fallback, .. } => { + assert_eq!(protocol, PROTOCOL_NAME); + assert_eq!(negotiated_fallback, None); + break + }, + _ => {} + }; + } + }); + + async_std::task::block_on(async move { + // Wait for the `NotificationStreamOpened`. + loop { + match events_stream1.next().await.unwrap() { + Event::NotificationStreamOpened { protocol, negotiated_fallback, .. } => { + assert_eq!(protocol, NEW_PROTOCOL_NAME); + assert_eq!(negotiated_fallback, Some(PROTOCOL_NAME)); + break + }, + _ => {} + }; + } + + receiver.await; + }); +} + #[test] #[should_panic(expected = "don't match the transport")] fn ensure_listen_addresses_consistent_with_transport_memory() { diff --git a/substrate/client/network/src/transactions.rs b/substrate/client/network/src/transactions.rs index b694182e6a231f58c9159ec9809775e30b2350a2..8a7dd78c834ce875de57906aeab93e5184dca76e 100644 --- a/substrate/client/network/src/transactions.rs +++ b/substrate/client/network/src/transactions.rs @@ -136,6 +136,7 @@ impl TransactionsHandlerPrototype { pub fn set_config(&self) -> config::NonDefaultSetConfig { config::NonDefaultSetConfig { notifications_protocol: self.protocol_name.clone(), + fallback_names: Vec::new(), max_notification_size: MAX_TRANSACTIONS_SIZE, set_config: config::SetConfig { in_peers: 0, @@ -318,7 +319,7 @@ impl<B: BlockT + 'static, H: ExHashT> TransactionsHandler<B, H> { } }, - Event::NotificationStreamOpened { remote, protocol, role } if protocol == self.protocol_name => { + Event::NotificationStreamOpened { remote, protocol, role, .. } if protocol == self.protocol_name => { let _was_in = self.peers.insert(remote, Peer { known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS) .expect("Constant is nonzero")), diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 689eca8aac5dd42521dd51cc2a3623eca8b36738..8e56005dad25d38eac4bc337c588665629b109f4 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -742,6 +742,7 @@ pub trait TestNetFactory: Sized where <Self::BlockImport as BlockImport<Block>>: network_config.extra_sets = config.notifications_protocols.into_iter().map(|p| { NonDefaultSetConfig { notifications_protocol: p, + fallback_names: Vec::new(), max_notification_size: 1024 * 1024, set_config: Default::default() }