// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! Polkadot-specific network implementation.
//!
//! This manages routing for parachain statements, parachain block and outgoing message
//! data fetching, communication between collators and validators, and more.
mod collator_pool;
mod local_collations;
mod router;
pub mod validation;
pub mod gossip;
use codec::{Decode, Encode};
use futures::channel::{oneshot, mpsc};
use futures::prelude::*;
use futures::future::Either;
use polkadot_primitives::{Block, Hash, Header};
use polkadot_primitives::parachain::{
Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock,
StructuredUnroutedIngress, ValidatorId, OutgoingMessages, ErasureChunk,
};
use sc_network::{
PeerId, RequestId, Context, StatusMessage as GenericFullStatus,
specialization::NetworkSpecialization as Specialization,
};
use sc_network::consensus_gossip::{
self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage,
};
use self::validation::{LiveValidationLeaves, RecentValidatorIds, InsertedRecentKey};
use self::collator_pool::{CollatorPool, Role, Action};
use self::local_collations::LocalCollations;
use log::{trace, debug, warn};
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::task::{Context as PollContext, Poll};
use crate::gossip::{POLKADOT_ENGINE_ID, GossipMessage, ErasureChunkMessage};
#[cfg(test)]
mod tests;
mod cost {
use sc_network::ReputationChange as Rep;
pub(super) const UNEXPECTED_MESSAGE: Rep = Rep::new(-200, "Polkadot: Unexpected message");
pub(super) const UNEXPECTED_ROLE: Rep = Rep::new(-200, "Polkadot: Unexpected role");
pub(super) const INVALID_FORMAT: Rep = Rep::new(-200, "Polkadot: Bad message");
pub(super) const UNKNOWN_PEER: Rep = Rep::new(-50, "Polkadot: Unknown peer");
pub(super) const COLLATOR_ALREADY_KNOWN: Rep = Rep::new( -100, "Polkadot: Known collator");
pub(super) const BAD_COLLATION: Rep = Rep::new(-1000, "Polkadot: Bad collation");
pub(super) const BAD_POV_BLOCK: Rep = Rep::new(-1000, "Polkadot: Bad POV block");
}
mod benefit {
use sc_network::ReputationChange as Rep;
pub(super) const EXPECTED_MESSAGE: Rep = Rep::new(20, "Polkadot: Expected message");
pub(super) const VALID_FORMAT: Rep = Rep::new(20, "Polkadot: Valid message format");
pub(super) const KNOWN_PEER: Rep = Rep::new(5, "Polkadot: Known peer");
pub(super) const NEW_COLLATOR: Rep = Rep::new(10, "Polkadot: New collator");
pub(super) const GOOD_COLLATION: Rep = Rep::new(100, "Polkadot: Good collation");
pub(super) const GOOD_POV_BLOCK: Rep = Rep::new(100, "Polkadot: Good POV block");
}
type FullStatus = GenericFullStatus;
/// Specialization of the network service for the polkadot protocol.
pub type PolkadotNetworkService = sc_network::NetworkService;
/// Basic functionality that a network has to fulfill.
pub trait NetworkService: Send + Sync + 'static {
/// Get a stream of gossip messages for a given hash.
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream;
/// Gossip a message on given topic.
fn gossip_message(&self, topic: Hash, message: GossipMessage);
/// Execute a closure with the gossip service.
fn with_gossip(&self, with: F)
where F: FnOnce(&mut dyn GossipService, &mut dyn Context);
/// Execute a closure with the polkadot protocol.
fn with_spec(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context);
}
/// This is a newtype that implements a [`ProvideGossipMessages`] shim trait.
///
/// For any wrapped [`NetworkService`] type it implements a [`ProvideGossipMessages`].
/// For more details see documentation of [`ProvideGossipMessages`].
///
/// [`NetworkService`]: ./trait.NetworkService.html
/// [`ProvideGossipMessages`]: ../polkadot_availability_store/trait.ProvideGossipMessages.html
pub struct AvailabilityNetworkShim(pub std::sync::Arc);
impl av_store::ProvideGossipMessages for AvailabilityNetworkShim
where T: NetworkService
{
fn gossip_messages_for(&self, topic: Hash)
-> Box + Unpin + Send>
{
Box::new(self.0.gossip_messages_for(topic)
.filter_map(|(msg, _)| async move {
match msg {
GossipMessage::ErasureChunk(chunk) => {
Some((chunk.relay_parent, chunk.candidate_hash, chunk.chunk))
},
_ => None,
}
})
.boxed()
)
}
fn gossip_erasure_chunk(
&self,
relay_parent: Hash,
candidate_hash: Hash,
erasure_root: Hash,
chunk: ErasureChunk
) {
let topic = av_store::erasure_coding_topic(relay_parent, erasure_root, chunk.index);
self.0.gossip_message(
topic,
GossipMessage::ErasureChunk(ErasureChunkMessage {
chunk,
relay_parent,
candidate_hash,
})
)
}
}
impl Clone for AvailabilityNetworkShim {
fn clone(&self) -> Self {
AvailabilityNetworkShim(self.0.clone())
}
}
impl NetworkService for PolkadotNetworkService {
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let (tx, rx) = std::sync::mpsc::channel();
PolkadotNetworkService::with_gossip(self, move |gossip, _| {
let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic);
let _ = tx.send(inner_rx);
});
let topic_stream = match rx.recv() {
Ok(rx) => rx,
Err(_) => mpsc::unbounded().1, // return empty channel.
};
GossipMessageStream::new(Box::new(topic_stream))
}
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
self.gossip_consensus_message(
topic,
POLKADOT_ENGINE_ID,
message.encode(),
GossipMessageRecipient::BroadcastToAll,
);
}
fn with_gossip(&self, with: F)
where F: FnOnce(&mut dyn GossipService, &mut dyn Context)
{
PolkadotNetworkService::with_gossip(self, move |gossip, ctx| with(gossip, ctx))
}
fn with_spec(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context)
{
PolkadotNetworkService::with_spec(self, with)
}
}
/// A gossip network subservice.
pub trait GossipService {
fn send_message(&mut self, ctx: &mut dyn Context, who: &PeerId, message: ConsensusMessage);
fn multicast(&mut self, ctx: &mut dyn Context, topic: &Hash, message: ConsensusMessage);
}
impl GossipService for consensus_gossip::ConsensusGossip {
fn send_message(&mut self, ctx: &mut dyn Context, who: &PeerId, message: ConsensusMessage) {
consensus_gossip::ConsensusGossip::send_message(self, ctx, who, message)
}
fn multicast(&mut self, ctx: &mut dyn Context, topic: &Hash, message: ConsensusMessage) {
consensus_gossip::ConsensusGossip::multicast(self, ctx, *topic, message, false)
}
}
/// A stream of gossip messages and an optional sender for a topic.
pub struct GossipMessageStream {
topic_stream: Box + Unpin + Send>,
}
impl GossipMessageStream {
/// Create a new instance with the given topic stream.
pub fn new(topic_stream: Box + Unpin + Send>) -> Self {
Self {
topic_stream,
}
}
}
impl Stream for GossipMessageStream {
type Item = (GossipMessage, Option);
fn poll_next(self: Pin<&mut Self>, cx: &mut PollContext) -> Poll