Unverified Commit 75e827c0 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Introduce a Proof-of-Validation block type and use that in place of BlockData (#227)



* validators expect collators to give them parachain messages

* mostly port network to use pov_block

* network tests pass

* verify ingress when fetching pov block

* fix runtime compilation

* all tests build

* fix some grumbles

* Update validation/src/collation.rs
Co-Authored-By: asynchronous rob's avatarrphmeier <rphmeier@gmail.com>

* Update primitives/src/parachain.rs
Co-Authored-By: asynchronous rob's avatarrphmeier <rphmeier@gmail.com>

* Update network/src/lib.rs
Co-Authored-By: asynchronous rob's avatarrphmeier <rphmeier@gmail.com>
parent e7fbcfe4
Pipeline #36204 passed with stages
in 15 minutes and 5 seconds
......@@ -71,7 +71,10 @@ use futures::{future, Stream, Future, IntoFuture};
use client::BlockchainEvents;
use primitives::{ed25519, Pair};
use polkadot_primitives::{BlockId, SessionKey, Hash, Block};
use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic};
use polkadot_primitives::parachain::{
self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic,
PoVBlock,
};
use polkadot_cli::{PolkadotService, CustomConfiguration, CoreApi, ParachainHost};
use polkadot_cli::{Worker, IntoExit, ProvideRuntimeApi, TaskExecutor};
use polkadot_network::validation::{ValidationNetwork, SessionParams};
......@@ -148,10 +151,10 @@ pub fn collate<'a, R, P>(
P: ParachainContext + 'a,
{
let ingress = relay_context.unrouted_egress(local_id).into_future().map_err(Error::Polkadot);
ingress.and_then(move |ConsolidatedIngress(ingress)| {
ingress.and_then(move |ingress| {
let (block_data, head_data, mut extrinsic) = para_context.produce_candidate(
last_head,
ingress.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
).map_err(Error::Collator)?;
let block_data_hash = block_data.hash();
......@@ -170,10 +173,12 @@ pub fn collate<'a, R, P>(
block_data_hash,
};
// not necessary to send extrinsic because it is recomputed from execution.
Ok(parachain::Collation {
receipt,
block_data,
pov: PoVBlock {
block_data,
ingress,
},
})
})
}
......
......@@ -220,9 +220,18 @@ impl CollatorPool {
mod tests {
use super::*;
use substrate_primitives::crypto::UncheckedInto;
use polkadot_primitives::parachain::{CandidateReceipt, BlockData, HeadData};
use polkadot_primitives::parachain::{
CandidateReceipt, BlockData, PoVBlock, HeadData, ConsolidatedIngress,
};
use futures::Future;
fn make_pov(block_data: Vec<u8>) -> PoVBlock {
PoVBlock {
block_data: BlockData(block_data),
ingress: ConsolidatedIngress(Vec::new()),
}
}
#[test]
fn disconnect_primary_gives_new_primary() {
let mut pool = CollatorPool::new();
......@@ -272,7 +281,7 @@ mod tests {
fees: 0,
block_data_hash: [3; 32].into(),
},
block_data: BlockData(vec![4, 5, 6]),
pov: make_pov(vec![4, 5, 6]),
});
rx1.wait().unwrap();
......@@ -299,7 +308,7 @@ mod tests {
fees: 0,
block_data_hash: [3; 32].into(),
},
block_data: BlockData(vec![4, 5, 6]),
pov: make_pov(vec![4, 5, 6]),
});
let (tx, rx) = oneshot::channel();
......
......@@ -56,7 +56,10 @@ pub mod gossip;
use codec::{Decode, Encode};
use futures::sync::oneshot;
use polkadot_primitives::{Block, SessionKey, Hash, Header};
use polkadot_primitives::parachain::{Id as ParaId, CollatorId, BlockData, CandidateReceipt, Collation};
use polkadot_primitives::parachain::{
Id as ParaId, BlockData, CollatorId, CandidateReceipt, Collation, PoVBlock,
ConsolidatedIngressRoots,
};
use substrate_network::{PeerId, RequestId, Context, Severity};
use substrate_network::{message, generic_message};
use substrate_network::specialization::NetworkSpecialization as Specialization;
......@@ -84,12 +87,33 @@ pub struct Status {
collating_for: Option<(CollatorId, ParaId)>,
}
struct BlockDataRequest {
struct PoVBlockRequest {
attempted_peers: HashSet<SessionKey>,
validation_session_parent: Hash,
candidate_hash: Hash,
block_data_hash: Hash,
sender: oneshot::Sender<BlockData>,
sender: oneshot::Sender<PoVBlock>,
canon_roots: ConsolidatedIngressRoots,
}
impl PoVBlockRequest {
// Attempt to process a response. If the provided block is invalid,
// this returns an error result containing the unmodified request.
//
// If `Ok(())` is returned, that indicates that the request has been processed.
fn process_response(self, pov_block: PoVBlock) -> Result<(), Self> {
if pov_block.block_data.hash() != self.block_data_hash {
return Err(self);
}
match polkadot_validation::validate_incoming(&self.canon_roots, &pov_block.ingress) {
Ok(()) => {
let _ = self.sender.send(pov_block);
Ok(())
}
Err(_) => Err(self)
}
}
}
// ensures collator-protocol messages are sent in correct order.
......@@ -147,9 +171,13 @@ pub enum Message {
// TODO: do this with a cryptographic proof of some kind
// https://github.com/paritytech/polkadot/issues/47
SessionKey(SessionKey),
/// Requesting parachain block data by (relay_parent, candidate_hash).
/// Requesting parachain proof-of-validation block (relay_parent, candidate_hash).
RequestPovBlock(RequestId, Hash, Hash),
/// Provide requested proof-of-validation block data by candidate hash or nothing if unknown.
PovBlock(RequestId, Option<PoVBlock>),
/// Request block data (relay_parent, candidate_hash)
RequestBlockData(RequestId, Hash, Hash),
/// Provide block data by candidate hash or nothing if unknown.
/// Provide requested block data by candidate hash or nothing.
BlockData(RequestId, Option<BlockData>),
/// Tell a collator their role.
CollatorRole(Role),
......@@ -171,8 +199,8 @@ pub struct PolkadotProtocol {
validators: HashMap<SessionKey, PeerId>,
local_collations: LocalCollations<Collation>,
live_validation_sessions: LiveValidationSessions,
in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>,
pending: Vec<BlockDataRequest>,
in_flight: HashMap<(RequestId, PeerId), PoVBlockRequest>,
pending: Vec<PoVBlockRequest>,
extrinsic_store: Option<::av_store::Store>,
next_req_id: u64,
}
......@@ -195,15 +223,22 @@ impl PolkadotProtocol {
}
/// Fetch block data by candidate receipt.
fn fetch_block_data(&mut self, ctx: &mut Context<Block>, candidate: &CandidateReceipt, relay_parent: Hash) -> oneshot::Receiver<BlockData> {
fn fetch_pov_block(
&mut self,
ctx: &mut Context<Block>,
candidate: &CandidateReceipt,
relay_parent: Hash,
canon_roots: ConsolidatedIngressRoots,
) -> oneshot::Receiver<PoVBlock> {
let (tx, rx) = oneshot::channel();
self.pending.push(BlockDataRequest {
self.pending.push(PoVBlockRequest {
attempted_peers: Default::default(),
validation_session_parent: relay_parent,
candidate_hash: candidate.hash(),
block_data_hash: candidate.block_data_hash,
sender: tx,
canon_roots,
});
self.dispatch_pending_requests(ctx);
......@@ -250,7 +285,7 @@ impl PolkadotProtocol {
let parent = pending.validation_session_parent;
let c_hash = pending.candidate_hash;
let still_pending = self.live_validation_sessions.with_block_data(&parent, &c_hash, |x| match x {
let still_pending = self.live_validation_sessions.with_pov_block(&parent, &c_hash, |x| match x {
Ok(data @ &_) => {
// answer locally.
let _ = pending.sender.send(data.clone());
......@@ -270,7 +305,7 @@ impl PolkadotProtocol {
send_polkadot_message(
ctx,
who.clone(),
Message::RequestBlockData(req_id, parent, c_hash),
Message::RequestPovBlock(req_id, parent, c_hash),
);
in_flight.insert((req_id, who), pending);
......@@ -295,12 +330,21 @@ impl PolkadotProtocol {
trace!(target: "p_net", "Polkadot message from {}: {:?}", who, msg);
match msg {
Message::SessionKey(key) => self.on_session_key(ctx, who, key),
Message::RequestPovBlock(req_id, relay_parent, candidate_hash) => {
let pov_block = self.live_validation_sessions.with_pov_block(
&relay_parent,
&candidate_hash,
|res| res.ok().map(|b| b.clone()),
);
send_polkadot_message(ctx, who, Message::PovBlock(req_id, pov_block));
}
Message::RequestBlockData(req_id, relay_parent, candidate_hash) => {
let block_data = self.live_validation_sessions
.with_block_data(
.with_pov_block(
&relay_parent,
&candidate_hash,
|res| res.ok().map(|b| b.clone()),
|res| res.ok().map(|b| b.block_data.clone()),
)
.or_else(|| self.extrinsic_store.as_ref()
.and_then(|s| s.block_data(relay_parent, candidate_hash))
......@@ -308,7 +352,11 @@ impl PolkadotProtocol {
send_polkadot_message(ctx, who, Message::BlockData(req_id, block_data));
}
Message::BlockData(req_id, data) => self.on_block_data(ctx, who, req_id, data),
Message::PovBlock(req_id, data) => self.on_pov_block(ctx, who, req_id, data),
Message::BlockData(_req_id, _data) => {
// current block data is never requested bare by the node.
ctx.report_peer(who, Severity::Bad("Peer sent un-requested block data".to_string()));
}
Message::Collation(relay_parent, collation) => self.on_collation(ctx, who, relay_parent, collation),
Message::CollatorRole(role) => self.on_new_role(ctx, who, role),
}
......@@ -355,13 +403,19 @@ impl PolkadotProtocol {
self.dispatch_pending_requests(ctx);
}
fn on_block_data(&mut self, ctx: &mut Context<Block>, who: PeerId, req_id: RequestId, data: Option<BlockData>) {
fn on_pov_block(
&mut self,
ctx: &mut Context<Block>,
who: PeerId,
req_id: RequestId,
pov_block: Option<PoVBlock>,
) {
match self.in_flight.remove(&(req_id, who.clone())) {
Some(req) => {
if let Some(data) = data {
if data.hash() == req.block_data_hash {
let _ = req.sender.send(data);
return
Some(mut req) => {
if let Some(pov_block) = pov_block {
match req.process_response(pov_block) {
Ok(()) => return,
Err(r) => { req = r; }
}
}
......@@ -486,12 +540,14 @@ impl Specialization<Block> for PolkadotProtocol {
self.in_flight.retain(|&(_, ref peer), val| {
let retain = peer != &who;
if !retain {
// swap with a dummy value which will be dropped immediately.
let (sender, _) = oneshot::channel();
pending.push(::std::mem::replace(val, BlockDataRequest {
pending.push(::std::mem::replace(val, PoVBlockRequest {
attempted_peers: Default::default(),
validation_session_parent: Default::default(),
candidate_hash: Default::default(),
block_data_hash: Default::default(),
canon_roots: ConsolidatedIngressRoots(Vec::new()),
sender,
}));
}
......
......@@ -29,7 +29,8 @@ use polkadot_validation::{
};
use polkadot_primitives::{Block, Hash, SessionKey};
use polkadot_primitives::parachain::{
BlockData, Extrinsic, CandidateReceipt, ParachainHost, Id as ParaId, Message
Extrinsic, CandidateReceipt, ParachainHost, Id as ParaId, Message,
Collation, PoVBlock,
};
use gossip::RegisteredMessageValidator;
......@@ -41,7 +42,7 @@ use std::collections::{HashMap, HashSet};
use std::io;
use std::sync::Arc;
use validation::{self, SessionDataFetcher, NetworkService, Executor, Incoming};
use validation::{self, SessionDataFetcher, NetworkService, Executor};
type IngressPairRef<'a> = (ParaId, &'a [Message]);
......@@ -92,6 +93,12 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
.map(|msg| msg.statement)
}
/// Get access to the session data fetcher.
#[cfg(test)]
pub(crate) fn fetcher(&self) -> &SessionDataFetcher<P, E, N, T> {
&self.fetcher
}
fn parent_hash(&self) -> Hash {
self.fetcher.parent_hash()
}
......@@ -201,7 +208,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
fn create_work<D>(&self, candidate_hash: Hash, producer: ParachainWork<D>)
-> impl Future<Item=(),Error=()> + Send + 'static
where
D: Future<Item=(BlockData, Incoming),Error=io::Error> + Send + 'static,
D: Future<Item=PoVBlock,Error=io::Error> + Send + 'static,
{
let table = self.table.clone();
let network = self.network().clone();
......@@ -213,7 +220,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
// store the data before broadcasting statements, so other peers can fetch.
knowledge.lock().note_candidate(
candidate_hash,
Some(validated.block_data().clone()),
Some(validated.pov_block().clone()),
validated.extrinsic().cloned(),
);
......@@ -234,26 +241,21 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
E: Future<Item=(),Error=()> + Clone + Send + 'static,
{
type Error = io::Error;
type FetchCandidate = validation::BlockDataReceiver;
type FetchIncoming = validation::IncomingReceiver;
type FetchValidationProof = validation::PoVReceiver;
fn local_candidate(&self, receipt: CandidateReceipt, block_data: BlockData, extrinsic: Extrinsic) {
fn local_collation(&self, collation: Collation, extrinsic: Extrinsic) {
// produce a signed statement
let hash = receipt.hash();
let validated = Validated::collated_local(receipt, block_data.clone(), extrinsic.clone());
let hash = collation.receipt.hash();
let validated = Validated::collated_local(collation.receipt, collation.pov.clone(), extrinsic.clone());
let statement = self.table.import_validated(validated);
// give to network to make available.
self.fetcher.knowledge().lock().note_candidate(hash, Some(block_data), Some(extrinsic));
self.fetcher.knowledge().lock().note_candidate(hash, Some(collation.pov), Some(extrinsic));
self.network().gossip_message(self.attestation_topic, statement.encode());
}
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate {
self.fetcher.fetch_block_data(candidate)
}
fn fetch_incoming(&self, parachain: ParaId) -> Self::FetchIncoming {
self.fetcher.fetch_incoming(parachain)
fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> Self::FetchValidationProof {
self.fetcher.fetch_pov_block(candidate)
}
}
......
......@@ -21,7 +21,10 @@ use validation::SessionParams;
use polkadot_validation::GenericStatement;
use polkadot_primitives::{Block, Hash, SessionKey};
use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData, CollatorId, ValidatorId};
use polkadot_primitives::parachain::{
CandidateReceipt, HeadData, PoVBlock, BlockData, CollatorId, ValidatorId,
ConsolidatedIngressRoots,
};
use substrate_primitives::crypto::UncheckedInto;
use codec::Encode;
use substrate_network::{
......@@ -74,6 +77,14 @@ impl TestContext {
}
}
fn make_pov(block_data: Vec<u8>) -> PoVBlock {
PoVBlock {
block_data: BlockData(block_data),
ingress: polkadot_primitives::parachain::ConsolidatedIngress(Vec::new()),
}
}
fn make_status(status: &Status, roles: Roles) -> FullStatus {
FullStatus {
version: 1,
......@@ -164,7 +175,13 @@ fn fetches_from_those_with_knowledge() {
let knowledge = session.knowledge();
knowledge.lock().note_statement(a_key.clone(), &GenericStatement::Valid(candidate_hash));
let recv = protocol.fetch_block_data(&mut TestContext::default(), &candidate_receipt, parent_hash);
let canon_roots = ConsolidatedIngressRoots(Vec::new());
let recv = protocol.fetch_pov_block(
&mut TestContext::default(),
&candidate_receipt,
parent_hash,
canon_roots,
);
// connect peer A
{
......@@ -178,7 +195,7 @@ fn fetches_from_those_with_knowledge() {
let mut ctx = TestContext::default();
on_message(&mut protocol, &mut ctx, peer_a.clone(), Message::SessionKey(a_key.clone()));
assert!(protocol.validators.contains_key(&a_key));
assert!(ctx.has_message(peer_a.clone(), Message::RequestBlockData(1, parent_hash, candidate_hash)));
assert!(ctx.has_message(peer_a.clone(), Message::RequestPovBlock(1, parent_hash, candidate_hash)));
}
knowledge.lock().note_statement(b_key.clone(), &GenericStatement::Valid(candidate_hash));
......@@ -188,7 +205,7 @@ fn fetches_from_those_with_knowledge() {
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_b.clone(), make_status(&status, Roles::AUTHORITY));
on_message(&mut protocol, &mut ctx, peer_b.clone(), Message::SessionKey(b_key.clone()));
assert!(!ctx.has_message(peer_b.clone(), Message::RequestBlockData(2, parent_hash, candidate_hash)));
assert!(!ctx.has_message(peer_b.clone(), Message::RequestPovBlock(2, parent_hash, candidate_hash)));
}
......@@ -197,15 +214,16 @@ fn fetches_from_those_with_knowledge() {
let mut ctx = TestContext::default();
protocol.on_disconnect(&mut ctx, peer_a.clone());
assert!(!protocol.validators.contains_key(&a_key));
assert!(ctx.has_message(peer_b.clone(), Message::RequestBlockData(2, parent_hash, candidate_hash)));
assert!(ctx.has_message(peer_b.clone(), Message::RequestPovBlock(2, parent_hash, candidate_hash)));
}
// peer B comes back with block data.
{
let mut ctx = TestContext::default();
on_message(&mut protocol, &mut ctx, peer_b, Message::BlockData(2, Some(block_data.clone())));
let pov_block = make_pov(block_data.0);
on_message(&mut protocol, &mut ctx, peer_b, Message::PovBlock(2, Some(pov_block.clone())));
drop(protocol);
assert_eq!(recv.wait().unwrap(), block_data);
assert_eq!(recv.wait().unwrap(), pov_block);
}
}
......
......@@ -22,10 +22,12 @@ use substrate_primitives::{NativeOrEncoded, ExecutionContext};
use substrate_keyring::AuthorityKeyring;
use {PolkadotProtocol};
use polkadot_validation::{SharedTable, MessagesFrom, Network, TableRouter};
use polkadot_validation::{SharedTable, MessagesFrom, Network};
use polkadot_primitives::{SessionKey, Block, Hash, Header, BlockId};
use polkadot_primitives::parachain::{Id as ParaId, Chain, DutyRoster, ParachainHost, OutgoingMessage,
ValidatorId};
use polkadot_primitives::parachain::{
Id as ParaId, Chain, DutyRoster, ParachainHost, OutgoingMessage,
ValidatorId, ConsolidatedIngressRoots,
};
use parking_lot::Mutex;
use substrate_client::error::Result as ClientResult;
use substrate_client::runtime_api::{Core, RuntimeVersion, ApiExt};
......@@ -158,7 +160,7 @@ struct ApiData {
validators: Vec<ValidatorId>,
duties: Vec<Chain>,
active_parachains: Vec<ParaId>,
ingress: HashMap<ParaId, Vec<(ParaId, Hash)>>,
ingress: HashMap<ParaId, ConsolidatedIngressRoots>,
}
#[derive(Default, Clone)]
......@@ -293,7 +295,7 @@ impl ParachainHost<Block> for RuntimeApi {
_: ExecutionContext,
id: Option<ParaId>,
_: Vec<u8>,
) -> ClientResult<NativeOrEncoded<Option<Vec<(ParaId, Hash)>>>> {
) -> ClientResult<NativeOrEncoded<Option<ConsolidatedIngressRoots>>> {
let id = id.unwrap();
Ok(NativeOrEncoded::Native(self.data.lock().ingress.get(&id).cloned()))
}
......@@ -358,7 +360,7 @@ impl IngressBuilder {
}
}
fn build(self) -> HashMap<ParaId, Vec<(ParaId, Hash)>> {
fn build(self) -> HashMap<ParaId, ConsolidatedIngressRoots> {
let mut map = HashMap::new();
for ((source, target), messages) in self.egress {
map.entry(target).or_insert_with(Vec::new)
......@@ -369,7 +371,7 @@ impl IngressBuilder {
roots.sort_by_key(|&(para_id, _)| para_id);
}
map
map.into_iter().map(|(k, v)| (k, ConsolidatedIngressRoots(v))).collect()
}
}
......@@ -471,11 +473,11 @@ fn ingress_fetch_works() {
};
// make sure everyone can get ingress for their own parachain.
let fetch_a = router_a.then(move |r| r.unwrap()
let fetch_a = router_a.then(move |r| r.unwrap().fetcher()
.fetch_incoming(id_a).map_err(|_| format!("Could not fetch ingress_a")));
let fetch_b = router_b.then(move |r| r.unwrap()
let fetch_b = router_b.then(move |r| r.unwrap().fetcher()
.fetch_incoming(id_b).map_err(|_| format!("Could not fetch ingress_b")));
let fetch_c = router_c.then(move |r| r.unwrap()
let fetch_c = router_c.then(move |r| r.unwrap().fetcher()
.fetch_incoming(id_c).map_err(|_| format!("Could not fetch ingress_c")));
let work = fetch_a.join3(fetch_b, fetch_c);
......
......@@ -22,10 +22,10 @@
use sr_primitives::traits::{BlakeTwo256, ProvideRuntimeApi, Hash as HashT};
use substrate_network::Context as NetContext;
use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement};
use polkadot_primitives::{Block, Hash, SessionKey};
use polkadot_primitives::{Block, BlockId, Hash, SessionKey};
use polkadot_primitives::parachain::{
Id as ParaId, Collation, Extrinsic, ParachainHost, BlockData, Message, CandidateReceipt,
CollatorId, ValidatorId,
Id as ParaId, Collation, Extrinsic, ParachainHost, Message, CandidateReceipt,
CollatorId, ValidatorId, PoVBlock,
};
use codec::{Encode, Decode};
......@@ -325,7 +325,7 @@ impl<P, E: Clone, N, T: Clone> Collators for ValidationNetwork<P, E, N, T> where
struct KnowledgeEntry {
knows_block_data: Vec<ValidatorId>,
knows_extrinsic: Vec<ValidatorId>,
block_data: Option<BlockData>,
pov: Option<PoVBlock>,
extrinsic: Option<Extrinsic>,
}
......@@ -366,9 +366,9 @@ impl Knowledge {
}
/// Note a candidate collated or seen locally.
pub(crate) fn note_candidate(&mut self, hash: Hash, block_data: Option<BlockData>, extrinsic: Option<Extrinsic>) {
pub(crate) fn note_candidate(&mut self, hash: Hash, pov: Option<PoVBlock>, extrinsic: Option<Extrinsic>) {
let entry = self.candidates.entry(hash).or_insert_with(Default::default);
entry.block_data = entry.block_data.take().or(block_data);
entry.pov = entry.pov.take().or(pov);
entry.extrinsic = entry.extrinsic.take().or(extrinsic);
}
}
......@@ -436,15 +436,15 @@ impl ValidationSession {
&self.fetch_incoming
}
// execute a closure with locally stored block data for a candidate, or a slice of session identities
// execute a closure with locally stored proof-of-validation for a candidate, or a slice of session identities
// we believe should have the data.
fn with_block_data<F, U>(&self, hash: &Hash, f: F) -> U
where F: FnOnce(Result<&BlockData, &[ValidatorId]>) -> U
fn with_pov_block<F, U>(&self, hash: &Hash, f: F) -> U
where F: FnOnce(Result<&PoVBlock, &[ValidatorId]>) -> U
{
let knowledge = self.knowledge.lock();
let res = knowledge.candidates.get(hash)
.ok_or(&[] as &_)
.and_then(|entry| entry.block_data.as_ref().ok_or(&entry.knows_block_data[..]));
.and_then(|entry| entry.pov.as_ref().ok_or(&entry.knows_block_data[..]));
f(res)
}
......@@ -590,32 +590,33 @@ impl LiveValidationSessions {
self.recent.as_slice()
}
/// Call a closure with block data from validation session at parent hash.
/// Call a closure with pov-data from validation session at parent hash for a given
/// candidate-receipt hash.
///
/// This calls the closure with `Some(data)` where the session and data are live,
/// `Err(Some(keys))` when the session is live but the data unknown, with a list of keys
/// who have the data, and `Err(None)` where the session is unknown.
pub(crate) fn with_block_data<F, U>(&self, parent_hash: &Hash, c_hash: &Hash, f: F) -> U
where F: FnOnce(Result<&BlockData, Option<&[ValidatorId]>>) -> U
pub(crate) fn with_pov_block<F, U>(&self, parent_hash: &Hash, c_hash: &Hash, f: F) -> U
where F: FnOnce(Result<&PoVBlock, Option<&[ValidatorId]>>) -> U
{
match self.live_instances.get(parent_hash) {
Some(c) => c.1.with_block_data(c_hash, |res| f(res.map_err(Some))),
Some(c) => c.1.with_pov_block(c_hash, |res| f(res.map_err(Some))),
None => f(Err(None))
}
}
}
/// Receiver for block data.
pub struct BlockDataReceiver {
outer: Receiver<Receiver<BlockData>>,
inner: Option<Receiver<BlockData>>
pub struct PoVReceiver {
outer: Receiver<Receiver<PoVBlock>>,
inner: Option<Receiver<PoVBlock>>
}
impl Future for BlockDataReceiver {
type Item = BlockData;
impl Future for PoVReceiver {
type Item = PoVBlock;
type Error = io::Error;
fn poll(&mut self) -> Poll<BlockData, io::Error> {
fn poll(&mut self) -> Poll<PoVBlock, io::Error> {
let map_err = |_| io::Error::new(
io::ErrorKind::Other,
"Sending end of channel hung up",
......@@ -746,22 +747,34 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where
T: Clone + Executor + Send + 'static,
E: