Unverified Commit b94febb1 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

A more comprehensive model for PoV-Blocks and Candidate receipts (#843)

* encode the candidate statement as only the hash

* refactor CandidateReceipt and CollationInfo

* introduce an abridged candidate receipt type

* erasure coding stores candidate receipt

* store omitted data instead and introduce AvailableData type

* refactor availability-store schema

* tweak schema and APIs a bit more

* get availability-store tests passing

* accept AbridgedCandidateReceipt in `set_heads`

* change statement type in primitives to be hash-only

* fix parachains runtime tests

* fix bad merge

* rewrite validation pipeline

* remove evaluation module

* use abridged candidate hash as canonical

* statement table uses abridged candidate receipts

* kill availability_store::Data struct

* port shared table to new validation pipelines

* extract full validation pipeline to helper

* remove old validation pipeline from collation module

* polkadot-validation compiles

* polkadot-validation tests compile

* make local collation available in validation service

* port legacy network code

* polkadot-network fully ported

* network: ensure fresh statement is propagated

* remove pov_block_hash from LocalValidationData

* remove candidate_hash field from AttestedCandidate and update runtime

* port runtimes to new ParachainHost definition

* port over polkadot-collator

* fix test compilation

* better fix

* remove unrelated validation work dispatch fix

* address grumbles

* fix equality check
parent e6dc44cd
Pipeline #80471 passed with stages
in 22 minutes and 21 seconds
......@@ -28,7 +28,8 @@ use keystore::KeyStorePtr;
use polkadot_primitives::{
Hash, Block,
parachain::{
Id as ParaId, BlockData, CandidateReceipt, ErasureChunk, ParachainHost
PoVBlock, AbridgedCandidateReceipt, ErasureChunk,
ParachainHost, AvailableData, OmittedValidationData,
},
};
use sp_runtime::traits::{BlakeTwo256, Hash as HashT, HasherFor};
......@@ -37,6 +38,7 @@ use client::{
BlockchainEvents, BlockBody,
};
use sp_api::{ApiExt, ProvideRuntimeApi};
use codec::{Encode, Decode};
use log::warn;
......@@ -50,9 +52,10 @@ mod worker;
mod store;
pub use worker::AvailabilityBlockImport;
pub use store::AwaitedFrontierEntry;
use worker::{
Worker, WorkerHandle, Chunks, ParachainBlocks, WorkerMsg, MakeAvailable,
Worker, WorkerHandle, Chunks, IncludedParachainBlocks, WorkerMsg, MakeAvailable,
};
use store::{Store as InnerStore};
......@@ -116,15 +119,14 @@ pub trait ProvideGossipMessages {
);
}
/// Some data to keep available about a parachain block candidate.
#[derive(Debug)]
pub struct Data {
/// The relay chain parent hash this should be localized to.
pub relay_parent: Hash,
/// The parachain index for this candidate.
pub parachain_id: ParaId,
/// Block data.
pub block_data: BlockData,
/// Data which, when combined with an `AbridgedCandidateReceipt`, is enough
/// to fully re-execute a block.
#[derive(Debug, Encode, Decode, PartialEq)]
pub struct ExecutionData {
/// The `PoVBlock`.
pub pov_block: PoVBlock,
/// The data omitted from the `AbridgedCandidateReceipt`.
pub omitted_validation: OmittedValidationData,
}
/// Handle to the availability store.
......@@ -220,17 +222,17 @@ impl Store {
/// in order to persist that data to disk and so it can be queried and provided
/// to other nodes in the network.
///
/// The message data of `Data` is optional but is expected
/// to be present with the exception of the case where there is no message data
/// due to the block's invalidity. Determination of invalidity is beyond the
/// scope of this function.
/// Determination of invalidity is beyond the scope of this function.
///
/// This method will send the `Data` to the background worker, allowing caller to
/// asynchrounously wait for the result.
pub async fn make_available(&self, data: Data) -> io::Result<()> {
/// This method will send the data to the background worker, allowing the caller to
/// asynchronously wait for the result.
pub async fn make_available(&self, candidate_hash: Hash, available_data: AvailableData)
-> io::Result<()>
{
let (s, r) = oneshot::channel();
let msg = WorkerMsg::MakeAvailable(MakeAvailable {
data,
candidate_hash,
available_data,
result: s,
});
......@@ -244,41 +246,11 @@ impl Store {
}
/// Get a set of all chunks we are waiting for grouped by
/// `(relay_parent, erasure_root, candidate_hash, our_id)`.
pub fn awaited_chunks(&self) -> Option<HashSet<(Hash, Hash, Hash, u32)>> {
/// Get a set of all chunks we are waiting for.
pub fn awaited_chunks(&self) -> Option<HashSet<AwaitedFrontierEntry>> {
self.inner.awaited_chunks()
}
/// Qery which candidates were included in the relay chain block by block's parent.
pub fn get_candidates_in_relay_block(&self, relay_block: &Hash) -> Option<Vec<Hash>> {
self.inner.get_candidates_in_relay_block(relay_block)
}
/// Make a validator's index and a number of validators at a relay parent available.
///
/// This information is needed before the `add_candidates_in_relay_block` is called
/// since that call forms the awaited frontier of chunks.
/// In the current implementation this function is called in the `get_or_instantiate` at
/// the start of the parachain agreement process on top of some parent hash.
pub fn add_validator_index_and_n_validators(
&self,
relay_parent: &Hash,
validator_index: u32,
n_validators: u32,
) -> io::Result<()> {
self.inner.add_validator_index_and_n_validators(
relay_parent,
validator_index,
n_validators,
)
}
/// Query a validator's index and n_validators by relay parent.
pub fn get_validator_index_and_n_validators(&self, relay_parent: &Hash) -> Option<(u32, u32)> {
self.inner.get_validator_index_and_n_validators(relay_parent)
}
/// Adds an erasure chunk to storage.
///
/// The chunk should be checked for validity against the root of encoding
......@@ -288,11 +260,10 @@ impl Store {
/// asynchrounously wait for the result.
pub async fn add_erasure_chunk(
&self,
relay_parent: Hash,
receipt: CandidateReceipt,
candidate: AbridgedCandidateReceipt,
chunk: ErasureChunk,
) -> io::Result<()> {
self.add_erasure_chunks(relay_parent, receipt, vec![chunk]).await
self.add_erasure_chunks(candidate, vec![chunk]).await
}
/// Adds a set of erasure chunks to storage.
......@@ -304,16 +275,17 @@ impl Store {
/// asynchrounously waiting for the result.
pub async fn add_erasure_chunks<I>(
&self,
relay_parent: Hash,
receipt: CandidateReceipt,
candidate: AbridgedCandidateReceipt,
chunks: I,
) -> io::Result<()>
where I: IntoIterator<Item = ErasureChunk>
{
self.add_candidate(relay_parent, receipt.clone()).await?;
let candidate_hash = candidate.hash();
let relay_parent = candidate.relay_parent;
self.add_candidate(candidate).await?;
let (s, r) = oneshot::channel();
let chunks = chunks.into_iter().collect();
let candidate_hash = receipt.hash();
let msg = WorkerMsg::Chunks(Chunks {
relay_parent,
candidate_hash,
......@@ -330,27 +302,44 @@ impl Store {
}
}
/// Queries an erasure chunk by its block's parent and hash and index.
/// Queries an erasure chunk by the candidate hash and validator index.
pub fn get_erasure_chunk(
&self,
relay_parent: &Hash,
block_data_hash: Hash,
index: usize,
candidate_hash: &Hash,
validator_index: usize,
) -> Option<ErasureChunk> {
self.inner.get_erasure_chunk(relay_parent, block_data_hash, index)
self.inner.get_erasure_chunk(candidate_hash, validator_index)
}
/// Stores a candidate receipt.
pub async fn add_candidate(
/// Note a validator's index and a number of validators at a relay parent in the
/// store.
///
/// This should be done before adding erasure chunks with this relay parent.
pub fn note_validator_index_and_n_validators(
&self,
relay_parent: Hash,
receipt: CandidateReceipt,
relay_parent: &Hash,
validator_index: u32,
n_validators: u32,
) -> io::Result<()> {
self.inner.note_validator_index_and_n_validators(
relay_parent,
validator_index,
n_validators,
)
}
// Stores a candidate receipt.
async fn add_candidate(
&self,
candidate: AbridgedCandidateReceipt,
) -> io::Result<()> {
let (s, r) = oneshot::channel();
let msg = WorkerMsg::ParachainBlocks(ParachainBlocks {
relay_parent,
blocks: vec![(receipt, None)],
let msg = WorkerMsg::IncludedParachainBlocks(IncludedParachainBlocks {
blocks: vec![crate::worker::IncludedParachainBlock {
candidate,
available_data: None,
}],
result: s,
});
......@@ -363,20 +352,17 @@ impl Store {
}
}
/// Queries a candidate receipt by it's hash.
pub fn get_candidate(&self, candidate_hash: &Hash) -> Option<CandidateReceipt> {
/// Queries a candidate receipt by its hash.
pub fn get_candidate(&self, candidate_hash: &Hash)
-> Option<AbridgedCandidateReceipt>
{
self.inner.get_candidate(candidate_hash)
}
/// Query block data.
pub fn block_data(&self, relay_parent: Hash, block_data_hash: Hash) -> Option<BlockData> {
self.inner.block_data(relay_parent, block_data_hash)
}
/// Query block data by corresponding candidate receipt's hash.
pub fn block_data_by_candidate(&self, relay_parent: Hash, candidate_hash: Hash)
-> Option<BlockData>
/// Query execution data by pov-block hash.
pub fn execution_data(&self, candidate_hash: &Hash)
-> Option<ExecutionData>
{
self.inner.block_data_by_candidate(relay_parent, candidate_hash)
self.inner.execution_data(candidate_hash)
}
}
This diff is collapsed.
......@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::io;
use std::sync::Arc;
use std::thread;
......@@ -34,15 +34,15 @@ use consensus_common::{
};
use polkadot_primitives::{Block, BlockId, Hash};
use polkadot_primitives::parachain::{
CandidateReceipt, ParachainHost, ValidatorId,
ValidatorPair, ErasureChunk, PoVBlock,
ParachainHost, ValidatorId, AbridgedCandidateReceipt, AvailableData,
ValidatorPair, ErasureChunk,
};
use futures::{prelude::*, future::select, channel::{mpsc, oneshot}, task::{Spawn, SpawnExt}};
use keystore::KeyStorePtr;
use tokio::runtime::{Handle, Runtime as LocalRuntime};
use crate::{LOG_TARGET, Data, ProvideGossipMessages, erasure_coding_topic};
use crate::{LOG_TARGET, ProvideGossipMessages, erasure_coding_topic};
use crate::store::Store;
/// Errors that may occur.
......@@ -65,32 +65,27 @@ pub(crate) enum Error {
/// * when the `Store` api is used by outside code.
#[derive(Debug)]
pub(crate) enum WorkerMsg {
ErasureRoots(ErasureRoots),
ParachainBlocks(ParachainBlocks),
IncludedParachainBlocks(IncludedParachainBlocks),
ListenForChunks(ListenForChunks),
Chunks(Chunks),
CandidatesFinalized(CandidatesFinalized),
MakeAvailable(MakeAvailable),
}
/// The erasure roots of the heads included in the block with a given parent.
/// A notification of a parachain block included in the relay chain.
#[derive(Debug)]
pub(crate) struct ErasureRoots {
/// The relay parent of the block these roots belong to.
pub relay_parent: Hash,
/// The roots themselves.
pub erasure_roots: Vec<Hash>,
/// A sender to signal the result asynchronously.
pub result: oneshot::Sender<Result<(), Error>>,
pub(crate) struct IncludedParachainBlock {
/// The abridged candidate receipt, extracted from a relay-chain block.
pub candidate: AbridgedCandidateReceipt,
/// The data to keep available from the candidate, if known.
pub available_data: Option<AvailableData>,
}
/// The receipts of the heads included into the block with a given parent.
#[derive(Debug)]
pub(crate) struct ParachainBlocks {
/// The relay parent of the block these parachain blocks belong to.
pub relay_parent: Hash,
pub(crate) struct IncludedParachainBlocks {
/// The blocks themselves.
pub blocks: Vec<(CandidateReceipt, Option<PoVBlock>)>,
pub blocks: Vec<IncludedParachainBlock>,
/// A sender to signal the result asynchronously.
pub result: oneshot::Sender<Result<(), Error>>,
}
......@@ -98,8 +93,6 @@ pub(crate) struct ParachainBlocks {
/// Listen gossip for these chunks.
#[derive(Debug)]
pub(crate) struct ListenForChunks {
/// The relay parent of the block the chunks from we want to listen to.
pub relay_parent: Hash,
/// The hash of the candidate chunk belongs to.
pub candidate_hash: Hash,
/// The index of the chunk we need.
......@@ -126,15 +119,17 @@ pub(crate) struct Chunks {
pub(crate) struct CandidatesFinalized {
/// The relay parent of the block that was finalized.
relay_parent: Hash,
/// The parachain heads that were finalized in this block.
candidate_hashes: Vec<Hash>,
/// The hashes of candidates that were finalized in this block.
included_candidates: HashSet<Hash>,
}
/// The message that corresponds to `make_available` call of the crate API.
#[derive(Debug)]
pub(crate) struct MakeAvailable {
/// The data being made available.
pub data: Data,
/// The hash of the candidate for which we are publishing data.
pub candidate_hash: Hash,
/// The data to make available.
pub available_data: AvailableData,
/// A sender to signal the result asynchronously.
pub result: oneshot::Sender<Result<(), Error>>,
}
......@@ -205,7 +200,7 @@ where
fn fetch_candidates<P>(client: &P, extrinsics: Vec<<Block as BlockT>::Extrinsic>, parent: &BlockId)
-> ClientResult<Option<Vec<CandidateReceipt>>>
-> ClientResult<Option<Vec<AbridgedCandidateReceipt>>>
where
P: ProvideRuntimeApi<Block>,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
......@@ -262,12 +257,15 @@ where
}
};
let candidate_hashes = match fetch_candidates(
let included_candidates = match fetch_candidates(
&*client,
extrinsics,
&BlockId::hash(parent_hash)
&BlockId::hash(parent_hash),
) {
Ok(Some(candidates)) => candidates.into_iter().map(|c| c.hash()).collect(),
Ok(Some(candidates)) => candidates
.into_iter()
.map(|c| c.hash())
.collect(),
Ok(None) => {
warn!(
target: LOG_TARGET,
......@@ -286,7 +284,7 @@ where
let msg = WorkerMsg::CandidatesFinalized(CandidatesFinalized {
relay_parent: parent_hash,
candidate_hashes
included_candidates
});
if let Err(_) = sender.send(msg).await {
......@@ -315,12 +313,12 @@ where
sender: &mut mpsc::UnboundedSender<WorkerMsg>,
) {
if let Some(awaited_chunks) = self.availability_store.awaited_chunks() {
for chunk in awaited_chunks {
for awaited_chunk in awaited_chunks {
if let Err(e) = self.register_chunks_listener(
runtime_handle,
sender,
chunk.0,
chunk.1,
awaited_chunk.relay_parent,
awaited_chunk.erasure_root,
) {
warn!(target: LOG_TARGET, "Failed to register gossip listener: {}", e);
}
......@@ -366,33 +364,36 @@ where
&mut self,
runtime_handle: &Handle,
sender: &mut mpsc::UnboundedSender<WorkerMsg>,
relay_parent: Hash,
blocks: Vec<(CandidateReceipt, Option<PoVBlock>)>,
blocks: Vec<IncludedParachainBlock>,
) -> Result<(), Error> {
let hashes: Vec<_> = blocks.iter().map(|(c, _)| c.hash()).collect();
// First we have to add the receipts themselves.
for (candidate, block) in blocks.into_iter() {
for IncludedParachainBlock { candidate, available_data }
in blocks.into_iter()
{
let _ = self.availability_store.add_candidate(&candidate);
if let Some(_block) = block {
if let Some(_available_data) = available_data {
// Should we be breaking block into chunks here and gossiping it and so on?
}
if let Err(e) = self.register_chunks_listener(
runtime_handle,
sender,
relay_parent,
candidate.erasure_root
candidate.relay_parent,
candidate.commitments.erasure_root,
) {
warn!(target: LOG_TARGET, "Failed to register chunk listener: {}", e);
}
}
let _ = self.availability_store.add_candidates_in_relay_block(
&relay_parent,
hashes
);
// This leans on the codebase-wide assumption that the `relay_parent`
// of all candidates in a block matches the parent hash of that block.
//
// In the future this will not always be true.
let _ = self.availability_store.note_candidates_with_relay_parent(
&candidate.relay_parent,
&[candidate.hash()],
);
}
Ok(())
}
......@@ -415,7 +416,11 @@ where
.ok_or(Error::CandidateNotFound { candidate_hash })?;
for chunk in &chunks {
let topic = erasure_coding_topic(relay_parent, receipt.erasure_root, chunk.index);
let topic = erasure_coding_topic(
relay_parent,
receipt.commitments.erasure_root,
chunk.index,
);
// need to remove gossip listener and stop it.
if let Some(signal) = self.registered_gossip_streams.remove(&topic) {
let _ = signal.fire();
......@@ -424,7 +429,6 @@ where
self.availability_store.add_erasure_chunks(
n_validators,
&relay_parent,
&candidate_hash,
chunks,
)?;
......@@ -432,17 +436,6 @@ where
Ok(())
}
// Adds the erasure roots into the store.
fn on_erasure_roots_received(
&mut self,
relay_parent: Hash,
erasure_roots: Vec<Hash>
) -> Result<(), Error> {
self.availability_store.add_erasure_roots_in_relay_block(&relay_parent, erasure_roots)?;
Ok(())
}
// Processes the `ListenForChunks` message.
//
// When the worker receives a `ListenForChunk` message, it double-checks that
......@@ -451,7 +444,6 @@ where
&mut self,
runtime_handle: &Handle,
sender: &mut mpsc::UnboundedSender<WorkerMsg>,
relay_parent: Hash,
candidate_hash: Hash,
id: usize
) -> Result<(), Error> {
......@@ -459,13 +451,13 @@ where
.ok_or(Error::CandidateNotFound { candidate_hash })?;
if self.availability_store
.get_erasure_chunk(&relay_parent, candidate.block_data_hash, id)
.get_erasure_chunk(&candidate_hash, id)
.is_none() {
if let Err(e) = self.register_chunks_listener(
runtime_handle,
sender,
relay_parent,
candidate.erasure_root
candidate.relay_parent,
candidate.commitments.erasure_root
) {
warn!(target: LOG_TARGET, "Failed to register a gossip listener: {}", e);
}
......@@ -498,7 +490,7 @@ where
let runtime_handle = runtime.handle().clone();
// On startup, registers listeners (gossip streams) for all
// (relay_parent, erasure-root, i) in the awaited frontier.
// entries in the awaited frontier.
worker.register_listeners(runtime.handle(), &mut sender);
let process_notification = async move {
......@@ -506,18 +498,8 @@ where
trace!(target: LOG_TARGET, "Received message {:?}", msg);
let res = match msg {
WorkerMsg::ErasureRoots(msg) => {
let ErasureRoots { relay_parent, erasure_roots, result} = msg;
let res = worker.on_erasure_roots_received(
relay_parent,
erasure_roots,
);
let _ = result.send(res);
Ok(())
}
WorkerMsg::ListenForChunks(msg) => {
let ListenForChunks {
relay_parent,
candidate_hash,
index,
result,
......@@ -526,7 +508,6 @@ where
let res = worker.on_listen_for_chunks_received(
&runtime_handle,
&mut sender,
relay_parent,
candidate_hash,
index as usize,
);
......@@ -536,9 +517,8 @@ where
}
Ok(())
}
WorkerMsg::ParachainBlocks(msg) => {
let ParachainBlocks {
relay_parent,
WorkerMsg::IncludedParachainBlocks(msg) => {
let IncludedParachainBlocks {
blocks,
result,
} = msg;
......@@ -546,7 +526,6 @@ where
let res = worker.on_parachain_blocks_received(
&runtime_handle,
&mut sender,
relay_parent,
blocks,
);
......@@ -565,16 +544,17 @@ where
Ok(())
}
WorkerMsg::CandidatesFinalized(msg) => {
let CandidatesFinalized { relay_parent, candidate_hashes } = msg;
let CandidatesFinalized { relay_parent, included_candidates } = msg;
worker.availability_store.candidates_finalized(
relay_parent,
candidate_hashes.into_iter().collect(),
included_candidates,
)
}
WorkerMsg::MakeAvailable(msg) => {
let MakeAvailable { data, result } = msg;
let res = worker.availability_store.make_available(data)
let MakeAvailable { candidate_hash, available_data, result } = msg;
let res = worker.availability_store
.make_available(candidate_hash, available_data)
.map_err(|e| e.into());
let _ = result.send(res);
Ok(())
......@@ -651,7 +631,6 @@ impl<I, P> BlockImport<Block> for AvailabilityBlockImport<I, P> where
);
if let Some(ref extrinsics) = block.body {
let relay_parent = *block.header.parent_hash();
let parent_id = BlockId::hash(*block.header.parent_hash());
// Extract our local position i from the validator set of the parent.
let validators = self.client.runtime_api().validators(&parent_id)
......@@ -675,16 +654,15 @@ impl<I, P> BlockImport<Block> for AvailabilityBlockImport<I, P> where
);
for candidate in &candidates {
let candidate_hash = candidate.hash();
// If we don't yet have our chunk of this candidate,
// tell the worker to listen for one.
if self.availability_store.get_erasure_chunk(
&relay_parent,
candidate.block_data_hash,
&candidate_hash,
our_id as usize,
).is_none() {
let msg = WorkerMsg::ListenForChunks(ListenForChunks {
relay_parent,
candidate_hash: candidate.hash(),
candidate_hash,
index: our_id as u32,
result: None,
});
......@@ -693,27 +671,19 @@ impl<I, P> BlockImport<Block> for AvailabilityBlockImport<I, P> where
}
}
let erasure_roots: Vec<_> = candidates
.iter()
.map(|c| c.erasure_root)
.collect();
// Inform the worker about new (relay_parent, erasure_roots) pairs
let (s, _) = oneshot::channel();