// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! The "validation session" networking code built on top of the base network service.
//!
//! This fulfills the `polkadot_validation::Network` trait, providing a hook to be called
//! each time a validation session begins on a new chain head.
use sr_primitives::traits::{BlakeTwo256, ProvideRuntimeApi, Hash as HashT};
use substrate_network::Context as NetContext;
use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement};
use polkadot_primitives::{Block, BlockId, Hash, SessionKey};
use polkadot_primitives::parachain::{
Id as ParaId, Collation, Extrinsic, ParachainHost, Message, CandidateReceipt,
CollatorId, ValidatorId, PoVBlock,
};
use codec::{Encode, Decode};
use futures::prelude::*;
use futures::future::{self, Executor as FutureExecutor};
use futures::sync::mpsc;
use futures::sync::oneshot::{self, Receiver};
use std::collections::hash_map::{HashMap, Entry};
use std::io;
use std::sync::Arc;
use arrayvec::ArrayVec;
use tokio::runtime::TaskExecutor;
use parking_lot::Mutex;
use router::Router;
use gossip::{POLKADOT_ENGINE_ID, RegisteredMessageValidator, MessageValidationData};
use super::PolkadotProtocol;
pub use polkadot_validation::Incoming;
/// An executor suitable for dispatching async consensus tasks.
pub trait Executor {
fn spawn + Send + 'static>(&self, f: F);
}
/// A wrapped futures::future::Executor.
pub struct WrappedExecutor(pub T);
impl Executor for WrappedExecutor
where T: FutureExecutor + Send + 'static>>
{
fn spawn + Send + 'static>(&self, f: F) {
if let Err(e) = self.0.execute(Box::new(f)) {
warn!(target: "validation", "could not spawn consensus task: {:?}", e);
}
}
}
impl Executor for TaskExecutor {
fn spawn + Send + 'static>(&self, f: F) {
TaskExecutor::spawn(self, f)
}
}
/// Basic functionality that a network has to fulfill.
pub trait NetworkService: Send + Sync + 'static {
/// Get a stream of gossip messages for a given hash.
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver>;
/// Gossip a message on given topic.
fn gossip_message(&self, topic: Hash, message: Vec);
/// Drop a gossip topic.
fn drop_gossip(&self, topic: Hash);
/// Execute a closure with the polkadot protocol.
fn with_spec(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut NetContext);
}
impl NetworkService for super::NetworkService {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver> {
let (tx, rx) = std::sync::mpsc::channel();
self.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic);
let _ = tx.send(inner_rx);
});
match rx.recv() {
Ok(rx) => rx,
Err(_) => mpsc::unbounded().1, // return empty channel.
}
}
fn gossip_message(&self, topic: Hash, message: Vec) {
self.gossip_consensus_message(topic, POLKADOT_ENGINE_ID, message, false);
}
fn drop_gossip(&self, _topic: Hash) { }
fn with_spec(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut NetContext)
{
super::NetworkService::with_spec(self, with)
}
}
/// Params to a current validation session.
pub struct SessionParams {
/// The local session key.
pub local_session_key: Option,
/// The parent hash.
pub parent_hash: Hash,
/// The authorities.
pub authorities: Vec,
}
/// Wrapper around the network service
pub struct ValidationNetwork
{
network: Arc,
api: Arc
,
executor: T,
message_validator: RegisteredMessageValidator,
exit: E,
}
impl
ValidationNetwork
{
/// Create a new consensus networking object.
pub fn new(
network: Arc,
exit: E,
message_validator: RegisteredMessageValidator,
api: Arc
where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost,
E: Clone + Future + Send + Sync + 'static,
N: NetworkService,
T: Clone + Executor + Send + Sync + 'static,
{
/// Instantiate session data fetcher at a parent hash.
///
/// If the used session key is new, it will be broadcast to peers.
/// If a validation session was already instantiated at this parent hash,
/// the underlying instance will be shared.
///
/// If there was already a validation session instantiated and a different
/// session key was set, then the new key will be ignored.
///
/// This implies that there can be multiple services intantiating validation
/// session instances safely, but they should all be coordinated on which session keys
/// are being used.
pub fn instantiate_session(&self, params: SessionParams)
-> oneshot::Receiver>
{
let parent_hash = params.parent_hash;
let network = self.network.clone();
let api = self.api.clone();
let task_executor = self.executor.clone();
let exit = self.exit.clone();
let message_validator = self.message_validator.clone();
let (tx, rx) = oneshot::channel();
self.network.with_spec(move |spec, ctx| {
// before requesting messages, note live consensus session.
message_validator.note_session(
parent_hash,
MessageValidationData { authorities: params.authorities.clone() },
);
let session = spec.new_validation_session(ctx, params);
let _ = tx.send(SessionDataFetcher {
network,
api,
task_executor,
parent_hash,
knowledge: session.knowledge().clone(),
exit,
fetch_incoming: session.fetched_incoming().clone(),
message_validator,
});
});
rx
}
}
/// A long-lived network which can create parachain statement routing processes on demand.
impl
;
type BuildTableRouter = Box + Send>;
fn communication_for(
&self,
table: Arc,
outgoing: polkadot_validation::Outgoing,
authorities: &[ValidatorId],
) -> Self::BuildTableRouter {
let parent_hash = table.consensus_parent_hash().clone();
let local_session_key = table.session_key();
let build_fetcher = self.instantiate_session(SessionParams {
local_session_key: Some(local_session_key),
parent_hash,
authorities: authorities.to_vec(),
});
let message_validator = self.message_validator.clone();
let executor = self.executor.clone();
let work = build_fetcher
.map_err(|e| format!("{:?}", e))
.map(move |fetcher| {
let table_router = Router::new(
table,
fetcher,
message_validator,
);
table_router.broadcast_egress(outgoing);
let table_router_clone = table_router.clone();
let work = table_router.checked_statements()
.for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) });
executor.spawn(work);
table_router
});
Box::new(work)
}
}
/// Error when the network appears to be down.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct NetworkDown;
/// A future that resolves when a collation is received.
pub struct AwaitingCollation {
outer: ::futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver>,
inner: Option<::futures::sync::oneshot::Receiver>
}
impl Future for AwaitingCollation {
type Item = Collation;
type Error = NetworkDown;
fn poll(&mut self) -> Poll {
if let Some(ref mut inner) = self.inner {
return inner
.poll()
.map_err(|_| NetworkDown)
}
match self.outer.poll() {
Ok(futures::Async::Ready(inner)) => {
self.inner = Some(inner);
self.poll()
},
Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady),
Err(_) => Err(NetworkDown)
}
}
}
impl
Collators for ValidationNetwork
where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost,
N: NetworkService,
{
type Error = NetworkDown;
type Collation = AwaitingCollation;
fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation {
let (tx, rx) = ::futures::sync::oneshot::channel();
self.network.with_spec(move |spec, _| {
let collation = spec.await_collation(relay_parent, parachain);
let _ = tx.send(collation);
});
AwaitingCollation{outer: rx, inner: None}
}
fn note_bad_collator(&self, collator: CollatorId) {
self.network.with_spec(move |spec, ctx| spec.disconnect_bad_collator(ctx, collator));
}
}
#[derive(Default)]
struct KnowledgeEntry {
knows_block_data: Vec,
knows_extrinsic: Vec,
pov: Option,
extrinsic: Option,
}
/// Tracks knowledge of peers.
pub(crate) struct Knowledge {
candidates: HashMap,
}
impl Knowledge {
/// Create a new knowledge instance.
pub(crate) fn new() -> Self {
Knowledge {
candidates: HashMap::new(),
}
}
/// Note a statement seen from another validator.
pub(crate) fn note_statement(&mut self, from: ValidatorId, statement: &Statement) {
// those proposing the candidate or declaring it valid know everything.
// those claiming it invalid do not have the extrinsic data as it is
// generated by valid execution.
match *statement {
GenericStatement::Candidate(ref c) => {
let mut entry = self.candidates.entry(c.hash()).or_insert_with(Default::default);
entry.knows_block_data.push(from.clone());
entry.knows_extrinsic.push(from);
}
GenericStatement::Valid(ref hash) => {
let mut entry = self.candidates.entry(*hash).or_insert_with(Default::default);
entry.knows_block_data.push(from.clone());
entry.knows_extrinsic.push(from);
}
GenericStatement::Invalid(ref hash) => self.candidates.entry(*hash)
.or_insert_with(Default::default)
.knows_block_data
.push(from),
}
}
/// Note a candidate collated or seen locally.
pub(crate) fn note_candidate(&mut self, hash: Hash, pov: Option, extrinsic: Option) {
let entry = self.candidates.entry(hash).or_insert_with(Default::default);
entry.pov = entry.pov.take().or(pov);
entry.extrinsic = entry.extrinsic.take().or(extrinsic);
}
}
/// receiver for incoming data.
#[derive(Clone)]
pub struct IncomingReceiver {
inner: future::Shared>
}
impl Future for IncomingReceiver {
type Item = Incoming;
type Error = io::Error;
fn poll(&mut self) -> Poll {
match self.inner.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(i)) => Ok(Async::Ready(Incoming::clone(&*i))),
Err(_) => Err(io::Error::new(
io::ErrorKind::Other,
"Sending end of channel hung up",
)),
}
}
}
/// Incoming message gossip topic for a parachain at a given block hash.
pub(crate) fn incoming_message_topic(parent_hash: Hash, parachain: ParaId) -> Hash {
let mut v = parent_hash.as_ref().to_vec();
parachain.using_encoded(|s| v.extend(s));
v.extend(b"incoming");
BlakeTwo256::hash(&v[..])
}
/// A current validation session instance.
#[derive(Clone)]
pub(crate) struct ValidationSession {
parent_hash: Hash,
knowledge: Arc>,
local_session_key: Option,
fetch_incoming: Arc>,
}
impl ValidationSession {
/// Create a new validation session instance. Needs to be attached to the
/// nework.
pub(crate) fn new(params: SessionParams) -> Self {
ValidationSession {
parent_hash: params.parent_hash,
knowledge: Arc::new(Mutex::new(Knowledge::new())),
local_session_key: params.local_session_key,
fetch_incoming: Arc::new(Mutex::new(FetchIncoming::new())),
}
}
/// Get a handle to the shared knowledge relative to this consensus
/// instance.
pub(crate) fn knowledge(&self) -> &Arc> {
&self.knowledge
}
/// Get a handle to the shared list of parachains' incoming data fetch.
pub(crate) fn fetched_incoming(&self) -> &Arc> {
&self.fetch_incoming
}
// execute a closure with locally stored proof-of-validation for a candidate, or a slice of session identities
// we believe should have the data.
fn with_pov_block(&self, hash: &Hash, f: F) -> U
where F: FnOnce(Result<&PoVBlock, &[ValidatorId]>) -> U
{
let knowledge = self.knowledge.lock();
let res = knowledge.candidates.get(hash)
.ok_or(&[] as &_)
.and_then(|entry| entry.pov.as_ref().ok_or(&entry.knows_block_data[..]));
f(res)
}
}
// 3 is chosen because sessions change infrequently and usually
// only the last 2 (current session and "last" session) are relevant.
// the extra is an error boundary.
const RECENT_SESSIONS: usize = 3;
/// Result when inserting recent session key.
#[derive(PartialEq, Eq)]
pub(crate) enum InsertedRecentKey {
/// Key was already known.
AlreadyKnown,
/// Key was new and pushed out optional old item.
New(Option),
}
/// Wrapper for managing recent session keys.
#[derive(Default)]
pub(crate) struct RecentValidatorIds {
inner: ArrayVec<[ValidatorId; RECENT_SESSIONS]>,
}
impl RecentValidatorIds {
/// Insert a new session key. This returns one to be pushed out if the
/// set is full.
pub(crate) fn insert(&mut self, key: ValidatorId) -> InsertedRecentKey {
if self.inner.contains(&key) { return InsertedRecentKey::AlreadyKnown }
let old = if self.inner.len() == RECENT_SESSIONS {
Some(self.inner.remove(0))
} else {
None
};
self.inner.push(key);
InsertedRecentKey::New(old)
}
/// As a slice.
pub(crate) fn as_slice(&self) -> &[ValidatorId] {
&*self.inner
}
fn remove(&mut self, key: &ValidatorId) {
self.inner.retain(|k| k != key)
}
}
/// Manages requests and keys for live validation session instances.
pub(crate) struct LiveValidationSessions {
// recent local session keys.
recent: RecentValidatorIds,
// live validation session instances, on `parent_hash`. refcount retained alongside.
live_instances: HashMap,
}
impl LiveValidationSessions {
/// Create a new `LiveValidationSessions`
pub(crate) fn new() -> Self {
LiveValidationSessions {
recent: Default::default(),
live_instances: HashMap::new(),
}
}
/// Note new validation session. If the used session key is new,
/// it returns it to be broadcasted to peers.
///
/// If there was already a validation session instantiated and a different
/// session key was set, then the new key will be ignored.
pub(crate) fn new_validation_session(
&mut self,
params: SessionParams,
) -> (ValidationSession, Option) {
let parent_hash = params.parent_hash.clone();
let key = params.local_session_key.clone();
let recent = &mut self.recent;
let mut check_new_key = || {
let inserted_key = key.clone().map(|key| recent.insert(key));
if let Some(InsertedRecentKey::New(_)) = inserted_key {
key.clone()
} else {
None
}
};
if let Some(&mut (ref mut rc, ref mut prev)) = self.live_instances.get_mut(&parent_hash) {
let maybe_new = if prev.local_session_key.is_none() {
prev.local_session_key = key.clone();
check_new_key()
} else {
None
};
*rc += 1;
return (prev.clone(), maybe_new)
}
let session = ValidationSession::new(params);
self.live_instances.insert(parent_hash, (1, session.clone()));
(session, check_new_key())
}
/// Remove validation session. true indicates that it was actually removed.
pub(crate) fn remove(&mut self, parent_hash: Hash) -> bool {
let maybe_removed = if let Entry::Occupied(mut entry) = self.live_instances.entry(parent_hash) {
entry.get_mut().0 -= 1;
if entry.get().0 == 0 {
let (_, session) = entry.remove();
Some(session)
} else {
None
}
} else {
None
};
let session = match maybe_removed {
None => return false,
Some(s) => s,
};
if let Some(ref key) = session.local_session_key {
let key_still_used = self.live_instances.values()
.any(|c| c.1.local_session_key.as_ref() == Some(key));
if !key_still_used {
self.recent.remove(key)
}
}
true
}
/// Recent session keys as a slice.
pub(crate) fn recent_keys(&self) -> &[ValidatorId] {
self.recent.as_slice()
}
/// Call a closure with pov-data from validation session at parent hash for a given
/// candidate-receipt hash.
///
/// This calls the closure with `Some(data)` where the session and data are live,
/// `Err(Some(keys))` when the session is live but the data unknown, with a list of keys
/// who have the data, and `Err(None)` where the session is unknown.
pub(crate) fn with_pov_block(&self, parent_hash: &Hash, c_hash: &Hash, f: F) -> U
where F: FnOnce(Result<&PoVBlock, Option<&[ValidatorId]>>) -> U
{
match self.live_instances.get(parent_hash) {
Some(c) => c.1.with_pov_block(c_hash, |res| f(res.map_err(Some))),
None => f(Err(None))
}
}
}
/// Receiver for block data.
pub struct PoVReceiver {
outer: Receiver>,
inner: Option>
}
impl Future for PoVReceiver {
type Item = PoVBlock;
type Error = io::Error;
fn poll(&mut self) -> Poll {
let map_err = |_| io::Error::new(
io::ErrorKind::Other,
"Sending end of channel hung up",
);
if let Some(ref mut inner) = self.inner {
return inner.poll().map_err(map_err);
}
match self.outer.poll().map_err(map_err)? {
Async::Ready(inner) => {
self.inner = Some(inner);
self.poll()
}
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// Wrapper around bookkeeping for tracking which parachains we're fetching incoming messages
/// for.
pub(crate) struct FetchIncoming {
exit_signal: ::exit_future::Signal,
parachains_fetching: HashMap,
}
impl FetchIncoming {
fn new() -> Self {
FetchIncoming {
exit_signal: ::exit_future::signal_only(),
parachains_fetching: HashMap::new(),
}
}
// registers intent to fetch incoming. returns an optional piece of work
// that, if some, is needed to be run to completion in order for the future to
// resolve.
//
// impl Future has a bug here where it wrongly assigns a `'static` bound to `M`.
fn fetch_with_work(&mut self, para_id: ParaId, make_work: M)
-> (IncomingReceiver, Option + Send>>) where
M: FnOnce() -> W,
W: Future> + Send + 'static,
{
let (tx, rx) = match self.parachains_fetching.entry(para_id) {
Entry::Occupied(entry) => return (entry.get().clone(), None),
Entry::Vacant(entry) => {
// has not been requested yet.
let (tx, rx) = oneshot::channel();
let rx = IncomingReceiver { inner: rx.shared() };
entry.insert(rx.clone());
(tx, rx)
}
};
let exit = self.exit_signal.make_exit();
let work = make_work()
.map(move |incoming| if let Some(i) = incoming { let _ = tx.send(i); })
.select2(exit)
.then(|_| Ok(()));
(rx, Some(Box::new(work)))
}
}
/// Can fetch data for a given validation session
pub struct SessionDataFetcher