diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 6bfa95511a15d7eef1d8bf1036737743c24a927a..bd858a358625847d7b8ff954005017566384682e 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -71,7 +71,10 @@ use futures::{future, Stream, Future, IntoFuture}; use client::BlockchainEvents; use primitives::{ed25519, Pair}; use polkadot_primitives::{BlockId, SessionKey, Hash, Block}; -use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic}; +use polkadot_primitives::parachain::{ + self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic, + PoVBlock, +}; use polkadot_cli::{PolkadotService, CustomConfiguration, CoreApi, ParachainHost}; use polkadot_cli::{Worker, IntoExit, ProvideRuntimeApi, TaskExecutor}; use polkadot_network::validation::{ValidationNetwork, SessionParams}; @@ -148,10 +151,10 @@ pub fn collate<'a, R, P>( P: ParachainContext + 'a, { let ingress = relay_context.unrouted_egress(local_id).into_future().map_err(Error::Polkadot); - ingress.and_then(move |ConsolidatedIngress(ingress)| { + ingress.and_then(move |ingress| { let (block_data, head_data, mut extrinsic) = para_context.produce_candidate( last_head, - ingress.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg))) + ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg))) ).map_err(Error::Collator)?; let block_data_hash = block_data.hash(); @@ -170,10 +173,12 @@ pub fn collate<'a, R, P>( block_data_hash, }; - // not necessary to send extrinsic because it is recomputed from execution. Ok(parachain::Collation { receipt, - block_data, + pov: PoVBlock { + block_data, + ingress, + }, }) }) } diff --git a/polkadot/network/src/collator_pool.rs b/polkadot/network/src/collator_pool.rs index bfec39736fe778bb710abe2850cc477d844a1b83..511f4ee51e66568d09760aeaa5c7fc82378a3780 100644 --- a/polkadot/network/src/collator_pool.rs +++ b/polkadot/network/src/collator_pool.rs @@ -220,9 +220,18 @@ impl CollatorPool { mod tests { use super::*; use substrate_primitives::crypto::UncheckedInto; - use polkadot_primitives::parachain::{CandidateReceipt, BlockData, HeadData}; + use polkadot_primitives::parachain::{ + CandidateReceipt, BlockData, PoVBlock, HeadData, ConsolidatedIngress, + }; use futures::Future; + fn make_pov(block_data: Vec<u8>) -> PoVBlock { + PoVBlock { + block_data: BlockData(block_data), + ingress: ConsolidatedIngress(Vec::new()), + } + } + #[test] fn disconnect_primary_gives_new_primary() { let mut pool = CollatorPool::new(); @@ -272,7 +281,7 @@ mod tests { fees: 0, block_data_hash: [3; 32].into(), }, - block_data: BlockData(vec![4, 5, 6]), + pov: make_pov(vec![4, 5, 6]), }); rx1.wait().unwrap(); @@ -299,7 +308,7 @@ mod tests { fees: 0, block_data_hash: [3; 32].into(), }, - block_data: BlockData(vec![4, 5, 6]), + pov: make_pov(vec![4, 5, 6]), }); let (tx, rx) = oneshot::channel(); diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index b979481dc892c0bbfee05ad7979060059cfa9735..dd9665237481e42798547ede959a5ce153998d52 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -56,7 +56,10 @@ pub mod gossip; use codec::{Decode, Encode}; use futures::sync::oneshot; use polkadot_primitives::{Block, SessionKey, Hash, Header}; -use polkadot_primitives::parachain::{Id as ParaId, CollatorId, BlockData, CandidateReceipt, Collation}; +use polkadot_primitives::parachain::{ + Id as ParaId, BlockData, CollatorId, CandidateReceipt, Collation, PoVBlock, + ConsolidatedIngressRoots, +}; use substrate_network::{PeerId, RequestId, Context, Severity}; use substrate_network::{message, generic_message}; use substrate_network::specialization::NetworkSpecialization as Specialization; @@ -84,12 +87,33 @@ pub struct Status { collating_for: Option<(CollatorId, ParaId)>, } -struct BlockDataRequest { +struct PoVBlockRequest { attempted_peers: HashSet<SessionKey>, validation_session_parent: Hash, candidate_hash: Hash, block_data_hash: Hash, - sender: oneshot::Sender<BlockData>, + sender: oneshot::Sender<PoVBlock>, + canon_roots: ConsolidatedIngressRoots, +} + +impl PoVBlockRequest { + // Attempt to process a response. If the provided block is invalid, + // this returns an error result containing the unmodified request. + // + // If `Ok(())` is returned, that indicates that the request has been processed. + fn process_response(self, pov_block: PoVBlock) -> Result<(), Self> { + if pov_block.block_data.hash() != self.block_data_hash { + return Err(self); + } + + match polkadot_validation::validate_incoming(&self.canon_roots, &pov_block.ingress) { + Ok(()) => { + let _ = self.sender.send(pov_block); + Ok(()) + } + Err(_) => Err(self) + } + } } // ensures collator-protocol messages are sent in correct order. @@ -147,9 +171,13 @@ pub enum Message { // TODO: do this with a cryptographic proof of some kind // https://github.com/paritytech/polkadot/issues/47 SessionKey(SessionKey), - /// Requesting parachain block data by (relay_parent, candidate_hash). + /// Requesting parachain proof-of-validation block (relay_parent, candidate_hash). + RequestPovBlock(RequestId, Hash, Hash), + /// Provide requested proof-of-validation block data by candidate hash or nothing if unknown. + PovBlock(RequestId, Option<PoVBlock>), + /// Request block data (relay_parent, candidate_hash) RequestBlockData(RequestId, Hash, Hash), - /// Provide block data by candidate hash or nothing if unknown. + /// Provide requested block data by candidate hash or nothing. BlockData(RequestId, Option<BlockData>), /// Tell a collator their role. CollatorRole(Role), @@ -171,8 +199,8 @@ pub struct PolkadotProtocol { validators: HashMap<SessionKey, PeerId>, local_collations: LocalCollations<Collation>, live_validation_sessions: LiveValidationSessions, - in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>, - pending: Vec<BlockDataRequest>, + in_flight: HashMap<(RequestId, PeerId), PoVBlockRequest>, + pending: Vec<PoVBlockRequest>, extrinsic_store: Option<::av_store::Store>, next_req_id: u64, } @@ -195,15 +223,22 @@ impl PolkadotProtocol { } /// Fetch block data by candidate receipt. - fn fetch_block_data(&mut self, ctx: &mut Context<Block>, candidate: &CandidateReceipt, relay_parent: Hash) -> oneshot::Receiver<BlockData> { + fn fetch_pov_block( + &mut self, + ctx: &mut Context<Block>, + candidate: &CandidateReceipt, + relay_parent: Hash, + canon_roots: ConsolidatedIngressRoots, + ) -> oneshot::Receiver<PoVBlock> { let (tx, rx) = oneshot::channel(); - self.pending.push(BlockDataRequest { + self.pending.push(PoVBlockRequest { attempted_peers: Default::default(), validation_session_parent: relay_parent, candidate_hash: candidate.hash(), block_data_hash: candidate.block_data_hash, sender: tx, + canon_roots, }); self.dispatch_pending_requests(ctx); @@ -250,7 +285,7 @@ impl PolkadotProtocol { let parent = pending.validation_session_parent; let c_hash = pending.candidate_hash; - let still_pending = self.live_validation_sessions.with_block_data(&parent, &c_hash, |x| match x { + let still_pending = self.live_validation_sessions.with_pov_block(&parent, &c_hash, |x| match x { Ok(data @ &_) => { // answer locally. let _ = pending.sender.send(data.clone()); @@ -270,7 +305,7 @@ impl PolkadotProtocol { send_polkadot_message( ctx, who.clone(), - Message::RequestBlockData(req_id, parent, c_hash), + Message::RequestPovBlock(req_id, parent, c_hash), ); in_flight.insert((req_id, who), pending); @@ -295,12 +330,21 @@ impl PolkadotProtocol { trace!(target: "p_net", "Polkadot message from {}: {:?}", who, msg); match msg { Message::SessionKey(key) => self.on_session_key(ctx, who, key), + Message::RequestPovBlock(req_id, relay_parent, candidate_hash) => { + let pov_block = self.live_validation_sessions.with_pov_block( + &relay_parent, + &candidate_hash, + |res| res.ok().map(|b| b.clone()), + ); + + send_polkadot_message(ctx, who, Message::PovBlock(req_id, pov_block)); + } Message::RequestBlockData(req_id, relay_parent, candidate_hash) => { let block_data = self.live_validation_sessions - .with_block_data( + .with_pov_block( &relay_parent, &candidate_hash, - |res| res.ok().map(|b| b.clone()), + |res| res.ok().map(|b| b.block_data.clone()), ) .or_else(|| self.extrinsic_store.as_ref() .and_then(|s| s.block_data(relay_parent, candidate_hash)) @@ -308,7 +352,11 @@ impl PolkadotProtocol { send_polkadot_message(ctx, who, Message::BlockData(req_id, block_data)); } - Message::BlockData(req_id, data) => self.on_block_data(ctx, who, req_id, data), + Message::PovBlock(req_id, data) => self.on_pov_block(ctx, who, req_id, data), + Message::BlockData(_req_id, _data) => { + // current block data is never requested bare by the node. + ctx.report_peer(who, Severity::Bad("Peer sent un-requested block data".to_string())); + } Message::Collation(relay_parent, collation) => self.on_collation(ctx, who, relay_parent, collation), Message::CollatorRole(role) => self.on_new_role(ctx, who, role), } @@ -355,13 +403,19 @@ impl PolkadotProtocol { self.dispatch_pending_requests(ctx); } - fn on_block_data(&mut self, ctx: &mut Context<Block>, who: PeerId, req_id: RequestId, data: Option<BlockData>) { + fn on_pov_block( + &mut self, + ctx: &mut Context<Block>, + who: PeerId, + req_id: RequestId, + pov_block: Option<PoVBlock>, + ) { match self.in_flight.remove(&(req_id, who.clone())) { - Some(req) => { - if let Some(data) = data { - if data.hash() == req.block_data_hash { - let _ = req.sender.send(data); - return + Some(mut req) => { + if let Some(pov_block) = pov_block { + match req.process_response(pov_block) { + Ok(()) => return, + Err(r) => { req = r; } } } @@ -486,12 +540,14 @@ impl Specialization<Block> for PolkadotProtocol { self.in_flight.retain(|&(_, ref peer), val| { let retain = peer != &who; if !retain { + // swap with a dummy value which will be dropped immediately. let (sender, _) = oneshot::channel(); - pending.push(::std::mem::replace(val, BlockDataRequest { + pending.push(::std::mem::replace(val, PoVBlockRequest { attempted_peers: Default::default(), validation_session_parent: Default::default(), candidate_hash: Default::default(), block_data_hash: Default::default(), + canon_roots: ConsolidatedIngressRoots(Vec::new()), sender, })); } diff --git a/polkadot/network/src/router.rs b/polkadot/network/src/router.rs index 53b35c003a844354d2015dd0813774e4f613e56c..9adbb3b644399d7dc47159873b1653b1563236cb 100644 --- a/polkadot/network/src/router.rs +++ b/polkadot/network/src/router.rs @@ -29,7 +29,8 @@ use polkadot_validation::{ }; use polkadot_primitives::{Block, Hash, SessionKey}; use polkadot_primitives::parachain::{ - BlockData, Extrinsic, CandidateReceipt, ParachainHost, Id as ParaId, Message + Extrinsic, CandidateReceipt, ParachainHost, Id as ParaId, Message, + Collation, PoVBlock, }; use gossip::RegisteredMessageValidator; @@ -41,7 +42,7 @@ use std::collections::{HashMap, HashSet}; use std::io; use std::sync::Arc; -use validation::{self, SessionDataFetcher, NetworkService, Executor, Incoming}; +use validation::{self, SessionDataFetcher, NetworkService, Executor}; type IngressPairRef<'a> = (ParaId, &'a [Message]); @@ -92,6 +93,12 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> { .map(|msg| msg.statement) } + /// Get access to the session data fetcher. + #[cfg(test)] + pub(crate) fn fetcher(&self) -> &SessionDataFetcher<P, E, N, T> { + &self.fetcher + } + fn parent_hash(&self) -> Hash { self.fetcher.parent_hash() } @@ -201,7 +208,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w fn create_work<D>(&self, candidate_hash: Hash, producer: ParachainWork<D>) -> impl Future<Item=(),Error=()> + Send + 'static where - D: Future<Item=(BlockData, Incoming),Error=io::Error> + Send + 'static, + D: Future<Item=PoVBlock,Error=io::Error> + Send + 'static, { let table = self.table.clone(); let network = self.network().clone(); @@ -213,7 +220,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w // store the data before broadcasting statements, so other peers can fetch. knowledge.lock().note_candidate( candidate_hash, - Some(validated.block_data().clone()), + Some(validated.pov_block().clone()), validated.extrinsic().cloned(), ); @@ -234,26 +241,21 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh E: Future<Item=(),Error=()> + Clone + Send + 'static, { type Error = io::Error; - type FetchCandidate = validation::BlockDataReceiver; - type FetchIncoming = validation::IncomingReceiver; + type FetchValidationProof = validation::PoVReceiver; - fn local_candidate(&self, receipt: CandidateReceipt, block_data: BlockData, extrinsic: Extrinsic) { + fn local_collation(&self, collation: Collation, extrinsic: Extrinsic) { // produce a signed statement - let hash = receipt.hash(); - let validated = Validated::collated_local(receipt, block_data.clone(), extrinsic.clone()); + let hash = collation.receipt.hash(); + let validated = Validated::collated_local(collation.receipt, collation.pov.clone(), extrinsic.clone()); let statement = self.table.import_validated(validated); // give to network to make available. - self.fetcher.knowledge().lock().note_candidate(hash, Some(block_data), Some(extrinsic)); + self.fetcher.knowledge().lock().note_candidate(hash, Some(collation.pov), Some(extrinsic)); self.network().gossip_message(self.attestation_topic, statement.encode()); } - fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate { - self.fetcher.fetch_block_data(candidate) - } - - fn fetch_incoming(&self, parachain: ParaId) -> Self::FetchIncoming { - self.fetcher.fetch_incoming(parachain) + fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> Self::FetchValidationProof { + self.fetcher.fetch_pov_block(candidate) } } diff --git a/polkadot/network/src/tests/mod.rs b/polkadot/network/src/tests/mod.rs index 5e6f0ebea147fb2c0be41f6ed371ef3fa26279a5..8d3cf21b10c87327c3df93fc088b508b2c54407c 100644 --- a/polkadot/network/src/tests/mod.rs +++ b/polkadot/network/src/tests/mod.rs @@ -21,7 +21,10 @@ use validation::SessionParams; use polkadot_validation::GenericStatement; use polkadot_primitives::{Block, Hash, SessionKey}; -use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData, CollatorId, ValidatorId}; +use polkadot_primitives::parachain::{ + CandidateReceipt, HeadData, PoVBlock, BlockData, CollatorId, ValidatorId, + ConsolidatedIngressRoots, +}; use substrate_primitives::crypto::UncheckedInto; use codec::Encode; use substrate_network::{ @@ -74,6 +77,14 @@ impl TestContext { } } + +fn make_pov(block_data: Vec<u8>) -> PoVBlock { + PoVBlock { + block_data: BlockData(block_data), + ingress: polkadot_primitives::parachain::ConsolidatedIngress(Vec::new()), + } +} + fn make_status(status: &Status, roles: Roles) -> FullStatus { FullStatus { version: 1, @@ -164,7 +175,13 @@ fn fetches_from_those_with_knowledge() { let knowledge = session.knowledge(); knowledge.lock().note_statement(a_key.clone(), &GenericStatement::Valid(candidate_hash)); - let recv = protocol.fetch_block_data(&mut TestContext::default(), &candidate_receipt, parent_hash); + let canon_roots = ConsolidatedIngressRoots(Vec::new()); + let recv = protocol.fetch_pov_block( + &mut TestContext::default(), + &candidate_receipt, + parent_hash, + canon_roots, + ); // connect peer A { @@ -178,7 +195,7 @@ fn fetches_from_those_with_knowledge() { let mut ctx = TestContext::default(); on_message(&mut protocol, &mut ctx, peer_a.clone(), Message::SessionKey(a_key.clone())); assert!(protocol.validators.contains_key(&a_key)); - assert!(ctx.has_message(peer_a.clone(), Message::RequestBlockData(1, parent_hash, candidate_hash))); + assert!(ctx.has_message(peer_a.clone(), Message::RequestPovBlock(1, parent_hash, candidate_hash))); } knowledge.lock().note_statement(b_key.clone(), &GenericStatement::Valid(candidate_hash)); @@ -188,7 +205,7 @@ fn fetches_from_those_with_knowledge() { let mut ctx = TestContext::default(); protocol.on_connect(&mut ctx, peer_b.clone(), make_status(&status, Roles::AUTHORITY)); on_message(&mut protocol, &mut ctx, peer_b.clone(), Message::SessionKey(b_key.clone())); - assert!(!ctx.has_message(peer_b.clone(), Message::RequestBlockData(2, parent_hash, candidate_hash))); + assert!(!ctx.has_message(peer_b.clone(), Message::RequestPovBlock(2, parent_hash, candidate_hash))); } @@ -197,15 +214,16 @@ fn fetches_from_those_with_knowledge() { let mut ctx = TestContext::default(); protocol.on_disconnect(&mut ctx, peer_a.clone()); assert!(!protocol.validators.contains_key(&a_key)); - assert!(ctx.has_message(peer_b.clone(), Message::RequestBlockData(2, parent_hash, candidate_hash))); + assert!(ctx.has_message(peer_b.clone(), Message::RequestPovBlock(2, parent_hash, candidate_hash))); } // peer B comes back with block data. { let mut ctx = TestContext::default(); - on_message(&mut protocol, &mut ctx, peer_b, Message::BlockData(2, Some(block_data.clone()))); + let pov_block = make_pov(block_data.0); + on_message(&mut protocol, &mut ctx, peer_b, Message::PovBlock(2, Some(pov_block.clone()))); drop(protocol); - assert_eq!(recv.wait().unwrap(), block_data); + assert_eq!(recv.wait().unwrap(), pov_block); } } diff --git a/polkadot/network/src/tests/validation.rs b/polkadot/network/src/tests/validation.rs index d9cf88b622259cd4511b4ddc3dbfd43b06b69a2d..3562ac2fa7d6359f78794627134ddc52048bbc79 100644 --- a/polkadot/network/src/tests/validation.rs +++ b/polkadot/network/src/tests/validation.rs @@ -22,10 +22,12 @@ use substrate_primitives::{NativeOrEncoded, ExecutionContext}; use substrate_keyring::AuthorityKeyring; use {PolkadotProtocol}; -use polkadot_validation::{SharedTable, MessagesFrom, Network, TableRouter}; +use polkadot_validation::{SharedTable, MessagesFrom, Network}; use polkadot_primitives::{SessionKey, Block, Hash, Header, BlockId}; -use polkadot_primitives::parachain::{Id as ParaId, Chain, DutyRoster, ParachainHost, OutgoingMessage, - ValidatorId}; +use polkadot_primitives::parachain::{ + Id as ParaId, Chain, DutyRoster, ParachainHost, OutgoingMessage, + ValidatorId, ConsolidatedIngressRoots, +}; use parking_lot::Mutex; use substrate_client::error::Result as ClientResult; use substrate_client::runtime_api::{Core, RuntimeVersion, ApiExt}; @@ -158,7 +160,7 @@ struct ApiData { validators: Vec<ValidatorId>, duties: Vec<Chain>, active_parachains: Vec<ParaId>, - ingress: HashMap<ParaId, Vec<(ParaId, Hash)>>, + ingress: HashMap<ParaId, ConsolidatedIngressRoots>, } #[derive(Default, Clone)] @@ -293,7 +295,7 @@ impl ParachainHost<Block> for RuntimeApi { _: ExecutionContext, id: Option<ParaId>, _: Vec<u8>, - ) -> ClientResult<NativeOrEncoded<Option<Vec<(ParaId, Hash)>>>> { + ) -> ClientResult<NativeOrEncoded<Option<ConsolidatedIngressRoots>>> { let id = id.unwrap(); Ok(NativeOrEncoded::Native(self.data.lock().ingress.get(&id).cloned())) } @@ -358,7 +360,7 @@ impl IngressBuilder { } } - fn build(self) -> HashMap<ParaId, Vec<(ParaId, Hash)>> { + fn build(self) -> HashMap<ParaId, ConsolidatedIngressRoots> { let mut map = HashMap::new(); for ((source, target), messages) in self.egress { map.entry(target).or_insert_with(Vec::new) @@ -369,7 +371,7 @@ impl IngressBuilder { roots.sort_by_key(|&(para_id, _)| para_id); } - map + map.into_iter().map(|(k, v)| (k, ConsolidatedIngressRoots(v))).collect() } } @@ -471,11 +473,11 @@ fn ingress_fetch_works() { }; // make sure everyone can get ingress for their own parachain. - let fetch_a = router_a.then(move |r| r.unwrap() + let fetch_a = router_a.then(move |r| r.unwrap().fetcher() .fetch_incoming(id_a).map_err(|_| format!("Could not fetch ingress_a"))); - let fetch_b = router_b.then(move |r| r.unwrap() + let fetch_b = router_b.then(move |r| r.unwrap().fetcher() .fetch_incoming(id_b).map_err(|_| format!("Could not fetch ingress_b"))); - let fetch_c = router_c.then(move |r| r.unwrap() + let fetch_c = router_c.then(move |r| r.unwrap().fetcher() .fetch_incoming(id_c).map_err(|_| format!("Could not fetch ingress_c"))); let work = fetch_a.join3(fetch_b, fetch_c); diff --git a/polkadot/network/src/validation.rs b/polkadot/network/src/validation.rs index 6cde1a5234ca72dde1f289f4a58efdf1e425188f..373eae5a4ddcf331098ac443a1df542f82857afc 100644 --- a/polkadot/network/src/validation.rs +++ b/polkadot/network/src/validation.rs @@ -22,10 +22,10 @@ use sr_primitives::traits::{BlakeTwo256, ProvideRuntimeApi, Hash as HashT}; use substrate_network::Context as NetContext; use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement}; -use polkadot_primitives::{Block, Hash, SessionKey}; +use polkadot_primitives::{Block, BlockId, Hash, SessionKey}; use polkadot_primitives::parachain::{ - Id as ParaId, Collation, Extrinsic, ParachainHost, BlockData, Message, CandidateReceipt, - CollatorId, ValidatorId, + Id as ParaId, Collation, Extrinsic, ParachainHost, Message, CandidateReceipt, + CollatorId, ValidatorId, PoVBlock, }; use codec::{Encode, Decode}; @@ -325,7 +325,7 @@ impl<P, E: Clone, N, T: Clone> Collators for ValidationNetwork<P, E, N, T> where struct KnowledgeEntry { knows_block_data: Vec<ValidatorId>, knows_extrinsic: Vec<ValidatorId>, - block_data: Option<BlockData>, + pov: Option<PoVBlock>, extrinsic: Option<Extrinsic>, } @@ -366,9 +366,9 @@ impl Knowledge { } /// Note a candidate collated or seen locally. - pub(crate) fn note_candidate(&mut self, hash: Hash, block_data: Option<BlockData>, extrinsic: Option<Extrinsic>) { + pub(crate) fn note_candidate(&mut self, hash: Hash, pov: Option<PoVBlock>, extrinsic: Option<Extrinsic>) { let entry = self.candidates.entry(hash).or_insert_with(Default::default); - entry.block_data = entry.block_data.take().or(block_data); + entry.pov = entry.pov.take().or(pov); entry.extrinsic = entry.extrinsic.take().or(extrinsic); } } @@ -436,15 +436,15 @@ impl ValidationSession { &self.fetch_incoming } - // execute a closure with locally stored block data for a candidate, or a slice of session identities + // execute a closure with locally stored proof-of-validation for a candidate, or a slice of session identities // we believe should have the data. - fn with_block_data<F, U>(&self, hash: &Hash, f: F) -> U - where F: FnOnce(Result<&BlockData, &[ValidatorId]>) -> U + fn with_pov_block<F, U>(&self, hash: &Hash, f: F) -> U + where F: FnOnce(Result<&PoVBlock, &[ValidatorId]>) -> U { let knowledge = self.knowledge.lock(); let res = knowledge.candidates.get(hash) .ok_or(&[] as &_) - .and_then(|entry| entry.block_data.as_ref().ok_or(&entry.knows_block_data[..])); + .and_then(|entry| entry.pov.as_ref().ok_or(&entry.knows_block_data[..])); f(res) } @@ -590,32 +590,33 @@ impl LiveValidationSessions { self.recent.as_slice() } - /// Call a closure with block data from validation session at parent hash. + /// Call a closure with pov-data from validation session at parent hash for a given + /// candidate-receipt hash. /// /// This calls the closure with `Some(data)` where the session and data are live, /// `Err(Some(keys))` when the session is live but the data unknown, with a list of keys /// who have the data, and `Err(None)` where the session is unknown. - pub(crate) fn with_block_data<F, U>(&self, parent_hash: &Hash, c_hash: &Hash, f: F) -> U - where F: FnOnce(Result<&BlockData, Option<&[ValidatorId]>>) -> U + pub(crate) fn with_pov_block<F, U>(&self, parent_hash: &Hash, c_hash: &Hash, f: F) -> U + where F: FnOnce(Result<&PoVBlock, Option<&[ValidatorId]>>) -> U { match self.live_instances.get(parent_hash) { - Some(c) => c.1.with_block_data(c_hash, |res| f(res.map_err(Some))), + Some(c) => c.1.with_pov_block(c_hash, |res| f(res.map_err(Some))), None => f(Err(None)) } } } /// Receiver for block data. -pub struct BlockDataReceiver { - outer: Receiver<Receiver<BlockData>>, - inner: Option<Receiver<BlockData>> +pub struct PoVReceiver { + outer: Receiver<Receiver<PoVBlock>>, + inner: Option<Receiver<PoVBlock>> } -impl Future for BlockDataReceiver { - type Item = BlockData; +impl Future for PoVReceiver { + type Item = PoVBlock; type Error = io::Error; - fn poll(&mut self) -> Poll<BlockData, io::Error> { + fn poll(&mut self) -> Poll<PoVBlock, io::Error> { let map_err = |_| io::Error::new( io::ErrorKind::Other, "Sending end of channel hung up", @@ -746,22 +747,34 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where T: Clone + Executor + Send + 'static, E: Future<Item=(),Error=()> + Clone + Send + 'static, { - /// Fetch block data for the given candidate receipt. - pub fn fetch_block_data(&self, candidate: &CandidateReceipt) -> BlockDataReceiver { + /// Fetch PoV block for the given candidate receipt. + pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver { + let parachain = candidate.parachain_index.clone(); let parent_hash = self.parent_hash; + + let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain) + .map_err(|e| + format!( + "Cannot fetch ingress for parachain {:?} at {:?}: {:?}", + parachain, + parent_hash, + e, + ) + ); + let candidate = candidate.clone(); let (tx, rx) = ::futures::sync::oneshot::channel(); self.network.with_spec(move |spec, ctx| { - let inner_rx = spec.fetch_block_data(ctx, &candidate, parent_hash); - let _ = tx.send(inner_rx); + if let Ok(Some(canon_roots)) = canon_roots { + let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash, canon_roots); + let _ = tx.send(inner_rx); + } }); - BlockDataReceiver { outer: rx, inner: None } + PoVReceiver { outer: rx, inner: None } } /// Fetch incoming messages for a parachain. pub fn fetch_incoming(&self, parachain: ParaId) -> IncomingReceiver { - use polkadot_primitives::BlockId; - let (rx, work) = self.fetch_incoming.lock().fetch_with_work(parachain.clone(), move || { let parent_hash: Hash = self.parent_hash(); let topic = incoming_message_topic(parent_hash, parachain); @@ -778,7 +791,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where canon_roots.into_future() .and_then(move |ingress_roots| match ingress_roots { None => Err(format!("No parachain {:?} registered at {}", parachain, parent_hash)), - Some(roots) => Ok(roots.into_iter().collect()) + Some(roots) => Ok(roots.0.into_iter().collect()) }) .and_then(move |ingress_roots| ComputeIngress { inner: gossip_messages, diff --git a/polkadot/parachain/src/lib.rs b/polkadot/parachain/src/lib.rs index 7ae53995977cc74845aebc566d589107e1b2e553..323e241a76cc6c916ba0af353348ea09e650d969 100644 --- a/polkadot/parachain/src/lib.rs +++ b/polkadot/parachain/src/lib.rs @@ -41,7 +41,6 @@ //! for setting up a parachain WASM module in Rust. #![cfg_attr(not(feature = "std"), no_std)] -#![cfg_attr(not(feature = "std"), feature(alloc))] /// Re-export of parity-codec. pub extern crate parity_codec as codec; diff --git a/polkadot/primitives/src/lib.rs b/polkadot/primitives/src/lib.rs index de3bbdad2651191b241cfe6ddae4a8c74a9f3ff0..3cb21f78a5bdced9f626721695d8d6205c41220d 100644 --- a/polkadot/primitives/src/lib.rs +++ b/polkadot/primitives/src/lib.rs @@ -19,7 +19,6 @@ #![warn(missing_docs)] #![cfg_attr(not(feature = "std"), no_std)] -#![cfg_attr(not(feature = "std"), feature(alloc))] extern crate parity_codec as codec; extern crate substrate_primitives as primitives; diff --git a/polkadot/primitives/src/parachain.rs b/polkadot/primitives/src/parachain.rs index 563d6f4ab4cbf5ec4c7c711b53249429215cdacb..5183f408f9bcf0631485a562ba0218b9eafac177 100644 --- a/polkadot/primitives/src/parachain.rs +++ b/polkadot/primitives/src/parachain.rs @@ -159,26 +159,52 @@ impl Ord for CandidateReceipt { } /// A full collation. -#[derive(PartialEq, Eq, Clone, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug))] +#[derive(PartialEq, Eq, Clone)] +#[cfg_attr(feature = "std", derive(Debug, Encode, Decode))] pub struct Collation { - /// Block data. - pub block_data: BlockData, /// Candidate receipt itself. pub receipt: CandidateReceipt, + /// A proof-of-validation for the receipt. + pub pov: PoVBlock, } -/// Parachain ingress queue message. +/// A Proof-of-Validation block. #[derive(PartialEq, Eq, Clone)] -#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Encode, Decode, Debug))] +#[cfg_attr(feature = "std", derive(Debug, Encode, Decode))] +pub struct PoVBlock { + /// Block data. + pub block_data: BlockData, + /// Ingress for the parachain. + pub ingress: ConsolidatedIngress, +} + +/// Parachain ingress queue message. +#[derive(PartialEq, Eq, Clone, Decode)] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Encode, Debug))] pub struct Message(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec<u8>); +/// Consolidated ingress roots. +/// +/// This is an ordered vector of other parachains' egress queue roots, +/// obtained according to the routing rules. The same parachain may appear +/// twice. +#[derive(Default, PartialEq, Eq, Clone, Encode)] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Decode))] +pub struct ConsolidatedIngressRoots(pub Vec<(Id, Hash)>); + +impl From<Vec<(Id, Hash)>> for ConsolidatedIngressRoots { + fn from(v: Vec<(Id, Hash)>) -> Self { + ConsolidatedIngressRoots(v) + } +} + /// Consolidated ingress queue data. /// /// This is just an ordered vector of other parachains' egress queues, -/// obtained according to the routing rules. -#[derive(Default, PartialEq, Eq, Clone)] -#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] +/// obtained according to the routing rules. The same parachain may appear +/// twice. +#[derive(Default, PartialEq, Eq, Clone, Decode)] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Encode, Debug))] pub struct ConsolidatedIngress(pub Vec<(Id, Vec<Message>)>); /// Parachain block data. @@ -283,7 +309,7 @@ decl_runtime_apis! { fn parachain_code(id: Id) -> Option<Vec<u8>>; /// Get the ingress roots to a specific parachain at a /// block. - fn ingress(to: Id) -> Option<Vec<(Id, Hash)>>; + fn ingress(to: Id) -> Option<ConsolidatedIngressRoots>; } } diff --git a/polkadot/runtime/src/lib.rs b/polkadot/runtime/src/lib.rs index 3aebc5d1ce6f138e86cd90d27ddfd7664f6c6b62..01be12a0e5f30b517a7a6f9dac77617812a52f19 100644 --- a/polkadot/runtime/src/lib.rs +++ b/polkadot/runtime/src/lib.rs @@ -363,8 +363,8 @@ impl_runtime_apis! { fn parachain_code(id: parachain::Id) -> Option<Vec<u8>> { Parachains::parachain_code(&id) } - fn ingress(to: parachain::Id) -> Option<Vec<(parachain::Id, Hash)>> { - Parachains::ingress(to) + fn ingress(to: parachain::Id) -> Option<parachain::ConsolidatedIngressRoots> { + Parachains::ingress(to).map(Into::into) } } diff --git a/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.compact.wasm b/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.compact.wasm index c40326d196ee2527eb6f640c0bb60beba0425341..69f822132b82e0a475e6845c1206431a20f3f21d 100644 Binary files a/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.compact.wasm and b/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.compact.wasm differ diff --git a/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.wasm b/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.wasm index 1094d6dcdd7e09401e1834e914fa4f64eb45c6e4..dc9228ec89b6adaee841de79f3b259bba2c79df9 100755 Binary files a/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.wasm and b/polkadot/runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.wasm differ diff --git a/polkadot/validation/src/attestation_service.rs b/polkadot/validation/src/attestation_service.rs index 0df832921e7c6c50727042a3716d21c63474d072..6593454cdef906591cd9fc86c77e5e933bf35b07 100644 --- a/polkadot/validation/src/attestation_service.rs +++ b/polkadot/validation/src/attestation_service.rs @@ -42,7 +42,7 @@ use tokio::runtime::TaskExecutor; use tokio::runtime::current_thread::Runtime as LocalRuntime; use tokio::timer::Interval; -use super::{Network, Collators, TableRouter}; +use super::{Network, Collators}; /// Gets a list of the candidates in a block. pub(crate) fn fetch_candidates<P: BlockBody<Block>>(client: &P, block: &BlockId) @@ -117,7 +117,6 @@ pub(crate) fn start<C, N, P>( P::Api: ParachainHost<Block> + Core<Block> + BlockBuilder<Block>, N: Network + Send + Sync + 'static, N::TableRouter: Send + 'static, - <<N::TableRouter as TableRouter>::FetchIncoming as IntoFuture>::Future: Send + 'static, <N::BuildTableRouter as IntoFuture>::Future: Send + 'static, { const TIMER_DELAY: Duration = Duration::from_secs(5); diff --git a/polkadot/validation/src/collation.rs b/polkadot/validation/src/collation.rs index f260274f505a1c07e976bfc8f60f246d8a826ac5..7c06296d24fab50757f85c355780da833a8d3b4c 100644 --- a/polkadot/validation/src/collation.rs +++ b/polkadot/validation/src/collation.rs @@ -23,10 +23,12 @@ use std::sync::Arc; use polkadot_primitives::{Block, Hash, BlockId, parachain::CollatorId}; use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, OutgoingMessage}; -use polkadot_primitives::parachain::{CandidateReceipt, ParachainHost}; +use polkadot_primitives::parachain::{ + ConsolidatedIngress, ConsolidatedIngressRoots, CandidateReceipt, ParachainHost, +}; use runtime_primitives::traits::ProvideRuntimeApi; use parachain::{wasm_executor::{self, ExternalitiesError}, MessageRef}; -use super::Incoming; +use error_chain::bail; use futures::prelude::*; @@ -60,7 +62,6 @@ pub struct CollationFetch<C: Collators, P> { relay_parent_hash: Hash, relay_parent: BlockId, collators: C, - incoming: Incoming, live_fetch: Option<<C::Collation as IntoFuture>::Future>, client: Arc<P>, } @@ -72,7 +73,6 @@ impl<C: Collators, P> CollationFetch<C, P> { relay_parent_hash: Hash, collators: C, client: Arc<P>, - incoming: Incoming, ) -> Self { CollationFetch { relay_parent: BlockId::hash(relay_parent_hash), @@ -81,7 +81,6 @@ impl<C: Collators, P> CollationFetch<C, P> { client, parachain, live_fetch: None, - incoming, } } @@ -104,7 +103,7 @@ impl<C: Collators, P: ProvideRuntimeApi> Future for CollationFetch<C, P> fn poll(&mut self) -> Poll<(Collation, Extrinsic), C::Error> { loop { - let x = { + let collation = { let parachain = self.parachain.clone(); let (r, c) = (self.relay_parent_hash, &self.collators); let poll = self.live_fetch @@ -114,16 +113,18 @@ impl<C: Collators, P: ProvideRuntimeApi> Future for CollationFetch<C, P> try_ready!(poll) }; - match validate_collation(&*self.client, &self.relay_parent, &x, &self.incoming) { + let res = validate_collation(&*self.client, &self.relay_parent, &collation); + + match res { Ok(e) => { - return Ok(Async::Ready((x, e))) + return Ok(Async::Ready((collation, e))) } Err(e) => { debug!("Failed to validate parachain due to API error: {}", e); // just continue if we got a bad collation or failed to validate self.live_fetch = None; - self.collators.note_bad_collator(x.receipt.collator) + self.collators.note_bad_collator(collation.receipt.collator) } } } @@ -145,15 +146,33 @@ error_chain! { display("Collated for inactive parachain: {:?}", id), } EgressRootMismatch(id: ParaId, expected: Hash, got: Hash) { - description("Got unexpected egress route."), + description("Got unexpected egress root."), + display( + "Got unexpected egress root to {:?}. (expected: {:?}, got {:?})", + id, expected, got + ), + } + IngressRootMismatch(id: ParaId, expected: Hash, got: Hash) { + description("Got unexpected ingress root."), display( - "Got unexpected egress route to {:?}. (expected: {:?}, got {:?})", + "Got unexpected ingress root to {:?}. (expected: {:?}, got {:?})", id, expected, got ), } - MissingEgressRoute(expected: Option<ParaId>, got: Option<ParaId>) { - description("Missing or extra egress route."), - display("Missing or extra egress route. (expected: {:?}, got {:?})", expected, got), + IngressChainMismatch(expected: ParaId, got: ParaId) { + description("Got ingress from wrong chain"), + display( + "Got ingress from wrong chain. (expected: {:?}, got {:?})", + expected, got + ), + } + IngressCanonicalityMismatch(expected: usize, got: usize) { + description("Ingress canonicality mismatch."), + display("Got data for {} roots, expected {}", got, expected), + } + MissingEgressRoot(expected: Option<ParaId>, got: Option<ParaId>) { + description("Missing or extra egress root."), + display("Missing or extra egress root. (expected: {:?}, got {:?})", expected, got), } WrongHeadData(expected: Vec<u8>, got: Vec<u8>) { description("Parachain validation produced wrong head data."), @@ -206,11 +225,11 @@ fn check_extrinsic( let mut expected_egress_roots = expected_egress_roots.iter(); while let Some(batch_target) = messages_iter.peek().map(|o| o.target) { let expected_root = match expected_egress_roots.next() { - None => return Err(ErrorKind::MissingEgressRoute(Some(batch_target), None).into()), + None => return Err(ErrorKind::MissingEgressRoot(Some(batch_target), None).into()), Some(&(id, ref root)) => if id == batch_target { root } else { - return Err(ErrorKind::MissingEgressRoute(Some(batch_target), Some(id)).into()); + return Err(ErrorKind::MissingEgressRoot(Some(batch_target), Some(id)).into()); } }; @@ -234,7 +253,7 @@ fn check_extrinsic( // also check that there are no more additional expected roots. if let Some((next_target, _)) = expected_egress_roots.next() { - return Err(ErrorKind::MissingEgressRoute(None, Some(*next_target)).into()); + return Err(ErrorKind::MissingEgressRoot(None, Some(*next_target)).into()); } } @@ -277,16 +296,38 @@ impl Externalities { } } +/// Validate incoming messages against expected roots. +pub fn validate_incoming( + roots: &ConsolidatedIngressRoots, + ingress: &ConsolidatedIngress, +) -> Result<(), Error> { + if roots.0.len() != ingress.0.len() { + bail!(ErrorKind::IngressCanonicalityMismatch(roots.0.len(), ingress.0.len())); + } + + let all_iter = roots.0.iter().zip(&ingress.0); + for ((expected_id, root), (got_id, messages)) in all_iter { + if expected_id != got_id { + bail!(ErrorKind::IngressChainMismatch(*expected_id, *got_id)); + } + + let got_root = message_queue_root(messages.iter().map(|msg| &msg.0[..])); + if &got_root != root { + bail!(ErrorKind::IngressRootMismatch(*expected_id, *root, got_root)); + } + } + + Ok(()) +} + /// Check whether a given collation is valid. Returns `Ok` on success, error otherwise. /// /// This assumes that basic validity checks have been done: /// - Block data hash is the same as linked in candidate receipt. -/// - incoming messages have been validated against canonical ingress roots pub fn validate_collation<P>( client: &P, relay_parent: &BlockId, collation: &Collation, - incoming: &Incoming, ) -> Result<Extrinsic, Error> where P: ProvideRuntimeApi, P::Api: ParachainHost<Block>, @@ -301,10 +342,14 @@ pub fn validate_collation<P>( let chain_head = api.parachain_head(relay_parent, para_id)? .ok_or_else(|| ErrorKind::InactiveParachain(para_id))?; + let roots = api.ingress(relay_parent, para_id)? + .ok_or_else(|| ErrorKind::InactiveParachain(para_id))?; + validate_incoming(&roots, &collation.pov.ingress)?; + let params = ValidationParams { parent_head: chain_head, - block_data: collation.block_data.0.clone(), - ingress: incoming.iter() + block_data: collation.pov.block_data.0.clone(), + ingress: collation.pov.ingress.0.iter() .flat_map(|&(source, ref messages)| { messages.iter().map(move |msg| IncomingMessage { source, diff --git a/polkadot/validation/src/lib.rs b/polkadot/validation/src/lib.rs index 0597f8cfe26e515e0d2c3a6290e69aca316597ea..6ffdace74668d9910a723ba6053be94f6fc06ef3 100644 --- a/polkadot/validation/src/lib.rs +++ b/polkadot/validation/src/lib.rs @@ -78,8 +78,9 @@ use extrinsic_store::Store as ExtrinsicStore; use parking_lot::Mutex; use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header, SessionKey}; use polkadot_primitives::parachain::{ - Id as ParaId, Chain, DutyRoster, BlockData, Extrinsic as ParachainExtrinsic, CandidateReceipt, - ParachainHost, AttestedCandidate, Statement as PrimitiveStatement, Message, OutgoingMessage, CollatorSignature + Id as ParaId, Chain, DutyRoster, Extrinsic as ParachainExtrinsic, CandidateReceipt, + ParachainHost, AttestedCandidate, Statement as PrimitiveStatement, Message, OutgoingMessage, CollatorSignature, + Collation, PoVBlock, }; use primitives::{Pair, ed25519}; use runtime_primitives::{traits::{ProvideRuntimeApi, Header as HeaderT}, ApplyError}; @@ -97,7 +98,9 @@ use runtime_aura::timestamp::TimestampInherentData; use ed25519::Public as AuthorityId; -pub use self::collation::{validate_collation, message_queue_root, egress_roots, Collators}; +pub use self::collation::{ + validate_collation, validate_incoming, message_queue_root, egress_roots, Collators, +}; pub use self::error::{ErrorKind, Error}; pub use self::shared_table::{ SharedTable, ParachainWork, PrimedParachainWork, Validated, Statement, SignedStatement, @@ -146,24 +149,14 @@ pub trait TableRouter: Clone { /// Errors when fetching data from the network. type Error: std::fmt::Debug; /// Future that resolves when candidate data is fetched. - type FetchCandidate: IntoFuture<Item=BlockData,Error=Self::Error>; - /// Fetch incoming messages for a candidate. - type FetchIncoming: IntoFuture<Item=Incoming,Error=Self::Error>; + type FetchValidationProof: IntoFuture<Item=PoVBlock,Error=Self::Error>; /// Call with local candidate data. This will make the data available on the network, /// and sign, import, and broadcast a statement about the candidate. - fn local_candidate(&self, candidate: CandidateReceipt, block_data: BlockData, extrinsic: ParachainExtrinsic); - - /// Fetch block data for a specific candidate. - fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate; + fn local_collation(&self, collation: Collation, extrinsic: ParachainExtrinsic); - /// Fetches the incoming message data to a parachain from the network. Incoming data should be - /// checked. - /// - /// The `ParachainHost::ingress` function can be used to fetch incoming roots, - /// and the `message_queue_root` function can be used to check that messages actually have - /// expected root. - fn fetch_incoming(&self, id: ParaId) -> Self::FetchIncoming; + /// Fetch validation proof for a specific candidate. + fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> Self::FetchValidationProof; } /// A long-lived network which can create parachain statement and BFT message routing processes on demand. @@ -291,7 +284,6 @@ impl<C, N, P> ParachainValidation<C, N, P> where P::Api: ParachainHost<Block> + BlockBuilderApi<Block>, <C::Collation as IntoFuture>::Future: Send + 'static, N::TableRouter: Send + 'static, - <<N::TableRouter as TableRouter>::FetchIncoming as IntoFuture>::Future: Send + 'static, <N::BuildTableRouter as IntoFuture>::Future: Send + 'static, { /// Get an attestation table for given parent hash. @@ -400,23 +392,13 @@ impl<C, N, P> ParachainValidation<C, N, P> where let extrinsic_store = self.extrinsic_store.clone(); let with_router = move |router: N::TableRouter| { - let fetch_incoming = router.fetch_incoming(validation_para) - .into_future() - .map_err(|e| format!("{:?}", e)); - - // fetch incoming messages to our parachain from network and - // then fetch a local collation. - let collation_work = fetch_incoming - .map_err(|e| String::clone(&e)) - .and_then(move |incoming| { - CollationFetch::new( - validation_para, - relay_parent, - collators, - client, - incoming, - ).map_err(|e| format!("{:?}", e)) - }); + // fetch a local collation from connected collators. + let collation_work = CollationFetch::new( + validation_para, + relay_parent, + collators, + client, + ); collation_work.then(move |result| match result { Ok((collation, extrinsic)) => { @@ -424,7 +406,7 @@ impl<C, N, P> ParachainValidation<C, N, P> where relay_parent, parachain_id: collation.receipt.parachain_index, candidate_hash: collation.receipt.hash(), - block_data: collation.block_data.clone(), + block_data: collation.pov.block_data.clone(), extrinsic: Some(extrinsic.clone()), }); @@ -432,11 +414,7 @@ impl<C, N, P> ParachainValidation<C, N, P> where Ok(()) => { // TODO: https://github.com/paritytech/polkadot/issues/51 // Erasure-code and provide merkle branches. - router.local_candidate( - collation.receipt, - collation.block_data, - extrinsic, - ) + router.local_collation(collation, extrinsic); } Err(e) => warn!( target: "validation", @@ -448,7 +426,7 @@ impl<C, N, P> ParachainValidation<C, N, P> where Ok(()) } Err(e) => { - warn!(target: "validation", "Failed to collate candidate: {}", e); + warn!(target: "validation", "Failed to collate candidate: {:?}", e); Ok(()) } }) @@ -493,7 +471,6 @@ impl<C, N, P, TxApi> ProposerFactory<C, N, P, TxApi> where P::Api: ParachainHost<Block> + Core<Block> + BlockBuilderApi<Block>, N: Network + Send + Sync + 'static, N::TableRouter: Send + 'static, - <<N::TableRouter as TableRouter>::FetchIncoming as IntoFuture>::Future: Send + 'static, <N::BuildTableRouter as IntoFuture>::Future: Send + 'static, TxApi: PoolChainApi, { @@ -543,7 +520,6 @@ impl<C, N, P, TxApi> consensus::Environment<Block> for ProposerFactory<C, N, P, P::Api: ParachainHost<Block> + BlockBuilderApi<Block>, <C::Collation as IntoFuture>::Future: Send + 'static, N::TableRouter: Send + 'static, - <<N::TableRouter as TableRouter>::FetchIncoming as IntoFuture>::Future: Send + 'static, <N::BuildTableRouter as IntoFuture>::Future: Send + 'static, { type Proposer = Proposer<P, TxApi>; diff --git a/polkadot/validation/src/shared_table/mod.rs b/polkadot/validation/src/shared_table/mod.rs index 92e420d35523dca1c8dd40c1b879e4a8d2c27c1c..2f54331a7ef5a36d03c6a1ce2e25627a12ab682a 100644 --- a/polkadot/validation/src/shared_table/mod.rs +++ b/polkadot/validation/src/shared_table/mod.rs @@ -24,14 +24,14 @@ use extrinsic_store::{Data, Store as ExtrinsicStore}; use table::{self, Table, Context as TableContextTrait}; use polkadot_primitives::{Block, BlockId, Hash, SessionKey}; use polkadot_primitives::parachain::{ - Id as ParaId, BlockData, Collation, Extrinsic, CandidateReceipt, - AttestedCandidate, ParachainHost + Id as ParaId, Collation, Extrinsic, CandidateReceipt, + AttestedCandidate, ParachainHost, PoVBlock }; use parking_lot::Mutex; -use futures::{future, prelude::*}; +use futures::prelude::*; -use super::{GroupInfo, Incoming, TableRouter}; +use super::{GroupInfo, TableRouter}; use self::includable::IncludabilitySender; use primitives::{ed25519, Pair}; use runtime_primitives::traits::ProvideRuntimeApi; @@ -75,8 +75,8 @@ impl TableContext { } pub(crate) enum Validation { - Valid(BlockData, Extrinsic), - Invalid(BlockData), // should take proof. + Valid(PoVBlock, Extrinsic), + Invalid(PoVBlock), // should take proof. } enum ValidationWork { @@ -121,10 +121,9 @@ impl SharedTableInner { context: &TableContext, router: &R, statement: table::SignedStatement, - ) -> Option<ParachainWork<future::Join< - <R::FetchCandidate as IntoFuture>::Future, - <R::FetchIncoming as IntoFuture>::Future, - >>> { + ) -> Option<ParachainWork< + <R::FetchValidationProof as IntoFuture>::Future, + >> { let summary = match self.table.import_statement(context, statement) { Some(summary) => summary, None => return None, @@ -149,7 +148,6 @@ impl SharedTableInner { }; let work = if do_validation { - let fetch_incoming = router.fetch_incoming(summary.group_id); match self.table.get_candidate(&digest) { None => { let message = format!( @@ -163,11 +161,11 @@ impl SharedTableInner { None } Some(candidate) => { - let fetch_block_data = router.fetch_block_data(candidate).into_future(); + let fetch = router.fetch_pov_block(candidate).into_future(); Some(Work { candidate_receipt: candidate.clone(), - fetch: fetch_block_data.join(fetch_incoming), + fetch, }) } } @@ -202,19 +200,19 @@ pub struct Validated { impl Validated { /// Note that we've validated a candidate with given hash and it is bad. - pub fn known_bad(hash: Hash, block_data: BlockData) -> Self { + pub fn known_bad(hash: Hash, collation: PoVBlock) -> Self { Validated { statement: GenericStatement::Invalid(hash), - result: Validation::Invalid(block_data), + result: Validation::Invalid(collation), } } /// Note that we've validated a candidate with given hash and it is good. /// Extrinsic data required. - pub fn known_good(hash: Hash, block_data: BlockData, extrinsic: Extrinsic) -> Self { + pub fn known_good(hash: Hash, collation: PoVBlock, extrinsic: Extrinsic) -> Self { Validated { statement: GenericStatement::Valid(hash), - result: Validation::Valid(block_data, extrinsic), + result: Validation::Valid(collation, extrinsic), } } @@ -222,17 +220,17 @@ impl Validated { /// Extrinsic data required. pub fn collated_local( receipt: CandidateReceipt, - block_data: BlockData, + collation: PoVBlock, extrinsic: Extrinsic, ) -> Self { Validated { statement: GenericStatement::Candidate(receipt), - result: Validation::Valid(block_data, extrinsic), + result: Validation::Valid(collation, extrinsic), } } - /// Get a reference to the block data. - pub fn block_data(&self) -> &BlockData { + /// Get a reference to the proof-of-validation block. + pub fn pov_block(&self) -> &PoVBlock { match self.result { Validation::Valid(ref b, _) | Validation::Invalid(ref b) => b, } @@ -260,18 +258,17 @@ impl<Fetch: Future> ParachainWork<Fetch> { pub fn prime<P: ProvideRuntimeApi>(self, api: Arc<P>) -> PrimedParachainWork< Fetch, - impl Send + FnMut(&BlockId, &Collation, &Incoming) -> Result<Extrinsic, ()>, + impl Send + FnMut(&BlockId, &Collation) -> Result<Extrinsic, ()>, > where P: Send + Sync + 'static, P::Api: ParachainHost<Block>, { - let validate = move |id: &_, collation: &_, incoming: &_| { + let validate = move |id: &_, collation: &_| { let res = ::collation::validate_collation( &*api, id, collation, - incoming, ); match res { @@ -288,7 +285,7 @@ impl<Fetch: Future> ParachainWork<Fetch> { /// Prime the parachain work with a custom validation function. pub fn prime_with<F>(self, validate: F) -> PrimedParachainWork<Fetch, F> - where F: FnMut(&BlockId, &Collation, &Incoming) -> Result<Extrinsic, ()> + where F: FnMut(&BlockId, &Collation) -> Result<Extrinsic, ()> { PrimedParachainWork { inner: self, validate } } @@ -307,8 +304,8 @@ pub struct PrimedParachainWork<Fetch, F> { impl<Fetch, F, Err> Future for PrimedParachainWork<Fetch, F> where - Fetch: Future<Item=(BlockData, Incoming),Error=Err>, - F: FnMut(&BlockId, &Collation, &Incoming) -> Result<Extrinsic, ()>, + Fetch: Future<Item=PoVBlock,Error=Err>, + F: FnMut(&BlockId, &Collation) -> Result<Extrinsic, ()>, Err: From<::std::io::Error>, { type Item = Validated; @@ -318,11 +315,10 @@ impl<Fetch, F, Err> Future for PrimedParachainWork<Fetch, F> let work = &mut self.inner.work; let candidate = &work.candidate_receipt; - let (block, incoming) = try_ready!(work.fetch.poll()); + let pov_block = try_ready!(work.fetch.poll()); let validation_res = (self.validate)( &BlockId::hash(self.inner.relay_parent), - &Collation { block_data: block.clone(), receipt: candidate.clone() }, - &incoming, + &Collation { pov: pov_block.clone(), receipt: candidate.clone() }, ); let candidate_hash = candidate.hash(); @@ -333,20 +329,20 @@ impl<Fetch, F, Err> Future for PrimedParachainWork<Fetch, F> let (validity_statement, result) = match validation_res { Err(()) => ( GenericStatement::Invalid(candidate_hash), - Validation::Invalid(block), + Validation::Invalid(pov_block), ), Ok(extrinsic) => { self.inner.extrinsic_store.make_available(Data { relay_parent: self.inner.relay_parent, parachain_id: work.candidate_receipt.parachain_index, candidate_hash, - block_data: block.clone(), + block_data: pov_block.block_data.clone(), extrinsic: Some(extrinsic.clone()), })?; ( GenericStatement::Valid(candidate_hash), - Validation::Valid(block, extrinsic) + Validation::Valid(pov_block, extrinsic) ) } }; @@ -431,10 +427,9 @@ impl SharedTable { &self, router: &R, statement: table::SignedStatement, - ) -> Option<ParachainWork<future::Join< - <R::FetchCandidate as IntoFuture>::Future, - <R::FetchIncoming as IntoFuture>::Future, - >>> { + ) -> Option<ParachainWork< + <R::FetchValidationProof as IntoFuture>::Future, + >> { self.inner.lock().import_remote_statement(&*self.context, router, statement) } @@ -448,10 +443,9 @@ impl SharedTable { where R: TableRouter, I: IntoIterator<Item=table::SignedStatement>, - U: ::std::iter::FromIterator<Option<ParachainWork<future::Join< - <R::FetchCandidate as IntoFuture>::Future, - <R::FetchIncoming as IntoFuture>::Future, - >>>>, + U: ::std::iter::FromIterator<Option<ParachainWork< + <R::FetchValidationProof as IntoFuture>::Future, + >>>, { let mut inner = self.inner.lock(); @@ -547,24 +541,27 @@ mod tests { use super::*; use substrate_keyring::AuthorityKeyring; use primitives::crypto::UncheckedInto; + use polkadot_primitives::parachain::{BlockData, ConsolidatedIngress}; + use futures::future; + + fn pov_block_with_data(data: Vec<u8>) -> PoVBlock { + PoVBlock { + block_data: BlockData(data), + ingress: ConsolidatedIngress(Vec::new()), + } + } #[derive(Clone)] struct DummyRouter; impl TableRouter for DummyRouter { type Error = ::std::io::Error; - type FetchCandidate = ::futures::future::FutureResult<BlockData,Self::Error>; - type FetchIncoming = ::futures::future::FutureResult<Incoming,Self::Error>; - - fn local_candidate(&self, _candidate: CandidateReceipt, _block_data: BlockData, _extrinsic: Extrinsic) { - - } + type FetchValidationProof = future::FutureResult<PoVBlock,Self::Error>; - fn fetch_block_data(&self, _candidate: &CandidateReceipt) -> Self::FetchCandidate { - future::ok(BlockData(vec![1, 2, 3, 4, 5])) + fn local_collation(&self, _collation: Collation, _extrinsic: Extrinsic) { } - fn fetch_incoming(&self, _para_id: ParaId) -> Self::FetchIncoming { - future::ok(Vec::new()) + fn fetch_pov_block(&self, _candidate: &CandidateReceipt) -> Self::FetchValidationProof { + future::ok(pov_block_with_data(vec![1, 2, 3, 4, 5])) } } @@ -675,7 +672,7 @@ mod tests { let store = ExtrinsicStore::new_in_memory(); let relay_parent = [0; 32].into(); let para_id = 5.into(); - let block_data = BlockData(vec![1, 2, 3]); + let pov_block = pov_block_with_data(vec![1, 2, 3]); let candidate = CandidateReceipt { parachain_index: para_id, @@ -693,20 +690,20 @@ mod tests { let producer: ParachainWork<future::FutureResult<_, ::std::io::Error>> = ParachainWork { work: Work { candidate_receipt: candidate, - fetch: future::ok((block_data.clone(), Vec::new())), + fetch: future::ok(pov_block.clone()), }, relay_parent, extrinsic_store: store.clone(), }; - let validated = producer.prime_with(|_, _, _| Ok(Extrinsic { outgoing_messages: Vec::new() })) + let validated = producer.prime_with(|_, _| Ok(Extrinsic { outgoing_messages: Vec::new() })) .wait() .unwrap(); - assert_eq!(validated.block_data(), &block_data); + assert_eq!(validated.pov_block(), &pov_block); assert_eq!(validated.statement, GenericStatement::Valid(hash)); - assert_eq!(store.block_data(relay_parent, hash).unwrap(), block_data); + assert_eq!(store.block_data(relay_parent, hash).unwrap(), pov_block.block_data); assert!(store.extrinsic(relay_parent, hash).is_some()); } @@ -715,7 +712,7 @@ mod tests { let store = ExtrinsicStore::new_in_memory(); let relay_parent = [0; 32].into(); let para_id = 5.into(); - let block_data = BlockData(vec![1, 2, 3]); + let pov_block = pov_block_with_data(vec![1, 2, 3]); let candidate = CandidateReceipt { parachain_index: para_id, @@ -733,19 +730,19 @@ mod tests { let producer = ParachainWork { work: Work { candidate_receipt: candidate, - fetch: future::ok::<_, ::std::io::Error>((block_data.clone(), Vec::new())), + fetch: future::ok::<_, ::std::io::Error>(pov_block.clone()), }, relay_parent, extrinsic_store: store.clone(), }; - let validated = producer.prime_with(|_, _, _| Ok(Extrinsic { outgoing_messages: Vec::new() })) + let validated = producer.prime_with(|_, _| Ok(Extrinsic { outgoing_messages: Vec::new() })) .wait() .unwrap(); - assert_eq!(validated.block_data(), &block_data); + assert_eq!(validated.pov_block(), &pov_block); - assert_eq!(store.block_data(relay_parent, hash).unwrap(), block_data); + assert_eq!(store.block_data(relay_parent, hash).unwrap(), pov_block.block_data); assert!(store.extrinsic(relay_parent, hash).is_some()); } @@ -815,7 +812,7 @@ mod tests { let mut groups = HashMap::new(); let para_id = ParaId::from(1); - let block_data = BlockData(vec![1, 2, 3]); + let pov_block = pov_block_with_data(vec![1, 2, 3]); let extrinsic = Extrinsic { outgoing_messages: Vec::new() }; let parent_hash = Default::default(); @@ -851,7 +848,7 @@ mod tests { let hash = candidate.hash(); let signed_statement = shared_table.import_validated(Validated::collated_local( candidate, - block_data, + pov_block, extrinsic, ));