// Copyright 2017 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! Parachain statement table meant to be shared with a message router //! and a consensus proposer. use std::collections::{HashMap, HashSet}; use std::sync::Arc; use table::{self, Table, Context as TableContextTrait}; use table::generic::Statement as GenericStatement; use collation::Collation; use polkadot_primitives::Hash; use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt}; use primitives::AuthorityId; use parking_lot::Mutex; use futures::{future, prelude::*}; use super::{GroupInfo, TableRouter}; use self::includable::IncludabilitySender; mod includable; pub use self::includable::Includable; struct TableContext { parent_hash: Hash, key: Arc<::ed25519::Pair>, groups: HashMap, } impl table::Context for TableContext { fn is_member_of(&self, authority: &AuthorityId, group: &ParaId) -> bool { self.groups.get(group).map_or(false, |g| g.validity_guarantors.contains(authority)) } fn is_availability_guarantor_of(&self, authority: &AuthorityId, group: &ParaId) -> bool { self.groups.get(group).map_or(false, |g| g.availability_guarantors.contains(authority)) } fn requisite_votes(&self, group: &ParaId) -> (usize, usize) { self.groups.get(group).map_or( (usize::max_value(), usize::max_value()), |g| (g.needed_validity, g.needed_availability), ) } } impl TableContext { fn local_id(&self) -> AuthorityId { self.key.public().into() } fn sign_statement(&self, statement: table::Statement) -> table::SignedStatement { let signature = ::sign_table_statement(&statement, &self.key, &self.parent_hash).into(); table::SignedStatement { statement, signature, sender: self.local_id(), } } } /// Source of statements pub enum StatementSource { /// Locally produced statement. Local, /// Received statement from remote source, with optional sender. Remote(Option), } // A shared table object. struct SharedTableInner { table: Table, proposed_digest: Option, checked_validity: HashSet, checked_availability: HashSet, trackers: Vec, } impl SharedTableInner { // Import a single statement. Provide a handle to a table router and a function // used to determine if a referenced candidate is valid. fn import_statement bool>( &mut self, context: &TableContext, router: &R, statement: table::SignedStatement, statement_source: StatementSource, check_candidate: C, ) -> StatementProducer< ::Future, ::Future, C, > { // this blank producer does nothing until we attach some futures // and set a candidate digest. let received_from = match statement_source { StatementSource::Local => return Default::default(), StatementSource::Remote(from) => from, }; let summary = match self.table.import_statement(context, statement, received_from) { Some(summary) => summary, None => return Default::default(), }; self.update_trackers(&summary.candidate, context); let local_id = context.local_id(); let is_validity_member = context.is_member_of(&local_id, &summary.group_id); let is_availability_member = context.is_availability_guarantor_of(&local_id, &summary.group_id); let digest = &summary.candidate; // TODO: consider a strategy based on the number of candidate votes as well. // only check validity if this wasn't locally proposed. let checking_validity = is_validity_member && self.proposed_digest.as_ref().map_or(true, |d| d != digest) && self.checked_validity.insert(digest.clone()); let checking_availability = is_availability_member && self.checked_availability.insert(digest.clone()); let work = if checking_validity || checking_availability { match self.table.get_candidate(&digest) { None => None, // TODO: handle table inconsistency somehow? Some(candidate) => { let fetch_block_data = router.fetch_block_data(candidate).into_future().fuse(); let fetch_extrinsic = if checking_availability { Some( router.fetch_extrinsic_data(candidate).into_future().fuse() ) } else { None }; Some(Work { candidate_receipt: candidate.clone(), fetch_block_data, fetch_extrinsic, evaluate: checking_validity, check_candidate, }) } } } else { None }; StatementProducer { produced_statements: Default::default(), work, } } fn update_trackers(&mut self, candidate: &Hash, context: &TableContext) { let includable = self.table.candidate_includable(candidate, context); for i in (0..self.trackers.len()).rev() { if self.trackers[i].update_candidate(candidate.clone(), includable) { self.trackers.swap_remove(i); } } } } /// Produced statements about a specific candidate. /// Both may be `None`. #[derive(Default)] pub struct ProducedStatements { /// A statement about the validity of the candidate. pub validity: Option, /// A statement about availability of data. If this is `Some`, /// then `block_data` and `extrinsic` should be `Some` as well. pub availability: Option, /// Block data to ensure availability of. pub block_data: Option, /// Extrinsic data to ensure availability of. pub extrinsic: Option, } /// Future that produces statements about a specific candidate. pub struct StatementProducer { produced_statements: ProducedStatements, work: Option>, } struct Work { candidate_receipt: CandidateReceipt, fetch_block_data: future::Fuse, fetch_extrinsic: Option>, evaluate: bool, check_candidate: C } impl Default for StatementProducer { fn default() -> Self { StatementProducer { produced_statements: Default::default(), work: None, } } } impl Future for StatementProducer where D: Future, E: Future, C: FnMut(Collation) -> bool, { type Item = ProducedStatements; type Error = Err; fn poll(&mut self) -> Poll { let work = match self.work { Some(ref mut work) => work, None => return Ok(Async::Ready(::std::mem::replace(&mut self.produced_statements, Default::default()))), }; if let Async::Ready(block_data) = work.fetch_block_data.poll()? { self.produced_statements.block_data = Some(block_data.clone()); if work.evaluate { let is_good = (work.check_candidate)(Collation { block_data, receipt: work.candidate_receipt.clone(), }); let hash = work.candidate_receipt.hash(); self.produced_statements.validity = Some(if is_good { GenericStatement::Valid(hash) } else { GenericStatement::Invalid(hash) }); } } if let Some(ref mut fetch_extrinsic) = work.fetch_extrinsic { if let Async::Ready(extrinsic) = fetch_extrinsic.poll()? { self.produced_statements.extrinsic = Some(extrinsic); } } let done = self.produced_statements.block_data.is_some() && { if work.evaluate { true } else if self.produced_statements.extrinsic.is_some() { self.produced_statements.availability = Some(GenericStatement::Available(work.candidate_receipt.hash())); true } else { false } }; if done { Ok(Async::Ready(::std::mem::replace(&mut self.produced_statements, Default::default()))) } else { Ok(Async::NotReady) } } } /// A shared table object. pub struct SharedTable { context: Arc, inner: Arc>, } impl Clone for SharedTable { fn clone(&self) -> Self { SharedTable { context: self.context.clone(), inner: self.inner.clone(), } } } impl SharedTable { /// Create a new shared table. /// /// Provide the key to sign with, and the parent hash of the relay chain /// block being built. pub fn new(groups: HashMap, key: Arc<::ed25519::Pair>, parent_hash: Hash) -> Self { SharedTable { context: Arc::new(TableContext { groups, key, parent_hash }), inner: Arc::new(Mutex::new(SharedTableInner { table: Table::default(), proposed_digest: None, checked_validity: HashSet::new(), checked_availability: HashSet::new(), trackers: Vec::new(), })) } } /// Get group info. pub fn group_info(&self) -> &HashMap { &self.context.groups } /// Import a single statement. Provide a handle to a table router /// for dispatching any other requests which come up. pub fn import_statement bool>( &self, router: &R, statement: table::SignedStatement, received_from: StatementSource, check_candidate: C, ) -> StatementProducer<::Future, ::Future, C> { self.inner.lock().import_statement(&*self.context, router, statement, received_from, check_candidate) } /// Sign and import a local statement. pub fn sign_and_import( &self, router: &R, statement: table::Statement, ) { let proposed_digest = match statement { GenericStatement::Candidate(ref c) => Some(c.hash()), _ => None, }; let signed_statement = self.context.sign_statement(statement); let mut inner = self.inner.lock(); if proposed_digest.is_some() { inner.proposed_digest = proposed_digest; } let producer = inner.import_statement( &*self.context, router, signed_statement, StatementSource::Local, |_| true, ); assert!(producer.work.is_none(), "local statement import never leads to additional work; qed"); } /// Import many statements at once. /// /// Provide an iterator yielding pairs of (statement, statement_source). pub fn import_statements(&self, router: &R, iterable: I) -> U where R: TableRouter, I: IntoIterator, C: FnMut(Collation) -> bool, U: ::std::iter::FromIterator::Future, ::Future, C, >>, { let mut inner = self.inner.lock(); iterable.into_iter().map(move |(statement, statement_source, check_candidate)| { inner.import_statement(&*self.context, router, statement, statement_source, check_candidate) }).collect() } /// Execute a closure using a specific candidate. /// /// Deadlocks if called recursively. pub fn with_candidate(&self, digest: &Hash, f: F) -> U where F: FnOnce(Option<&CandidateReceipt>) -> U { let inner = self.inner.lock(); f(inner.table.get_candidate(digest)) } /// Execute a closure using the current proposed set. /// /// Deadlocks if called recursively. pub fn with_proposal(&self, f: F) -> U where F: FnOnce(Vec<&CandidateReceipt>) -> U { let inner = self.inner.lock(); f(inner.table.proposed_candidates(&*self.context)) } /// Get the number of parachains which have available candidates. pub fn includable_count(&self) -> usize { self.inner.lock().table.includable_count() } /// Get all witnessed misbehavior. pub fn get_misbehavior(&self) -> HashMap { self.inner.lock().table.get_misbehavior().clone() } /// Fill a statement batch. pub fn fill_batch(&self, batch: &mut B) { self.inner.lock().table.fill_batch(batch); } /// Track includability of a given set of candidate hashes. pub fn track_includability(&self, iterable: I) -> Includable where I: IntoIterator { let mut inner = self.inner.lock(); let (tx, rx) = includable::track(iterable.into_iter().map(|x| { let includable = inner.table.candidate_includable(&x, &*self.context); (x, includable) })); if !tx.is_complete() { inner.trackers.push(tx); } rx } } #[cfg(test)] mod tests { use super::*; use substrate_keyring::Keyring; #[derive(Clone)] struct DummyRouter; impl TableRouter for DummyRouter { type Error = (); type FetchCandidate = ::futures::future::Empty; type FetchExtrinsic = ::futures::future::Empty; /// Note local candidate data, making it available on the network to other validators. fn local_candidate_data(&self, _hash: Hash, _block_data: BlockData, _extrinsic: Extrinsic) { } /// Fetch block data for a specific candidate. fn fetch_block_data(&self, _candidate: &CandidateReceipt) -> Self::FetchCandidate { ::futures::future::empty() } /// Fetch extrinsic data for a specific candidate. fn fetch_extrinsic_data(&self, _candidate: &CandidateReceipt) -> Self::FetchExtrinsic { ::futures::future::empty() } } #[test] fn statement_triggers_fetch_and_evaluate() { let mut groups = HashMap::new(); let para_id = ParaId::from(1); let local_id = Keyring::Alice.to_raw_public().into(); let local_key = Arc::new(Keyring::Alice.pair()); let validity_other = Keyring::Bob.to_raw_public().into(); let validity_other_key = Keyring::Bob.pair(); let parent_hash = Default::default(); groups.insert(para_id, GroupInfo { validity_guarantors: [local_id, validity_other].iter().cloned().collect(), availability_guarantors: Default::default(), needed_validity: 2, needed_availability: 0, }); let shared_table = SharedTable::new(groups, local_key.clone(), parent_hash); let candidate = CandidateReceipt { parachain_index: para_id, collator: [1; 32].into(), head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]), balance_uploads: Vec::new(), egress_queue_roots: Vec::new(), fees: 1_000_000, }; let candidate_statement = GenericStatement::Candidate(candidate); let signature = ::sign_table_statement(&candidate_statement, &validity_other_key, &parent_hash); let signed_statement = ::table::generic::SignedStatement { statement: candidate_statement, signature: signature.into(), sender: validity_other, }; let producer = shared_table.import_statement( &DummyRouter, signed_statement, StatementSource::Remote(None), |_| true, ); assert!(producer.work.is_some(), "candidate and local validity group are same"); assert!(producer.work.as_ref().unwrap().evaluate, "should evaluate validity"); } #[test] fn statement_triggers_fetch_and_availability() { let mut groups = HashMap::new(); let para_id = ParaId::from(1); let local_id = Keyring::Alice.to_raw_public().into(); let local_key = Arc::new(Keyring::Alice.pair()); let validity_other = Keyring::Bob.to_raw_public().into(); let validity_other_key = Keyring::Bob.pair(); let parent_hash = Default::default(); groups.insert(para_id, GroupInfo { validity_guarantors: [validity_other].iter().cloned().collect(), availability_guarantors: [local_id].iter().cloned().collect(), needed_validity: 1, needed_availability: 1, }); let shared_table = SharedTable::new(groups, local_key.clone(), parent_hash); let candidate = CandidateReceipt { parachain_index: para_id, collator: [1; 32].into(), head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]), balance_uploads: Vec::new(), egress_queue_roots: Vec::new(), fees: 1_000_000, }; let candidate_statement = GenericStatement::Candidate(candidate); let signature = ::sign_table_statement(&candidate_statement, &validity_other_key, &parent_hash); let signed_statement = ::table::generic::SignedStatement { statement: candidate_statement, signature: signature.into(), sender: validity_other, }; let producer = shared_table.import_statement( &DummyRouter, signed_statement, StatementSource::Remote(None), |_| true, ); assert!(producer.work.is_some(), "candidate and local availability group are same"); assert!(producer.work.as_ref().unwrap().fetch_extrinsic.is_some(), "should fetch extrinsic when guaranteeing availability"); assert!(!producer.work.as_ref().unwrap().evaluate, "should not evaluate validity"); } }