// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! The "consensus" networking code built on top of the base network service.
//!
//! This fulfills the `polkadot_consensus::Network` trait, providing a hook to be called
//! each time consensus begins on a new chain head.
use sr_primitives::traits::ProvideRuntimeApi;
use substrate_network::consensus_gossip::ConsensusMessage;
use polkadot_consensus::{Network, SharedTable, Collators, Statement, GenericStatement};
use polkadot_primitives::{AccountId, Block, Hash, SessionKey};
use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, ParachainHost, BlockData};
use codec::Decode;
use futures::prelude::*;
use futures::sync::mpsc;
use std::collections::HashMap;
use std::sync::Arc;
use arrayvec::ArrayVec;
use tokio::runtime::TaskExecutor;
use parking_lot::Mutex;
use super::NetworkService;
use router::Router;
// task that processes all gossipped consensus messages,
// checking signatures
struct MessageProcessTask
{
inner_stream: mpsc::UnboundedReceiver,
parent_hash: Hash,
table_router: Router,
exit: E,
}
impl
MessageProcessTask
where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost,
E: Future- + Clone + Send + 'static,
{
fn process_message(&self, msg: ConsensusMessage) -> Option> {
use polkadot_consensus::SignedStatement;
debug!(target: "consensus", "Processing consensus statement for live consensus");
if let Some(statement) = SignedStatement::decode(&mut msg.as_slice()) {
if ::polkadot_consensus::check_statement(
&statement.statement,
&statement.signature,
statement.sender,
&self.parent_hash
) {
self.table_router.import_statement(statement, self.exit.clone());
}
}
None
}
}
impl
Future for MessageProcessTask
where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost,
E: Future- + Clone + Send + 'static,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
loop {
match self.inner_stream.poll() {
Ok(Async::Ready(Some(val))) => if let Some(async) = self.process_message(val) {
return Ok(async);
},
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => debug!(target: "p_net", "Error getting consensus message: {:?}", e),
}
}
}
}
/// Wrapper around the network service
pub struct ConsensusNetwork
{
network: Arc,
api: Arc,
exit: E,
}
impl
ConsensusNetwork
{
/// Create a new consensus networking object.
pub fn new(network: Arc, exit: E, api: Arc) -> Self {
ConsensusNetwork { network, exit, api }
}
}
impl
Clone for ConsensusNetwork
{
fn clone(&self) -> Self {
ConsensusNetwork {
network: self.network.clone(),
exit: self.exit.clone(),
api: self.api.clone(),
}
}
}
/// A long-lived network which can create parachain statement routing processes on demand.
impl
Network for ConsensusNetwork
where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost,
E: Clone + Future- + Send + 'static,
{
type TableRouter = Router
;
/// Instantiate a table router using the given shared table.
fn communication_for(
&self,
_validators: &[SessionKey],
table: Arc,
task_executor: TaskExecutor,
) -> Self::TableRouter {
let parent_hash = table.consensus_parent_hash().clone();
let knowledge = Arc::new(Mutex::new(Knowledge::new()));
let local_session_key = table.session_key();
let table_router = Router::new(
table,
self.network.clone(),
self.api.clone(),
task_executor.clone(),
parent_hash,
knowledge.clone(),
);
let attestation_topic = table_router.gossip_topic();
let exit = self.exit.clone();
// spin up a task in the background that processes all incoming statements
// TODO: propagate statements on a timer?
let inner_stream = self.network.consensus_gossip().write().messages_for(attestation_topic);
let process_task = self.network
.with_spec(|spec, ctx| {
spec.new_consensus(ctx, parent_hash, CurrentConsensus {
knowledge,
local_session_key,
});
MessageProcessTask {
inner_stream,
parent_hash,
table_router: table_router.clone(),
exit,
}
})
.then(|_| Ok(()));
task_executor.spawn(process_task);
table_router
}
}
/// Error when the network appears to be down.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct NetworkDown;
/// A future that resolves when a collation is received.
pub struct AwaitingCollation(::futures::sync::oneshot::Receiver);
impl Future for AwaitingCollation {
type Item = Collation;
type Error = NetworkDown;
fn poll(&mut self) -> Poll {
self.0.poll().map_err(|_| NetworkDown)
}
}
impl Collators for ConsensusNetwork
where P::Api: ParachainHost,
{
type Error = NetworkDown;
type Collation = AwaitingCollation;
fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation {
AwaitingCollation(
self.network.with_spec(|spec, _| spec.await_collation(relay_parent, parachain))
)
}
fn note_bad_collator(&self, collator: AccountId) {
self.network.with_spec(|spec, ctx| spec.disconnect_bad_collator(ctx, collator));
}
}
#[derive(Default)]
struct KnowledgeEntry {
knows_block_data: Vec,
knows_extrinsic: Vec,
block_data: Option,
extrinsic: Option,
}
/// Tracks knowledge of peers.
pub(crate) struct Knowledge {
candidates: HashMap,
}
impl Knowledge {
/// Create a new knowledge instance.
pub(crate) fn new() -> Self {
Knowledge {
candidates: HashMap::new(),
}
}
/// Note a statement seen from another validator.
pub(crate) fn note_statement(&mut self, from: SessionKey, statement: &Statement) {
// those proposing the candidate or declaring it valid know everything.
// those claiming it invalid do not have the extrinsic data as it is
// generated by valid execution.
match *statement {
GenericStatement::Candidate(ref c) => {
let mut entry = self.candidates.entry(c.hash()).or_insert_with(Default::default);
entry.knows_block_data.push(from);
entry.knows_extrinsic.push(from);
}
GenericStatement::Valid(ref hash) => {
let mut entry = self.candidates.entry(*hash).or_insert_with(Default::default);
entry.knows_block_data.push(from);
entry.knows_extrinsic.push(from);
}
GenericStatement::Invalid(ref hash) => self.candidates.entry(*hash)
.or_insert_with(Default::default)
.knows_block_data
.push(from),
}
}
/// Note a candidate collated or seen locally.
pub(crate) fn note_candidate(&mut self, hash: Hash, block_data: Option, extrinsic: Option) {
let entry = self.candidates.entry(hash).or_insert_with(Default::default);
entry.block_data = entry.block_data.take().or(block_data);
entry.extrinsic = entry.extrinsic.take().or(extrinsic);
}
}
/// A current consensus instance.
pub(crate) struct CurrentConsensus {
knowledge: Arc>,
local_session_key: SessionKey,
}
impl CurrentConsensus {
#[cfg(test)]
pub(crate) fn new(knowledge: Arc>, local_session_key: SessionKey) -> Self {
CurrentConsensus {
knowledge,
local_session_key
}
}
// execute a closure with locally stored block data for a candidate, or a slice of session identities
// we believe should have the data.
fn with_block_data(&self, hash: &Hash, f: F) -> U
where F: FnOnce(Result<&BlockData, &[SessionKey]>) -> U
{
let knowledge = self.knowledge.lock();
let res = knowledge.candidates.get(hash)
.ok_or(&[] as &_)
.and_then(|entry| entry.block_data.as_ref().ok_or(&entry.knows_block_data[..]));
f(res)
}
}
// 3 is chosen because sessions change infrequently and usually
// only the last 2 (current session and "last" session) are relevant.
// the extra is an error boundary.
const RECENT_SESSIONS: usize = 3;
/// Result when inserting recent session key.
#[derive(PartialEq, Eq)]
pub(crate) enum InsertedRecentKey {
/// Key was already known.
AlreadyKnown,
/// Key was new and pushed out optional old item.
New(Option),
}
/// Wrapper for managing recent session keys.
#[derive(Default)]
pub(crate) struct RecentSessionKeys {
inner: ArrayVec<[SessionKey; RECENT_SESSIONS]>,
}
impl RecentSessionKeys {
/// Insert a new session key. This returns one to be pushed out if the
/// set is full.
pub(crate) fn insert(&mut self, key: SessionKey) -> InsertedRecentKey {
if self.inner.contains(&key) { return InsertedRecentKey::AlreadyKnown }
let old = if self.inner.len() == RECENT_SESSIONS {
Some(self.inner.remove(0))
} else {
None
};
self.inner.push(key);
InsertedRecentKey::New(old)
}
/// As a slice.
pub(crate) fn as_slice(&self) -> &[SessionKey] {
&*self.inner
}
fn remove(&mut self, key: &SessionKey) {
self.inner.retain(|k| k != key)
}
}
/// Manages requests and session keys for live consensus instances.
pub(crate) struct LiveConsensusInstances {
// recent local session keys.
recent: RecentSessionKeys,
// live consensus instances, on `parent_hash`.
live_instances: HashMap,
}
impl LiveConsensusInstances {
/// Create a new `LiveConsensusInstances`
pub(crate) fn new() -> Self {
LiveConsensusInstances {
recent: Default::default(),
live_instances: HashMap::new(),
}
}
/// Note new consensus session. If the used session key is new,
/// it returns it to be broadcasted to peers.
pub(crate) fn new_consensus(
&mut self,
parent_hash: Hash,
consensus: CurrentConsensus,
) -> Option {
let inserted_key = self.recent.insert(consensus.local_session_key);
let maybe_new = if let InsertedRecentKey::New(_) = inserted_key {
Some(consensus.local_session_key)
} else {
None
};
self.live_instances.insert(parent_hash, consensus);
maybe_new
}
/// Remove consensus session.
pub(crate) fn remove(&mut self, parent_hash: &Hash) {
if let Some(consensus) = self.live_instances.remove(parent_hash) {
let key_still_used = self.live_instances.values()
.any(|c| c.local_session_key == consensus.local_session_key);
if !key_still_used {
self.recent.remove(&consensus.local_session_key)
}
}
}
/// Recent session keys as a slice.
pub(crate) fn recent_keys(&self) -> &[SessionKey] {
self.recent.as_slice()
}
/// Call a closure with block data from consensus session at parent hash.
///
/// This calls the closure with `Some(data)` where the session and data are live,
/// `Err(Some(keys))` when the session is live but the data unknown, with a list of keys
/// who have the data, and `Err(None)` where the session is unknown.
pub(crate) fn with_block_data(&self, parent_hash: &Hash, c_hash: &Hash, f: F) -> U
where F: FnOnce(Result<&BlockData, Option<&[SessionKey]>>) -> U
{
match self.live_instances.get(parent_hash) {
Some(c) => c.with_block_data(c_hash, |res| f(res.map_err(Some))),
None => f(Err(None))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn last_keys_works() {
let a = [1; 32].into();
let b = [2; 32].into();
let c = [3; 32].into();
let d = [4; 32].into();
let mut recent = RecentSessionKeys::default();
match recent.insert(a) {
InsertedRecentKey::New(None) => {},
_ => panic!("is new, not at capacity"),
}
match recent.insert(a) {
InsertedRecentKey::AlreadyKnown => {},
_ => panic!("not new"),
}
match recent.insert(b) {
InsertedRecentKey::New(None) => {},
_ => panic!("is new, not at capacity"),
}
match recent.insert(b) {
InsertedRecentKey::AlreadyKnown => {},
_ => panic!("not new"),
}
match recent.insert(c) {
InsertedRecentKey::New(None) => {},
_ => panic!("is new, not at capacity"),
}
match recent.insert(c) {
InsertedRecentKey::AlreadyKnown => {},
_ => panic!("not new"),
}
match recent.insert(d) {
InsertedRecentKey::New(Some(old)) => assert_eq!(old, a),
_ => panic!("is new, and at capacity"),
}
match recent.insert(d) {
InsertedRecentKey::AlreadyKnown => {},
_ => panic!("not new"),
}
}
}