// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! Parachain statement table meant to be shared with a message router
//! and a consensus proposer.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use extrinsic_store::{Data, Store as ExtrinsicStore};
use table::{self, Table, Context as TableContextTrait};
use polkadot_primitives::{Hash, SessionKey};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Collation, Extrinsic, CandidateReceipt};
use parking_lot::Mutex;
use futures::{future, prelude::*};
use super::{GroupInfo, TableRouter};
use self::includable::IncludabilitySender;
use primitives::ed25519;
mod includable;
pub use self::includable::Includable;
pub use table::{SignedStatement, Statement};
pub use table::generic::Statement as GenericStatement;
struct TableContext {
parent_hash: Hash,
key: Arc,
groups: HashMap,
}
impl table::Context for TableContext {
fn is_member_of(&self, authority: &SessionKey, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.validity_guarantors.contains(authority))
}
fn is_availability_guarantor_of(&self, authority: &SessionKey, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.availability_guarantors.contains(authority))
}
fn requisite_votes(&self, group: &ParaId) -> (usize, usize) {
self.groups.get(group).map_or(
(usize::max_value(), usize::max_value()),
|g| (g.needed_validity, g.needed_availability),
)
}
}
impl TableContext {
fn local_id(&self) -> SessionKey {
self.key.public().into()
}
fn sign_statement(&self, statement: table::Statement) -> table::SignedStatement {
let signature = ::sign_table_statement(&statement, &self.key, &self.parent_hash).into();
table::SignedStatement {
statement,
signature,
sender: self.local_id(),
}
}
}
// A shared table object.
struct SharedTableInner {
table: Table,
proposed_digest: Option,
checked_validity: HashSet,
checked_availability: HashSet,
trackers: Vec,
extrinsic_store: ExtrinsicStore,
}
impl SharedTableInner {
// Import a single statement. Provide a handle to a table router and a function
// used to determine if a referenced candidate is valid.
//
// the statement producer, if any, will produce only statements concerning the same candidate
// as the one just imported
fn import_remote_statement(
&mut self,
context: &TableContext,
router: &R,
statement: table::SignedStatement,
) -> Option::Future,
::Future,
>> {
let summary = match self.table.import_statement(context, statement) {
Some(summary) => summary,
None => return None,
};
self.update_trackers(&summary.candidate, context);
let local_id = context.local_id();
let is_validity_member = context.is_member_of(&local_id, &summary.group_id);
let is_availability_member =
context.is_availability_guarantor_of(&local_id, &summary.group_id);
let digest = &summary.candidate;
// TODO: consider a strategy based on the number of candidate votes as well.
// only check validity if this wasn't locally proposed.
let checking_validity = is_validity_member
&& self.proposed_digest.as_ref().map_or(true, |d| d != digest)
&& self.checked_validity.insert(digest.clone());
let checking_availability = is_availability_member
&& self.checked_availability.insert(digest.clone());
let work = if checking_validity || checking_availability {
match self.table.get_candidate(&digest) {
None => None, // TODO: handle table inconsistency somehow?
Some(candidate) => {
let fetch_block_data =
router.fetch_block_data(candidate).into_future().fuse();
let fetch_extrinsic = if checking_availability {
Some(
router.fetch_extrinsic_data(candidate).into_future().fuse()
)
} else {
None
};
Some(Work {
candidate_receipt: candidate.clone(),
fetch_block_data,
fetch_extrinsic,
evaluate: checking_validity,
ensure_available: checking_availability,
})
}
}
} else {
None
};
work.map(|work| StatementProducer {
produced_statements: Default::default(),
extrinsic_store: self.extrinsic_store.clone(),
relay_parent: context.parent_hash.clone(),
work
})
}
fn update_trackers(&mut self, candidate: &Hash, context: &TableContext) {
let includable = self.table.candidate_includable(candidate, context);
for i in (0..self.trackers.len()).rev() {
if self.trackers[i].update_candidate(candidate.clone(), includable) {
self.trackers.swap_remove(i);
}
}
}
}
/// Produced statements about a specific candidate.
/// Both may be `None`.
#[derive(Default)]
pub struct ProducedStatements {
/// A statement about the validity of the candidate.
pub validity: Option,
/// A statement about availability of data. If this is `Some`,
/// then `block_data` and `extrinsic` should be `Some` as well.
pub availability: Option,
/// Block data to ensure availability of.
pub block_data: Option,
/// Extrinsic data to ensure availability of.
pub extrinsic: Option,
}
/// Future that produces statements about a specific candidate.
pub struct StatementProducer {
produced_statements: ProducedStatements,
work: Work,
relay_parent: Hash,
extrinsic_store: ExtrinsicStore,
}
impl StatementProducer {
/// Attach a function for verifying fetched collation to the statement producer.
/// This will transform it into a future.
///
/// The collation-checking function should return `true` if known to be valid,
/// `false` if known to be invalid, and `None` if unable to determine.
pub fn prime Option>(self, check_candidate: C) -> PrimedStatementProducer {
PrimedStatementProducer {
inner: self,
check_candidate,
}
}
}
struct Work {
candidate_receipt: CandidateReceipt,
fetch_block_data: future::Fuse,
fetch_extrinsic: Option>,
evaluate: bool,
ensure_available: bool,
}
/// Primed statement producer.
pub struct PrimedStatementProducer {
inner: StatementProducer,
check_candidate: C,
}
impl Future for PrimedStatementProducer
where
D: Future,
E: Future,
C: FnMut(Collation) -> Option,
Err: From<::std::io::Error>,
{
type Item = ProducedStatements;
type Error = Err;
fn poll(&mut self) -> Poll {
let work = &mut self.inner.work;
let candidate = &work.candidate_receipt;
let statements = &mut self.inner.produced_statements;
let mut candidate_hash = None;
let mut candidate_hash = move ||
candidate_hash.get_or_insert_with(|| candidate.hash()).clone();
if let Async::Ready(block_data) = work.fetch_block_data.poll()? {
statements.block_data = Some(block_data.clone());
if work.evaluate {
let is_good = (self.check_candidate)(Collation {
block_data,
receipt: work.candidate_receipt.clone(),
});
let hash = candidate_hash();
debug!(target: "consensus", "Making validity statement about candidate {}: is_good? {:?}", hash, is_good);
statements.validity = match is_good {
Some(true) => Some(GenericStatement::Valid(hash)),
Some(false) => Some(GenericStatement::Invalid(hash)),
None => None,
};
work.evaluate = false;
}
}
if let Async::Ready(Some(extrinsic)) = work.fetch_extrinsic.poll()? {
if work.ensure_available {
let hash = candidate_hash();
debug!(target: "consensus", "Claiming candidate {} available.", hash);
statements.extrinsic = Some(extrinsic);
statements.availability = Some(GenericStatement::Available(hash));
work.ensure_available = false;
}
}
let done = match (work.evaluate, work.ensure_available) {
(false, false) => true,
_ => false,
};
if done {
// commit claimed-available data to disk before returning statements from the future.
if let (&Some(ref block), extrinsic) = (&statements.block_data, &statements.extrinsic) {
self.inner.extrinsic_store.make_available(Data {
relay_parent: self.inner.relay_parent,
parachain_id: work.candidate_receipt.parachain_index,
candidate_hash: candidate_hash(),
block_data: block.clone(),
extrinsic: extrinsic.clone(),
})?;
}
Ok(Async::Ready(::std::mem::replace(statements, Default::default())))
} else {
Ok(Async::NotReady)
}
}
}
/// A shared table object.
pub struct SharedTable {
context: Arc,
inner: Arc>,
}
impl Clone for SharedTable {
fn clone(&self) -> Self {
SharedTable {
context: self.context.clone(),
inner: self.inner.clone(),
}
}
}
impl SharedTable {
/// Create a new shared table.
///
/// Provide the key to sign with, and the parent hash of the relay chain
/// block being built.
pub fn new(
groups: HashMap,
key: Arc,
parent_hash: Hash,
extrinsic_store: ExtrinsicStore,
) -> Self {
SharedTable {
context: Arc::new(TableContext { groups, key, parent_hash }),
inner: Arc::new(Mutex::new(SharedTableInner {
table: Table::default(),
proposed_digest: None,
checked_validity: HashSet::new(),
checked_availability: HashSet::new(),
trackers: Vec::new(),
extrinsic_store,
}))
}
}
/// Get the parent hash this table should hold statements localized to.
pub fn consensus_parent_hash(&self) -> &Hash {
&self.context.parent_hash
}
/// Get the local validator session key.
pub fn session_key(&self) -> SessionKey {
self.context.local_id()
}
/// Get group info.
pub fn group_info(&self) -> &HashMap {
&self.context.groups
}
/// Import a single statement with remote source, whose signature has already been checked.
///
/// The statement producer, if any, will produce only statements concerning the same candidate
/// as the one just imported
pub fn import_remote_statement(
&self,
router: &R,
statement: table::SignedStatement,
) -> Option::Future,
::Future,
>> {
self.inner.lock().import_remote_statement(&*self.context, router, statement)
}
/// Import many statements at once.
///
/// Provide an iterator yielding remote, pre-checked statements.
///
/// The statement producer, if any, will produce only statements concerning the same candidate
/// as the one just imported
pub fn import_remote_statements(&self, router: &R, iterable: I) -> U
where
R: TableRouter,
I: IntoIterator,
U: ::std::iter::FromIterator