// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! The Network Bridge Subsystem - protocol multiplexer for Polkadot.
use parity_scale_codec::{Encode, Decode};
use futures::prelude::*;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use sc_network::{
ObservedRole, ReputationChange, PeerId,
Event as NetworkEvent,
};
use sp_runtime::ConsensusEngineId;
use polkadot_subsystem::{
FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
SubsystemResult,
};
use polkadot_subsystem::messages::{NetworkBridgeEvent, NetworkBridgeMessage, AllMessages};
use node_primitives::{ProtocolId, View};
use polkadot_primitives::v1::{Block, Hash};
use std::collections::btree_map::{BTreeMap, Entry as BEntry};
use std::collections::hash_map::{HashMap, Entry as HEntry};
use std::pin::Pin;
use std::sync::Arc;
/// The maximum amount of heads a peer is allowed to have in their view at any time.
///
/// We use the same limit to compute the view sent to peers locally.
const MAX_VIEW_HEADS: usize = 5;
/// The engine ID of the polkadot network protocol.
pub const POLKADOT_ENGINE_ID: ConsensusEngineId = *b"dot2";
/// The protocol name.
pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/2";
const MALFORMED_MESSAGE_COST: ReputationChange
= ReputationChange::new(-500, "Malformed Network-bridge message");
const UNKNOWN_PROTO_COST: ReputationChange
= ReputationChange::new(-50, "Message sent to unknown protocol");
const MALFORMED_VIEW_COST: ReputationChange
= ReputationChange::new(-500, "Malformed view");
/// Messages received on the network.
#[derive(Debug, Encode, Decode, Clone)]
pub enum WireMessage {
/// A message from a peer on a specific protocol.
#[codec(index = "1")]
ProtocolMessage(ProtocolId, Vec),
/// A view update from a peer.
#[codec(index = "2")]
ViewUpdate(View),
}
/// Information about the notifications protocol. Should be used during network configuration
/// or shortly after startup to register the protocol with the network service.
pub fn notifications_protocol_info() -> (ConsensusEngineId, std::borrow::Cow<'static, [u8]>) {
(POLKADOT_ENGINE_ID, POLKADOT_PROTOCOL_NAME.into())
}
/// An action to be carried out by the network.
#[derive(PartialEq)]
pub enum NetworkAction {
/// Note a change in reputation for a peer.
ReputationChange(PeerId, ReputationChange),
/// Write a notification to a given peer.
WriteNotification(PeerId, Vec),
}
/// An abstraction over networking for the purposes of this subsystem.
pub trait Network: Send + 'static {
/// Get a stream of all events occurring on the network. This may include events unrelated
/// to the Polkadot protocol - the user of this function should filter only for events related
/// to the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID).
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>;
/// Get access to an underlying sink for all network actions.
fn action_sink<'a>(&'a mut self) -> Pin<
Box + Send + 'a>
>;
/// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
fn report_peer(&mut self, who: PeerId, cost_benefit: ReputationChange)
-> BoxFuture>
{
async move {
self.action_sink().send(NetworkAction::ReputationChange(who, cost_benefit)).await
}.boxed()
}
/// Write a notification to a peer on the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID) topic.
fn write_notification(&mut self, who: PeerId, message: Vec)
-> BoxFuture>
{
async move {
self.action_sink().send(NetworkAction::WriteNotification(who, message)).await
}.boxed()
}
}
impl Network for Arc> {
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
}
fn action_sink<'a>(&'a mut self)
-> Pin + Send + 'a>>
{
use futures::task::{Poll, Context};
// wrapper around a NetworkService to make it act like a sink.
struct ActionSink<'b>(&'b sc_network::NetworkService);
impl<'b> Sink for ActionSink<'b> {
type Error = SubsystemError;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> {
match action {
NetworkAction::ReputationChange(peer, cost_benefit) => self.0.report_peer(
peer,
cost_benefit,
),
NetworkAction::WriteNotification(peer, message) => self.0.write_notification(
peer,
POLKADOT_ENGINE_ID,
message,
),
}
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> {
Poll::Ready(Ok(()))
}
}
Box::pin(ActionSink(&**self))
}
}
/// The network bridge subsystem.
pub struct NetworkBridge(N);
impl NetworkBridge {
/// Create a new network bridge subsystem with underlying network service.
///
/// This assumes that the network service has had the notifications protocol for the network
/// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info).
pub fn new(net_service: N) -> Self {
NetworkBridge(net_service)
}
}
impl Subsystem for NetworkBridge
where
Net: Network,
Context: SubsystemContext,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`.
SpawnedSubsystem {
name: "network-bridge-subsystem",
future: run_network(self.0, ctx).map(|_| ()).boxed(),
}
}
}
struct PeerData {
/// Latest view sent by the peer.
view: View,
/// The role of the peer.
role: ObservedRole,
}
#[derive(Debug)]
enum Action {
RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages),
SendMessage(Vec, ProtocolId, Vec),
ReportPeer(PeerId, ReputationChange),
StartWork(Hash),
StopWork(Hash),
PeerConnected(PeerId, ObservedRole),
PeerDisconnected(PeerId),
PeerMessages(PeerId, Vec),
Abort,
}
fn action_from_overseer_message(
res: polkadot_subsystem::SubsystemResult>,
) -> Action {
match res {
Ok(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)))
=> Action::StartWork(relay_parent),
Ok(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)))
=> Action::StopWork(relay_parent),
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort,
Ok(FromOverseer::Communication { msg }) => match msg {
NetworkBridgeMessage::RegisterEventProducer(protocol_id, message_producer)
=> Action::RegisterEventProducer(protocol_id, message_producer),
NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep),
NetworkBridgeMessage::SendMessage(peers, protocol, message)
=> Action::SendMessage(peers, protocol, message),
},
Err(e) => {
log::warn!("Shutting down Network Bridge due to error {:?}", e);
Action::Abort
}
}
}
fn action_from_network_message(event: Option) -> Option {
match event {
None => {
log::info!("Shutting down Network Bridge: underlying event stream concluded");
Some(Action::Abort)
}
Some(NetworkEvent::Dht(_)) => None,
Some(NetworkEvent::NotificationStreamOpened { remote, engine_id, role }) => {
if engine_id == POLKADOT_ENGINE_ID {
Some(Action::PeerConnected(remote, role))
} else {
None
}
}
Some(NetworkEvent::NotificationStreamClosed { remote, engine_id }) => {
if engine_id == POLKADOT_ENGINE_ID {
Some(Action::PeerDisconnected(remote))
} else {
None
}
}
Some(NetworkEvent::NotificationsReceived { remote, messages }) => {
let v: Result, _> = messages.iter()
.filter(|(engine_id, _)| engine_id == &POLKADOT_ENGINE_ID)
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
.collect();
match v {
Err(_) => Some(Action::ReportPeer(remote, MALFORMED_MESSAGE_COST)),
Ok(v) => if v.is_empty() {
None
} else {
Some(Action::PeerMessages(remote, v))
}
}
}
}
}
fn construct_view(live_heads: &[Hash]) -> View {
View(live_heads.iter().rev().take(MAX_VIEW_HEADS).cloned().collect())
}
async fn dispatch_update_to_all(
update: NetworkBridgeEvent,
event_producers: impl IntoIterator AllMessages>,
ctx: &mut impl SubsystemContext,
) -> polkadot_subsystem::SubsystemResult<()> {
// collect messages here to avoid the borrow lasting across await boundary.
let messages: Vec<_> = event_producers.into_iter()
.map(|producer| producer(update.clone()))
.collect();
ctx.send_messages(messages).await
}
async fn update_view(
peers: &HashMap,
live_heads: &[Hash],
net: &mut impl Network,
local_view: &mut View,
) -> SubsystemResult