Newer
Older
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Polkadot-specific network implementation.
//!
//! This manages routing for parachain statements, parachain block and extrinsic data fetching,
//! communication between collators and validators, and more.
mod router;
pub mod validation;
use futures::sync::oneshot;
use polkadot_primitives::{Block, SessionKey, Hash, Header};
asynchronous rob
committed
use polkadot_primitives::parachain::{
Id as ParaId, BlockData, CollatorId, CandidateReceipt, Collation, PoVBlock,
StructuredUnroutedIngress,
asynchronous rob
committed
};
use substrate_network::{PeerId, RequestId, Context};
use substrate_network::{message, generic_message};
use substrate_network::specialization::NetworkSpecialization as Specialization;
use substrate_network::StatusMessage as GenericFullStatus;
use self::validation::{LiveValidationSessions, RecentValidatorIds, InsertedRecentKey};
use self::collator_pool::{CollatorPool, Role, Action};
use self::local_collations::LocalCollations;
use std::collections::{HashMap, HashSet};
#[cfg(test)]
mod tests;
mod cost {
pub(super) const UNEXPECTED_MESSAGE: i32 = -200;
pub(super) const INVALID_FORMAT: i32 = -200;
pub(super) const UNKNOWN_PEER: i32 = -50;
pub(super) const COLLATOR_ALREADY_KNOWN: i32 = -100;
pub(super) const BAD_COLLATION: i32 = -1000;
pub(super) const BAD_POV_BLOCK: i32 = -1000;
}
mod benefit {
pub(super) const EXPECTED_MESSAGE: i32 = 20;
pub(super) const VALID_FORMAT: i32 = 20;
pub(super) const KNOWN_PEER: i32 = 5;
pub(super) const NEW_COLLATOR: i32 = 10;
pub(super) const GOOD_COLLATION: i32 = 100;
pub(super) const GOOD_POV_BLOCK: i32 = 100;
type FullStatus = GenericFullStatus<Block>;
/// Specialization of the network service for the polkadot protocol.
pub type NetworkService = ::substrate_network::NetworkService<Block, PolkadotProtocol>;
/// Status of a Polkadot node.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct Status {
collating_for: Option<(CollatorId, ParaId)>,
}
asynchronous rob
committed
struct PoVBlockRequest {
attempted_peers: HashSet<SessionKey>,
validation_session_parent: Hash,
candidate_hash: Hash,
block_data_hash: Hash,
asynchronous rob
committed
sender: oneshot::Sender<PoVBlock>,
canon_roots: StructuredUnroutedIngress,
asynchronous rob
committed
}
impl PoVBlockRequest {
// Attempt to process a response. If the provided block is invalid,
// this returns an error result containing the unmodified request.
//
// If `Ok(())` is returned, that indicates that the request has been processed.
fn process_response(self, pov_block: PoVBlock) -> Result<(), Self> {
if pov_block.block_data.hash() != self.block_data_hash {
return Err(self);
}
match polkadot_validation::validate_incoming(&self.canon_roots, &pov_block.ingress) {
Ok(()) => {
let _ = self.sender.send(pov_block);
Ok(())
}
Err(_) => Err(self)
}
}
}
asynchronous rob
committed
// ensures collator-protocol messages are sent in correct order.
// session key must be sent before collator role.
enum CollatorState {
Fresh,
RolePending(Role),
Primed(Option<Role>),
asynchronous rob
committed
}
impl CollatorState {
fn send_key<F: FnMut(Message)>(&mut self, key: SessionKey, mut f: F) {
f(Message::SessionKey(key));
if let CollatorState::RolePending(role) = *self {
asynchronous rob
committed
f(Message::CollatorRole(role));
*self = CollatorState::Primed(Some(role));
asynchronous rob
committed
}
}
fn set_role<F: FnMut(Message)>(&mut self, role: Role, mut f: F) {
if let CollatorState::Primed(ref mut r) = *self {
asynchronous rob
committed
f(Message::CollatorRole(role));
*r = Some(role);
asynchronous rob
committed
} else {
*self = CollatorState::RolePending(role);
}
}
fn role(&self) -> Option<Role> {
match *self {
CollatorState::Fresh => None,
CollatorState::RolePending(role) => Some(role),
CollatorState::Primed(role) => role,
}
}
asynchronous rob
committed
}
struct PeerInfo {
collating_for: Option<(CollatorId, ParaId)>,
validator_keys: RecentValidatorIds,
asynchronous rob
committed
collator_state: CollatorState,
}
impl PeerInfo {
fn should_send_key(&self) -> bool {
self.claimed_validator || self.collating_for.is_some()
}
}
/// Polkadot-specific messages.
pub enum Message {
/// As a validator, tell the peer your current session key.
// TODO: do this with a cryptographic proof of some kind
// https://github.com/paritytech/polkadot/issues/47
asynchronous rob
committed
/// Requesting parachain proof-of-validation block (relay_parent, candidate_hash).
RequestPovBlock(RequestId, Hash, Hash),
/// Provide requested proof-of-validation block data by candidate hash or nothing if unknown.
PovBlock(RequestId, Option<PoVBlock>),
/// Request block data (relay_parent, candidate_hash)
RequestBlockData(RequestId, Hash, Hash),
asynchronous rob
committed
/// Provide requested block data by candidate hash or nothing.
BlockData(RequestId, Option<BlockData>),
/// Tell a collator their role.
CollatorRole(Role),
/// A collation provided by a peer. Relay parent and collation.
Collation(Hash, Collation),
}
fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message) {
trace!(target: "p_net", "Sending polkadot message to {}: {:?}", to, message);
let encoded = message.encode();
ctx.send_chain_specific(to, encoded)
}
/// Polkadot protocol attachment for substrate.
pub struct PolkadotProtocol {
peers: HashMap<PeerId, PeerInfo>,
collating_for: Option<(CollatorId, ParaId)>,
collators: CollatorPool,
validators: HashMap<SessionKey, PeerId>,
local_collations: LocalCollations<Collation>,
live_validation_sessions: LiveValidationSessions,
asynchronous rob
committed
in_flight: HashMap<(RequestId, PeerId), PoVBlockRequest>,
pending: Vec<PoVBlockRequest>,
extrinsic_store: Option<::av_store::Store>,
next_req_id: u64,
}
impl PolkadotProtocol {
/// Instantiate a polkadot protocol handler.
pub fn new(collating_for: Option<(CollatorId, ParaId)>) -> Self {
PolkadotProtocol {
peers: HashMap::new(),
collators: CollatorPool::new(),
collating_for,
validators: HashMap::new(),
local_collations: LocalCollations::new(),
live_validation_sessions: LiveValidationSessions::new(),
in_flight: HashMap::new(),
pending: Vec::new(),
next_req_id: 1,
}
}
/// Fetch block data by candidate receipt.
asynchronous rob
committed
fn fetch_pov_block(
&mut self,
ctx: &mut Context<Block>,
candidate: &CandidateReceipt,
relay_parent: Hash,
canon_roots: StructuredUnroutedIngress,
asynchronous rob
committed
) -> oneshot::Receiver<PoVBlock> {
let (tx, rx) = oneshot::channel();
asynchronous rob
committed
self.pending.push(PoVBlockRequest {
attempted_peers: Default::default(),
validation_session_parent: relay_parent,
candidate_hash: candidate.hash(),
block_data_hash: candidate.block_data_hash,
sender: tx,
asynchronous rob
committed
canon_roots,
});
self.dispatch_pending_requests(ctx);
rx
}
/// Note new validation session.
fn new_validation_session(
&mut self,
ctx: &mut Context<Block>,
params: validation::SessionParams,
) -> validation::ValidationSession {
let (session, new_local) = self.live_validation_sessions
.new_validation_session(params);
if let Some(new_local) = new_local {
asynchronous rob
committed
for (id, peer_data) in self.peers.iter_mut()
.filter(|&(_, ref info)| info.should_send_key())
peer_data.collator_state.send_key(new_local.clone(), |msg| send_polkadot_message(
asynchronous rob
committed
msg
));
}
}
// true indicates that it was removed actually.
fn remove_validation_session(&mut self, parent_hash: Hash) -> bool {
self.live_validation_sessions.remove(parent_hash)
}
fn dispatch_pending_requests(&mut self, ctx: &mut Context<Block>) {
let mut new_pending = Vec::new();
let validator_keys = &mut self.validators;
let next_req_id = &mut self.next_req_id;
let in_flight = &mut self.in_flight;
for mut pending in ::std::mem::replace(&mut self.pending, Vec::new()) {
let parent = pending.validation_session_parent;
let c_hash = pending.candidate_hash;
asynchronous rob
committed
let still_pending = self.live_validation_sessions.with_pov_block(&parent, &c_hash, |x| match x {
Ok(data @ &_) => {
// answer locally.
let _ = pending.sender.send(data.clone());
None
}
Err(Some(known_keys)) => {
let next_peer = known_keys.iter()
.filter_map(|x| validator_keys.get(x).map(|id| (x.clone(), id.clone())))
.find(|&(ref key, _)| pending.attempted_peers.insert(key.clone()))
.map(|(_, id)| id);
// dispatch to peer
if let Some(who) = next_peer {
let req_id = *next_req_id;
*next_req_id += 1;
send_polkadot_message(
ctx,
who.clone(),
asynchronous rob
committed
Message::RequestPovBlock(req_id, parent, c_hash),
);
in_flight.insert((req_id, who), pending);
None
} else {
Some(pending)
}
}
Err(None) => None, // no such known validation session. prune out.
});
if let Some(pending) = still_pending {
new_pending.push(pending);
}
}
self.pending = new_pending;
}
fn on_polkadot_message(&mut self, ctx: &mut Context<Block>, who: PeerId, msg: Message) {
trace!(target: "p_net", "Polkadot message from {}: {:?}", who, msg);
match msg {
Message::SessionKey(key) => self.on_session_key(ctx, who, key),
asynchronous rob
committed
Message::RequestPovBlock(req_id, relay_parent, candidate_hash) => {
let pov_block = self.live_validation_sessions.with_pov_block(
&relay_parent,
&candidate_hash,
|res| res.ok().map(|b| b.clone()),
);
send_polkadot_message(ctx, who, Message::PovBlock(req_id, pov_block));
}
Message::RequestBlockData(req_id, relay_parent, candidate_hash) => {
let block_data = self.live_validation_sessions
asynchronous rob
committed
.with_pov_block(
&relay_parent,
&candidate_hash,
asynchronous rob
committed
|res| res.ok().map(|b| b.block_data.clone()),
.or_else(|| self.extrinsic_store.as_ref()
.and_then(|s| s.block_data(relay_parent, candidate_hash))
);
send_polkadot_message(ctx, who, Message::BlockData(req_id, block_data));
}
asynchronous rob
committed
Message::PovBlock(req_id, data) => self.on_pov_block(ctx, who, req_id, data),
Message::BlockData(_req_id, _data) => {
// current block data is never requested bare by the node.
ctx.report_peer(who, cost::UNEXPECTED_MESSAGE);
asynchronous rob
committed
}
Message::Collation(relay_parent, collation) => self.on_collation(ctx, who, relay_parent, collation),
Message::CollatorRole(role) => self.on_new_role(ctx, who, role),
fn on_session_key(&mut self, ctx: &mut Context<Block>, who: PeerId, key: SessionKey) {
Some(peer) => peer,
None => {
trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who);
return
}
};
if !info.claimed_validator {
ctx.report_peer(who, cost::UNEXPECTED_MESSAGE);
ctx.report_peer(who.clone(), benefit::EXPECTED_MESSAGE);
let local_collations = &mut self.local_collations;
let new_collations = match info.validator_keys.insert(key.clone()) {
InsertedRecentKey::AlreadyKnown => Vec::new(),
InsertedRecentKey::New(Some(old_key)) => {
self.validators.remove(&old_key);
local_collations.fresh_key(&old_key, &key)
InsertedRecentKey::New(None) => info.collator_state.role()
.map(|r| local_collations.note_validator_role(key.clone(), r))
.unwrap_or_else(Vec::new),
};
for (relay_parent, collation) in new_collations {
send_polkadot_message(
ctx,
who.clone(),
Message::Collation(relay_parent, collation),
)
}
self.dispatch_pending_requests(ctx);
}
asynchronous rob
committed
fn on_pov_block(
&mut self,
ctx: &mut Context<Block>,
who: PeerId,
req_id: RequestId,
pov_block: Option<PoVBlock>,
) {
match self.in_flight.remove(&(req_id, who.clone())) {
asynchronous rob
committed
Some(mut req) => {
match pov_block {
Some(pov_block) => {
match req.process_response(pov_block) {
Ok(()) => {
ctx.report_peer(who, benefit::GOOD_POV_BLOCK);
return;
}
Err(r) => {
ctx.report_peer(who, cost::BAD_POV_BLOCK);
req = r;
}
}
},
None => {
ctx.report_peer(who, benefit::EXPECTED_MESSAGE);
}
}
self.pending.push(req);
self.dispatch_pending_requests(ctx);
}
None => ctx.report_peer(who, cost::UNEXPECTED_MESSAGE),
}
}
// when a validator sends us (a collator) a new role.
fn on_new_role(&mut self, ctx: &mut Context<Block>, who: PeerId, role: Role) {
let info = match self.peers.get_mut(&who) {
Some(peer) => peer,
None => {
trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who);
asynchronous rob
committed
debug!(target: "p_net", "New collator role {:?} from {}", role, who);
if info.validator_keys.as_slice().is_empty() {
ctx.report_peer(who, cost::UNEXPECTED_MESSAGE);
} else {
// update role for all saved session keys for this validator.
let local_collations = &mut self.local_collations;
for (relay_parent, collation) in info.validator_keys
.as_slice()
.iter()
.cloned()
.flat_map(|k| local_collations.note_validator_role(k, role))
{
asynchronous rob
committed
debug!(target: "p_net", "Broadcasting collation on relay parent {:?}", relay_parent);
send_polkadot_message(
ctx,
who.clone(),
Message::Collation(relay_parent, collation),
)
}
}
}
impl Specialization<Block> for PolkadotProtocol {
fn status(&self) -> Vec<u8> {
Status { collating_for: self.collating_for.clone() }.encode()
}
fn on_connect(&mut self, ctx: &mut Context<Block>, who: PeerId, status: FullStatus) {
let local_status = match Status::decode(&mut &status.chain_status[..]) {
Some(status) => status,
None => {
}
};
let validator = status.roles.contains(substrate_network::config::Roles::AUTHORITY);
asynchronous rob
committed
let mut peer_info = PeerInfo {
collating_for: local_status.collating_for.clone(),
validator_keys: Default::default(),
asynchronous rob
committed
claimed_validator: validator,
collator_state: CollatorState::Fresh,
};
if let Some((ref acc_id, ref para_id)) = local_status.collating_for {
asynchronous rob
committed
if self.collator_peer(acc_id.clone()).is_some() {
ctx.report_peer(who, cost::COLLATOR_ALREADY_KNOWN);
ctx.report_peer(who.clone(), benefit::NEW_COLLATOR);
let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone());
asynchronous rob
committed
peer_info.collator_state.set_role(collator_role, |msg| send_polkadot_message(
who.clone(),
asynchronous rob
committed
msg,
));
}
// send session keys.
if peer_info.should_send_key() {
for local_session_key in self.live_validation_sessions.recent_keys() {
peer_info.collator_state.send_key(local_session_key.clone(), |msg| send_polkadot_message(
ctx,
who.clone(),
msg,
));
}
}
asynchronous rob
committed
self.peers.insert(who, peer_info);
self.dispatch_pending_requests(ctx);
}
fn on_disconnect(&mut self, ctx: &mut Context<Block>, who: PeerId) {
if let Some((acc_id, _)) = info.collating_for {
let new_primary = self.collators.on_disconnect(acc_id)
asynchronous rob
committed
.and_then(|new_primary| self.collator_peer(new_primary));
asynchronous rob
committed
if let Some((new_primary, primary_info)) = new_primary {
primary_info.collator_state.set_role(Role::Primary, |msg| send_polkadot_message(
new_primary.clone(),
asynchronous rob
committed
msg,
));
}
}
for key in info.validator_keys.as_slice().iter() {
self.validators.remove(key);
self.local_collations.on_disconnect(key);
}
{
let pending = &mut self.pending;
self.in_flight.retain(|&(_, ref peer), val| {
if !retain {
asynchronous rob
committed
// swap with a dummy value which will be dropped immediately.
let (sender, _) = oneshot::channel();
asynchronous rob
committed
pending.push(::std::mem::replace(val, PoVBlockRequest {
attempted_peers: Default::default(),
validation_session_parent: Default::default(),
candidate_hash: Default::default(),
block_data_hash: Default::default(),
canon_roots: StructuredUnroutedIngress(Vec::new()),
sender,
}));
}
retain
});
}
self.dispatch_pending_requests(ctx);
}
}
fn on_message(&mut self, ctx: &mut Context<Block>, who: PeerId, message: &mut Option<message::Message<Block>>) {
match message.take() {
Some(generic_message::Message::ChainSpecific(raw)) => {
match Message::decode(&mut raw.as_slice()) {
Some(msg) => {
ctx.report_peer(who.clone(), benefit::VALID_FORMAT);
self.on_polkadot_message(ctx, who, msg)
},
ctx.report_peer(who, cost::INVALID_FORMAT);
*message = Some(generic_message::Message::ChainSpecific(raw));
}
}
}
_ => {}
}
}
fn maintain_peers(&mut self, ctx: &mut Context<Block>) {
self.collators.collect_garbage(None);
self.local_collations.collect_garbage(None);
self.dispatch_pending_requests(ctx);
for collator_action in self.collators.maintain_peers() {
match collator_action {
Action::Disconnect(collator) => self.disconnect_bad_collator(ctx, collator),
asynchronous rob
committed
Action::NewRole(account_id, role) => if let Some((collator, info)) = self.collator_peer(account_id) {
info.collator_state.set_role(role, |msg| send_polkadot_message(
collator.clone(),
asynchronous rob
committed
msg,
))
},
}
}
}
fn on_block_imported(&mut self, _ctx: &mut Context<Block>, hash: Hash, header: &Header) {
self.collators.collect_garbage(Some(&hash));
self.local_collations.collect_garbage(Some(&header.parent_hash));
}
}
impl PolkadotProtocol {
// we received a collation from a peer
fn on_collation(&mut self, ctx: &mut Context<Block>, from: PeerId, relay_parent: Hash, collation: Collation) {
let collation_para = collation.receipt.parachain_index;
let collated_acc = collation.receipt.collator.clone();
match self.peers.get(&from) {
None => ctx.report_peer(from, cost::UNKNOWN_PEER),
Some(peer_info) => {
ctx.report_peer(from.clone(), benefit::KNOWN_PEER);
match peer_info.collating_for {
None => ctx.report_peer(from, cost::UNEXPECTED_MESSAGE),
Some((ref acc_id, ref para_id)) => {
ctx.report_peer(from.clone(), benefit::EXPECTED_MESSAGE);
let structurally_valid = para_id == &collation_para && acc_id == &collated_acc;
if structurally_valid && collation.receipt.check_signature().is_ok() {
debug!(target: "p_net", "Received collation for parachain {:?} from peer {}", para_id, from);
ctx.report_peer(from, benefit::GOOD_COLLATION);
self.collators.on_collation(acc_id.clone(), relay_parent, collation)
} else {
ctx.report_peer(from, cost::INVALID_FORMAT)
};
}
}
},
}
}
fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> oneshot::Receiver<Collation> {
let (tx, rx) = oneshot::channel();
asynchronous rob
committed
debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent);
self.collators.await_collation(relay_parent, para_id, tx);
rx
}
// get connected peer with given account ID for collation.
fn collator_peer(&mut self, collator_id: CollatorId) -> Option<(PeerId, &mut PeerInfo)> {
let check_info = |info: &PeerInfo| info
.collating_for
.as_ref()
.map_or(false, |&(ref acc_id, _)| acc_id == &collator_id);
asynchronous rob
committed
.iter_mut()
.filter(|&(_, ref info)| check_info(&**info))
.map(|(who, info)| (who.clone(), info))
.next()
}
// disconnect a collator by account-id.
fn disconnect_bad_collator(&mut self, ctx: &mut Context<Block>, collator_id: CollatorId) {
if let Some((who, _)) = self.collator_peer(collator_id) {
ctx.report_peer(who, cost::BAD_COLLATION)
}
}
}
impl PolkadotProtocol {
/// Add a local collation and broadcast it to the necessary peers.
pub fn add_local_collation(
&mut self,
ctx: &mut Context<Block>,
relay_parent: Hash,
targets: HashSet<SessionKey>,
collation: Collation,
) {
asynchronous rob
committed
debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}",
relay_parent, collation.receipt.parachain_index);
for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) {
match self.validators.get(&primary) {
asynchronous rob
committed
Some(who) => {
debug!(target: "p_net", "Sending local collation to {:?}", primary);
send_polkadot_message(
ctx,
who.clone(),
asynchronous rob
committed
Message::Collation(relay_parent, cloned_collation),
)
},
None =>
warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary),
}
}
/// register availability store.
pub fn register_availability_store(&mut self, extrinsic_store: ::av_store::Store) {
self.extrinsic_store = Some(extrinsic_store);
}