// Copyright 2017 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! The "consensus" networking code built on top of the base network service. //! This fulfills the `polkadot_consensus::Network` trait, providing a hook to be called //! each time consensus begins on a new chain head. use bft; use ed25519; use substrate_network::{self as net, generic_message as msg}; use substrate_network::consensus_gossip::ConsensusMessage; use polkadot_api::{PolkadotApi, LocalPolkadotApi}; use polkadot_consensus::{Network, SharedTable, Collators}; use polkadot_primitives::{AccountId, Block, Hash, SessionKey}; use polkadot_primitives::parachain::{Id as ParaId, Collation}; use codec::Decode; use futures::prelude::*; use futures::sync::mpsc; use std::sync::Arc; use tokio::runtime::TaskExecutor; use parking_lot::Mutex; use super::{Message, NetworkService, Knowledge, CurrentConsensus}; use router::Router; /// Sink for output BFT messages. pub struct BftSink { network: Arc, parent_hash: Hash, _marker: ::std::marker::PhantomData, } impl Sink for BftSink { type SinkItem = bft::Communication; // TODO: replace this with the ! type when that's stabilized type SinkError = E; fn start_send(&mut self, message: bft::Communication) -> ::futures::StartSend, E> { let network_message = net::LocalizedBftMessage { message: match message { ::rhododendron::Communication::Consensus(c) => msg::BftMessage::Consensus(match c { ::rhododendron::LocalizedMessage::Propose(proposal) => msg::SignedConsensusMessage::Propose(msg::SignedConsensusProposal { round_number: proposal.round_number as u32, proposal: proposal.proposal, digest: proposal.digest, sender: proposal.sender, digest_signature: proposal.digest_signature.signature, full_signature: proposal.full_signature.signature, }), ::rhododendron::LocalizedMessage::Vote(vote) => msg::SignedConsensusMessage::Vote(msg::SignedConsensusVote { sender: vote.sender, signature: vote.signature.signature, vote: match vote.vote { ::rhododendron::Vote::Prepare(r, h) => msg::ConsensusVote::Prepare(r as u32, h), ::rhododendron::Vote::Commit(r, h) => msg::ConsensusVote::Commit(r as u32, h), ::rhododendron::Vote::AdvanceRound(r) => msg::ConsensusVote::AdvanceRound(r as u32), } }), }), ::rhododendron::Communication::Auxiliary(justification) => { let unchecked: bft::UncheckedJustification<_> = justification.uncheck().into(); msg::BftMessage::Auxiliary(unchecked.into()) } }, parent_hash: self.parent_hash, }; self.network.with_spec( move |spec, ctx| spec.consensus_gossip.multicast_bft_message(ctx, network_message) ); Ok(::futures::AsyncSink::Ready) } fn poll_complete(&mut self) -> ::futures::Poll<(), E> { Ok(Async::Ready(())) } } // check signature and authority validity of message. fn process_bft_message(msg: msg::LocalizedBftMessage, local_id: &SessionKey, authorities: &[SessionKey]) -> Result>, bft::Error> { Ok(Some(match msg.message { msg::BftMessage::Consensus(c) => ::rhododendron::Communication::Consensus(match c { msg::SignedConsensusMessage::Propose(proposal) => ::rhododendron::LocalizedMessage::Propose({ if &proposal.sender == local_id { return Ok(None) } let proposal = ::rhododendron::LocalizedProposal { round_number: proposal.round_number as usize, proposal: proposal.proposal, digest: proposal.digest, sender: proposal.sender, digest_signature: ed25519::LocalizedSignature { signature: proposal.digest_signature, signer: ed25519::Public(proposal.sender.into()), }, full_signature: ed25519::LocalizedSignature { signature: proposal.full_signature, signer: ed25519::Public(proposal.sender.into()), } }; bft::check_proposal(authorities, &msg.parent_hash, &proposal)?; trace!(target: "bft", "importing proposal message for round {} from {}", proposal.round_number, Hash::from(proposal.sender.0)); proposal }), msg::SignedConsensusMessage::Vote(vote) => ::rhododendron::LocalizedMessage::Vote({ if &vote.sender == local_id { return Ok(None) } let vote = ::rhododendron::LocalizedVote { sender: vote.sender, signature: ed25519::LocalizedSignature { signature: vote.signature, signer: ed25519::Public(vote.sender.0), }, vote: match vote.vote { msg::ConsensusVote::Prepare(r, h) => ::rhododendron::Vote::Prepare(r as usize, h), msg::ConsensusVote::Commit(r, h) => ::rhododendron::Vote::Commit(r as usize, h), msg::ConsensusVote::AdvanceRound(r) => ::rhododendron::Vote::AdvanceRound(r as usize), } }; bft::check_vote::(authorities, &msg.parent_hash, &vote)?; trace!(target: "bft", "importing vote {:?} from {}", vote.vote, Hash::from(vote.sender.0)); vote }), }), msg::BftMessage::Auxiliary(a) => { let justification = bft::UncheckedJustification::from(a); // TODO: get proper error let justification: Result<_, bft::Error> = bft::check_prepare_justification::(authorities, msg.parent_hash, justification) .map_err(|_| bft::ErrorKind::InvalidJustification.into()); ::rhododendron::Communication::Auxiliary(justification?) }, })) } // task that processes all gossipped consensus messages, // checking signatures struct MessageProcessTask { inner_stream: mpsc::UnboundedReceiver>, bft_messages: mpsc::UnboundedSender>, validators: Vec, table_router: Router

