diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index f24fff6791f69d2104d2120f1cac5dc0e22b8fae..a4343dc085b240f45279a4b62732133b67da2b03 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5247,6 +5247,7 @@ dependencies = [ "polkadot-node-primitives", "polkadot-primitives", "sc-network", + "strum 0.20.0", ] [[package]] @@ -8415,7 +8416,7 @@ dependencies = [ "lazy_static", "sp-core", "sp-runtime", - "strum", + "strum 0.16.0", ] [[package]] @@ -8805,7 +8806,16 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6138f8f88a16d90134763314e3fc76fa3ed6a7db4725d6acf9a3ef95a3188d22" dependencies = [ - "strum_macros", + "strum_macros 0.16.0", +] + +[[package]] +name = "strum" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7318c509b5ba57f18533982607f24070a55d353e90d4cae30c467cdb2ad5ac5c" +dependencies = [ + "strum_macros 0.20.1", ] [[package]] @@ -8820,6 +8830,18 @@ dependencies = [ "syn 1.0.58", ] +[[package]] +name = "strum_macros" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee8bc6b87a5112aeeab1f4a9f7ab634fe6cbefc4850006df31267f4cfb9e3149" +dependencies = [ + "heck", + "proc-macro2 1.0.24", + "quote 1.0.7", + "syn 1.0.58", +] + [[package]] name = "substrate-bip39" version = "0.4.2" diff --git a/polkadot/node/network/bridge/src/action.rs b/polkadot/node/network/bridge/src/action.rs new file mode 100644 index 0000000000000000000000000000000000000000..27cbeefbbd2a4f8c96ce75178cfa3cfacebe5b2d --- /dev/null +++ b/polkadot/node/network/bridge/src/action.rs @@ -0,0 +1,184 @@ +// Copyright 2020-2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. +// + +use futures::channel::mpsc; + +use parity_scale_codec::Decode; +use polkadot_node_network_protocol::{ + peer_set::PeerSet, v1 as protocol_v1, PeerId, ReputationChange, +}; +use polkadot_primitives::v1::{AuthorityDiscoveryId, BlockNumber}; +use polkadot_subsystem::messages::NetworkBridgeMessage; +use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal}; +use sc_network::Event as NetworkEvent; + +use polkadot_node_network_protocol::ObservedRole; + +use super::{WireMessage, LOG_TARGET, MALFORMED_MESSAGE_COST}; + +/// Internal type combining all actions a `NetworkBridge` might perform. +/// +/// Both messages coming from the network (`NetworkEvent`) and messages coming from other +/// subsystems (`FromOverseer`) will be converted to `Action` in `run_network` before being +/// processed. +#[derive(Debug)] +pub(crate) enum Action { + /// Ask network to send a validation message. + SendValidationMessages(Vec<(Vec<PeerId>, protocol_v1::ValidationProtocol)>), + + /// Ask network to send a collation message. + SendCollationMessages(Vec<(Vec<PeerId>, protocol_v1::CollationProtocol)>), + + /// Ask network to connect to validators. + ConnectToValidators { + validator_ids: Vec<AuthorityDiscoveryId>, + connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, + }, + + /// Report a peer to the network implementation (decreasing/increasing its reputation). + ReportPeer(PeerId, ReputationChange), + + /// A subsystem updates us on the relay chain leaves we consider active. + /// + /// Implementation will send `WireMessage::ViewUpdate` message to peers as appropriate to the + /// change. + ActiveLeaves(ActiveLeavesUpdate), + + /// A subsystem updates our view on the latest finalized block. + /// + /// This information is used for view updates, see also `ActiveLeaves`. + BlockFinalized(BlockNumber), + + /// Network tells us about a new peer. + PeerConnected(PeerSet, PeerId, ObservedRole), + + /// Network tells us about a peer that left. + PeerDisconnected(PeerSet, PeerId), + + /// Messages from the network targeted to other subsystems. + PeerMessages( + PeerId, + Vec<WireMessage<protocol_v1::ValidationProtocol>>, + Vec<WireMessage<protocol_v1::CollationProtocol>>, + ), + + Abort, + Nop, +} + +impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>> for Action { + #[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))] + fn from(res: polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>) -> Self { + match res { + Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves))) => { + Action::ActiveLeaves(active_leaves) + } + Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number))) => { + Action::BlockFinalized(number) + } + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort, + Ok(FromOverseer::Communication { msg }) => match msg { + NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep), + NetworkBridgeMessage::SendValidationMessage(peers, msg) => { + Action::SendValidationMessages(vec![(peers, msg)]) + } + NetworkBridgeMessage::SendCollationMessage(peers, msg) => { + Action::SendCollationMessages(vec![(peers, msg)]) + } + NetworkBridgeMessage::SendValidationMessages(msgs) => { + Action::SendValidationMessages(msgs) + } + NetworkBridgeMessage::SendCollationMessages(msgs) => { + Action::SendCollationMessages(msgs) + } + NetworkBridgeMessage::ConnectToValidators { + validator_ids, + connected, + } => Action::ConnectToValidators { + validator_ids, + connected, + }, + }, + Err(e) => { + tracing::warn!(target: LOG_TARGET, err = ?e, "Shutting down Network Bridge due to error"); + Action::Abort + } + } + } +} + +impl From<Option<NetworkEvent>> for Action { + #[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))] + fn from(event: Option<NetworkEvent>) -> Action { + match event { + None => { + tracing::info!( + target: LOG_TARGET, + "Shutting down Network Bridge: underlying event stream concluded" + ); + Action::Abort + } + Some(NetworkEvent::Dht(_)) + | Some(NetworkEvent::SyncConnected { .. }) + | Some(NetworkEvent::SyncDisconnected { .. }) => Action::Nop, + Some(NetworkEvent::NotificationStreamOpened { + remote, + protocol, + role, + }) => { + let role = role.into(); + PeerSet::try_from_protocol_name(&protocol).map_or(Action::Nop, |peer_set| { + Action::PeerConnected(peer_set, remote, role) + }) + } + Some(NetworkEvent::NotificationStreamClosed { remote, protocol }) => { + PeerSet::try_from_protocol_name(&protocol).map_or(Action::Nop, |peer_set| { + Action::PeerDisconnected(peer_set, remote) + }) + } + Some(NetworkEvent::NotificationsReceived { remote, messages }) => { + let v_messages: Result<Vec<_>, _> = messages + .iter() + .filter(|(protocol, _)| protocol == &PeerSet::Validation.into_protocol_name()) + .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref())) + .collect(); + + let v_messages = match v_messages { + Err(_) => return Action::ReportPeer(remote, MALFORMED_MESSAGE_COST), + Ok(v) => v, + }; + + let c_messages: Result<Vec<_>, _> = messages + .iter() + .filter(|(protocol, _)| protocol == &PeerSet::Collation.into_protocol_name()) + .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref())) + .collect(); + + match c_messages { + Err(_) => Action::ReportPeer(remote, MALFORMED_MESSAGE_COST), + Ok(c_messages) => { + if v_messages.is_empty() && c_messages.is_empty() { + Action::Nop + } else { + Action::PeerMessages(remote, v_messages, c_messages) + } + } + } + } + } + } +} diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 3512ee48eecf2bab1ff1b37373ef4307024fa359..dc0f0903bd333ec06ea77e3631b2d4df839e53b7 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -22,14 +22,9 @@ use parity_scale_codec::{Encode, Decode}; use futures::prelude::*; -use futures::future::BoxFuture; -use futures::stream::BoxStream; -use futures::channel::mpsc; - -use sc_network::Event as NetworkEvent; use polkadot_subsystem::{ - ActiveLeavesUpdate, FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError, + ActiveLeavesUpdate, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError, SubsystemResult, JaegerSpan, }; use polkadot_subsystem::messages::{ @@ -37,27 +32,41 @@ use polkadot_subsystem::messages::{ BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage, CollatorProtocolMessage, }; -use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash, BlockNumber}; +use polkadot_primitives::v1::{Hash, BlockNumber}; use polkadot_node_network_protocol::{ - ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1, OurView, + ReputationChange, PeerId, peer_set::PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1, OurView, }; +/// Peer set infos for network initialization. +/// +/// To be added to [`NetworkConfiguration::extra_sets`]. +pub use polkadot_node_network_protocol::peer_set::peer_sets_info; + use std::collections::{HashMap, hash_map}; use std::iter::ExactSizeIterator; -use std::pin::Pin; use std::sync::Arc; mod validator_discovery; +/// Internally used `Action` type. +/// +/// All requested `NetworkBridgeMessage` user actions and `NetworkEvent` network messages are +/// translated to `Action` before being processed by `run_network`. +mod action; +use action::Action; + +/// Actual interfacing to the network based on the `Network` trait. +/// +/// Defines the `Network` trait with an implementation for an `Arc<NetworkService>`. +mod network; +use network::{Network, send_message}; + + /// The maximum amount of heads a peer is allowed to have in their view at any time. /// /// We use the same limit to compute the view sent to peers locally. const MAX_VIEW_HEADS: usize = 5; -/// The protocol name for the validation peer-set. -pub const VALIDATION_PROTOCOL_NAME: &'static str = "/polkadot/validation/1"; -/// The protocol name for the collation peer-set. -pub const COLLATION_PROTOCOL_NAME: &'static str = "/polkadot/collation/1"; const MALFORMED_MESSAGE_COST: ReputationChange = ReputationChange::new(-500, "Malformed Network-bridge message"); const UNCONNECTED_PEERSET_COST: ReputationChange = ReputationChange::new(-50, "Message sent to un-connected peer-set"); @@ -67,7 +76,9 @@ const EMPTY_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Peer sent // network bridge log target const LOG_TARGET: &'static str = "network_bridge"; -/// Messages received on the network. +/// Messages from and to the network. +/// +/// As transmitted to and received from subsystems. #[derive(Debug, Encode, Decode, Clone)] pub enum WireMessage<M> { /// A message from a peer on a specific protocol. @@ -78,136 +89,10 @@ pub enum WireMessage<M> { ViewUpdate(View), } -/// Information about the extra peers set. Should be used during network configuration -/// to register the protocol with the network service. -pub fn peers_sets_info() -> Vec<sc_network::config::NonDefaultSetConfig> { - vec![ - sc_network::config::NonDefaultSetConfig { - notifications_protocol: VALIDATION_PROTOCOL_NAME.into(), - set_config: sc_network::config::SetConfig { - in_peers: 25, - out_peers: 0, - reserved_nodes: Vec::new(), - non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept, - }, - }, - sc_network::config::NonDefaultSetConfig { - notifications_protocol: COLLATION_PROTOCOL_NAME.into(), - set_config: sc_network::config::SetConfig { - in_peers: 25, - out_peers: 0, - reserved_nodes: Vec::new(), - non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept, - }, - } - ] -} - -/// An action to be carried out by the network. -#[derive(Debug, PartialEq)] -pub enum NetworkAction { - /// Note a change in reputation for a peer. - ReputationChange(PeerId, ReputationChange), - /// Write a notification to a given peer on the given peer-set. - WriteNotification(PeerId, PeerSet, Vec<u8>), -} - -/// An abstraction over networking for the purposes of this subsystem. -pub trait Network: Send + 'static { - /// Get a stream of all events occurring on the network. This may include events unrelated - /// to the Polkadot protocol - the user of this function should filter only for events related - /// to the [`VALIDATION_PROTOCOL_NAME`](VALIDATION_PROTOCOL_NAME) - /// or [`COLLATION_PROTOCOL_NAME`](COLLATION_PROTOCOL_NAME) - fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>; - - /// Get access to an underlying sink for all network actions. - fn action_sink<'a>(&'a mut self) -> Pin< - Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a> - >; - - /// Report a given peer as either beneficial (+) or costly (-) according to the given scalar. - fn report_peer(&mut self, who: PeerId, cost_benefit: ReputationChange) - -> BoxFuture<SubsystemResult<()>> - { - async move { - self.action_sink().send(NetworkAction::ReputationChange(who, cost_benefit)).await - }.boxed() - } - - /// Write a notification to a peer on the given peer-set's protocol. - fn write_notification(&mut self, who: PeerId, peer_set: PeerSet, message: Vec<u8>) - -> BoxFuture<SubsystemResult<()>> - { - async move { - self.action_sink().send(NetworkAction::WriteNotification(who, peer_set, message)).await - }.boxed() - } -} - -impl Network for Arc<sc_network::NetworkService<Block, Hash>> { - fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> { - sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed() - } - - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - fn action_sink<'a>(&'a mut self) - -> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>> - { - use futures::task::{Poll, Context}; - - // wrapper around a NetworkService to make it act like a sink. - struct ActionSink<'b>(&'b sc_network::NetworkService<Block, Hash>); - - impl<'b> Sink<NetworkAction> for ActionSink<'b> { - type Error = SubsystemError; - - fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> { - Poll::Ready(Ok(())) - } - - fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> { - match action { - NetworkAction::ReputationChange(peer, cost_benefit) => { - tracing::debug!(target: LOG_TARGET, "Changing reputation: {:?} for {}", cost_benefit, peer); - self.0.report_peer( - peer, - cost_benefit, - ) - } - NetworkAction::WriteNotification(peer, peer_set, message) => { - match peer_set { - PeerSet::Validation => self.0.write_notification( - peer, - VALIDATION_PROTOCOL_NAME.into(), - message, - ), - PeerSet::Collation => self.0.write_notification( - peer, - COLLATION_PROTOCOL_NAME.into(), - message, - ), - } - } - } - - Ok(()) - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> { - Poll::Ready(Ok(())) - } - } - - Box::pin(ActionSink(&**self)) - } -} /// The network bridge subsystem. pub struct NetworkBridge<N, AD> { + /// `Network` trait implementing type. network_service: N, authority_discovery_service: AD, } @@ -256,114 +141,193 @@ struct PeerData { view: View, } -#[derive(Debug)] -enum Action { - SendValidationMessages(Vec<(Vec<PeerId>, protocol_v1::ValidationProtocol)>), - SendCollationMessages(Vec<(Vec<PeerId>, protocol_v1::CollationProtocol)>), - ConnectToValidators { - validator_ids: Vec<AuthorityDiscoveryId>, - connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, - }, - ReportPeer(PeerId, ReputationChange), - - ActiveLeaves(ActiveLeavesUpdate), - BlockFinalized(BlockNumber), - - PeerConnected(PeerSet, PeerId, ObservedRole), - PeerDisconnected(PeerSet, PeerId), - PeerMessages( - PeerId, - Vec<WireMessage<protocol_v1::ValidationProtocol>>, - Vec<WireMessage<protocol_v1::CollationProtocol>>, - ), - - Abort, - Nop, -} +/// Main driver, processing network events and messages from other subsystems. +#[tracing::instrument(skip(network_service, authority_discovery_service, ctx), fields(subsystem = LOG_TARGET))] +async fn run_network<N, AD>( + mut network_service: N, + mut authority_discovery_service: AD, + mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>, +) -> SubsystemResult<()> +where + N: Network + validator_discovery::Network, + AD: validator_discovery::AuthorityDiscovery, +{ + let mut event_stream = network_service.event_stream().fuse(); -#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))] -fn action_from_overseer_message( - res: polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>, -) -> Action { - match res { - Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves))) - => Action::ActiveLeaves(active_leaves), - Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number))) - => Action::BlockFinalized(number), - Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort, - Ok(FromOverseer::Communication { msg }) => match msg { - NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep), - NetworkBridgeMessage::SendValidationMessage(peers, msg) - => Action::SendValidationMessages(vec![(peers, msg)]), - NetworkBridgeMessage::SendCollationMessage(peers, msg) - => Action::SendCollationMessages(vec![(peers, msg)]), - NetworkBridgeMessage::SendValidationMessages(msgs) - => Action::SendValidationMessages(msgs), - NetworkBridgeMessage::SendCollationMessages(msgs) - => Action::SendCollationMessages(msgs), - NetworkBridgeMessage::ConnectToValidators { validator_ids, connected } - => Action::ConnectToValidators { validator_ids, connected }, - }, - Err(e) => { - tracing::warn!(target: LOG_TARGET, err = ?e, "Shutting down Network Bridge due to error"); - Action::Abort - } - } -} + // Most recent heads are at the back. + let mut live_heads: Vec<(Hash, Arc<JaegerSpan>)> = Vec::with_capacity(MAX_VIEW_HEADS); + let mut local_view = View::default(); + let mut finalized_number = 0; -#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))] -fn action_from_network_message(event: Option<NetworkEvent>) -> Action { - match event { - None => { - tracing::info!(target: LOG_TARGET, "Shutting down Network Bridge: underlying event stream concluded"); - Action::Abort - } - Some(NetworkEvent::Dht(_)) | - Some(NetworkEvent::SyncConnected { .. }) | - Some(NetworkEvent::SyncDisconnected { .. }) => Action::Nop, - Some(NetworkEvent::NotificationStreamOpened { remote, protocol, role }) => { - let role = role.into(); - match protocol { - x if x == VALIDATION_PROTOCOL_NAME - => Action::PeerConnected(PeerSet::Validation, remote, role), - x if x == COLLATION_PROTOCOL_NAME - => Action::PeerConnected(PeerSet::Collation, remote, role), - _ => Action::Nop, + let mut validation_peers: HashMap<PeerId, PeerData> = HashMap::new(); + let mut collation_peers: HashMap<PeerId, PeerData> = HashMap::new(); + + let mut validator_discovery = validator_discovery::Service::<N, AD>::new(); + + loop { + + let action = { + let subsystem_next = ctx.recv().fuse(); + let mut net_event_next = event_stream.next().fuse(); + futures::pin_mut!(subsystem_next); + + futures::select! { + subsystem_msg = subsystem_next => Action::from(subsystem_msg), + net_event = net_event_next => Action::from(net_event), } - } - Some(NetworkEvent::NotificationStreamClosed { remote, protocol }) => { - match protocol { - x if x == VALIDATION_PROTOCOL_NAME - => Action::PeerDisconnected(PeerSet::Validation, remote), - x if x == COLLATION_PROTOCOL_NAME - => Action::PeerDisconnected(PeerSet::Collation, remote), - _ => Action::Nop, + }; + + match action { + Action::Nop => {} + Action::Abort => return Ok(()), + + Action::SendValidationMessages(msgs) => { + for (peers, msg) in msgs { + send_message( + &mut network_service, + peers, + PeerSet::Validation, + WireMessage::ProtocolMessage(msg), + ).await? + } } - } - Some(NetworkEvent::NotificationsReceived { remote, messages }) => { - let v_messages: Result<Vec<_>, _> = messages.iter() - .filter(|(protocol, _)| protocol == &VALIDATION_PROTOCOL_NAME) - .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref())) - .collect(); - - let v_messages = match v_messages { - Err(_) => return Action::ReportPeer(remote, MALFORMED_MESSAGE_COST), - Ok(v) => v, - }; - - let c_messages: Result<Vec<_>, _> = messages.iter() - .filter(|(protocol, _)| protocol == &COLLATION_PROTOCOL_NAME) - .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref())) - .collect(); - - match c_messages { - Err(_) => Action::ReportPeer(remote, MALFORMED_MESSAGE_COST), - Ok(c_messages) => if v_messages.is_empty() && c_messages.is_empty() { - Action::Nop - } else { - Action::PeerMessages(remote, v_messages, c_messages) - }, + + Action::SendCollationMessages(msgs) => { + for (peers, msg) in msgs { + send_message( + &mut network_service, + peers, + PeerSet::Collation, + WireMessage::ProtocolMessage(msg), + ).await? + } + } + + Action::ConnectToValidators { + validator_ids, + connected, + } => { + let (ns, ads) = validator_discovery.on_request( + validator_ids, + connected, + network_service, + authority_discovery_service, + ).await; + network_service = ns; + authority_discovery_service = ads; + }, + + Action::ReportPeer(peer, rep) => network_service.report_peer(peer, rep).await?, + + Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { + live_heads.extend(activated); + live_heads.retain(|h| !deactivated.contains(&h.0)); + + update_our_view( + &mut network_service, + &mut ctx, + &live_heads, + &mut local_view, + finalized_number, + &validation_peers, + &collation_peers, + ).await?; } + + Action::BlockFinalized(number) => { + debug_assert!(finalized_number < number); + + // we don't send the view updates here, but delay them until the next `Action::ActiveLeaves` + // otherwise it might break assumptions of some of the subsystems + // that we never send the same `ActiveLeavesUpdate` + // this is fine, we will get `Action::ActiveLeaves` on block finalization anyway + finalized_number = number; + }, + + Action::PeerConnected(peer_set, peer, role) => { + let peer_map = match peer_set { + PeerSet::Validation => &mut validation_peers, + PeerSet::Collation => &mut collation_peers, + }; + + validator_discovery.on_peer_connected(&peer, &mut authority_discovery_service).await; + + match peer_map.entry(peer.clone()) { + hash_map::Entry::Occupied(_) => continue, + hash_map::Entry::Vacant(vacant) => { + let _ = vacant.insert(PeerData { + view: View::default(), + }); + + match peer_set { + PeerSet::Validation => dispatch_validation_events_to_all( + vec![ + NetworkBridgeEvent::PeerConnected(peer.clone(), role), + NetworkBridgeEvent::PeerViewChange( + peer, + View::default(), + ), + ], + &mut ctx, + ).await, + PeerSet::Collation => dispatch_collation_events_to_all( + vec![ + NetworkBridgeEvent::PeerConnected(peer.clone(), role), + NetworkBridgeEvent::PeerViewChange( + peer, + View::default(), + ), + ], + &mut ctx, + ).await, + } + } + } + } + Action::PeerDisconnected(peer_set, peer) => { + let peer_map = match peer_set { + PeerSet::Validation => &mut validation_peers, + PeerSet::Collation => &mut collation_peers, + }; + + validator_discovery.on_peer_disconnected(&peer); + + if peer_map.remove(&peer).is_some() { + match peer_set { + PeerSet::Validation => dispatch_validation_event_to_all( + NetworkBridgeEvent::PeerDisconnected(peer), + &mut ctx, + ).await, + PeerSet::Collation => dispatch_collation_event_to_all( + NetworkBridgeEvent::PeerDisconnected(peer), + &mut ctx, + ).await, + } + } + }, + Action::PeerMessages(peer, v_messages, c_messages) => { + if !v_messages.is_empty() { + let events = handle_peer_messages( + peer.clone(), + &mut validation_peers, + v_messages, + &mut network_service, + ).await?; + + dispatch_validation_events_to_all(events, &mut ctx).await; + } + + if !c_messages.is_empty() { + let events = handle_peer_messages( + peer.clone(), + &mut collation_peers, + c_messages, + &mut network_service, + ).await?; + + dispatch_collation_events_to_all(events, &mut ctx).await; + } + }, } } } @@ -497,41 +461,6 @@ async fn send_collation_message<I>( send_message(net, peers, PeerSet::Collation, message).await } -async fn send_message<M, I>( - net: &mut impl Network, - peers: I, - peer_set: PeerSet, - message: WireMessage<M>, -) -> SubsystemResult<()> - where - M: Encode + Clone, - I: IntoIterator<Item=PeerId>, - I::IntoIter: ExactSizeIterator, -{ - let mut message_producer = stream::iter({ - let peers = peers.into_iter(); - let n_peers = peers.len(); - let mut message = Some(message.encode()); - - peers.enumerate().map(move |(i, peer)| { - // optimization: avoid cloning the message for the last peer in the - // list. The message payload can be quite large. If the underlying - // network used `Bytes` this would not be necessary. - let message = if i == n_peers - 1 { - message.take() - .expect("Only taken in last iteration of loop, never afterwards; qed") - } else { - message.as_ref() - .expect("Only taken in last iteration of loop, we are not there yet; qed") - .clone() - }; - - Ok(NetworkAction::WriteNotification(peer, peer_set, message)) - }) - }); - - net.action_sink().send_all(&mut message_producer).await -} async fn dispatch_validation_event_to_all( event: NetworkBridgeEvent<protocol_v1::ValidationProtocol>, @@ -597,210 +526,27 @@ async fn dispatch_collation_events_to_all<I>( ctx.send_messages(events.into_iter().flat_map(messages_for)).await } -#[tracing::instrument(skip(network_service, authority_discovery_service, ctx), fields(subsystem = LOG_TARGET))] -async fn run_network<N, AD>( - mut network_service: N, - mut authority_discovery_service: AD, - mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>, -) -> SubsystemResult<()> -where - N: Network + validator_discovery::Network, - AD: validator_discovery::AuthorityDiscovery, -{ - let mut event_stream = network_service.event_stream().fuse(); - - // Most recent heads are at the back. - let mut live_heads: Vec<(Hash, Arc<JaegerSpan>)> = Vec::with_capacity(MAX_VIEW_HEADS); - let mut local_view = View::default(); - let mut finalized_number = 0; - - let mut validation_peers: HashMap<PeerId, PeerData> = HashMap::new(); - let mut collation_peers: HashMap<PeerId, PeerData> = HashMap::new(); - - let mut validator_discovery = validator_discovery::Service::<N, AD>::new(); - - loop { - - let action = { - let subsystem_next = ctx.recv().fuse(); - let mut net_event_next = event_stream.next().fuse(); - futures::pin_mut!(subsystem_next); - - futures::select! { - subsystem_msg = subsystem_next => action_from_overseer_message(subsystem_msg), - net_event = net_event_next => action_from_network_message(net_event), - } - }; - - match action { - Action::Nop => {} - Action::Abort => return Ok(()), - - Action::SendValidationMessages(msgs) => { - for (peers, msg) in msgs { - send_message( - &mut network_service, - peers, - PeerSet::Validation, - WireMessage::ProtocolMessage(msg), - ).await? - } - } - - Action::SendCollationMessages(msgs) => { - for (peers, msg) in msgs { - send_message( - &mut network_service, - peers, - PeerSet::Collation, - WireMessage::ProtocolMessage(msg), - ).await? - } - } - - Action::ConnectToValidators { - validator_ids, - connected, - } => { - let (ns, ads) = validator_discovery.on_request( - validator_ids, - connected, - network_service, - authority_discovery_service, - ).await; - network_service = ns; - authority_discovery_service = ads; - }, - - Action::ReportPeer(peer, rep) => network_service.report_peer(peer, rep).await?, - - Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { - live_heads.extend(activated); - live_heads.retain(|h| !deactivated.contains(&h.0)); - - update_our_view( - &mut network_service, - &mut ctx, - &live_heads, - &mut local_view, - finalized_number, - &validation_peers, - &collation_peers, - ).await?; - } - - Action::BlockFinalized(number) => { - debug_assert!(finalized_number < number); - - // we don't send the view updates here, but delay them until the next `Action::ActiveLeaves` - // otherwise it might break assumptions of some of the subsystems - // that we never send the same `ActiveLeavesUpdate` - // this is fine, we will get `Action::ActiveLeaves` on block finalization anyway - finalized_number = number; - }, - - Action::PeerConnected(peer_set, peer, role) => { - let peer_map = match peer_set { - PeerSet::Validation => &mut validation_peers, - PeerSet::Collation => &mut collation_peers, - }; - validator_discovery.on_peer_connected(&peer, &mut authority_discovery_service).await; - - match peer_map.entry(peer.clone()) { - hash_map::Entry::Occupied(_) => continue, - hash_map::Entry::Vacant(vacant) => { - let _ = vacant.insert(PeerData { - view: View::default(), - }); - - match peer_set { - PeerSet::Validation => dispatch_validation_events_to_all( - vec![ - NetworkBridgeEvent::PeerConnected(peer.clone(), role), - NetworkBridgeEvent::PeerViewChange( - peer, - View::default(), - ), - ], - &mut ctx, - ).await, - PeerSet::Collation => dispatch_collation_events_to_all( - vec![ - NetworkBridgeEvent::PeerConnected(peer.clone(), role), - NetworkBridgeEvent::PeerViewChange( - peer, - View::default(), - ), - ], - &mut ctx, - ).await, - } - } - } - } - Action::PeerDisconnected(peer_set, peer) => { - let peer_map = match peer_set { - PeerSet::Validation => &mut validation_peers, - PeerSet::Collation => &mut collation_peers, - }; - - validator_discovery.on_peer_disconnected(&peer); - - if peer_map.remove(&peer).is_some() { - match peer_set { - PeerSet::Validation => dispatch_validation_event_to_all( - NetworkBridgeEvent::PeerDisconnected(peer), - &mut ctx, - ).await, - PeerSet::Collation => dispatch_collation_event_to_all( - NetworkBridgeEvent::PeerDisconnected(peer), - &mut ctx, - ).await, - } - } - }, - Action::PeerMessages(peer, v_messages, c_messages) => { - if !v_messages.is_empty() { - let events = handle_peer_messages( - peer.clone(), - &mut validation_peers, - v_messages, - &mut network_service, - ).await?; - - dispatch_validation_events_to_all(events, &mut ctx).await; - } - - if !c_messages.is_empty() { - let events = handle_peer_messages( - peer.clone(), - &mut collation_peers, - c_messages, - &mut network_service, - ).await?; - - dispatch_collation_events_to_all(events, &mut ctx).await; - } - }, - } - } -} #[cfg(test)] mod tests { use super::*; use futures::executor; + use futures::stream::BoxStream; + use std::pin::Pin; + use std::sync::Arc; use std::borrow::Cow; - use std::sync::Arc; use std::collections::HashSet; use async_trait::async_trait; use parking_lot::Mutex; use assert_matches::assert_matches; + use sc_network::Event as NetworkEvent; + use polkadot_subsystem::messages::{StatementDistributionMessage, BitfieldDistributionMessage}; + use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal}; use polkadot_node_subsystem_test_helpers::{ SingleItemSink, SingleItemStream, TestSubsystemContextHandle, }; @@ -808,6 +554,10 @@ mod tests { use polkadot_node_network_protocol::view; use sc_network::Multiaddr; use sp_keyring::Sr25519Keyring; + use polkadot_primitives::v1::AuthorityDiscoveryId; + use polkadot_node_network_protocol::ObservedRole; + + use crate::network::{Network, NetworkAction}; // The subsystem's view of the network - only supports a single call to `event_stream`. struct TestNetwork { @@ -845,13 +595,6 @@ mod tests { ) } - fn peer_set_protocol(peer_set: PeerSet) -> std::borrow::Cow<'static, str> { - match peer_set { - PeerSet::Validation => VALIDATION_PROTOCOL_NAME.into(), - PeerSet::Collation => COLLATION_PROTOCOL_NAME.into(), - } - } - impl Network for TestNetwork { fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> { self.net_events.lock() @@ -908,7 +651,7 @@ mod tests { async fn connect_peer(&mut self, peer: PeerId, peer_set: PeerSet, role: ObservedRole) { self.send_network_event(NetworkEvent::NotificationStreamOpened { remote: peer, - protocol: peer_set_protocol(peer_set), + protocol: peer_set.into_protocol_name(), role: role.into(), }).await; } @@ -916,14 +659,14 @@ mod tests { async fn disconnect_peer(&mut self, peer: PeerId, peer_set: PeerSet) { self.send_network_event(NetworkEvent::NotificationStreamClosed { remote: peer, - protocol: peer_set_protocol(peer_set), + protocol: peer_set.into_protocol_name(), }).await; } async fn peer_message(&mut self, peer: PeerId, peer_set: PeerSet, message: Vec<u8>) { self.send_network_event(NetworkEvent::NotificationsReceived { remote: peer, - messages: vec![(peer_set_protocol(peer_set), message.into())], + messages: vec![(peer_set.into_protocol_name(), message.into())], }).await; } diff --git a/polkadot/node/network/bridge/src/network.rs b/polkadot/node/network/bridge/src/network.rs new file mode 100644 index 0000000000000000000000000000000000000000..c6a2bc4bdd2da8986cbe2b7da0220fc1f7d5efd7 --- /dev/null +++ b/polkadot/node/network/bridge/src/network.rs @@ -0,0 +1,183 @@ +// Copyright 2020-2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +use std::pin::Pin; +use std::sync::Arc; + +use futures::future::BoxFuture; +use futures::prelude::*; +use futures::stream::BoxStream; + +use parity_scale_codec::Encode; + +use sc_network::Event as NetworkEvent; + +use super::LOG_TARGET; +use polkadot_node_network_protocol::{peer_set::PeerSet, PeerId, ReputationChange}; +use polkadot_primitives::v1::{Block, Hash}; +use polkadot_subsystem::{SubsystemError, SubsystemResult}; + +/// Send a message to the network. +/// +/// This function is only used internally by the network-bridge, which is responsible to only send +/// messages that are compatible with the passed peer set, as that is currently not enforced by +/// this function. These are messages of type `WireMessage` parameterized on the matching type. +pub(crate) async fn send_message<M, I>( + net: &mut impl Network, + peers: I, + peer_set: PeerSet, + message: M, +) -> SubsystemResult<()> +where + M: Encode + Clone, + I: IntoIterator<Item = PeerId>, + I::IntoIter: ExactSizeIterator, +{ + let mut message_producer = stream::iter({ + let peers = peers.into_iter(); + let n_peers = peers.len(); + let mut message = Some(message.encode()); + + peers.enumerate().map(move |(i, peer)| { + // optimization: avoid cloning the message for the last peer in the + // list. The message payload can be quite large. If the underlying + // network used `Bytes` this would not be necessary. + let message = if i == n_peers - 1 { + message + .take() + .expect("Only taken in last iteration of loop, never afterwards; qed") + } else { + message + .as_ref() + .expect("Only taken in last iteration of loop, we are not there yet; qed") + .clone() + }; + + Ok(NetworkAction::WriteNotification(peer, peer_set, message)) + }) + }); + + net.action_sink().send_all(&mut message_producer).await +} + +/// An action to be carried out by the network. +/// +/// This type is used for implementing `Sink` in order to cummunicate asynchronously with the +/// underlying network implementation in the `Network` trait. +#[derive(Debug, PartialEq)] +pub enum NetworkAction { + /// Note a change in reputation for a peer. + ReputationChange(PeerId, ReputationChange), + /// Write a notification to a given peer on the given peer-set. + WriteNotification(PeerId, PeerSet, Vec<u8>), +} + +/// An abstraction over networking for the purposes of this subsystem. +/// +pub trait Network: Send + 'static { + /// Get a stream of all events occurring on the network. This may include events unrelated + /// to the Polkadot protocol - the user of this function should filter only for events related + /// to the [`VALIDATION_PROTOCOL_NAME`](VALIDATION_PROTOCOL_NAME) + /// or [`COLLATION_PROTOCOL_NAME`](COLLATION_PROTOCOL_NAME) + fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>; + + /// Get access to an underlying sink for all network actions. + fn action_sink<'a>( + &'a mut self, + ) -> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>; + + /// Report a given peer as either beneficial (+) or costly (-) according to the given scalar. + fn report_peer( + &mut self, + who: PeerId, + cost_benefit: ReputationChange, + ) -> BoxFuture<SubsystemResult<()>> { + async move { + self.action_sink() + .send(NetworkAction::ReputationChange(who, cost_benefit)) + .await + } + .boxed() + } + + /// Write a notification to a peer on the given peer-set's protocol. + fn write_notification( + &mut self, + who: PeerId, + peer_set: PeerSet, + message: Vec<u8>, + ) -> BoxFuture<SubsystemResult<()>> { + async move { + self.action_sink() + .send(NetworkAction::WriteNotification(who, peer_set, message)) + .await + } + .boxed() + } +} + +impl Network for Arc<sc_network::NetworkService<Block, Hash>> { + fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> { + sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed() + } + + #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] + fn action_sink<'a>( + &'a mut self, + ) -> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>> { + use futures::task::{Context, Poll}; + + // wrapper around a NetworkService to make it act like a sink. + struct ActionSink<'b>(&'b sc_network::NetworkService<Block, Hash>); + + impl<'b> Sink<NetworkAction> for ActionSink<'b> { + type Error = SubsystemError; + + fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> { + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> { + match action { + NetworkAction::ReputationChange(peer, cost_benefit) => { + tracing::debug!( + target: LOG_TARGET, + "Changing reputation: {:?} for {}", + cost_benefit, + peer + ); + self.0.report_peer(peer, cost_benefit) + } + NetworkAction::WriteNotification(peer, peer_set, message) => self + .0 + .write_notification(peer, peer_set.into_protocol_name(), message), + } + + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> { + Poll::Ready(Ok(())) + } + } + + Box::pin(ActionSink(&**self)) + } +} diff --git a/polkadot/node/network/bridge/src/validator_discovery.rs b/polkadot/node/network/bridge/src/validator_discovery.rs index 89e72a7aa9cfe44df53bba9cca0ed117d952c84b..926aa37066496a261ea8c63902b4567ebaab0ea7 100644 --- a/polkadot/node/network/bridge/src/validator_discovery.rs +++ b/polkadot/node/network/bridge/src/validator_discovery.rs @@ -28,6 +28,7 @@ use sc_network::multiaddr::{Multiaddr, Protocol}; use sc_authority_discovery::Service as AuthorityDiscoveryService; use polkadot_node_network_protocol::PeerId; use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash}; +use polkadot_node_network_protocol::peer_set::PeerSet; const LOG_TARGET: &str = "validator_discovery"; @@ -276,24 +277,24 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> { // ask the network to connect to these nodes and not disconnect // from them until removed from the set if let Err(e) = network_service.add_peers_to_reserved_set( - super::COLLATION_PROTOCOL_NAME.into(), + PeerSet::Collation.into_protocol_name(), multiaddr_to_add.clone(), ).await { tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress"); } if let Err(e) = network_service.add_peers_to_reserved_set( - super::VALIDATION_PROTOCOL_NAME.into(), + PeerSet::Validation.into_protocol_name(), multiaddr_to_add, ).await { tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress"); } // the addresses are known to be valid let _ = network_service.remove_peers_from_reserved_set( - super::COLLATION_PROTOCOL_NAME.into(), + PeerSet::Collation.into_protocol_name(), multiaddr_to_remove.clone() ).await; let _ = network_service.remove_peers_from_reserved_set( - super::VALIDATION_PROTOCOL_NAME.into(), + PeerSet::Validation.into_protocol_name(), multiaddr_to_remove ).await; diff --git a/polkadot/node/network/protocol/Cargo.toml b/polkadot/node/network/protocol/Cargo.toml index 148cc14efd20b2561a90766f5c1b5d17642a95d7..f06f2ccd4e06428d1ec7f735c45f063cd8a7e888 100644 --- a/polkadot/node/network/protocol/Cargo.toml +++ b/polkadot/node/network/protocol/Cargo.toml @@ -11,3 +11,4 @@ polkadot-node-primitives = { path = "../../primitives" } polkadot-node-jaeger = { path = "../../jaeger" } parity-scale-codec = { version = "1.3.6", default-features = false, features = ["derive"] } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } +strum = { version = "0.20", features = ["derive"] } diff --git a/polkadot/node/network/protocol/src/lib.rs b/polkadot/node/network/protocol/src/lib.rs index 35dc3b3a1702ad7efa758a92d1a7ba1f82be8e23..c833ba4b151570caed3a118808cc8fed46e662f9 100644 --- a/polkadot/node/network/protocol/src/lib.rs +++ b/polkadot/node/network/protocol/src/lib.rs @@ -29,6 +29,10 @@ pub use polkadot_node_jaeger::JaegerSpan; #[doc(hidden)] pub use std::sync::Arc; + +/// Peer-sets and protocols used for parachains. +pub mod peer_set; + /// A unique identifier of a request. pub type RequestId = u64; @@ -47,15 +51,6 @@ impl fmt::Display for WrongVariant { impl std::error::Error for WrongVariant {} -/// The peer-sets that the network manages. Different subsystems will use different peer-sets. -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum PeerSet { - /// The validation peer-set is responsible for all messages related to candidate validation and communication among validators. - Validation, - /// The collation peer-set is used for validator<>collator communication. - Collation, -} - /// The advertised role of a node. #[derive(Debug, Clone, Copy, PartialEq)] pub enum ObservedRole { diff --git a/polkadot/node/network/protocol/src/peer_set.rs b/polkadot/node/network/protocol/src/peer_set.rs new file mode 100644 index 0000000000000000000000000000000000000000..3554aea5c0c38a3d4af60eba83d183251cdc524e --- /dev/null +++ b/polkadot/node/network/protocol/src/peer_set.rs @@ -0,0 +1,90 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +//! All peersets and protocols used for parachains. + +use sc_network::config::{NonDefaultSetConfig, SetConfig}; +use std::borrow::Cow; +use strum::{EnumIter, IntoEnumIterator}; + +/// The peer-sets and thus the protocols which are used for the network. +#[derive(Debug, Clone, Copy, PartialEq, EnumIter)] +pub enum PeerSet { + /// The validation peer-set is responsible for all messages related to candidate validation and communication among validators. + Validation, + /// The collation peer-set is used for validator<>collator communication. + Collation, +} + +impl PeerSet { + /// Get `sc_network` peer set configurations for each peerset. + /// + /// Those should be used in the network configuration to register the protocols with the + /// network service. + pub fn get_info(self) -> NonDefaultSetConfig { + let protocol = self.into_protocol_name(); + match self { + PeerSet::Validation => NonDefaultSetConfig { + notifications_protocol: protocol, + set_config: sc_network::config::SetConfig { + in_peers: 25, + out_peers: 0, + reserved_nodes: Vec::new(), + non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept, + }, + }, + PeerSet::Collation => NonDefaultSetConfig { + notifications_protocol: protocol, + set_config: SetConfig { + in_peers: 25, + out_peers: 0, + reserved_nodes: Vec::new(), + non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept, + }, + }, + } + } + + /// Get the protocol name associated with each peer set as static str. + pub const fn get_protocol_name_static(self) -> &'static str { + match self { + PeerSet::Validation => "/polkadot/validation/1", + PeerSet::Collation => "/polkadot/collation/1", + } + } + + /// Convert a peer set into a protocol name as understood by Substrate. + pub fn into_protocol_name(self) -> Cow<'static, str> { + self.get_protocol_name_static().into() + } + + /// Try parsing a protocol name into a peer set. + pub fn try_from_protocol_name(name: &Cow<'static, str>) -> Option<PeerSet> { + match name { + n if n == &PeerSet::Validation.into_protocol_name() => Some(PeerSet::Validation), + n if n == &PeerSet::Collation.into_protocol_name() => Some(PeerSet::Collation), + _ => None, + } + } +} + +/// Get `NonDefaultSetConfig`s for all available peer sets. +/// +/// Should be used during network configuration (added to [`NetworkConfiguration::extra_sets`]) +/// or shortly after startup to register the protocols with the network service. +pub fn peer_sets_info() -> Vec<sc_network::config::NonDefaultSetConfig> { + PeerSet::iter().map(PeerSet::get_info).collect() +} diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index ed86833888ad98653381788816cf7cd994ce0423..0b37e3ae48f6d57816cc1e31fa3a83d029eb011e 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -567,7 +567,7 @@ pub fn new_full<RuntimeApi, Executor>( // Substrate nodes. config.network.extra_sets.push(grandpa::grandpa_peers_set_config()); #[cfg(feature = "real-overseer")] - config.network.extra_sets.extend(polkadot_network_bridge::peers_sets_info()); + config.network.extra_sets.extend(polkadot_network_bridge::peer_sets_info()); let (network, network_status_sinks, system_rpc_tx, network_starter) = service::build_network(service::BuildNetworkParams { diff --git a/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md b/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md index 9f51094336f4ef3314a27bba74848984558abd69..abcff82a16b69e2d7c2a089a705e534192952a22 100644 --- a/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md +++ b/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md @@ -32,7 +32,7 @@ Output: This network bridge sends messages of these types over the network. ```rust -enum ProtocolMessage<M> { +enum WireMessage<M> { ProtocolMessage(M), ViewUpdate(View), } @@ -41,8 +41,8 @@ enum ProtocolMessage<M> { and instantiates this type twice, once using the [`ValidationProtocolV1`][VP1] message type, and once with the [`CollationProtocolV1`][CP1] message type. ```rust -type ValidationV1Message = ProtocolMessage<ValidationProtocolV1>; -type CollationV1Message = ProtocolMessage<CollationProtocolV1>; +type ValidationV1Message = WireMessage<ValidationProtocolV1>; +type CollationV1Message = WireMessage<CollationProtocolV1>; ``` ### Startup