// Copyright 2017 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! Polkadot-specific network implementation. //! //! This manages gossip of consensus messages for BFT and for parachain statements, //! parachain block and extrinsic data fetching, communication between collators and validators, //! and more. extern crate substrate_bft as bft; extern crate parity_codec as codec; extern crate substrate_network; extern crate substrate_primitives; extern crate polkadot_api; extern crate polkadot_availability_store as av_store; extern crate polkadot_consensus; extern crate polkadot_primitives; extern crate futures; extern crate parking_lot; extern crate tokio; extern crate rhododendron; #[macro_use] extern crate log; #[macro_use] extern crate parity_codec_derive; mod collator_pool; mod local_collations; mod router; pub mod consensus; use codec::{Decode, Encode}; use futures::sync::oneshot; use parking_lot::Mutex; use polkadot_consensus::{Statement, SignedStatement, GenericStatement}; use polkadot_primitives::{AccountId, Block, SessionKey, Hash, Header}; use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt, Collation}; use substrate_network::{NodeIndex, RequestId, Context, Severity}; use substrate_network::consensus_gossip::ConsensusGossip; use substrate_network::{message, generic_message}; use substrate_network::specialization::Specialization; use substrate_network::StatusMessage as GenericFullStatus; use self::collator_pool::{CollatorPool, Role, Action}; use self::local_collations::LocalCollations; use std::collections::{HashMap, HashSet}; use std::sync::Arc; #[cfg(test)] mod tests; /// Polkadot protocol id. pub const DOT_PROTOCOL_ID: ::substrate_network::ProtocolId = *b"dot"; type FullStatus = GenericFullStatus; /// Specialization of the network service for the polkadot protocol. pub type NetworkService = ::substrate_network::Service; /// Status of a Polkadot node. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] pub struct Status { collating_for: Option<(AccountId, ParaId)>, } struct BlockDataRequest { attempted_peers: HashSet, consensus_parent: Hash, candidate_hash: Hash, block_data_hash: Hash, sender: oneshot::Sender, } // ensures collator-protocol messages are sent in correct order. // session key must be sent before collator role. enum CollatorState { Fresh, RolePending(Role), Primed, } impl CollatorState { fn send_key(&mut self, key: SessionKey, mut f: F) { f(Message::SessionKey(key)); if let CollatorState::RolePending(role) = ::std::mem::replace(self, CollatorState::Primed) { f(Message::CollatorRole(role)); } } fn set_role(&mut self, role: Role, mut f: F) { if let CollatorState::Primed = *self { f(Message::CollatorRole(role)); } else { *self = CollatorState::RolePending(role); } } } struct PeerInfo { collating_for: Option<(AccountId, ParaId)>, validator_key: Option, claimed_validator: bool, collator_state: CollatorState, } #[derive(Default)] struct KnowledgeEntry { knows_block_data: Vec, knows_extrinsic: Vec, block_data: Option, extrinsic: Option, } /// Tracks knowledge of peers. struct Knowledge { candidates: HashMap, } impl Knowledge { pub fn new() -> Self { Knowledge { candidates: HashMap::new(), } } fn note_statement(&mut self, from: SessionKey, statement: &Statement) { match *statement { GenericStatement::Candidate(ref c) => { let mut entry = self.candidates.entry(c.hash()).or_insert_with(Default::default); entry.knows_block_data.push(from); entry.knows_extrinsic.push(from); } GenericStatement::Available(ref hash) => { let mut entry = self.candidates.entry(*hash).or_insert_with(Default::default); entry.knows_block_data.push(from); entry.knows_extrinsic.push(from); } GenericStatement::Valid(ref hash) | GenericStatement::Invalid(ref hash) => self.candidates.entry(*hash) .or_insert_with(Default::default) .knows_block_data .push(from), } } fn note_candidate(&mut self, hash: Hash, block_data: Option, extrinsic: Option) { let entry = self.candidates.entry(hash).or_insert_with(Default::default); entry.block_data = entry.block_data.take().or(block_data); entry.extrinsic = entry.extrinsic.take().or(extrinsic); } } struct CurrentConsensus { knowledge: Arc>, parent_hash: Hash, local_session_key: SessionKey, } impl CurrentConsensus { // get locally stored block data for a candidate. fn block_data(&self, relay_parent: &Hash, hash: &Hash) -> Option { if relay_parent != &self.parent_hash { return None } self.knowledge.lock().candidates.get(hash) .and_then(|entry| entry.block_data.clone()) } } /// Polkadot-specific messages. #[derive(Debug, Encode, Decode)] pub enum Message { /// signed statement and localized parent hash. Statement(Hash, SignedStatement), /// As a validator, tell the peer your current session key. // TODO: do this with a cryptographic proof of some kind SessionKey(SessionKey), /// Requesting parachain block data by (relay_parent, candidate_hash). RequestBlockData(RequestId, Hash, Hash), /// Provide block data by candidate hash or nothing if unknown. BlockData(RequestId, Option), /// Tell a collator their role. CollatorRole(Role), /// A collation provided by a peer. Relay parent and collation. Collation(Hash, Collation), } fn send_polkadot_message(ctx: &mut Context, to: NodeIndex, message: Message) { trace!(target: "p_net", "Sending polkadot message to {}: {:?}", to, message); let encoded = message.encode(); ctx.send_message(to, generic_message::Message::ChainSpecific(encoded)) } /// Polkadot protocol attachment for substrate. pub struct PolkadotProtocol { peers: HashMap, collating_for: Option<(AccountId, ParaId)>, consensus_gossip: ConsensusGossip, collators: CollatorPool, validators: HashMap, local_collations: LocalCollations, live_consensus: Option, in_flight: HashMap<(RequestId, NodeIndex), BlockDataRequest>, pending: Vec, extrinsic_store: Option<::av_store::Store>, next_req_id: u64, } impl PolkadotProtocol { /// Instantiate a polkadot protocol handler. pub fn new(collating_for: Option<(AccountId, ParaId)>) -> Self { PolkadotProtocol { peers: HashMap::new(), consensus_gossip: ConsensusGossip::new(), collators: CollatorPool::new(), collating_for, validators: HashMap::new(), local_collations: LocalCollations::new(), live_consensus: None, in_flight: HashMap::new(), pending: Vec::new(), extrinsic_store: None, next_req_id: 1, } } /// Gossip a consensus statement. fn gossip_statement(&mut self, ctx: &mut Context, parent_hash: Hash, statement: SignedStatement) { // TODO: something more targeted than gossip. let raw = Message::Statement(parent_hash, statement).encode(); self.consensus_gossip.multicast_chain_specific(ctx, raw, parent_hash); } /// Fetch block data by candidate receipt. fn fetch_block_data(&mut self, ctx: &mut Context, candidate: &CandidateReceipt, relay_parent: Hash) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); self.pending.push(BlockDataRequest { attempted_peers: Default::default(), consensus_parent: relay_parent, candidate_hash: candidate.hash(), block_data_hash: candidate.block_data_hash, sender: tx, }); self.dispatch_pending_requests(ctx); rx } /// Note new consensus session. fn new_consensus(&mut self, ctx: &mut Context, consensus: CurrentConsensus) { let old_data = self.live_consensus.as_ref().map(|c| (c.parent_hash, c.local_session_key)); if Some(&consensus.local_session_key) != old_data.as_ref().map(|&(_, ref key)| key) { for (id, peer_data) in self.peers.iter_mut() .filter(|&(_, ref info)| info.claimed_validator || info.collating_for.is_some()) { peer_data.collator_state.send_key(consensus.local_session_key, |msg| send_polkadot_message( ctx, *id, msg )); } } self.live_consensus = Some(consensus); self.consensus_gossip.collect_garbage(old_data.as_ref().map(|&(ref hash, _)| hash)); } fn dispatch_pending_requests(&mut self, ctx: &mut Context) { let consensus = match self.live_consensus { Some(ref mut c) => c, None => { self.pending.clear(); return; } }; let knowledge = consensus.knowledge.lock(); let mut new_pending = Vec::new(); for mut pending in ::std::mem::replace(&mut self.pending, Vec::new()) { if pending.consensus_parent != consensus.parent_hash { continue } if let Some(entry) = knowledge.candidates.get(&pending.candidate_hash) { // answer locally if let Some(ref data) = entry.block_data { let _ = pending.sender.send(data.clone()); continue; } let validator_keys = &mut self.validators; let next_peer = entry.knows_block_data.iter() .filter_map(|x| validator_keys.get(x).map(|id| (*x, *id))) .find(|&(ref key, _)| pending.attempted_peers.insert(*key)) .map(|(_, id)| id); // dispatch to peer if let Some(who) = next_peer { let req_id = self.next_req_id; self.next_req_id += 1; send_polkadot_message( ctx, who, Message::RequestBlockData(req_id, pending.consensus_parent, pending.candidate_hash) ); self.in_flight.insert((req_id, who), pending); continue; } } new_pending.push(pending); } self.pending = new_pending; } fn on_polkadot_message(&mut self, ctx: &mut Context, who: NodeIndex, raw: Vec, msg: Message) { trace!(target: "p_net", "Polkadot message from {}: {:?}", who, msg); match msg { Message::Statement(parent_hash, _statement) => self.consensus_gossip.on_chain_specific(ctx, who, raw, parent_hash), Message::SessionKey(key) => self.on_session_key(ctx, who, key), Message::RequestBlockData(req_id, relay_parent, candidate_hash) => { let block_data = self.live_consensus.as_ref() .and_then(|c| c.block_data(&relay_parent, &candidate_hash)) .or_else(|| self.extrinsic_store.as_ref() .and_then(|s| s.block_data(relay_parent, candidate_hash)) ); send_polkadot_message(ctx, who, Message::BlockData(req_id, block_data)); } Message::BlockData(req_id, data) => self.on_block_data(ctx, who, req_id, data), Message::Collation(relay_parent, collation) => self.on_collation(ctx, who, relay_parent, collation), Message::CollatorRole(role) => self.on_new_role(ctx, who, role), } } fn on_session_key(&mut self, ctx: &mut Context, who: NodeIndex, key: SessionKey) { { let info = match self.peers.get_mut(&who) { Some(peer) => peer, None => { trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who); return } }; if !info.claimed_validator { ctx.report_peer(who, Severity::Bad("Session key broadcasted without setting authority role")); return; } if let Some(old_key) = ::std::mem::replace(&mut info.validator_key, Some(key)) { self.validators.remove(&old_key); for (relay_parent, collation) in self.local_collations.fresh_key(&old_key, &key) { send_polkadot_message( ctx, who, Message::Collation(relay_parent, collation), ) } } self.validators.insert(key, who); } self.dispatch_pending_requests(ctx); } fn on_block_data(&mut self, ctx: &mut Context, who: NodeIndex, req_id: RequestId, data: Option) { match self.in_flight.remove(&(req_id, who)) { Some(req) => { if let Some(data) = data { if data.hash() == req.block_data_hash { let _ = req.sender.send(data); return } } self.pending.push(req); self.dispatch_pending_requests(ctx); } None => ctx.report_peer(who, Severity::Bad("Unexpected block data response")), } } // when a validator sends us (a collator) a new role. fn on_new_role(&mut self, ctx: &mut Context, who: NodeIndex, role: Role) { let info = match self.peers.get(&who) { Some(peer) => peer, None => { trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who); return } }; debug!(target: "p_net", "New collator role {:?} from {}", role, who); match info.validator_key { None => ctx.report_peer( who, Severity::Bad("Sent collator role without registering first as validator"), ), Some(key) => for (relay_parent, collation) in self.local_collations.note_validator_role(key, role) { debug!(target: "p_net", "Broadcasting collation on relay parent {:?}", relay_parent); send_polkadot_message( ctx, who, Message::Collation(relay_parent, collation), ) }, } } } impl Specialization for PolkadotProtocol { fn status(&self) -> Vec { Status { collating_for: self.collating_for.clone() }.encode() } fn on_connect(&mut self, ctx: &mut Context, who: NodeIndex, status: FullStatus) { let local_status = match Status::decode(&mut &status.chain_status[..]) { Some(status) => status, None => { Status { collating_for: None } } }; let validator = status.roles.contains(substrate_network::Roles::AUTHORITY); let send_key = validator || local_status.collating_for.is_some(); let mut peer_info = PeerInfo { collating_for: local_status.collating_for, validator_key: None, claimed_validator: validator, collator_state: CollatorState::Fresh, }; if let Some((ref acc_id, ref para_id)) = local_status.collating_for { if self.collator_peer(acc_id.clone()).is_some() { ctx.report_peer(who, Severity::Useless("Unknown Polkadot-specific reason")); return } let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone()); peer_info.collator_state.set_role(collator_role, |msg| send_polkadot_message( ctx, who, msg, )); } if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) { peer_info.collator_state.send_key(consensus.local_session_key, |msg| send_polkadot_message( ctx, who, msg, )); } self.peers.insert(who, peer_info); self.consensus_gossip.new_peer(ctx, who, status.roles); self.dispatch_pending_requests(ctx); } fn on_disconnect(&mut self, ctx: &mut Context, who: NodeIndex) { if let Some(info) = self.peers.remove(&who) { if let Some((acc_id, _)) = info.collating_for { let new_primary = self.collators.on_disconnect(acc_id) .and_then(|new_primary| self.collator_peer(new_primary)); if let Some((new_primary, primary_info)) = new_primary { primary_info.collator_state.set_role(Role::Primary, |msg| send_polkadot_message( ctx, new_primary, msg, )); } } if let Some(validator_key) = info.validator_key { self.validators.remove(&validator_key); self.local_collations.on_disconnect(&validator_key); } { let pending = &mut self.pending; self.in_flight.retain(|&(_, ref peer), val| { let retain = peer != &who; if !retain { let (sender, _) = oneshot::channel(); pending.push(::std::mem::replace(val, BlockDataRequest { attempted_peers: Default::default(), consensus_parent: Default::default(), candidate_hash: Default::default(), block_data_hash: Default::default(), sender, })); } retain }); } self.consensus_gossip.peer_disconnected(ctx, who); self.dispatch_pending_requests(ctx); } } fn on_message(&mut self, ctx: &mut Context, who: NodeIndex, message: message::Message) { match message { generic_message::Message::BftMessage(msg) => { trace!(target: "p_net", "Polkadot BFT message from {}: {:?}", who, msg); // TODO: check signature here? what if relevant block is unknown? self.consensus_gossip.on_bft_message(ctx, who, msg) } generic_message::Message::ChainSpecific(raw) => { match Message::decode(&mut raw.as_slice()) { Some(msg) => self.on_polkadot_message(ctx, who, raw, msg), None => { trace!(target: "p_net", "Bad message from {}", who); ctx.report_peer(who, Severity::Bad("Invalid polkadot protocol message format")); } } } _ => {} } } fn on_abort(&mut self) { self.consensus_gossip.abort(); } fn maintain_peers(&mut self, ctx: &mut Context) { self.consensus_gossip.collect_garbage(None); self.collators.collect_garbage(None); self.local_collations.collect_garbage(None); self.dispatch_pending_requests(ctx); for collator_action in self.collators.maintain_peers() { match collator_action { Action::Disconnect(collator) => self.disconnect_bad_collator(ctx, collator), Action::NewRole(account_id, role) => if let Some((collator, info)) = self.collator_peer(account_id) { info.collator_state.set_role(role, |msg| send_polkadot_message( ctx, collator, msg, )) }, } } } fn on_block_imported(&mut self, _ctx: &mut Context, hash: Hash, header: &Header) { self.collators.collect_garbage(Some(&hash)); self.local_collations.collect_garbage(Some(&header.parent_hash)); } } impl PolkadotProtocol { // we received a collation from a peer fn on_collation(&mut self, ctx: &mut Context, from: NodeIndex, relay_parent: Hash, collation: Collation) { let collation_para = collation.receipt.parachain_index; let collated_acc = collation.receipt.collator; match self.peers.get(&from) { None => ctx.report_peer(from, Severity::Useless("Unknown Polkadot specific reason")), Some(peer_info) => match peer_info.collating_for { None => ctx.report_peer(from, Severity::Bad("Sent collation without registering collator intent")), Some((ref acc_id, ref para_id)) => { let structurally_valid = para_id == &collation_para && acc_id == &collated_acc; if structurally_valid && collation.receipt.check_signature().is_ok() { debug!(target: "p_net", "Received collation for parachain {:?} from peer {}", para_id, from); self.collators.on_collation(acc_id.clone(), relay_parent, collation) } else { ctx.report_peer(from, Severity::Bad("Sent malformed collation")) }; } }, } } fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent); self.collators.await_collation(relay_parent, para_id, tx); rx } // get connected peer with given account ID for collation. fn collator_peer(&mut self, account_id: AccountId) -> Option<(NodeIndex, &mut PeerInfo)> { let check_info = |info: &PeerInfo| info .collating_for .as_ref() .map_or(false, |&(ref acc_id, _)| acc_id == &account_id); self.peers .iter_mut() .filter(|&(_, ref info)| check_info(&**info)) .map(|(who, info)| (*who, info)) .next() } // disconnect a collator by account-id. fn disconnect_bad_collator(&mut self, ctx: &mut Context, account_id: AccountId) { if let Some((who, _)) = self.collator_peer(account_id) { ctx.report_peer(who, Severity::Bad("Consensus layer determined the given collator misbehaved")) } } } impl PolkadotProtocol { /// Add a local collation and broadcast it to the necessary peers. pub fn add_local_collation( &mut self, ctx: &mut Context, relay_parent: Hash, targets: HashSet, collation: Collation, ) { debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}", relay_parent, collation.receipt.parachain_index); for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) { match self.validators.get(&primary) { Some(who) => { debug!(target: "p_net", "Sending local collation to {:?}", primary); send_polkadot_message( ctx, *who, Message::Collation(relay_parent, cloned_collation), ) }, None => warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary), } } } /// register availability store. pub fn register_availability_store(&mut self, extrinsic_store: ::av_store::Store) { self.extrinsic_store = Some(extrinsic_store); } }