, } impl MessageProcessTask

{ fn process_message(&self, msg: ConsensusMessage) -> Option> { match msg { ConsensusMessage::Bft(msg) => { let local_id = self.table_router.session_key(); match process_bft_message(msg, &local_id, &self.validators[..]) { Ok(Some(msg)) => { if let Err(_) = self.bft_messages.unbounded_send(msg) { // if the BFT receiving stream has ended then // we should just bail. trace!(target: "bft", "BFT message stream appears to have closed"); return Some(Async::Ready(())); } } Ok(None) => {} // ignored local message Err(e) => { debug!("Message validation failed: {:?}", e); } } } ConsensusMessage::ChainSpecific(msg, _) => { debug!(target: "consensus", "Processing consensus statement for live consensus"); if let Some(Message::Statement(parent_hash, statement)) = Decode::decode(&mut msg.as_slice()) { if ::polkadot_consensus::check_statement(&statement.statement, &statement.signature, statement.sender, &parent_hash) { self.table_router.import_statement(statement); } } } } None } } impl Future for MessageProcessTask

{ type Item = (); type Error = (); fn poll(&mut self) -> Poll<(), ()> { loop { match self.inner_stream.poll() { Ok(Async::Ready(Some(val))) => if let Some(async) = self.process_message(val) { return Ok(async); }, Ok(Async::Ready(None)) => return Ok(Async::Ready(())), Ok(Async::NotReady) => return Ok(Async::NotReady), Err(e) => debug!(target: "p_net", "Error getting consensus message: {:?}", e), } } } } /// Input stream from the consensus network. pub struct InputAdapter { input: mpsc::UnboundedReceiver>, } impl Stream for InputAdapter { type Item = bft::Communication; type Error = ::polkadot_consensus::Error; fn poll(&mut self) -> Poll, Self::Error> { match self.input.poll() { Err(_) | Ok(Async::Ready(None)) => Err(bft::InputStreamConcluded.into()), Ok(x) => Ok(x) } } } /// Wrapper around the network service pub struct ConsensusNetwork

{ network: Arc, api: Arc

, } impl

ConsensusNetwork

{ /// Create a new consensus networking object. pub fn new(network: Arc, api: Arc

) -> Self { ConsensusNetwork { network, api } } } impl

Clone for ConsensusNetwork

{ fn clone(&self) -> Self { ConsensusNetwork { network: self.network.clone(), api: self.api.clone(), } } } /// A long-lived network which can create parachain statement and BFT message routing processes on demand. impl Network for ConsensusNetwork

{ type TableRouter = Router

; /// The input stream of BFT messages. Should never logically conclude. type Input = InputAdapter; /// The output sink of BFT messages. Messages sent here should eventually pass to all /// current validators. type Output = BftSink<::polkadot_consensus::Error>; /// Instantiate a table router using the given shared table. fn communication_for(&self, validators: &[SessionKey], table: Arc, task_executor: TaskExecutor) -> (Self::TableRouter, Self::Input, Self::Output) { let parent_hash = table.consensus_parent_hash().clone(); let sink = BftSink { network: self.network.clone(), parent_hash, _marker: Default::default(), }; let (bft_send, bft_recv) = mpsc::unbounded(); let knowledge = Arc::new(Mutex::new(Knowledge::new())); let local_session_key = table.session_key(); let table_router = Router::new( table, self.network.clone(), self.api.clone(), task_executor.clone(), parent_hash, knowledge.clone(), ); // spin up a task in the background that processes all incoming statements // TODO: propagate statements on a timer? let process_task = self.network.with_spec(|spec, ctx| { spec.new_consensus(ctx, CurrentConsensus { knowledge, parent_hash, local_session_key, }); MessageProcessTask { inner_stream: spec.consensus_gossip.messages_for(parent_hash), bft_messages: bft_send, validators: validators.to_vec(), table_router: table_router.clone(), } }); match process_task { Some(task) => task_executor.spawn(task), None => warn!(target: "p_net", "Cannot process incoming messages: network appears to be down"), } (table_router, InputAdapter { input: bft_recv }, sink) } } /// Error when the network appears to be down. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct NetworkDown; /// A future that resolves when a collation is received. pub struct AwaitingCollation(Option<::futures::sync::oneshot::Receiver>); impl Future for AwaitingCollation { type Item = Collation; type Error = NetworkDown; fn poll(&mut self) -> Poll { match self.0.poll().map_err(|_| NetworkDown)? { Async::Ready(None) => Err(NetworkDown), Async::Ready(Some(x)) => Ok(Async::Ready(x)), Async::NotReady => Ok(Async::NotReady), } } } impl Collators for ConsensusNetwork

{ type Error = NetworkDown; type Collation = AwaitingCollation; fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation { AwaitingCollation( self.network.with_spec(|spec, _| spec.await_collation(relay_parent, parachain)) ) } fn note_bad_collator(&self, collator: AccountId) { self.network.with_spec(|spec, ctx| spec.disconnect_bad_collator(ctx, collator)); } }