Newer
Older
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Parachain statement table meant to be shared with a message router
//! and a consensus proposer.
use std::collections::hash_map::{HashMap, Entry};
use extrinsic_store::{Data, Store as ExtrinsicStore};
use table::{self, Table, Context as TableContextTrait};
use polkadot_primitives::{Block, BlockId, Hash, SessionKey};
use polkadot_primitives::parachain::{
asynchronous rob
committed
Id as ParaId, Collation, Extrinsic, CandidateReceipt,
AttestedCandidate, ParachainHost, PoVBlock
asynchronous rob
committed
use futures::prelude::*;
asynchronous rob
committed
use super::{GroupInfo, TableRouter};
use self::includable::IncludabilitySender;
use primitives::{ed25519, Pair};
use runtime_primitives::traits::ProvideRuntimeApi;
mod includable;
pub use self::includable::Includable;
pub use table::{SignedStatement, Statement};
pub use table::generic::Statement as GenericStatement;
struct TableContext {
parent_hash: Hash,
key: Arc<ed25519::Pair>,
groups: HashMap<ParaId, GroupInfo>,
}
impl table::Context for TableContext {
fn is_member_of(&self, authority: &SessionKey, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.validity_guarantors.contains(authority))
}
fn requisite_votes(&self, group: &ParaId) -> usize {
self.groups.get(group).map_or(usize::max_value(), |g| g.needed_validity)
}
}
impl TableContext {
fn local_id(&self) -> SessionKey {
}
fn sign_statement(&self, statement: table::Statement) -> table::SignedStatement {
let signature = ::sign_table_statement(&statement, &self.key, &self.parent_hash).into();
table::SignedStatement {
statement,
signature,
asynchronous rob
committed
Valid(PoVBlock, Extrinsic),
Invalid(PoVBlock), // should take proof.
}
enum ValidationWork {
Done(Validation),
InProgress,
Error(String),
}
#[cfg(test)]
impl ValidationWork {
fn is_in_progress(&self) -> bool {
match *self {
ValidationWork::InProgress => true,
_ => false,
}
}
fn is_done(&self) -> bool {
match *self {
ValidationWork::Done(_) => true,
_ => false,
}
}
}
// A shared table object.
struct SharedTableInner {
table: Table<TableContext>,
trackers: Vec<IncludabilitySender>,
validated: HashMap<Hash, ValidationWork>,
}
impl SharedTableInner {
// Import a single statement. Provide a handle to a table router and a function
// used to determine if a referenced candidate is valid.
//
// the statement producer, if any, will produce only statements concerning the same candidate
// as the one just imported
fn import_remote_statement<R: TableRouter>(
&mut self,
context: &TableContext,
router: &R,
statement: table::SignedStatement,
asynchronous rob
committed
) -> Option<ParachainWork<
<R::FetchValidationProof as IntoFuture>::Future,
>> {
let summary = match self.table.import_statement(context, statement) {
None => return None,
};
self.update_trackers(&summary.candidate, context);
let local_id = context.local_id();
let para_member = context.is_member_of(&local_id, &summary.group_id);
let digest = &summary.candidate;
// TODO: consider a strategy based on the number of candidate votes as well.
// https://github.com/paritytech/polkadot/issues/218
let do_validation = para_member && match self.validated.entry(digest.clone()) {
Entry::Occupied(_) => false,
Entry::Vacant(entry) => {
entry.insert(ValidationWork::InProgress);
true
}
};
match self.table.get_candidate(&digest) {
None => {
let message = format!(
"Table inconsistency detected. Summary returned for candidate {} \
but receipt not present in table.",
digest,
);
warn!(target: "validation", "{}", message);
self.validated.insert(digest.clone(), ValidationWork::Error(message));
None
}
asynchronous rob
committed
let fetch = router.fetch_pov_block(candidate).into_future();
Some(Work {
candidate_receipt: candidate.clone(),
asynchronous rob
committed
fetch,
})
}
}
} else {
None
};
extrinsic_store: self.extrinsic_store.clone(),
relay_parent: context.parent_hash.clone(),
work
})
}
fn update_trackers(&mut self, candidate: &Hash, context: &TableContext) {
let includable = self.table.candidate_includable(candidate, context);
for i in (0..self.trackers.len()).rev() {
if self.trackers[i].update_candidate(candidate.clone(), includable) {
self.trackers.swap_remove(i);
}
}
}
}
/// Produced after validating a candidate.
pub struct Validated {
/// A statement about the validity of the candidate.
statement: table::Statement,
/// The result of validation.
result: Validation,
}
impl Validated {
/// Note that we've validated a candidate with given hash and it is bad.
asynchronous rob
committed
pub fn known_bad(hash: Hash, collation: PoVBlock) -> Self {
Validated {
statement: GenericStatement::Invalid(hash),
asynchronous rob
committed
result: Validation::Invalid(collation),
}
}
/// Note that we've validated a candidate with given hash and it is good.
/// Extrinsic data required.
asynchronous rob
committed
pub fn known_good(hash: Hash, collation: PoVBlock, extrinsic: Extrinsic) -> Self {
Validated {
statement: GenericStatement::Valid(hash),
asynchronous rob
committed
result: Validation::Valid(collation, extrinsic),
}
}
/// Note that we've collated a candidate.
/// Extrinsic data required.
pub fn collated_local(
receipt: CandidateReceipt,
asynchronous rob
committed
collation: PoVBlock,
extrinsic: Extrinsic,
) -> Self {
Validated {
statement: GenericStatement::Candidate(receipt),
asynchronous rob
committed
result: Validation::Valid(collation, extrinsic),
asynchronous rob
committed
/// Get a reference to the proof-of-validation block.
pub fn pov_block(&self) -> &PoVBlock {
match self.result {
Validation::Valid(ref b, _) | Validation::Invalid(ref b) => b,
}
}
/// Get a reference to the extrinsic data, if any.
pub fn extrinsic(&self) -> Option<&Extrinsic> {
match self.result {
Validation::Valid(_, ref ex) => Some(ex),
Validation::Invalid(_) => None,
}
}
/// Future that performs parachain validation work.
pub struct ParachainWork<Fetch> {
work: Work<Fetch>,
relay_parent: Hash,
extrinsic_store: ExtrinsicStore,
}
impl<Fetch: Future> ParachainWork<Fetch> {
/// Prime the parachain work with an API reference for extracting
/// chain information.
pub fn prime<P: ProvideRuntimeApi>(self, api: Arc<P>)
-> PrimedParachainWork<
asynchronous rob
committed
impl Send + FnMut(&BlockId, &Collation) -> Result<Extrinsic, ()>,
>
where
P: Send + Sync + 'static,
P::Api: ParachainHost<Block>,
{
asynchronous rob
committed
let validate = move |id: &_, collation: &_| {
let res = ::collation::validate_collation(
&*api,
id,
collation,
);
match res {
debug!(target: "validation", "Encountered bad collation: {}", e);
}
}
};
PrimedParachainWork { inner: self, validate }
}
/// Prime the parachain work with a custom validation function.
pub fn prime_with<F>(self, validate: F) -> PrimedParachainWork<Fetch, F>
asynchronous rob
committed
where F: FnMut(&BlockId, &Collation) -> Result<Extrinsic, ()>
{
PrimedParachainWork { inner: self, validate }
}
candidate_receipt: CandidateReceipt,
/// Primed statement producer.
pub struct PrimedParachainWork<Fetch, F> {
inner: ParachainWork<Fetch>,
impl<Fetch, F, Err> Future for PrimedParachainWork<Fetch, F>
asynchronous rob
committed
Fetch: Future<Item=PoVBlock,Error=Err>,
F: FnMut(&BlockId, &Collation) -> Result<Extrinsic, ()>,
fn poll(&mut self) -> Poll<Validated, Err> {
let work = &mut self.inner.work;
let candidate = &work.candidate_receipt;
asynchronous rob
committed
asynchronous rob
committed
let pov_block = try_ready!(work.fetch.poll());
let validation_res = (self.validate)(
&BlockId::hash(self.inner.relay_parent),
asynchronous rob
committed
&Collation { pov: pov_block.clone(), receipt: candidate.clone() },
let candidate_hash = candidate.hash();
asynchronous rob
committed
debug!(target: "validation", "Making validity statement about candidate {}: is_good? {:?}",
candidate_hash, validation_res.is_ok());
let (validity_statement, result) = match validation_res {
Err(()) => (
GenericStatement::Invalid(candidate_hash),
asynchronous rob
committed
Validation::Invalid(pov_block),
Ok(extrinsic) => {
self.inner.extrinsic_store.make_available(Data {
relay_parent: self.inner.relay_parent,
parachain_id: work.candidate_receipt.parachain_index,
candidate_hash,
asynchronous rob
committed
block_data: pov_block.block_data.clone(),
extrinsic: Some(extrinsic.clone()),
})?;
(
GenericStatement::Valid(candidate_hash),
asynchronous rob
committed
Validation::Valid(pov_block, extrinsic)
statement: validity_statement,
result,
}
}
/// A shared table object.
pub struct SharedTable {
context: Arc<TableContext>,
inner: Arc<Mutex<SharedTableInner>>,
}
impl Clone for SharedTable {
fn clone(&self) -> Self {
SharedTable {
context: self.context.clone(),
inner: self.inner.clone(),
}
}
}
impl SharedTable {
/// Create a new shared table.
///
/// Provide the key to sign with, and the parent hash of the relay chain
/// block being built.
pub fn new(
groups: HashMap<ParaId, GroupInfo>,
key: Arc<ed25519::Pair>,
parent_hash: Hash,
extrinsic_store: ExtrinsicStore,
) -> Self {
SharedTable {
context: Arc::new(TableContext { groups, key, parent_hash }),
inner: Arc::new(Mutex::new(SharedTableInner {
table: Table::default(),
/// Get the parent hash this table should hold statements localized to.
pub fn consensus_parent_hash(&self) -> &Hash {
&self.context.parent_hash
}
/// Get the local validator session key.
pub fn session_key(&self) -> SessionKey {
self.context.local_id()
}
/// Get group info.
pub fn group_info(&self) -> &HashMap<ParaId, GroupInfo> {
&self.context.groups
}
/// Get extrinsic data for candidate with given hash, if any.
///
/// This will return `Some` for any candidates that have been validated
/// locally.
pub(crate) fn extrinsic_data(&self, hash: &Hash) -> Option<Extrinsic> {
self.inner.lock().validated.get(hash).and_then(|x| match *x {
ValidationWork::Error(_) => None,
ValidationWork::InProgress => None,
ValidationWork::Done(Validation::Invalid(_)) => None,
ValidationWork::Done(Validation::Valid(_, ref ex)) => Some(ex.clone()),
})
}
/// Import a single statement with remote source, whose signature has already been checked.
///
/// The statement producer, if any, will produce only statements concerning the same candidate
/// as the one just imported
pub fn import_remote_statement<R: TableRouter>(
&self,
router: &R,
statement: table::SignedStatement,
asynchronous rob
committed
) -> Option<ParachainWork<
<R::FetchValidationProof as IntoFuture>::Future,
>> {
self.inner.lock().import_remote_statement(&*self.context, router, statement)
}
/// Import many statements at once.
///
/// Provide an iterator yielding remote, pre-checked statements.
///
/// The statement producer, if any, will produce only statements concerning the same candidate
/// as the one just imported
pub fn import_remote_statements<R, I, U>(&self, router: &R, iterable: I) -> U
where
R: TableRouter,
I: IntoIterator<Item=table::SignedStatement>,
asynchronous rob
committed
U: ::std::iter::FromIterator<Option<ParachainWork<
<R::FetchValidationProof as IntoFuture>::Future,
>>>,
{
let mut inner = self.inner.lock();
iterable.into_iter().map(move |statement| {
inner.import_remote_statement(&*self.context, router, statement)
}).collect()
/// Sign and import the result of candidate validation.
pub fn import_validated(&self, validated: Validated)
asynchronous rob
committed
{
let digest = match validated.statement {
GenericStatement::Candidate(ref c) => c.hash(),
GenericStatement::Valid(h) | GenericStatement::Invalid(h) => h,
let signed_statement = self.context.sign_statement(validated.statement);
let mut inner = self.inner.lock();
inner.table.import_statement(&*self.context, signed_statement.clone());
inner.validated.insert(digest, ValidationWork::Done(validated.result));
asynchronous rob
committed
}
/// Execute a closure using a specific candidate.
///
/// Deadlocks if called recursively.
pub fn with_candidate<F, U>(&self, digest: &Hash, f: F) -> U
where F: FnOnce(Option<&CandidateReceipt>) -> U
{
let inner = self.inner.lock();
f(inner.table.get_candidate(digest))
}
/// Get a set of candidates that can be proposed.
pub fn proposed_set(&self) -> Vec<AttestedCandidate> {
use table::generic::{ValidityAttestation as GAttestation};
use polkadot_primitives::parachain::ValidityAttestation;
// we transform the types of the attestations gathered from the table
// into the type expected by the runtime. This may do signature
// aggregation in the future.
let table_attestations = self.inner.lock().table.proposed_candidates(&*self.context);
table_attestations.into_iter()
.map(|attested| AttestedCandidate {
candidate: attested.candidate,
validity_votes: attested.validity_votes.into_iter().map(|(a, v)| match v {
GAttestation::Implicit(s) => (a, ValidityAttestation::Implicit(s)),
GAttestation::Explicit(s) => (a, ValidityAttestation::Explicit(s)),
}).collect(),
})
.collect()
/// Get the number of total parachains.
pub fn num_parachains(&self) -> usize {
self.group_info().len()
}
/// Get the number of parachains whose candidates may be included.
pub fn includable_count(&self) -> usize {
self.inner.lock().table.includable_count()
}
/// Get all witnessed misbehavior.
pub fn get_misbehavior(&self) -> HashMap<SessionKey, table::Misbehavior> {
self.inner.lock().table.get_misbehavior().clone()
}
/// Track includability of a given set of candidate hashes.
pub fn track_includability<I>(&self, iterable: I) -> Includable
where I: IntoIterator<Item=Hash>
{
let mut inner = self.inner.lock();
let (tx, rx) = includable::track(iterable.into_iter().map(|x| {
let includable = inner.table.candidate_includable(&x, &*self.context);
(x, includable)
}));
if !tx.is_complete() {
inner.trackers.push(tx);
}
rx
}
}
#[cfg(test)]
mod tests {
use super::*;
use substrate_keyring::AuthorityKeyring;
use primitives::crypto::UncheckedInto;
asynchronous rob
committed
use polkadot_primitives::parachain::{BlockData, ConsolidatedIngress};
use futures::future;
fn pov_block_with_data(data: Vec<u8>) -> PoVBlock {
PoVBlock {
block_data: BlockData(data),
ingress: ConsolidatedIngress(Vec::new()),
}
}
#[derive(Clone)]
struct DummyRouter;
impl TableRouter for DummyRouter {
asynchronous rob
committed
type FetchValidationProof = future::FutureResult<PoVBlock,Self::Error>;
asynchronous rob
committed
fn local_collation(&self, _collation: Collation, _extrinsic: Extrinsic) {
asynchronous rob
committed
fn fetch_pov_block(&self, _candidate: &CandidateReceipt) -> Self::FetchValidationProof {
future::ok(pov_block_with_data(vec![1, 2, 3, 4, 5]))
}
#[test]
fn statement_triggers_fetch_and_evaluate() {
let mut groups = HashMap::new();
let para_id = ParaId::from(1);
let parent_hash = Default::default();
let local_key = Arc::new(AuthorityKeyring::Alice.pair());
let local_id = local_key.public();
let validity_other_key = AuthorityKeyring::Bob.pair();
let validity_other = validity_other_key.public();
groups.insert(para_id, GroupInfo {
validity_guarantors: [local_id, validity_other.clone()].iter().cloned().collect(),
needed_validity: 2,
});
let shared_table = SharedTable::new(
groups,
local_key.clone(),
parent_hash,
ExtrinsicStore::new_in_memory(),
);
let candidate = CandidateReceipt {
parachain_index: para_id,
head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]),
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
fees: 1_000_000,
block_data_hash: [2; 32].into(),
};
let candidate_statement = GenericStatement::Candidate(candidate);
let signature = ::sign_table_statement(&candidate_statement, &validity_other_key, &parent_hash);
let signed_statement = ::table::generic::SignedStatement {
statement: candidate_statement,
signature: signature.into(),
sender: validity_other,
};
shared_table.import_remote_statement(
&DummyRouter,
signed_statement,
).expect("candidate and local validity group are same");
fn statement_triggers_fetch_and_validity() {
let mut groups = HashMap::new();
let para_id = ParaId::from(1);
let parent_hash = Default::default();
let local_key = Arc::new(AuthorityKeyring::Alice.pair());
let local_id = local_key.public();
let validity_other_key = AuthorityKeyring::Bob.pair();
let validity_other = validity_other_key.public();
groups.insert(para_id, GroupInfo {
validity_guarantors: [local_id, validity_other.clone()].iter().cloned().collect(),
needed_validity: 1,
});
let shared_table = SharedTable::new(
groups,
local_key.clone(),
parent_hash,
ExtrinsicStore::new_in_memory(),
);
let candidate = CandidateReceipt {
parachain_index: para_id,
head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]),
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
fees: 1_000_000,
block_data_hash: [2; 32].into(),
};
let candidate_statement = GenericStatement::Candidate(candidate);
let signature = ::sign_table_statement(&candidate_statement, &validity_other_key, &parent_hash);
let signed_statement = ::table::generic::SignedStatement {
statement: candidate_statement,
signature: signature.into(),
sender: validity_other,
};
shared_table.import_remote_statement(
&DummyRouter,
signed_statement,
).expect("should produce work");
#[test]
fn evaluate_makes_block_data_available() {
let store = ExtrinsicStore::new_in_memory();
let relay_parent = [0; 32].into();
let para_id = 5.into();
asynchronous rob
committed
let pov_block = pov_block_with_data(vec![1, 2, 3]);
let candidate = CandidateReceipt {
parachain_index: para_id,
signature: Default::default(),
head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]),
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
fees: 1_000_000,
block_data_hash: [2; 32].into(),
};
let hash = candidate.hash();
let producer: ParachainWork<future::FutureResult<_, ::std::io::Error>> = ParachainWork {
work: Work {
candidate_receipt: candidate,
asynchronous rob
committed
fetch: future::ok(pov_block.clone()),
},
relay_parent,
extrinsic_store: store.clone(),
};
asynchronous rob
committed
let validated = producer.prime_with(|_, _| Ok(Extrinsic { outgoing_messages: Vec::new() }))
.wait()
.unwrap();
asynchronous rob
committed
assert_eq!(validated.pov_block(), &pov_block);
assert_eq!(validated.statement, GenericStatement::Valid(hash));
asynchronous rob
committed
assert_eq!(store.block_data(relay_parent, hash).unwrap(), pov_block.block_data);
assert!(store.extrinsic(relay_parent, hash).is_some());
}
#[test]
fn full_availability() {
let store = ExtrinsicStore::new_in_memory();
let relay_parent = [0; 32].into();
let para_id = 5.into();
asynchronous rob
committed
let pov_block = pov_block_with_data(vec![1, 2, 3]);
let candidate = CandidateReceipt {
parachain_index: para_id,
signature: Default::default(),
head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]),
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
fees: 1_000_000,
block_data_hash: [2; 32].into(),
};
let hash = candidate.hash();
work: Work {
candidate_receipt: candidate,
asynchronous rob
committed
fetch: future::ok::<_, ::std::io::Error>(pov_block.clone()),
},
relay_parent,
extrinsic_store: store.clone(),
};
asynchronous rob
committed
let validated = producer.prime_with(|_, _| Ok(Extrinsic { outgoing_messages: Vec::new() }))
.wait()
.unwrap();
asynchronous rob
committed
assert_eq!(validated.pov_block(), &pov_block);
asynchronous rob
committed
assert_eq!(store.block_data(relay_parent, hash).unwrap(), pov_block.block_data);
assert!(store.extrinsic(relay_parent, hash).is_some());
}
#[test]
fn does_not_dispatch_work_after_starting_validation() {
let mut groups = HashMap::new();
let para_id = ParaId::from(1);
let parent_hash = Default::default();
let local_key = Arc::new(AuthorityKeyring::Alice.pair());
let local_id = local_key.public();
let validity_other_key = AuthorityKeyring::Bob.pair();
let validity_other = validity_other_key.public();
validity_guarantors: [local_id, validity_other.clone()].iter().cloned().collect(),
needed_validity: 1,
});
let shared_table = SharedTable::new(
groups,
local_key.clone(),
parent_hash,
ExtrinsicStore::new_in_memory(),
);
let candidate = CandidateReceipt {
parachain_index: para_id,
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
signature: Default::default(),
head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]),
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
fees: 1_000_000,
block_data_hash: [2; 32].into(),
};
let hash = candidate.hash();
let candidate_statement = GenericStatement::Candidate(candidate);
let signature = ::sign_table_statement(&candidate_statement, &validity_other_key, &parent_hash);
let signed_statement = ::table::generic::SignedStatement {
statement: candidate_statement,
signature: signature.into(),
sender: validity_other,
};
let _a = shared_table.import_remote_statement(
&DummyRouter,
signed_statement.clone(),
).expect("should produce work");
assert!(shared_table.inner.lock().validated.get(&hash).expect("validation has started").is_in_progress());
let b = shared_table.import_remote_statement(
&DummyRouter,
signed_statement.clone(),
);
assert!(b.is_none(), "cannot work when validation has started");
}
#[test]
fn does_not_dispatch_after_local_candidate() {
let mut groups = HashMap::new();
let para_id = ParaId::from(1);
asynchronous rob
committed
let pov_block = pov_block_with_data(vec![1, 2, 3]);
let extrinsic = Extrinsic { outgoing_messages: Vec::new() };
let parent_hash = Default::default();
let local_key = Arc::new(AuthorityKeyring::Alice.pair());
let local_id = local_key.public();
let validity_other_key = AuthorityKeyring::Bob.pair();
let validity_other = validity_other_key.public();
groups.insert(para_id, GroupInfo {
validity_guarantors: [local_id, validity_other].iter().cloned().collect(),
needed_validity: 1,
});
let shared_table = SharedTable::new(
groups,
local_key.clone(),
parent_hash,
ExtrinsicStore::new_in_memory(),
);
let candidate = CandidateReceipt {
parachain_index: para_id,
signature: Default::default(),
head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]),
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
fees: 1_000_000,
block_data_hash: [2; 32].into(),
};
let hash = candidate.hash();
let signed_statement = shared_table.import_validated(Validated::collated_local(
candidate,
asynchronous rob
committed
pov_block,
extrinsic,
));
assert!(shared_table.inner.lock().validated.get(&hash).expect("validation has started").is_done());
let a = shared_table.import_remote_statement(
&DummyRouter,
signed_statement,
);
assert!(a.is_none());
}