Newer
Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The Network Bridge Subsystem - protocol multiplexer for Polkadot.
Bastian Köcher
committed
#![deny(unused_crate_dependencies)]
use parity_scale_codec::{Encode, Decode};
use futures::prelude::*;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use sc_network::Event as NetworkEvent;
ActiveLeavesUpdate, FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
use polkadot_subsystem::messages::{
NetworkBridgeMessage, AllMessages, AvailabilityDistributionMessage,
BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage,
CollatorProtocolMessage,
};
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
use polkadot_node_network_protocol::{
ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1
};
use std::collections::{HashMap, hash_map};
use std::iter::ExactSizeIterator;
use std::pin::Pin;
use std::sync::Arc;
mod validator_discovery;
/// The maximum amount of heads a peer is allowed to have in their view at any time.
///
/// We use the same limit to compute the view sent to peers locally.
const MAX_VIEW_HEADS: usize = 5;
/// The protocol name for the validation peer-set.
pub const VALIDATION_PROTOCOL_NAME: &'static str = "/polkadot/validation/1";
/// The protocol name for the collation peer-set.
pub const COLLATION_PROTOCOL_NAME: &'static str = "/polkadot/collation/1";
const MALFORMED_MESSAGE_COST: ReputationChange
= ReputationChange::new(-500, "Malformed Network-bridge message");
const UNCONNECTED_PEERSET_COST: ReputationChange
= ReputationChange::new(-50, "Message sent to un-connected peer-set");
const MALFORMED_VIEW_COST: ReputationChange
= ReputationChange::new(-500, "Malformed view");
// network bridge log target
const LOG_TARGET: &'static str = "network_bridge";
/// Messages received on the network.
#[derive(Debug, Encode, Decode, Clone)]
/// A message from a peer on a specific protocol.
#[codec(index = "1")]
/// A view update from a peer.
#[codec(index = "2")]
ViewUpdate(View),
}
/// Information about the notifications protocol. Should be used during network configuration
/// or shortly after startup to register the protocol with the network service.
pub fn notifications_protocol_info() -> Vec<std::borrow::Cow<'static, str>> {
VALIDATION_PROTOCOL_NAME.into(),
COLLATION_PROTOCOL_NAME.into(),
}
/// An action to be carried out by the network.
pub enum NetworkAction {
/// Note a change in reputation for a peer.
ReputationChange(PeerId, ReputationChange),
/// Write a notification to a given peer on the given peer-set.
WriteNotification(PeerId, PeerSet, Vec<u8>),
}
/// An abstraction over networking for the purposes of this subsystem.
pub trait Network: Send + 'static {
/// Get a stream of all events occurring on the network. This may include events unrelated
/// to the Polkadot protocol - the user of this function should filter only for events related
/// to the [`VALIDATION_PROTOCOL_NAME`](VALIDATION_PROTOCOL_NAME)
/// or [`COLLATION_PROTOCOL_NAME`](COLLATION_PROTOCOL_NAME)
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>;
/// Get access to an underlying sink for all network actions.
fn action_sink<'a>(&'a mut self) -> Pin<
Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>
>;
/// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
fn report_peer(&mut self, who: PeerId, cost_benefit: ReputationChange)
-> BoxFuture<SubsystemResult<()>>
{
async move {
self.action_sink().send(NetworkAction::ReputationChange(who, cost_benefit)).await
}.boxed()
}
/// Write a notification to a peer on the given peer-set's protocol.
fn write_notification(&mut self, who: PeerId, peer_set: PeerSet, message: Vec<u8>)
-> BoxFuture<SubsystemResult<()>>
{
async move {
self.action_sink().send(NetworkAction::WriteNotification(who, peer_set, message)).await
}.boxed()
}
}
impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn action_sink<'a>(&'a mut self)
-> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>
{
use futures::task::{Poll, Context};
// wrapper around a NetworkService to make it act like a sink.
struct ActionSink<'b>(&'b sc_network::NetworkService<Block, Hash>);
impl<'b> Sink<NetworkAction> for ActionSink<'b> {
type Error = SubsystemError;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> {
match action {
NetworkAction::ReputationChange(peer, cost_benefit) => {
tracing::debug!("reputation: {:?} for {}", cost_benefit, peer);
self.0.report_peer(
peer,
cost_benefit,
)
}
NetworkAction::WriteNotification(peer, peer_set, message) => {
match peer_set {
PeerSet::Validation => self.0.write_notification(
peer,
VALIDATION_PROTOCOL_NAME.into(),
message,
),
PeerSet::Collation => self.0.write_notification(
peer,
COLLATION_PROTOCOL_NAME.into(),
}
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
Poll::Ready(Ok(()))
}
}
Box::pin(ActionSink(&**self))
}
}
/// The network bridge subsystem.
Loading full blame...