(network: EN) -> Self
- where EN: ErasureNetworking + Send + Sync + Clone + 'static
- {
- let inner = InnerStore::new_in_memory();
- let worker = Arc::new(Worker::start(inner.clone(), network));
- let to_worker = worker.to_worker().clone();
-
- Self {
- inner,
- worker,
- to_worker,
- }
- }
-
- /// Obtain a [`BlockImport`] implementation to import blocks into this store.
- ///
- /// This block import will act upon all newly imported blocks sending information
- /// about parachain heads included in them to this `Store`'s background worker.
- /// The user may create multiple instances of [`BlockImport`]s with this call.
- ///
- /// [`BlockImport`]: https://substrate.dev/rustdocs/v1.0/substrate_consensus_common/trait.BlockImport.html
- pub fn block_import(
- &self,
- wrapped_block_import: I,
- client: Arc ,
- spawner: impl Spawn,
- keystore: KeyStorePtr,
- ) -> ClientResult>
- where
- P: ProvideRuntimeApi + BlockchainEvents + BlockBackend + Send + Sync + 'static,
- P::Api: ParachainHost,
- P::Api: ApiExt,
- // Rust bug: https://github.com/rust-lang/rust/issues/24159
- sp_api::StateBackendFor: sp_api::StateBackend>,
- {
- let to_worker = self.to_worker.clone();
-
- let import = AvailabilityBlockImport::new(
- client,
- wrapped_block_import,
- spawner,
- keystore,
- to_worker,
- );
-
- Ok(import)
- }
-
- /// Make some data available provisionally.
- ///
- /// Validators with the responsibility of maintaining availability
- /// for a block or collators collating a block will call this function
- /// in order to persist that data to disk and so it can be queried and provided
- /// to other nodes in the network.
- ///
- /// Determination of invalidity is beyond the scope of this function.
- ///
- /// This method will send the data to the background worker, allowing the caller to
- /// asynchronously wait for the result.
- pub async fn make_available(&self, candidate_hash: Hash, available_data: AvailableData)
- -> io::Result<()>
- {
- let (s, r) = oneshot::channel();
- let msg = WorkerMsg::MakeAvailable(MakeAvailable {
- candidate_hash,
- available_data,
- result: s,
- });
-
- let _ = self.to_worker.unbounded_send(msg);
-
- if let Ok(Ok(())) = r.await {
- Ok(())
- } else {
- Err(io::Error::new(io::ErrorKind::Other, format!("adding erasure chunks failed")))
- }
-
- }
-
- /// Get a set of all chunks we are waiting for.
- pub fn awaited_chunks(&self) -> Option> {
- self.inner.awaited_chunks()
- }
-
- /// Adds an erasure chunk to storage.
- ///
- /// The chunk should be checked for validity against the root of encoding
- /// and its proof prior to calling this.
- ///
- /// This method will send the chunk to the background worker, allowing the caller to
- /// asynchronously wait for the result.
- pub async fn add_erasure_chunk(
- &self,
- candidate: AbridgedCandidateReceipt,
- n_validators: u32,
- chunk: ErasureChunk,
- ) -> io::Result<()> {
- self.add_erasure_chunks(candidate, n_validators, std::iter::once(chunk)).await
- }
-
- /// Adds a set of erasure chunks to storage.
- ///
- /// The chunks should be checked for validity against the root of encoding
- /// and its proof prior to calling this.
- ///
- /// This method will send the chunks to the background worker, allowing the caller to
- /// asynchronously wait for the result.
- pub async fn add_erasure_chunks(
- &self,
- candidate: AbridgedCandidateReceipt,
- n_validators: u32,
- chunks: I,
- ) -> io::Result<()>
- where I: IntoIterator-
- {
- let candidate_hash = candidate.hash();
-
- self.add_candidate(candidate).await?;
-
- let (s, r) = oneshot::channel();
- let chunks = chunks.into_iter().collect();
-
- let msg = WorkerMsg::Chunks(Chunks {
- candidate_hash,
- chunks,
- n_validators,
- result: s,
- });
-
- let _ = self.to_worker.unbounded_send(msg);
-
- if let Ok(Ok(())) = r.await {
- Ok(())
- } else {
- Err(io::Error::new(io::ErrorKind::Other, format!("adding erasure chunks failed")))
- }
- }
-
- /// Queries an erasure chunk by the candidate hash and validator index.
- pub fn get_erasure_chunk(
- &self,
- candidate_hash: &Hash,
- validator_index: usize,
- ) -> Option
{
- self.inner.get_erasure_chunk(candidate_hash, validator_index)
- }
-
- /// Note a validator's index and a number of validators at a relay parent in the
- /// store.
- ///
- /// This should be done before adding erasure chunks with this relay parent.
- pub fn note_validator_index_and_n_validators(
- &self,
- relay_parent: &Hash,
- validator_index: u32,
- n_validators: u32,
- ) -> io::Result<()> {
- self.inner.note_validator_index_and_n_validators(
- relay_parent,
- validator_index,
- n_validators,
- )
- }
-
- // Stores a candidate receipt.
- async fn add_candidate(
- &self,
- candidate: AbridgedCandidateReceipt,
- ) -> io::Result<()> {
- let (s, r) = oneshot::channel();
-
- let msg = WorkerMsg::IncludedParachainBlocks(IncludedParachainBlocks {
- blocks: vec![crate::worker::IncludedParachainBlock {
- candidate,
- available_data: None,
- }],
- result: s,
- });
-
- let _ = self.to_worker.unbounded_send(msg);
-
- if let Ok(Ok(())) = r.await {
- Ok(())
- } else {
- Err(io::Error::new(io::ErrorKind::Other, format!("adding erasure chunks failed")))
- }
- }
-
- /// Queries a candidate receipt by its hash.
- pub fn get_candidate(&self, candidate_hash: &Hash)
- -> Option
- {
- self.inner.get_candidate(candidate_hash)
- }
-
- /// Query execution data by pov-block hash.
- pub fn execution_data(&self, candidate_hash: &Hash)
- -> Option
- {
- self.inner.execution_data(candidate_hash)
- }
-}
diff --git a/availability-store/src/store.rs b/availability-store/src/store.rs
deleted file mode 100644
index e3b1e35929795211130d09e84454c28a34faeea4..0000000000000000000000000000000000000000
--- a/availability-store/src/store.rs
+++ /dev/null
@@ -1,623 +0,0 @@
-// Copyright 2018-2020 Parity Technologies (UK) Ltd.
-// This file is part of Polkadot.
-
-// Polkadot is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Polkadot is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Polkadot. If not, see .
-
-#[cfg(not(target_os = "unknown"))]
-use kvdb_rocksdb::{Database, DatabaseConfig};
-use kvdb::{KeyValueDB, DBTransaction};
-use codec::{Encode, Decode};
-use polkadot_erasure_coding::{self as erasure};
-use polkadot_primitives::{
- Hash,
- parachain::{
- ErasureChunk, AvailableData, AbridgedCandidateReceipt,
- },
-};
-use parking_lot::Mutex;
-
-use log::{trace, warn};
-use std::collections::HashSet;
-use std::sync::Arc;
-use std::iter::FromIterator;
-use std::io;
-
-use crate::{LOG_TARGET, Config, ExecutionData};
-
-mod columns {
- pub const DATA: u32 = 0;
- pub const META: u32 = 1;
- pub const NUM_COLUMNS: u32 = 2;
-}
-
-#[derive(Clone)]
-pub struct Store {
- inner: Arc,
- candidate_descendents_lock: Arc>
-}
-
-// data keys
-fn execution_data_key(candidate_hash: &Hash) -> Vec {
- (candidate_hash, 0i8).encode()
-}
-
-fn erasure_chunks_key(candidate_hash: &Hash) -> Vec {
- (candidate_hash, 1i8).encode()
-}
-
-fn candidate_key(candidate_hash: &Hash) -> Vec {
- (candidate_hash, 2i8).encode()
-}
-
-fn candidates_with_relay_parent_key(relay_block: &Hash) -> Vec {
- (relay_block, 4i8).encode()
-}
-
-// meta keys
-const AWAITED_CHUNKS_KEY: [u8; 14] = *b"awaited_chunks";
-
-fn validator_index_and_n_validators_key(relay_parent: &Hash) -> Vec {
- (relay_parent, 1i8).encode()
-}
-
-fn available_chunks_key(candidate_hash: &Hash) -> Vec {
- (candidate_hash, 2i8).encode()
-}
-
-/// An entry in the awaited frontier of chunks we are interested in.
-#[derive(Encode, Decode, Debug, Hash, PartialEq, Eq, Clone)]
-pub struct AwaitedFrontierEntry {
- /// The hash of the candidate for which we want to fetch a chunk for.
- /// There will be duplicate entries in the case of multiple candidates with
- /// the same erasure-root, but this is unlikely.
- pub candidate_hash: Hash,
- /// Although the relay-parent is implicitly referenced by the candidate hash,
- /// we include it here as well for convenience in pruning the set.
- pub relay_parent: Hash,
- /// The index of the validator we represent.
- pub validator_index: u32,
-}
-
-impl Store {
- /// Create a new `Store` with given condig on disk.
- #[cfg(not(target_os = "unknown"))]
- pub(super) fn new(config: Config) -> io::Result {
- let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS);
-
- if let Some(cache_size) = config.cache_size {
- let mut memory_budget = std::collections::HashMap::new();
- for i in 0..columns::NUM_COLUMNS {
- memory_budget.insert(i, cache_size / columns::NUM_COLUMNS as usize);
- }
-
- db_config.memory_budget = memory_budget;
- }
-
- let path = config.path.to_str().ok_or_else(|| io::Error::new(
- io::ErrorKind::Other,
- format!("Bad database path: {:?}", config.path),
- ))?;
-
- let db = Database::open(&db_config, &path)?;
-
- Ok(Store {
- inner: Arc::new(db),
- candidate_descendents_lock: Arc::new(Mutex::new(())),
- })
- }
-
- /// Create a new `Store` in-memory. Useful for tests.
- pub(super) fn new_in_memory() -> Self {
- Store {
- inner: Arc::new(::kvdb_memorydb::create(columns::NUM_COLUMNS)),
- candidate_descendents_lock: Arc::new(Mutex::new(())),
- }
- }
-
- /// Make some data available provisionally.
- pub(crate) fn make_available(&self, candidate_hash: Hash, available_data: AvailableData)
- -> io::Result<()>
- {
- let mut tx = DBTransaction::new();
-
- // at the moment, these structs are identical. later, we will also
- // keep outgoing message queues available, and these are not needed
- // for execution.
- let AvailableData { pov_block, omitted_validation } = available_data;
- let execution_data = ExecutionData {
- pov_block,
- omitted_validation,
- };
-
- tx.put_vec(
- columns::DATA,
- execution_data_key(&candidate_hash).as_slice(),
- execution_data.encode(),
- );
-
- self.inner.write(tx)
- }
-
- /// Get a set of all chunks we are waiting for.
- pub fn awaited_chunks(&self) -> Option> {
- self.query_inner(columns::META, &AWAITED_CHUNKS_KEY).map(|vec: Vec| {
- HashSet::from_iter(vec.into_iter())
- })
- }
-
- /// Adds a set of candidates hashes that were included in a relay block by the block's parent.
- ///
- /// If we already possess the receipts for these candidates _and_ our position at the specified
- /// relay chain the awaited frontier of the erasure chunks will also be extended.
- ///
- /// This method modifies the erasure chunks awaited frontier by adding this validator's
- /// chunks from `candidates` to it. In order to do so the information about this validator's
- /// position at parent `relay_parent` should be known to the store prior to calling this
- /// method, in other words `note_validator_index_and_n_validators` should be called for
- /// the given `relay_parent` before calling this function.
- pub(crate) fn note_candidates_with_relay_parent(
- &self,
- relay_parent: &Hash,
- candidates: &[Hash],
- ) -> io::Result<()> {
- let mut tx = DBTransaction::new();
- let dbkey = candidates_with_relay_parent_key(relay_parent);
-
- // This call can race against another call to `note_candidates_with_relay_parent`
- // with a different set of descendents.
- let _lock = self.candidate_descendents_lock.lock();
-
- if let Some((validator_index, _)) = self.get_validator_index_and_n_validators(relay_parent) {
- let candidates = candidates.clone();
- let awaited_frontier: Vec = self
- .query_inner(columns::META, &AWAITED_CHUNKS_KEY)
- .unwrap_or_else(|| Vec::new());
-
- let mut awaited_frontier: HashSet =
- HashSet::from_iter(awaited_frontier.into_iter());
-
- awaited_frontier.extend(candidates.iter().cloned().map(|candidate_hash| {
- AwaitedFrontierEntry {
- relay_parent: relay_parent.clone(),
- candidate_hash,
- validator_index,
- }
- }));
- let awaited_frontier = Vec::from_iter(awaited_frontier.into_iter());
- tx.put_vec(columns::META, &AWAITED_CHUNKS_KEY, awaited_frontier.encode());
- }
-
- let mut descendent_candidates = self.get_candidates_with_relay_parent(relay_parent);
- descendent_candidates.extend(candidates.iter().cloned());
- tx.put_vec(columns::DATA, &dbkey, descendent_candidates.encode());
-
- self.inner.write(tx)
- }
-
- /// Make a validator's index and a number of validators at a relay parent available.
- pub(crate) fn note_validator_index_and_n_validators(
- &self,
- relay_parent: &Hash,
- validator_index: u32,
- n_validators: u32,
- ) -> io::Result<()> {
- let mut tx = DBTransaction::new();
- let dbkey = validator_index_and_n_validators_key(relay_parent);
-
- tx.put_vec(columns::META, &dbkey, (validator_index, n_validators).encode());
-
- self.inner.write(tx)
- }
-
- /// Query a validator's index and n_validators by relay parent.
- pub(crate) fn get_validator_index_and_n_validators(&self, relay_parent: &Hash) -> Option<(u32, u32)> {
- let dbkey = validator_index_and_n_validators_key(relay_parent);
-
- self.query_inner(columns::META, &dbkey)
- }
-
- /// Add a set of chunks.
- ///
- /// The same as `add_erasure_chunk` but adds a set of chunks in one atomic transaction.
- pub fn add_erasure_chunks(
- &self,
- n_validators: u32,
- candidate_hash: &Hash,
- chunks: I,
- ) -> io::Result<()>
- where I: IntoIterator-
- {
- if let Some(receipt) = self.get_candidate(candidate_hash) {
- let mut tx = DBTransaction::new();
- let dbkey = erasure_chunks_key(candidate_hash);
-
- let mut v = self.query_inner(columns::DATA, &dbkey).unwrap_or(Vec::new());
-
- let av_chunks_key = available_chunks_key(candidate_hash);
- let mut have_chunks = self.query_inner(columns::META, &av_chunks_key).unwrap_or(Vec::new());
-
- let awaited_frontier: Option
> = self.query_inner(
- columns::META,
- &AWAITED_CHUNKS_KEY,
- );
-
- for chunk in chunks.into_iter() {
- if !have_chunks.contains(&chunk.index) {
- have_chunks.push(chunk.index);
- }
- v.push(chunk);
- }
-
- if let Some(mut awaited_frontier) = awaited_frontier {
- awaited_frontier.retain(|entry| {
- !(
- entry.relay_parent == receipt.relay_parent &&
- &entry.candidate_hash == candidate_hash &&
- have_chunks.contains(&entry.validator_index)
- )
- });
- tx.put_vec(columns::META, &AWAITED_CHUNKS_KEY, awaited_frontier.encode());
- }
-
- // If there are no block data in the store at this point,
- // check that they can be reconstructed now and add them to store if they can.
- if self.execution_data(&candidate_hash).is_none() {
- if let Ok(available_data) = erasure::reconstruct(
- n_validators as usize,
- v.iter().map(|chunk| (chunk.chunk.as_ref(), chunk.index as usize)),
- )
- {
- self.make_available(*candidate_hash, available_data)?;
- }
- }
-
- tx.put_vec(columns::DATA, &dbkey, v.encode());
- tx.put_vec(columns::META, &av_chunks_key, have_chunks.encode());
-
- self.inner.write(tx)
- } else {
- trace!(target: LOG_TARGET, "Candidate with hash {} not found", candidate_hash);
- Ok(())
- }
- }
-
- /// Queries an erasure chunk by its block's relay-parent, the candidate hash, and index.
- pub fn get_erasure_chunk(
- &self,
- candidate_hash: &Hash,
- index: usize,
- ) -> Option {
- self.query_inner(columns::DATA, &erasure_chunks_key(candidate_hash))
- .and_then(|chunks: Vec| {
- chunks.iter()
- .find(|chunk: &&ErasureChunk| chunk.index == index as u32)
- .map(|chunk| chunk.clone())
- })
- }
-
- /// Stores a candidate receipt.
- pub fn add_candidate(
- &self,
- receipt: &AbridgedCandidateReceipt,
- ) -> io::Result<()> {
- let candidate_hash = receipt.hash();
- let dbkey = candidate_key(&candidate_hash);
- let mut tx = DBTransaction::new();
-
- tx.put_vec(columns::DATA, &dbkey, receipt.encode());
-
- self.inner.write(tx)
- }
-
- /// Queries a candidate receipt by the relay parent hash and its hash.
- pub(crate) fn get_candidate(&self, candidate_hash: &Hash)
- -> Option
- {
- self.query_inner(columns::DATA, &candidate_key(candidate_hash))
- }
-
- /// Note that a set of candidates have been included in a finalized block with given hash and parent hash.
- pub(crate) fn candidates_finalized(
- &self,
- relay_parent: Hash,
- finalized_candidates: HashSet,
- ) -> io::Result<()> {
- let mut tx = DBTransaction::new();
-
- let awaited_frontier: Option> = self
- .query_inner(columns::META, &AWAITED_CHUNKS_KEY);
-
- if let Some(mut awaited_frontier) = awaited_frontier {
- awaited_frontier.retain(|entry| entry.relay_parent != relay_parent);
- tx.put_vec(columns::META, &AWAITED_CHUNKS_KEY, awaited_frontier.encode());
- }
-
- let candidates = self.get_candidates_with_relay_parent(&relay_parent);
-
- for candidate in candidates.into_iter().filter(|c| !finalized_candidates.contains(c)) {
- // we only delete this data for candidates which were not finalized.
- // we keep all data for the finalized chain forever at the moment.
- tx.delete(columns::DATA, execution_data_key(&candidate).as_slice());
- tx.delete(columns::DATA, &erasure_chunks_key(&candidate));
- tx.delete(columns::DATA, &candidate_key(&candidate));
-
- tx.delete(columns::META, &available_chunks_key(&candidate));
- }
-
- self.inner.write(tx)
- }
-
- /// Query execution data by relay parent and candidate hash.
- pub(crate) fn execution_data(&self, candidate_hash: &Hash) -> Option {
- self.query_inner(columns::DATA, &execution_data_key(candidate_hash))
- }
-
- /// Get candidates which pinned to the environment of the given relay parent.
- /// Note that this is not necessarily the same as candidates that were included in a direct
- /// descendent of the given relay-parent.
- fn get_candidates_with_relay_parent(&self, relay_parent: &Hash) -> Vec {
- let key = candidates_with_relay_parent_key(relay_parent);
- self.query_inner(columns::DATA, &key[..]).unwrap_or_default()
- }
-
- fn query_inner(&self, column: u32, key: &[u8]) -> Option {
- match self.inner.get(column, key) {
- Ok(Some(raw)) => {
- let res = T::decode(&mut &raw[..]).expect("all stored data serialized correctly; qed");
- Some(res)
- }
- Ok(None) => None,
- Err(e) => {
- warn!(target: LOG_TARGET, "Error reading from the availability store: {:?}", e);
- None
- }
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use polkadot_erasure_coding::{self as erasure};
- use polkadot_primitives::parachain::{
- Id as ParaId, BlockData, AvailableData, PoVBlock, OmittedValidationData,
- };
-
- fn available_data(block_data: &[u8]) -> AvailableData {
- AvailableData {
- pov_block: PoVBlock {
- block_data: BlockData(block_data.to_vec()),
- },
- omitted_validation: OmittedValidationData {
- global_validation: Default::default(),
- local_validation: Default::default(),
- }
- }
- }
-
- fn execution_data(available: &AvailableData) -> ExecutionData {
- let AvailableData { pov_block, omitted_validation } = available.clone();
- ExecutionData { pov_block, omitted_validation }
- }
-
- #[test]
- fn finalization_removes_unneeded() {
- let relay_parent = [1; 32].into();
-
- let para_id_1 = 5.into();
- let para_id_2 = 6.into();
-
- let mut candidate_1 = AbridgedCandidateReceipt::default();
- let mut candidate_2 = AbridgedCandidateReceipt::default();
-
- candidate_1.parachain_index = para_id_1;
- candidate_1.commitments.erasure_root = [6; 32].into();
- candidate_1.relay_parent = relay_parent;
-
- candidate_2.parachain_index = para_id_2;
- candidate_2.commitments.erasure_root = [6; 32].into();
- candidate_2.relay_parent = relay_parent;
-
-
- let candidate_1_hash = candidate_1.hash();
- let candidate_2_hash = candidate_2.hash();
-
- let available_data_1 = available_data(&[1, 2, 3]);
- let available_data_2 = available_data(&[4, 5, 6]);
-
- let erasure_chunk_1 = ErasureChunk {
- chunk: vec![10, 20, 30],
- index: 1,
- proof: vec![],
- };
-
- let erasure_chunk_2 = ErasureChunk {
- chunk: vec![40, 50, 60],
- index: 1,
- proof: vec![],
- };
-
- let store = Store::new_in_memory();
- store.make_available(candidate_1_hash, available_data_1.clone()).unwrap();
-
- store.make_available(candidate_2_hash, available_data_2.clone()).unwrap();
-
- store.add_candidate(&candidate_1).unwrap();
- store.add_candidate(&candidate_2).unwrap();
-
- store.note_candidates_with_relay_parent(&relay_parent, &[candidate_1_hash, candidate_2_hash]).unwrap();
-
- assert!(store.add_erasure_chunks(3, &candidate_1_hash, vec![erasure_chunk_1.clone()]).is_ok());
- assert!(store.add_erasure_chunks(3, &candidate_2_hash, vec![erasure_chunk_2.clone()]).is_ok());
-
- assert_eq!(store.execution_data(&candidate_1_hash).unwrap(), execution_data(&available_data_1));
- assert_eq!(store.execution_data(&candidate_2_hash).unwrap(), execution_data(&available_data_2));
-
- assert_eq!(store.get_erasure_chunk(&candidate_1_hash, 1).as_ref(), Some(&erasure_chunk_1));
- assert_eq!(store.get_erasure_chunk(&candidate_2_hash, 1), Some(erasure_chunk_2));
-
- assert_eq!(store.get_candidate(&candidate_1_hash), Some(candidate_1.clone()));
- assert_eq!(store.get_candidate(&candidate_2_hash), Some(candidate_2.clone()));
-
- store.candidates_finalized(relay_parent, [candidate_1_hash].iter().cloned().collect()).unwrap();
-
- assert_eq!(store.get_erasure_chunk(&candidate_1_hash, 1).as_ref(), Some(&erasure_chunk_1));
- assert!(store.get_erasure_chunk(&candidate_2_hash, 1).is_none());
-
- assert_eq!(store.get_candidate(&candidate_1_hash), Some(candidate_1));
- assert_eq!(store.get_candidate(&candidate_2_hash), None);
-
- assert_eq!(store.execution_data(&candidate_1_hash).unwrap(), execution_data(&available_data_1));
- assert!(store.execution_data(&candidate_2_hash).is_none());
- }
-
- #[test]
- fn erasure_coding() {
- let relay_parent: Hash = [1; 32].into();
- let para_id: ParaId = 5.into();
- let available_data = available_data(&[42; 8]);
- let n_validators = 5;
-
- let erasure_chunks = erasure::obtain_chunks(
- n_validators,
- &available_data,
- ).unwrap();
-
- let branches = erasure::branches(erasure_chunks.as_ref());
-
- let mut candidate = AbridgedCandidateReceipt::default();
- candidate.parachain_index = para_id;
- candidate.commitments.erasure_root = [6; 32].into();
- candidate.relay_parent = relay_parent;
-
- let candidate_hash = candidate.hash();
-
- let chunks: Vec<_> = erasure_chunks
- .iter()
- .zip(branches.map(|(proof, _)| proof))
- .enumerate()
- .map(|(index, (chunk, proof))| ErasureChunk {
- chunk: chunk.clone(),
- proof,
- index: index as u32,
- })
- .collect();
-
- let store = Store::new_in_memory();
-
- store.add_candidate(&candidate).unwrap();
- store.add_erasure_chunks(n_validators as u32, &candidate_hash, vec![chunks[0].clone()]).unwrap();
- assert_eq!(store.get_erasure_chunk(&candidate_hash, 0), Some(chunks[0].clone()));
-
- assert!(store.execution_data(&candidate_hash).is_none());
-
- store.add_erasure_chunks(n_validators as u32, &candidate_hash, chunks).unwrap();
- assert_eq!(store.execution_data(&candidate_hash), Some(execution_data(&available_data)));
- }
-
- #[test]
- fn add_validator_index_works() {
- let relay_parent = [42; 32].into();
- let store = Store::new_in_memory();
-
- store.note_validator_index_and_n_validators(&relay_parent, 42, 24).unwrap();
- assert_eq!(store.get_validator_index_and_n_validators(&relay_parent).unwrap(), (42, 24));
- }
-
- #[test]
- fn add_candidates_in_relay_block_works() {
- let relay_parent = [42; 32].into();
- let store = Store::new_in_memory();
-
- let candidates = vec![[1; 32].into(), [2; 32].into(), [3; 32].into()];
-
- store.note_candidates_with_relay_parent(&relay_parent, &candidates).unwrap();
- assert_eq!(store.get_candidates_with_relay_parent(&relay_parent), candidates);
- }
-
- #[test]
- fn awaited_chunks_works() {
- use std::iter::FromIterator;
- let validator_index = 3;
- let n_validators = 10;
- let relay_parent = [42; 32].into();
- let erasure_root_1 = [11; 32].into();
- let erasure_root_2 = [12; 32].into();
- let mut receipt_1 = AbridgedCandidateReceipt::default();
- let mut receipt_2 = AbridgedCandidateReceipt::default();
-
-
- receipt_1.parachain_index = 1.into();
- receipt_1.commitments.erasure_root = erasure_root_1;
- receipt_1.relay_parent = relay_parent;
-
- receipt_2.parachain_index = 2.into();
- receipt_2.commitments.erasure_root = erasure_root_2;
- receipt_2.relay_parent = relay_parent;
-
- let receipt_1_hash = receipt_1.hash();
- let receipt_2_hash = receipt_2.hash();
-
- let chunk = ErasureChunk {
- chunk: vec![1, 2, 3],
- index: validator_index,
- proof: Vec::new(),
- };
- let candidates = vec![receipt_1_hash, receipt_2_hash];
-
- let store = Store::new_in_memory();
-
- store.note_validator_index_and_n_validators(
- &relay_parent,
- validator_index,
- n_validators
- ).unwrap();
- store.add_candidate(&receipt_1).unwrap();
- store.add_candidate(&receipt_2).unwrap();
-
- // We are waiting for chunks from two candidates.
- store.note_candidates_with_relay_parent(&relay_parent, &candidates).unwrap();
-
- let awaited_frontier = store.awaited_chunks().unwrap();
- warn!(target: "availability", "awaited {:?}", awaited_frontier);
- let expected: HashSet<_> = candidates
- .clone()
- .into_iter()
- .map(|c| AwaitedFrontierEntry {
- relay_parent,
- candidate_hash: c,
- validator_index,
- })
- .collect();
- assert_eq!(awaited_frontier, expected);
-
- // We add chunk from one of the candidates.
- store.add_erasure_chunks(n_validators, &receipt_1_hash, vec![chunk]).unwrap();
-
- let awaited_frontier = store.awaited_chunks().unwrap();
- // Now we wait for the other chunk that we haven't received yet.
- let expected: HashSet<_> = vec![AwaitedFrontierEntry {
- relay_parent,
- candidate_hash: receipt_2_hash,
- validator_index,
- }].into_iter().collect();
-
- assert_eq!(awaited_frontier, expected);
-
- // Finalizing removes awaited candidates from frontier.
- store.candidates_finalized(relay_parent, HashSet::from_iter(candidates.into_iter())).unwrap();
-
- assert_eq!(store.awaited_chunks().unwrap().len(), 0);
- }
-}
diff --git a/availability-store/src/worker.rs b/availability-store/src/worker.rs
deleted file mode 100644
index 0ff59a9fca38377bd30c1fd405184f4796cce7f1..0000000000000000000000000000000000000000
--- a/availability-store/src/worker.rs
+++ /dev/null
@@ -1,942 +0,0 @@
-// Copyright 2018-2020 Parity Technologies (UK) Ltd.
-// This file is part of Polkadot.
-
-// Polkadot is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Polkadot is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Polkadot. If not, see .
-
-use std::collections::{HashMap, HashSet};
-use std::io;
-use std::sync::Arc;
-use std::thread;
-
-use log::{error, info, trace, warn};
-use sp_blockchain::{Result as ClientResult};
-use sp_runtime::traits::{Header as HeaderT, Block as BlockT, HashFor, BlakeTwo256};
-use sp_api::{ApiExt, ProvideRuntimeApi};
-use client::{
- BlockchainEvents, BlockBackend,
- blockchain::ProvideCache,
-};
-use consensus_common::{
- self, BlockImport, BlockCheckParams, BlockImportParams, Error as ConsensusError,
- ImportResult,
- import_queue::CacheKeyId,
-};
-use polkadot_primitives::{Block, BlockId, Hash};
-use polkadot_primitives::parachain::{
- ParachainHost, ValidatorId, AbridgedCandidateReceipt, AvailableData,
- ValidatorPair, ErasureChunk,
-};
-use futures::{prelude::*, future::select, channel::{mpsc, oneshot}, task::{Spawn, SpawnExt}};
-use futures::future::AbortHandle;
-use keystore::KeyStorePtr;
-
-use tokio::runtime::{Handle, Runtime as LocalRuntime};
-
-use crate::{LOG_TARGET, ErasureNetworking};
-use crate::store::Store;
-
-/// Errors that may occur.
-#[derive(Debug, derive_more::Display, derive_more::From)]
-pub(crate) enum Error {
- #[from]
- StoreError(io::Error),
- #[display(fmt = "Validator's id and number of validators at block with parent {} not found", relay_parent)]
- IdAndNValidatorsNotFound { relay_parent: Hash },
-}
-
-/// Used in testing to interact with the worker thread.
-#[cfg(test)]
-pub(crate) struct WithWorker(Box);
-
-#[cfg(test)]
-impl std::fmt::Debug for WithWorker {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "")
- }
-}
-
-/// Messages sent to the `Worker`.
-///
-/// Messages are sent in a number of different scenarios,
-/// for instance, when:
-/// * importing blocks in `BlockImport` implementation,
-/// * recieving finality notifications,
-/// * when the `Store` api is used by outside code.
-#[derive(Debug)]
-pub(crate) enum WorkerMsg {
- IncludedParachainBlocks(IncludedParachainBlocks),
- Chunks(Chunks),
- CandidatesFinalized(CandidatesFinalized),
- MakeAvailable(MakeAvailable),
- #[cfg(test)]
- WithWorker(WithWorker),
-}
-
-/// A notification of a parachain block included in the relay chain.
-#[derive(Debug)]
-pub(crate) struct IncludedParachainBlock {
- /// The abridged candidate receipt, extracted from a relay-chain block.
- pub candidate: AbridgedCandidateReceipt,
- /// The data to keep available from the candidate, if known.
- pub available_data: Option,
-}
-
-/// The receipts of the heads included into the block with a given parent.
-#[derive(Debug)]
-pub(crate) struct IncludedParachainBlocks {
- /// The blocks themselves.
- pub blocks: Vec,
- /// A sender to signal the result asynchronously.
- pub result: oneshot::Sender>,
-}
-
-/// We have received chunks we requested.
-#[derive(Debug)]
-pub(crate) struct Chunks {
- /// The hash of the parachain candidate these chunks belong to.
- pub candidate_hash: Hash,
- /// The chunks
- pub chunks: Vec,
- /// The number of validators present at the candidate's relay-parent.
- pub n_validators: u32,
- /// A sender to signal the result asynchronously.
- pub result: oneshot::Sender>,
-}
-
-/// These candidates have been finalized, so unneded availability may be now pruned
-#[derive(Debug)]
-pub(crate) struct CandidatesFinalized {
- /// The relay parent of the block that was finalized.
- relay_parent: Hash,
- /// The hashes of candidates that were finalized in this block.
- included_candidates: HashSet,
-}
-
-/// The message that corresponds to `make_available` call of the crate API.
-#[derive(Debug)]
-pub(crate) struct MakeAvailable {
- /// The hash of the candidate for which we are publishing data.
- pub candidate_hash: Hash,
- /// The data to make available.
- pub available_data: AvailableData,
- /// A sender to signal the result asynchronously.
- pub result: oneshot::Sender>,
-}
-
-/// Description of a chunk we are listening for.
-#[derive(Hash, Debug, PartialEq, Eq)]
-struct ListeningKey {
- candidate_hash: Hash,
- index: u32,
-}
-
-/// An availability worker with it's inner state.
-pub(super) struct Worker {
- availability_store: Store,
- listening_for: HashMap,
-
- sender: mpsc::UnboundedSender,
-}
-
-/// The handle to the `Worker`.
-pub(super) struct WorkerHandle {
- thread: Option>>,
- sender: mpsc::UnboundedSender,
- exit_signal: Option,
-}
-
-impl WorkerHandle {
- pub(crate) fn to_worker(&self) -> &mpsc::UnboundedSender {
- &self.sender
- }
-}
-
-impl Drop for WorkerHandle {
- fn drop(&mut self) {
- if let Some(signal) = self.exit_signal.take() {
- let _ = signal.fire();
- }
-
- if let Some(thread) = self.thread.take() {
- if let Err(_) = thread.join() {
- error!(target: LOG_TARGET, "Errored stopping the thread");
- }
- }
- }
-}
-
-
-fn fetch_candidates(client: &P, extrinsics: Vec<::Extrinsic>, parent: &BlockId)
- -> ClientResult>>
-where
- P: ProvideRuntimeApi,
- P::Api: ParachainHost,
- // Rust bug: https://github.com/rust-lang/rust/issues/24159
- sp_api::StateBackendFor: sp_api::StateBackend>,
-{
- let api = client.runtime_api();
-
- let candidates = if api.has_api_with::, _>(
- parent,
- |version| version >= 2,
- ).map_err(|e| ConsensusError::ChainLookup(e.to_string()))? {
- api.get_heads(&parent, extrinsics)
- .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
- } else {
- None
- };
-
- Ok(candidates)
-}
-
-/// Creates a task to prune entries in availability store upon block finalization.
-async fn prune_unneeded_availability(client: Arc
, mut sender: S)
-where
- P: ProvideRuntimeApi + BlockchainEvents + BlockBackend + Send + Sync + 'static,
- P::Api: ParachainHost + ApiExt,
- S: Sink + Clone + Send + Sync + Unpin,
- // Rust bug: https://github.com/rust-lang/rust/issues/24159
- sp_api::StateBackendFor: sp_api::StateBackend>,
-{
- let mut finality_notification_stream = client.finality_notification_stream();
-
- while let Some(notification) = finality_notification_stream.next().await {
- let hash = notification.hash;
- let parent_hash = notification.header.parent_hash;
- let extrinsics = match client.block_body(&BlockId::hash(hash)) {
- Ok(Some(extrinsics)) => extrinsics,
- Ok(None) => {
- error!(
- target: LOG_TARGET,
- "No block body found for imported block {:?}",
- hash,
- );
- continue;
- }
- Err(e) => {
- error!(
- target: LOG_TARGET,
- "Failed to get block body for imported block {:?}: {:?}",
- hash,
- e,
- );
- continue;
- }
- };
-
- let included_candidates = match fetch_candidates(
- &*client,
- extrinsics,
- &BlockId::hash(parent_hash),
- ) {
- Ok(Some(candidates)) => candidates
- .into_iter()
- .map(|c| c.hash())
- .collect(),
- Ok(None) => {
- warn!(
- target: LOG_TARGET,
- "Failed to extract candidates from block body of imported block {:?}", hash
- );
- continue;
- }
- Err(e) => {
- warn!(
- target: LOG_TARGET,
- "Failed to fetch block body for imported block {:?}: {:?}", hash, e
- );
- continue;
- }
- };
-
- let msg = WorkerMsg::CandidatesFinalized(CandidatesFinalized {
- relay_parent: parent_hash,
- included_candidates
- });
-
- if let Err(_) = sender.send(msg).await {
- break;
- }
- }
-}
-
-impl Worker {
-
- // Called on startup of the worker to initiate fetch from network for all awaited chunks.
- fn initiate_all_fetches(
- &mut self,
- runtime_handle: &Handle,
- erasure_network: &EN,
- sender: &mut mpsc::UnboundedSender,
- ) {
- if let Some(awaited_chunks) = self.availability_store.awaited_chunks() {
- for awaited_chunk in awaited_chunks {
- if let Err(e) = self.initiate_fetch(
- runtime_handle,
- erasure_network,
- sender,
- awaited_chunk.relay_parent,
- awaited_chunk.candidate_hash,
- ) {
- warn!(target: LOG_TARGET, "Failed to register network listener: {}", e);
- }
- }
- }
- }
-
- // initiates a fetch from network for the described chunk, with our local index.
- fn initiate_fetch(
- &mut self,
- runtime_handle: &Handle,
- erasure_network: &EN,
- sender: &mut mpsc::UnboundedSender,
- relay_parent: Hash,
- candidate_hash: Hash,
- ) -> Result<(), Error> {
- let (local_id, n_validators) = self.availability_store
- .get_validator_index_and_n_validators(&relay_parent)
- .ok_or(Error::IdAndNValidatorsNotFound { relay_parent })?;
-
- // fast exit for if we already have the chunk.
- if self.availability_store.get_erasure_chunk(&candidate_hash, local_id as _).is_some() {
- return Ok(())
- }
-
- trace!(
- target: LOG_TARGET,
- "Initiating fetch for erasure-chunk at parent {} with candidate-hash {}",
- relay_parent,
- candidate_hash,
- );
-
- let fut = erasure_network.fetch_erasure_chunk(&candidate_hash, local_id);
- let mut sender = sender.clone();
- let (fut, signal) = future::abortable(async move {
- let chunk = match fut.await {
- Ok(chunk) => chunk,
- Err(e) => {
- warn!(target: LOG_TARGET, "Unable to fetch erasure-chunk from network: {:?}", e);
- return
- }
- };
- let (s, _) = oneshot::channel();
- let _ = sender.send(WorkerMsg::Chunks(Chunks {
- candidate_hash,
- chunks: vec![chunk],
- n_validators,
- result: s,
- })).await;
- }.map(drop).boxed());
-
-
- let key = ListeningKey {
- candidate_hash,
- index: local_id,
- };
-
- self.listening_for.insert(key, signal);
- let _ = runtime_handle.spawn(fut);
-
- Ok(())
- }
-
- fn on_parachain_blocks_received(
- &mut self,
- runtime_handle: &Handle,
- erasure_network: &EN,
- sender: &mut mpsc::UnboundedSender,
- blocks: Vec,
- ) -> Result<(), Error> {
- // First we have to add the receipts themselves.
- for IncludedParachainBlock { candidate, available_data }
- in blocks.into_iter()
- {
- let _ = self.availability_store.add_candidate(&candidate);
-
- if let Some(_available_data) = available_data {
- // Should we be breaking block into chunks here and gossiping it and so on?
- }
-
- // This leans on the codebase-wide assumption that the `relay_parent`
- // of all candidates in a block matches the parent hash of that block.
- //
- // In the future this will not always be true.
- let candidate_hash = candidate.hash();
- let _ = self.availability_store.note_candidates_with_relay_parent(
- &candidate.relay_parent,
- &[candidate_hash],
- );
-
- if let Err(e) = self.initiate_fetch(
- runtime_handle,
- erasure_network,
- sender,
- candidate.relay_parent,
- candidate_hash,
- ) {
- warn!(target: LOG_TARGET, "Failed to register chunk listener: {}", e);
- }
- }
-
- Ok(())
- }
-
- // Handles chunks that were required.
- fn on_chunks(
- &mut self,
- candidate_hash: Hash,
- chunks: Vec,
- n_validators: u32,
- ) -> Result<(), Error> {
- for c in &chunks {
- let key = ListeningKey {
- candidate_hash,
- index: c.index,
- };
-
- // remove bookkeeping so network does not attempt to fetch
- // any longer.
- if let Some(exit_signal) = self.listening_for.remove(&key) {
- exit_signal.abort();
- }
- }
-
- self.availability_store.add_erasure_chunks(
- n_validators,
- &candidate_hash,
- chunks,
- )?;
-
- Ok(())
- }
-
- /// Starts a worker with a given availability store and a gossip messages provider.
- pub fn start(
- availability_store: Store,
- erasure_network: EN,
- ) -> WorkerHandle {
- let (sender, mut receiver) = mpsc::unbounded();
-
- let mut worker = Worker {
- availability_store,
- listening_for: HashMap::new(),
- sender: sender.clone(),
- };
-
- let sender = sender.clone();
- let (signal, exit) = exit_future::signal();
-
- let handle = thread::spawn(move || -> io::Result<()> {
- let mut runtime = LocalRuntime::new()?;
- let mut sender = worker.sender.clone();
-
- let runtime_handle = runtime.handle().clone();
-
- // On startup, initiates fetch from network for all
- // entries in the awaited frontier.
- worker.initiate_all_fetches(runtime.handle(), &erasure_network, &mut sender);
-
- let process_notification = async move {
- while let Some(msg) = receiver.next().await {
- trace!(target: LOG_TARGET, "Received message {:?}", msg);
-
- let res = match msg {
- WorkerMsg::IncludedParachainBlocks(msg) => {
- let IncludedParachainBlocks {
- blocks,
- result,
- } = msg;
-
- let res = worker.on_parachain_blocks_received(
- &runtime_handle,
- &erasure_network,
- &mut sender,
- blocks,
- );
-
- let _ = result.send(res);
- Ok(())
- }
- WorkerMsg::Chunks(msg) => {
- let Chunks {
- candidate_hash,
- chunks,
- n_validators,
- result,
- } = msg;
-
- let res = worker.on_chunks(
- candidate_hash,
- chunks,
- n_validators,
- );
-
- let _ = result.send(res);
- Ok(())
- }
- WorkerMsg::CandidatesFinalized(msg) => {
- let CandidatesFinalized { relay_parent, included_candidates } = msg;
-
- worker.availability_store.candidates_finalized(
- relay_parent,
- included_candidates,
- )
- }
- WorkerMsg::MakeAvailable(msg) => {
- let MakeAvailable { candidate_hash, available_data, result } = msg;
- let res = worker.availability_store
- .make_available(candidate_hash, available_data)
- .map_err(|e| e.into());
- let _ = result.send(res);
- Ok(())
- }
- #[cfg(test)]
- WorkerMsg::WithWorker(with_worker) => {
- (with_worker.0)(&mut worker);
- Ok(())
- }
- };
-
- if let Err(_) = res {
- warn!(target: LOG_TARGET, "An error occured while processing a message");
- }
- }
-
- };
-
- runtime.spawn(select(process_notification.boxed(), exit.clone()).map(drop));
- runtime.block_on(exit);
-
- info!(target: LOG_TARGET, "Availability worker exiting");
-
- Ok(())
- });
-
- WorkerHandle {
- thread: Some(handle),
- sender,
- exit_signal: Some(signal),
- }
- }
-}
-
-/// Implementor of the [`BlockImport`] trait.
-///
-/// Used to embed `availability-store` logic into the block imporing pipeline.
-///
-/// [`BlockImport`]: https://substrate.dev/rustdocs/v1.0/substrate_consensus_common/trait.BlockImport.html
-pub struct AvailabilityBlockImport {
- inner: I,
- client: Arc ,
- keystore: KeyStorePtr,
- to_worker: mpsc::UnboundedSender,
- exit_signal: AbortHandle,
-}
-
-impl Drop for AvailabilityBlockImport {
- fn drop(&mut self) {
- self.exit_signal.abort();
- }
-}
-
-impl BlockImport for AvailabilityBlockImport where
- I: BlockImport> + Send + Sync,
- I::Error: Into,
- P: ProvideRuntimeApi + ProvideCache,
- P::Api: ParachainHost,
- // Rust bug: https://github.com/rust-lang/rust/issues/24159
- sp_api::StateBackendFor: sp_api::StateBackend
-{
- type Error = ConsensusError;
- type Transaction = sp_api::TransactionFor;
-
- fn import_block(
- &mut self,
- block: BlockImportParams,
- new_cache: HashMap>,
- ) -> Result {
- trace!(
- target: LOG_TARGET,
- "Importing block #{}, ({})",
- block.header.number(),
- block.post_hash(),
- );
-
- if let Some(ref extrinsics) = block.body {
- let parent_id = BlockId::hash(*block.header.parent_hash());
- // Extract our local position i from the validator set of the parent.
- let validators = self.client.runtime_api().validators(&parent_id)
- .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?;
-
- let our_id = self.our_id(&validators);
-
- // Use a runtime API to extract all included erasure-roots from the imported block.
- let candidates = fetch_candidates(&*self.client, extrinsics.clone(), &parent_id)
- .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?;
-
- match candidates {
- Some(candidates) => {
- match our_id {
- Some(our_id) => {
- trace!(
- target: LOG_TARGET,
- "Our validator id is {}, the candidates included are {:?}",
- our_id,
- candidates,
- );
-
- let (s, _) = oneshot::channel();
-
- // Inform the worker about the included parachain blocks.
- let blocks = candidates
- .into_iter()
- .map(|c| IncludedParachainBlock {
- candidate: c,
- available_data: None,
- })
- .collect();
-
- let msg = WorkerMsg::IncludedParachainBlocks(IncludedParachainBlocks {
- blocks,
- result: s,
- });
-
- let _ = self.to_worker.unbounded_send(msg);
- }
- None => (),
- }
- }
- None => {
- trace!(
- target: LOG_TARGET,
- "No parachain heads were included in block {}", block.header.hash()
- );
- },
- }
- }
-
- self.inner.import_block(block, new_cache).map_err(Into::into)
- }
-
- fn check_block(
- &mut self,
- block: BlockCheckParams,
- ) -> Result {
- self.inner.check_block(block).map_err(Into::into)
- }
-}
-
-impl AvailabilityBlockImport {
- pub(crate) fn new(
- client: Arc ,
- block_import: I,
- spawner: impl Spawn,
- keystore: KeyStorePtr,
- to_worker: mpsc::UnboundedSender,
- ) -> Self
- where
- P: ProvideRuntimeApi + BlockBackend + BlockchainEvents + Send + Sync + 'static,
- P::Api: ParachainHost,
- P::Api: ApiExt,
- // Rust bug: https://github.com/rust-lang/rust/issues/24159
- sp_api::StateBackendFor: sp_api::StateBackend>,
- {
- // This is not the right place to spawn the finality future,
- // it would be more appropriate to spawn it in the `start` method of the `Worker`.
- // However, this would make the type of the `Worker` and the `Store` itself
- // dependent on the types of client and executor, which would prove
- // not not so handy in the testing code.
- let (prune_available, exit_signal) = future::abortable(prune_unneeded_availability(
- client.clone(),
- to_worker.clone(),
- ));
-
- if let Err(_) = spawner.spawn(prune_available.map(drop)) {
- error!(target: LOG_TARGET, "Failed to spawn availability pruning task");
- }
-
- AvailabilityBlockImport {
- client,
- inner: block_import,
- to_worker,
- keystore,
- exit_signal,
- }
- }
-
- fn our_id(&self, validators: &[ValidatorId]) -> Option {
- let keystore = self.keystore.read();
- validators
- .iter()
- .enumerate()
- .find_map(|(i, v)| {
- keystore.key_pair::(&v).map(|_| i as u32).ok()
- })
- }
-}
-
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use futures::channel::oneshot;
- use std::sync::Arc;
- use std::pin::Pin;
- use tokio::runtime::Runtime;
- use parking_lot::Mutex;
- use crate::store::AwaitedFrontierEntry;
-
- #[derive(Default, Clone)]
- struct TestErasureNetwork {
- chunk_receivers: Arc
- >>>,
- }
-
- impl TestErasureNetwork {
- // adds a receiver. this returns a sender for the erasure-chunk
- // along with an exit future that fires when the erasure chunk has
- // been fully-processed
- fn add_receiver(&self, candidate_hash: Hash, index: u32)
- -> oneshot::Sender
- {
- let (sender, receiver) = oneshot::channel();
- self.chunk_receivers.lock().insert((candidate_hash, index), receiver);
- sender
- }
- }
-
- impl ErasureNetworking for TestErasureNetwork {
- type Error = String;
-
- fn fetch_erasure_chunk(&self, candidate_hash: &Hash, index: u32)
- -> Pin> + Send>>
- {
- match self.chunk_receivers.lock().remove(&(*candidate_hash, index)) {
- Some(receiver) => receiver.then(|x| match x {
- Ok(x) => future::ready(Ok(x)).left_future(),
- Err(_) => future::pending().right_future(),
- }).boxed(),
- None => future::pending().boxed(),
- }
- }
-
- fn distribute_erasure_chunk(
- &self,
- _candidate_hash: Hash,
- _chunk: ErasureChunk
- ) {}
- }
-
- // This test tests that as soon as the worker receives info about new parachain blocks
- // included it registers gossip listeners for it's own chunks. Upon receiving the awaited
- // chunk messages the corresponding listeners are deregistered and these chunks are removed
- // from the awaited chunks set.
- #[test]
- fn receiving_gossip_chunk_removes_from_frontier() {
- let mut runtime = Runtime::new().unwrap();
- let relay_parent = [1; 32].into();
- let local_id = 2;
- let n_validators = 4;
-
- let store = Store::new_in_memory();
-
- let mut candidate = AbridgedCandidateReceipt::default();
-
- candidate.relay_parent = relay_parent;
- let candidate_hash = candidate.hash();
-
- // Tell the store our validator's position and the number of validators at given point.
- store.note_validator_index_and_n_validators(&relay_parent, local_id, n_validators).unwrap();
-
- let network = TestErasureNetwork::default();
- let chunk_sender = network.add_receiver(candidate_hash, local_id);
-
- // At this point we shouldn't be waiting for any chunks.
- assert!(store.awaited_chunks().is_none());
-
- let (s, r) = oneshot::channel();
-
- let msg = WorkerMsg::IncludedParachainBlocks(IncludedParachainBlocks {
- blocks: vec![IncludedParachainBlock {
- candidate,
- available_data: None,
- }],
- result: s,
- });
-
- let handle = Worker::start(store.clone(), network);
-
- // Tell the worker that the new blocks have been included into the relay chain.
- // This should trigger the registration of gossip message listeners for the
- // chunk topics.
- handle.sender.unbounded_send(msg).unwrap();
-
- runtime.block_on(r).unwrap().unwrap();
-
- // Make sure that at this point we are waiting for the appropriate chunk.
- assert_eq!(
- store.awaited_chunks().unwrap(),
- vec![AwaitedFrontierEntry {
- relay_parent,
- candidate_hash,
- validator_index: local_id,
- }].into_iter().collect()
- );
-
- // Complete the chunk request.
- chunk_sender.send(ErasureChunk {
- chunk: vec![1, 2, 3],
- index: local_id as u32,
- proof: vec![],
- }).unwrap();
-
- // wait until worker thread has de-registered the listener for a
- // particular chunk.
- loop {
- let (s, r) = oneshot::channel();
- handle.sender.unbounded_send(WorkerMsg::WithWorker(WithWorker(Box::new(move |worker| {
- let key = ListeningKey {
- candidate_hash,
- index: local_id,
- };
-
- let is_waiting = worker.listening_for.contains_key(&key);
-
- s.send(!is_waiting).unwrap(); // tell the test thread `true` if we are not waiting.
- })))).unwrap();
-
- if runtime.block_on(r).unwrap() {
- break
- }
- }
-
- // The awaited chunk has been received so at this point we no longer wait for any chunks.
- assert_eq!(store.awaited_chunks().unwrap().len(), 0);
- }
-
- #[test]
- fn included_parachain_blocks_registers_listener() {
- let mut runtime = Runtime::new().unwrap();
- let relay_parent = [1; 32].into();
- let erasure_root_1 = [2; 32].into();
- let erasure_root_2 = [3; 32].into();
- let pov_block_hash_1 = [4; 32].into();
- let pov_block_hash_2 = [5; 32].into();
- let local_id = 2;
- let n_validators = 4;
-
- let mut candidate_1 = AbridgedCandidateReceipt::default();
- candidate_1.commitments.erasure_root = erasure_root_1;
- candidate_1.pov_block_hash = pov_block_hash_1;
- candidate_1.relay_parent = relay_parent;
- let candidate_1_hash = candidate_1.hash();
-
- let mut candidate_2 = AbridgedCandidateReceipt::default();
- candidate_2.commitments.erasure_root = erasure_root_2;
- candidate_2.pov_block_hash = pov_block_hash_2;
- candidate_2.relay_parent = relay_parent;
- let candidate_2_hash = candidate_2.hash();
-
- let store = Store::new_in_memory();
-
- // Tell the store our validator's position and the number of validators at given point.
- store.note_validator_index_and_n_validators(&relay_parent, local_id, n_validators).unwrap();
-
- // Let the store know about the candidates
- store.add_candidate(&candidate_1).unwrap();
- store.add_candidate(&candidate_2).unwrap();
-
- // And let the store know about the chunk from the second candidate.
- store.add_erasure_chunks(
- n_validators,
- &candidate_2_hash,
- vec![ErasureChunk {
- chunk: vec![1, 2, 3],
- index: local_id,
- proof: Vec::default(),
- }],
- ).unwrap();
-
- let network = TestErasureNetwork::default();
- let _ = network.add_receiver(candidate_1_hash, local_id);
- let _ = network.add_receiver(candidate_2_hash, local_id);
-
- let handle = Worker::start(store.clone(), network.clone());
-
- {
- let (s, r) = oneshot::channel();
- // Tell the worker to listen for chunks from candidate 2 (we alredy have a chunk from it).
- let listen_msg_2 = WorkerMsg::IncludedParachainBlocks(IncludedParachainBlocks {
- blocks: vec![IncludedParachainBlock {
- candidate: candidate_2,
- available_data: None,
- }],
- result: s,
- });
-
- handle.sender.unbounded_send(listen_msg_2).unwrap();
-
- runtime.block_on(r).unwrap().unwrap();
- // The receiver for this chunk left intact => listener not registered.
- assert!(network.chunk_receivers.lock().contains_key(&(candidate_2_hash, local_id)));
-
- // more directly:
- let (s, r) = oneshot::channel();
- handle.sender.unbounded_send(WorkerMsg::WithWorker(WithWorker(Box::new(move |worker| {
- let key = ListeningKey {
- candidate_hash: candidate_2_hash,
- index: local_id,
- };
- let _ = s.send(worker.listening_for.contains_key(&key));
- })))).unwrap();
-
- assert!(!runtime.block_on(r).unwrap());
- }
-
- {
- let (s, r) = oneshot::channel();
-
- // Tell the worker to listen for chunks from candidate 1.
- // (we don't have a chunk from it yet).
- let listen_msg_1 = WorkerMsg::IncludedParachainBlocks(IncludedParachainBlocks {
- blocks: vec![IncludedParachainBlock {
- candidate: candidate_1,
- available_data: None,
- }],
- result: s,
- });
-
- handle.sender.unbounded_send(listen_msg_1).unwrap();
- runtime.block_on(r).unwrap().unwrap();
-
- // The receiver taken => listener registered.
- assert!(!network.chunk_receivers.lock().contains_key(&(candidate_1_hash, local_id)));
-
-
- // more directly:
- let (s, r) = oneshot::channel();
- handle.sender.unbounded_send(WorkerMsg::WithWorker(WithWorker(Box::new(move |worker| {
- let key = ListeningKey {
- candidate_hash: candidate_1_hash,
- index: local_id,
- };
- let _ = s.send(worker.listening_for.contains_key(&key));
- })))).unwrap();
-
- assert!(runtime.block_on(r).unwrap());
- }
- }
-}
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index ff67b236c547a586fe52b7861d1e6dd5bb94b52b..99eb04e9dd6377a9286bf3cf918d796ad9ea36e1 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "polkadot-cli"
-version = "0.8.12"
+version = "0.8.22"
authors = ["Parity Technologies "]
description = "Polkadot Relay-chain Client Node"
edition = "2018"
@@ -34,12 +34,15 @@ sc-service = { git = "https://github.com/paritytech/substrate", branch = "master
wasm-bindgen = { version = "0.2.57", optional = true }
wasm-bindgen-futures = { version = "0.4.7", optional = true }
browser-utils = { package = "substrate-browser-utils", git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
+# this crate is used only to enable `trie-memory-tracker` feature
+# see https://github.com/paritytech/substrate/pull/6745
+sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
[build-dependencies]
substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" }
[features]
-default = [ "wasmtime", "db", "cli", "service-old" ]
+default = [ "wasmtime", "db", "cli", "service-old", "trie-memory-tracker" ]
wasmtime = [ "sc-cli/wasmtime" ]
db = [ "service/db" ]
cli = [
@@ -57,3 +60,4 @@ browser = [
]
runtime-benchmarks = [ "service/runtime-benchmarks" ]
service-rewr = [ "service-new/full-node" ]
+trie-memory-tracker = [ "sp-trie/memory-tracker" ]
diff --git a/cli/src/browser.rs b/cli/src/browser.rs
index 6f3a4000843ae1c2394681f32d8789469bf6cba2..d3523e92a60005ff305963cf7bff516e1a67341a 100644
--- a/cli/src/browser.rs
+++ b/cli/src/browser.rs
@@ -46,8 +46,7 @@ async fn start_inner(chain_spec: String, log_level: String) -> Result,
- #[allow(missing_docs)]
#[structopt(flatten)]
pub run: RunCmd,
}
diff --git a/cli/src/command.rs b/cli/src/command.rs
index 714e3c2ddacd14f0ab33f20cec42b52459cac8d8..964e13e6d3fd8a6e722cfc5ef342f635c30b5d18 100644
--- a/cli/src/command.rs
+++ b/cli/src/command.rs
@@ -19,8 +19,7 @@ use log::info;
use service::{IdentifyVariant, self};
#[cfg(feature = "service-rewr")]
use service_new::{IdentifyVariant, self as service};
-use sc_executor::NativeExecutionDispatch;
-use sc_cli::{SubstrateCli, Result};
+use sc_cli::{SubstrateCli, Result, RuntimeVersion, Role};
use crate::cli::{Cli, Subcommand};
fn get_exec_name() -> Option {
@@ -31,19 +30,19 @@ fn get_exec_name() -> Option {
}
impl SubstrateCli for Cli {
- fn impl_name() -> &'static str { "Parity Polkadot" }
+ fn impl_name() -> String { "Parity Polkadot".into() }
- fn impl_version() -> &'static str { env!("SUBSTRATE_CLI_IMPL_VERSION") }
+ fn impl_version() -> String { env!("SUBSTRATE_CLI_IMPL_VERSION").into() }
- fn description() -> &'static str { env!("CARGO_PKG_DESCRIPTION") }
+ fn description() -> String { env!("CARGO_PKG_DESCRIPTION").into() }
- fn author() -> &'static str { env!("CARGO_PKG_AUTHORS") }
+ fn author() -> String { env!("CARGO_PKG_AUTHORS").into() }
- fn support_url() -> &'static str { "https://github.com/paritytech/polkadot/issues/new" }
+ fn support_url() -> String { "https://github.com/paritytech/polkadot/issues/new".into() }
fn copyright_start_year() -> i32 { 2017 }
- fn executable_name() -> &'static str { "polkadot" }
+ fn executable_name() -> String { "polkadot".into() }
fn load_spec(&self, id: &str) -> std::result::Result, String> {
let id = if id == "" {
@@ -54,27 +53,47 @@ impl SubstrateCli for Cli {
.unwrap_or("polkadot")
} else { id };
Ok(match id {
- "polkadot-dev" | "dev" => Box::new(service::chain_spec::polkadot_development_config()),
- "polkadot-local" => Box::new(service::chain_spec::polkadot_local_testnet_config()),
- "polkadot-staging" => Box::new(service::chain_spec::polkadot_staging_testnet_config()),
- "kusama-dev" => Box::new(service::chain_spec::kusama_development_config()),
- "kusama-local" => Box::new(service::chain_spec::kusama_local_testnet_config()),
- "kusama-staging" => Box::new(service::chain_spec::kusama_staging_testnet_config()),
+ "polkadot-dev" | "dev" => Box::new(service::chain_spec::polkadot_development_config()?),
+ "polkadot-local" => Box::new(service::chain_spec::polkadot_local_testnet_config()?),
+ "polkadot-staging" => Box::new(service::chain_spec::polkadot_staging_testnet_config()?),
+ "kusama-dev" => Box::new(service::chain_spec::kusama_development_config()?),
+ "kusama-local" => Box::new(service::chain_spec::kusama_local_testnet_config()?),
+ "kusama-staging" => Box::new(service::chain_spec::kusama_staging_testnet_config()?),
"polkadot" => Box::new(service::chain_spec::polkadot_config()?),
"westend" => Box::new(service::chain_spec::westend_config()?),
"kusama" => Box::new(service::chain_spec::kusama_config()?),
- "westend-dev" => Box::new(service::chain_spec::westend_development_config()),
- "westend-local" => Box::new(service::chain_spec::westend_local_testnet_config()),
- "westend-staging" => Box::new(service::chain_spec::westend_staging_testnet_config()),
- path if self.run.force_kusama => {
- Box::new(service::KusamaChainSpec::from_json_file(std::path::PathBuf::from(path))?)
+ "westend-dev" => Box::new(service::chain_spec::westend_development_config()?),
+ "westend-local" => Box::new(service::chain_spec::westend_local_testnet_config()?),
+ "westend-staging" => Box::new(service::chain_spec::westend_staging_testnet_config()?),
+ path => {
+ let path = std::path::PathBuf::from(path);
+
+ let starts_with = |prefix: &str| {
+ path.file_name().map(|f| f.to_str().map(|s| s.starts_with(&prefix))).flatten().unwrap_or(false)
+ };
+
+ // When `force_*` is given or the file name starts with the name of one of the known chains,
+ // we use the chain spec for the specific chain.
+ if self.run.force_kusama || starts_with("kusama") {
+ Box::new(service::KusamaChainSpec::from_json_file(path)?)
+ } else if self.run.force_westend || starts_with("westend") {
+ Box::new(service::WestendChainSpec::from_json_file(path)?)
+ } else {
+ Box::new(service::PolkadotChainSpec::from_json_file(path)?)
+ }
},
- path if self.run.force_westend => {
- Box::new(service::WestendChainSpec::from_json_file(std::path::PathBuf::from(path))?)
- },
- path => Box::new(service::PolkadotChainSpec::from_json_file(std::path::PathBuf::from(path))?),
})
}
+
+ fn native_runtime_version(spec: &Box) -> &'static RuntimeVersion {
+ if spec.is_kusama() {
+ &service::kusama_runtime::VERSION
+ } else if spec.is_westend() {
+ &service::westend_runtime::VERSION
+ } else {
+ &service::polkadot_runtime::VERSION
+ }
+ }
}
/// Parses polkadot specific CLI arguments and run the service.
@@ -97,8 +116,8 @@ pub fn run() -> Result<()> {
match &cli.subcommand {
None => {
- let runtime = cli.create_runner(&cli.run.base)?;
- let chain_spec = &runtime.config().chain_spec;
+ let runner = cli.create_runner(&cli.run.base)?;
+ let chain_spec = &runner.config().chain_spec;
set_default_ss58_version(chain_spec);
@@ -115,87 +134,47 @@ pub fn run() -> Result<()> {
info!(" endorsed by the ");
info!(" KUSAMA FOUNDATION ");
info!("----------------------------");
-
- runtime.run_node(
- |config| {
- service::kusama_new_light(config)
- },
- |config| {
- service::kusama_new_full(
- config,
- None,
- None,
- authority_discovery_enabled,
- 6000,
- grandpa_pause,
- ).map(|(s, _, _)| s)
- },
- service::KusamaExecutor::native_version().runtime_version
- )
- } else if chain_spec.is_westend() {
- runtime.run_node(
- |config| {
- service::westend_new_light(config)
- },
- |config| {
- service::westend_new_full(
- config,
- None,
- None,
- authority_discovery_enabled,
- 6000,
- grandpa_pause,
- ).map(|(s, _, _)| s)
- },
- service::WestendExecutor::native_version().runtime_version
- )
- } else {
- runtime.run_node(
- |config| {
- service::polkadot_new_light(config)
- },
- |config| {
- service::polkadot_new_full(
- config,
- None,
- None,
- authority_discovery_enabled,
- 6000,
- grandpa_pause,
- ).map(|(s, _, _)| s)
- },
- service::PolkadotExecutor::native_version().runtime_version
- )
}
+
+ runner.run_node_until_exit(|config| {
+ let role = config.role.clone();
+
+ match role {
+ Role::Light => service::build_light(config).map(|(task_manager, _)| task_manager),
+ _ => service::build_full(
+ config,
+ None,
+ authority_discovery_enabled,
+ grandpa_pause,
+ ).map(|r| r.0),
+ }
+ })
},
Some(Subcommand::Base(subcommand)) => {
- let runtime = cli.create_runner(subcommand)?;
- let chain_spec = &runtime.config().chain_spec;
+ let runner = cli.create_runner(subcommand)?;
+ let chain_spec = &runner.config().chain_spec;
set_default_ss58_version(chain_spec);
if chain_spec.is_kusama() {
- runtime.run_subcommand(subcommand, |config|
+ runner.run_subcommand(subcommand, |config|
service::new_chain_ops::<
service::kusama_runtime::RuntimeApi,
service::KusamaExecutor,
- service::kusama_runtime::UncheckedExtrinsic,
>(config)
)
} else if chain_spec.is_westend() {
- runtime.run_subcommand(subcommand, |config|
+ runner.run_subcommand(subcommand, |config|
service::new_chain_ops::<
service::westend_runtime::RuntimeApi,
service::WestendExecutor,
- service::westend_runtime::UncheckedExtrinsic,
>(config)
)
} else {
- runtime.run_subcommand(subcommand, |config|
+ runner.run_subcommand(subcommand, |config|
service::new_chain_ops::<
service::polkadot_runtime::RuntimeApi,
service::PolkadotExecutor,
- service::polkadot_runtime::UncheckedExtrinsic,
>(config)
)
}
@@ -212,21 +191,21 @@ pub fn run() -> Result<()> {
}
},
Some(Subcommand::Benchmark(cmd)) => {
- let runtime = cli.create_runner(cmd)?;
- let chain_spec = &runtime.config().chain_spec;
+ let runner = cli.create_runner(cmd)?;
+ let chain_spec = &runner.config().chain_spec;
set_default_ss58_version(chain_spec);
if chain_spec.is_kusama() {
- runtime.sync_run(|config| {
+ runner.sync_run(|config| {
cmd.run::(config)
})
} else if chain_spec.is_westend() {
- runtime.sync_run(|config| {
+ runner.sync_run(|config| {
cmd.run::(config)
})
} else {
- runtime.sync_run(|config| {
+ runner.sync_run(|config| {
cmd.run::(config)
})
}
diff --git a/cli/src/lib.rs b/cli/src/lib.rs
index be2f3c6cd646444761e813d005ace4282ef52e19..385a24d364c8582065827054dcd6d52b88da26d8 100644
--- a/cli/src/lib.rs
+++ b/cli/src/lib.rs
@@ -28,14 +28,14 @@ mod command;
#[cfg(not(feature = "service-rewr"))]
pub use service::{
- AbstractService, ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
+ ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
Block, self, RuntimeApiCollection, TFullClient
};
#[cfg(feature = "service-rewr")]
pub use service_new::{
self as service,
- AbstractService, ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
+ ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
Block, self, RuntimeApiCollection, TFullClient
};
diff --git a/collator/Cargo.toml b/collator/Cargo.toml
deleted file mode 100644
index 2c47f20dbb8f96a2fa1fb50f2b0331011633f225..0000000000000000000000000000000000000000
--- a/collator/Cargo.toml
+++ /dev/null
@@ -1,37 +0,0 @@
-[package]
-name = "polkadot-collator"
-version = "0.8.12"
-authors = ["Parity Technologies "]
-description = "Collator node implementation"
-edition = "2018"
-
-[dependencies]
-futures = "0.3.4"
-sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sc-cli = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
-consensus_common = { package = "sp-consensus", git = "https://github.com/paritytech/substrate", branch = "master" }
-polkadot-primitives = { path = "../primitives" }
-polkadot-cli = { path = "../cli" }
-polkadot-network = { path = "../network" }
-polkadot-validation = { path = "../validation" }
-polkadot-service = { path = "../service", optional = true}
-polkadot-service-new = { path = "../node/service", optional = true }
-log = "0.4.8"
-tokio = "0.2.13"
-futures-timer = "2.0"
-codec = { package = "parity-scale-codec", version = "1.3.0" }
-
-[dev-dependencies]
-keyring = { package = "sp-keyring", git = "https://github.com/paritytech/substrate", branch = "master" }
-
-[features]
-default = ["service-old"]
-service-old = [ "polkadot-service" ]
-service-rewr = [ "polkadot-service-new" ]
diff --git a/collator/README.adoc b/collator/README.adoc
deleted file mode 100644
index d302cd2af0fe4941a6e24c136bb0df9b2be07f35..0000000000000000000000000000000000000000
--- a/collator/README.adoc
+++ /dev/null
@@ -1,5 +0,0 @@
-
-= Polkadot Collator
-
-placeholder
-//TODO Write content :) (https://github.com/paritytech/polkadot/issues/159)
diff --git a/collator/src/lib.rs b/collator/src/lib.rs
deleted file mode 100644
index 063bdb1735c8acfb5037192dd0b98453eea077e2..0000000000000000000000000000000000000000
--- a/collator/src/lib.rs
+++ /dev/null
@@ -1,513 +0,0 @@
-// Copyright 2017-2020 Parity Technologies (UK) Ltd.
-// This file is part of Polkadot.
-
-// Polkadot is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Polkadot is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Polkadot. If not, see .
-
-//! Collation node logic.
-//!
-//! A collator node lives on a distinct parachain and submits a proposal for
-//! a state transition, along with a proof for its validity
-//! (what we might call a witness or block data).
-//!
-//! One of collators' other roles is to route messages between chains.
-//! Each parachain produces a list of "egress" posts of messages for each other
-//! parachain on each block, for a total of N^2 lists all together.
-//!
-//! We will refer to the egress list at relay chain block X of parachain A with
-//! destination B as egress(X)[A -> B]
-//!
-//! On every block, each parachain will be intended to route messages from some
-//! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3)
-//!
-//! Since the egress information is unique to every block, when routing from a
-//! parachain a collator must gather all egress posts from that parachain
-//! up to the last point in history that messages were successfully routed
-//! from that parachain, accounting for relay chain blocks where no candidate
-//! from the collator's parachain was produced.
-//!
-//! In the case that all parachains route to each other and a candidate for the
-//! collator's parachain was included in the last relay chain block, the collator
-//! only has to gather egress posts from other parachains one block back in relay
-//! chain history.
-//!
-//! This crate defines traits which provide context necessary for collation logic
-//! to be performed, as the collation logic itself.
-
-use std::collections::HashSet;
-use std::fmt;
-use std::sync::Arc;
-use std::time::Duration;
-use std::pin::Pin;
-
-use futures::{future, Future, Stream, FutureExt, StreamExt, task::Spawn};
-use log::warn;
-use sc_client_api::{StateBackend, BlockchainEvents};
-use sp_blockchain::HeaderBackend;
-use sp_core::Pair;
-use polkadot_primitives::{
- BlockId, Hash, Block,
- parachain::{
- self, BlockData, DutyRoster, HeadData, Id as ParaId,
- PoVBlock, ValidatorId, CollatorPair, LocalValidationData, GlobalValidationSchedule,
- }
-};
-use polkadot_cli::{
- ProvideRuntimeApi, AbstractService, ParachainHost, IdentifyVariant,
- service::{self, Role}
-};
-pub use polkadot_cli::service::Configuration;
-pub use polkadot_cli::Cli;
-pub use polkadot_validation::SignedStatement;
-pub use polkadot_primitives::parachain::CollatorId;
-pub use sc_network::PeerId;
-pub use service::RuntimeApiCollection;
-pub use sc_cli::SubstrateCli;
-use sp_api::{ConstructRuntimeApi, ApiExt, HashFor};
-#[cfg(not(feature = "service-rewr"))]
-use polkadot_service::{FullNodeHandles, PolkadotClient};
-#[cfg(feature = "service-rewr")]
-use polkadot_service_new::{
- self as polkadot_service,
- Error as ServiceError, FullNodeHandles, PolkadotClient,
-};
-
-const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
-
-/// An abstraction over the `Network` with useful functions for a `Collator`.
-pub trait Network: Send + Sync {
- /// Create a `Stream` of checked statements for the given `relay_parent`.
- ///
- /// The returned stream will not terminate, so it is required to make sure that the stream is
- /// dropped when it is not required anymore. Otherwise, it will stick around in memory
- /// infinitely.
- fn checked_statements(&self, relay_parent: Hash) -> Pin + Send>>;
-}
-
-impl Network for polkadot_network::protocol::Service {
- fn checked_statements(&self, relay_parent: Hash) -> Pin + Send>> {
- polkadot_network::protocol::Service::checked_statements(self, relay_parent).boxed()
- }
-}
-
-/// Collation errors.
-#[derive(Debug)]
-pub enum Error {
- /// Error on the relay-chain side of things.
- Polkadot(String),
-}
-
-impl fmt::Display for Error {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match *self {
- Error::Polkadot(ref err) => write!(f, "Polkadot node error: {}", err),
- }
- }
-}
-
-/// Something that can build a `ParachainContext`.
-pub trait BuildParachainContext {
- /// The parachain context produced by the `build` function.
- type ParachainContext: self::ParachainContext;
-
- /// Build the `ParachainContext`.
- fn build(
- self,
- client: Arc,
- spawner: SP,
- network: impl Network + Clone + 'static,
- ) -> Result
- where
- Client: ProvideRuntimeApi + HeaderBackend + BlockchainEvents + Send + Sync + 'static,
- Client::Api: RuntimeApiCollection,
- >::StateBackend: StateBackend>,
- Extrinsic: codec::Codec + Send + Sync + 'static,
- SP: Spawn + Clone + Send + Sync + 'static;
-}
-
-/// Parachain context needed for collation.
-///
-/// This can be implemented through an externally attached service or a stub.
-/// This is expected to be a lightweight, shared type like an Arc.
-pub trait ParachainContext: Clone {
- type ProduceCandidate: Future>;
-
- /// Produce a candidate, given the relay parent hash, the latest ingress queue information
- /// and the last parachain head.
- fn produce_candidate(
- &mut self,
- relay_parent: Hash,
- global_validation: GlobalValidationSchedule,
- local_validation: LocalValidationData,
- ) -> Self::ProduceCandidate;
-}
-
-/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
-pub async fn collate(
- relay_parent: Hash,
- local_id: ParaId,
- global_validation: GlobalValidationSchedule,
- local_validation_data: LocalValidationData,
- mut para_context: P,
- key: Arc,
-) -> Option
- where
- P: ParachainContext,
- P::ProduceCandidate: Send,
-{
- let (block_data, head_data) = para_context.produce_candidate(
- relay_parent,
- global_validation,
- local_validation_data,
- ).await?;
-
- let pov_block = PoVBlock {
- block_data,
- };
-
- let pov_block_hash = pov_block.hash();
- let signature = key.sign(¶chain::collator_signature_payload(
- &relay_parent,
- &local_id,
- &pov_block_hash,
- ));
-
- let info = parachain::CollationInfo {
- parachain_index: local_id,
- relay_parent,
- collator: key.public(),
- signature,
- head_data,
- pov_block_hash,
- };
-
- let collation = parachain::Collation {
- info,
- pov: pov_block,
- };
-
- Some(collation)
-}
-
-#[cfg(feature = "service-rewr")]
-fn build_collator_service(
- _spawner: SP,
- _handles: FullNodeHandles,
- _client: Arc,
- _para_id: ParaId,
- _key: Arc,
- _build_parachain_context: P,
-) -> Result, polkadot_service::Error>
- where
- C: PolkadotClient<
- service::Block,
- service::TFullBackend,
- R
- > + 'static,
- R: ConstructRuntimeApi + Sync + Send,
- >::RuntimeApi:
- sp_api::ApiExt<
- service::Block,
- StateBackend = as service::Backend>::State,
- >
- + RuntimeApiCollection<
- Extrinsic,
- StateBackend = as service::Backend>::State,
- >
- + Sync + Send,
- P: BuildParachainContext,
- P::ParachainContext: Send + 'static,
- ::ProduceCandidate: Send,
- Extrinsic: service::Codec + Send + Sync + 'static,
- SP: Spawn + Clone + Send + Sync + 'static,
-{
- Err("Collator is not functional with the new service yet".into())
-}
-
-
-#[cfg(not(feature = "service-rewr"))]
-fn build_collator_service(
- spawner: SP,
- handles: FullNodeHandles,
- client: Arc,
- para_id: ParaId,
- key: Arc,
- build_parachain_context: P,
-) -> Result + Send + 'static, polkadot_service::Error>
- where
- C: PolkadotClient<
- service::Block,
- service::TFullBackend,
- R
- > + 'static,
- R: ConstructRuntimeApi + Sync + Send,
- >::RuntimeApi:
- sp_api::ApiExt<
- service::Block,
- StateBackend = as service::Backend>::State,
- >
- + RuntimeApiCollection<
- Extrinsic,
- StateBackend = as service::Backend>::State,
- >
- + Sync + Send,
- P: BuildParachainContext,
- P::ParachainContext: Send + 'static,
- ::ProduceCandidate: Send,
- Extrinsic: service::Codec + Send + Sync + 'static,
- SP: Spawn + Clone + Send + Sync + 'static,
-{
- let polkadot_network = handles.polkadot_network
- .ok_or_else(|| "Collator cannot run when Polkadot-specific networking has not been started")?;
-
- // We don't require this here, but we need to make sure that the validation service is started.
- // This service makes sure the collator is joining the correct gossip topics and receives the appropiate
- // messages.
- handles.validation_service_handle
- .ok_or_else(|| "Collator cannot run when validation networking has not been started")?;
-
- let parachain_context = match build_parachain_context.build(
- client.clone(),
- spawner,
- polkadot_network.clone(),
- ) {
- Ok(ctx) => ctx,
- Err(()) => {
- return Err("Could not build the parachain context!".into())
- }
- };
-
- let work = async move {
- let mut notification_stream = client.import_notification_stream();
-
- while let Some(notification) = notification_stream.next().await {
- macro_rules! try_fr {
- ($e:expr) => {
- match $e {
- Ok(x) => x,
- Err(e) => return future::Either::Left(future::err(Error::Polkadot(
- format!("{:?}", e)
- ))),
- }
- }
- }
-
- let relay_parent = notification.hash;
- let id = BlockId::hash(relay_parent);
-
- let network = polkadot_network.clone();
- let client = client.clone();
- let key = key.clone();
- let parachain_context = parachain_context.clone();
-
- let work = future::lazy(move |_| {
- let api = client.runtime_api();
- let global_validation = try_fr!(api.global_validation_schedule(&id));
- let local_validation = match try_fr!(api.local_validation_data(&id, para_id)) {
- Some(local_validation) => local_validation,
- None => return future::Either::Left(future::ok(())),
- };
-
- let validators = try_fr!(api.validators(&id));
-
- let targets = compute_targets(
- para_id,
- validators.as_slice(),
- try_fr!(api.duty_roster(&id)),
- );
-
- let collation_work = collate(
- relay_parent,
- para_id,
- global_validation,
- local_validation,
- parachain_context,
- key,
- ).map(move |collation| {
- match collation {
- Some(collation) => network.distribute_collation(targets, collation),
- None => log::trace!("Skipping collation as `collate` returned `None`"),
- }
-
- Ok(())
- });
-
- future::Either::Right(collation_work)
- });
-
- let deadlined = future::select(
- work.then(|f| f).boxed(),
- futures_timer::Delay::new(COLLATION_TIMEOUT)
- );
-
- let silenced = deadlined
- .map(|either| {
- if let future::Either::Right(_) = either {
- warn!("Collation failure: timeout");
- }
- });
-
- let future = silenced.map(drop);
-
- tokio::spawn(future);
- }
- }.boxed();
-
- Ok(work)
-}
-
-/// Async function that will run the collator node with the given `RelayChainContext` and `ParachainContext`
-/// built by the given `BuildParachainContext` and arguments to the underlying polkadot node.
-pub async fn start_collator(
- build_parachain_context: P,
- para_id: ParaId,
- key: Arc,
- config: Configuration,
-) -> Result<(), polkadot_service::Error>
-where
- P: 'static + BuildParachainContext,
- P::ParachainContext: Send + 'static,
- ::ProduceCandidate: Send,
-{
- if matches!(config.role, Role::Light) {
- return Err(
- polkadot_service::Error::Other("light nodes are unsupported as collator".into())
- .into());
- }
-
- if config.chain_spec.is_kusama() {
- let (service, client, handlers) = service::kusama_new_full(
- config,
- Some((key.public(), para_id)),
- None,
- false,
- 6000,
- None,
- )?;
- let spawn_handle = service.spawn_task_handle();
- build_collator_service(
- spawn_handle,
- handlers,
- client,
- para_id,
- key,
- build_parachain_context
- )?.await;
- } else if config.chain_spec.is_westend() {
- let (service, client, handlers) = service::westend_new_full(
- config,
- Some((key.public(), para_id)),
- None,
- false,
- 6000,
- None,
- )?;
- let spawn_handle = service.spawn_task_handle();
- build_collator_service(
- spawn_handle,
- handlers,
- client,
- para_id,
- key,
- build_parachain_context
- )?.await;
- } else {
- let (service, client, handles) = service::polkadot_new_full(
- config,
- Some((key.public(), para_id)),
- None,
- false,
- 6000,
- None,
- )?;
- let spawn_handle = service.spawn_task_handle();
- build_collator_service(
- spawn_handle,
- handles,
- client,
- para_id,
- key,
- build_parachain_context,
- )?.await;
- }
-
- Ok(())
-}
-
-#[cfg(not(feature = "service-rewr"))]
-fn compute_targets(para_id: ParaId, session_keys: &[ValidatorId], roster: DutyRoster) -> HashSet {
- use polkadot_primitives::parachain::Chain;
-
- roster.validator_duty.iter().enumerate()
- .filter(|&(_, c)| c == &Chain::Parachain(para_id))
- .filter_map(|(i, _)| session_keys.get(i))
- .cloned()
- .collect()
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[derive(Clone)]
- struct DummyParachainContext;
-
- impl ParachainContext for DummyParachainContext {
- type ProduceCandidate = future::Ready>;
-
- fn produce_candidate(
- &mut self,
- _relay_parent: Hash,
- _global: GlobalValidationSchedule,
- _local_validation: LocalValidationData,
- ) -> Self::ProduceCandidate {
- // send messages right back.
- future::ready(Some((
- BlockData(vec![1, 2, 3, 4, 5,]),
- HeadData(vec![9, 9, 9]),
- )))
- }
- }
-
- struct BuildDummyParachainContext;
-
- impl BuildParachainContext for BuildDummyParachainContext {
- type ParachainContext = DummyParachainContext;
-
- fn build(
- self,
- _: Arc,
- _: SP,
- _: impl Network + Clone + 'static,
- ) -> Result {
- Ok(DummyParachainContext)
- }
- }
-
- // Make sure that the future returned by `start_collator` implements `Send`.
- #[test]
- fn start_collator_is_send() {
- fn check_send(_: T) {}
-
- let cli = Cli::from_iter(&["-dev"]);
- let task_executor = |_, _| unimplemented!();
- let config = cli.create_configuration(&cli.run.base, task_executor.into()).unwrap();
-
- check_send(start_collator(
- BuildDummyParachainContext,
- 0.into(),
- Arc::new(CollatorPair::generate().0),
- config,
- ));
- }
-}
diff --git a/core-primitives/Cargo.toml b/core-primitives/Cargo.toml
new file mode 100644
index 0000000000000000000000000000000000000000..cd4dac0f187d1cf4ced8ba938b48521fc3bf2669
--- /dev/null
+++ b/core-primitives/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "polkadot-core-primitives"
+version = "0.7.30"
+authors = ["Parity Technologies "]
+edition = "2018"
+
+[dependencies]
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
+sp-std = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
+sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
+codec = { package = "parity-scale-codec", version = "1.3.4", default-features = false, features = [ "derive" ] }
+
+[features]
+default = [ "std" ]
+std = [
+ "sp-core/std",
+ "sp-runtime/std",
+ "sp-std/std",
+ "codec/std",
+]
diff --git a/core-primitives/src/lib.rs b/core-primitives/src/lib.rs
new file mode 100644
index 0000000000000000000000000000000000000000..ffb346467d9e53c0c64ebca4438a9260492fdd8e
--- /dev/null
+++ b/core-primitives/src/lib.rs
@@ -0,0 +1,101 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+#![cfg_attr(not(feature = "std"), no_std)]
+
+//! Core Polkadot types.
+//!
+//! These core Polkadot types are used by the relay chain and the Parachains.
+
+use sp_runtime::{generic, MultiSignature, traits::{Verify, BlakeTwo256, IdentifyAccount}};
+
+/// The block number type used by Polkadot.
+/// 32-bits will allow for 136 years of blocks assuming 1 block per second.
+pub type BlockNumber = u32;
+
+/// An instant or duration in time.
+pub type Moment = u64;
+
+/// Alias to type for a signature for a transaction on the relay chain. This allows one of several
+/// kinds of underlying crypto to be used, so isn't a fixed size when encoded.
+pub type Signature = MultiSignature;
+
+/// Alias to the public key used for this chain, actually a `MultiSigner`. Like the signature, this
+/// also isn't a fixed size when encoded, as different cryptos have different size public keys.
+pub type AccountPublic = ::Signer;
+
+/// Alias to the opaque account ID type for this chain, actually a `AccountId32`. This is always
+/// 32 bytes.
+pub type AccountId = ::AccountId;
+
+/// The type for looking up accounts. We don't expect more than 4 billion of them.
+pub type AccountIndex = u32;
+
+/// Identifier for a chain. 32-bit should be plenty.
+pub type ChainId = u32;
+
+/// A hash of some data used by the relay chain.
+pub type Hash = sp_core::H256;
+
+/// Index of a transaction in the relay chain. 32-bit should be plenty.
+pub type Nonce = u32;
+
+/// The balance of an account.
+/// 128-bits (or 38 significant decimal figures) will allow for 10m currency (10^7) at a resolution
+/// to all for one second's worth of an annualised 50% reward be paid to a unit holder (10^11 unit
+/// denomination), or 10^18 total atomic units, to grow at 50%/year for 51 years (10^9 multiplier)
+/// for an eventual total of 10^27 units (27 significant decimal figures).
+/// We round denomination to 10^12 (12 sdf), and leave the other redundancy at the upper end so
+/// that 32 bits may be multiplied with a balance in 128 bits without worrying about overflow.
+pub type Balance = u128;
+
+/// Header type.
+pub type Header = generic::Header;
+/// Block type.
+pub type Block = generic::Block;
+/// Block ID.
+pub type BlockId = generic::BlockId;
+
+/// Opaque, encoded, unchecked extrinsic.
+pub use sp_runtime::OpaqueExtrinsic as UncheckedExtrinsic;
+
+/// The information that goes alongside a transfer_into_parachain operation. Entirely opaque, it
+/// will generally be used for identifying the reason for the transfer. Typically it will hold the
+/// destination account to which the transfer should be credited. If still more information is
+/// needed, then this should be a hash with the pre-image presented via an off-chain mechanism on
+/// the parachain.
+pub type Remark = [u8; 32];
+
+/// These are special "control" messages that can be passed from the Relaychain to a parachain.
+/// They should be handled by all parachains.
+#[derive(codec::Encode, codec::Decode, Clone, sp_runtime::RuntimeDebug, PartialEq)]
+pub enum DownwardMessage {
+ /// Some funds were transferred into the parachain's account. The hash is the identifier that
+ /// was given with the transfer.
+ TransferInto(AccountId, Balance, Remark),
+ /// An opaque blob of data. The relay chain must somehow know how to form this so that the
+ /// destination parachain does something sensible.
+ ///
+ /// NOTE: Be very careful not to allow users to place arbitrary size information in here.
+ Opaque(sp_std::vec::Vec),
+ /// XCMP message for the Parachain.
+ XCMPMessage(sp_std::vec::Vec),
+}
+
+/// V1 primitives.
+pub mod v1 {
+ pub use super::*;
+}
diff --git a/erasure-coding/Cargo.toml b/erasure-coding/Cargo.toml
index 6368a1787b11d71ce77dfb34e2358c388c03402f..1965150ca67abef411509179466d50f7ffb964d1 100644
--- a/erasure-coding/Cargo.toml
+++ b/erasure-coding/Cargo.toml
@@ -1,13 +1,13 @@
[package]
name = "polkadot-erasure-coding"
-version = "0.8.12"
+version = "0.8.22"
authors = ["Parity Technologies "]
edition = "2018"
[dependencies]
primitives = { package = "polkadot-primitives", path = "../primitives" }
reed_solomon = { package = "reed-solomon-erasure", version = "4.0.2"}
-codec = { package = "parity-scale-codec", version = "1.3.0", default-features = false, features = ["derive"] }
+codec = { package = "parity-scale-codec", version = "1.3.4", default-features = false, features = ["derive"] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
trie = { package = "sp-trie", git = "https://github.com/paritytech/substrate", branch = "master" }
derive_more = "0.15.0"
diff --git a/erasure-coding/src/lib.rs b/erasure-coding/src/lib.rs
index 98a2776d8848a543b31e170e9cb3f56ff4906929..708a167d627675abaf3ec13befed01400bce219c 100644
--- a/erasure-coding/src/lib.rs
+++ b/erasure-coding/src/lib.rs
@@ -26,8 +26,8 @@
use codec::{Encode, Decode};
use reed_solomon::galois_16::{self, ReedSolomon};
-use primitives::{Hash as H256, BlakeTwo256, HashT};
-use primitives::parachain::AvailableData;
+use primitives::v0::{self, Hash as H256, BlakeTwo256, HashT};
+use primitives::v1;
use sp_core::Blake2Hasher;
use trie::{EMPTY_PREFIX, MemoryDB, Trie, TrieMut, trie_types::{TrieDBMut, TrieDB}};
@@ -124,14 +124,32 @@ fn code_params(n_validators: usize) -> Result {
})
}
+/// Obtain erasure-coded chunks for v0 `AvailableData`, one for each validator.
+///
+/// Works only up to 65536 validators, and `n_validators` must be non-zero.
+pub fn obtain_chunks_v0(n_validators: usize, data: &v0::AvailableData)
+ -> Result>, Error>
+{
+ obtain_chunks(n_validators, data)
+}
+
+/// Obtain erasure-coded chunks for v1 `AvailableData`, one for each validator.
+///
+/// Works only up to 65536 validators, and `n_validators` must be non-zero.
+pub fn obtain_chunks_v1(n_validators: usize, data: &v1::AvailableData)
+ -> Result>, Error>
+{
+ obtain_chunks(n_validators, data)
+}
+
/// Obtain erasure-coded chunks, one for each validator.
///
/// Works only up to 65536 validators, and `n_validators` must be non-zero.
-pub fn obtain_chunks(n_validators: usize, available_data: &AvailableData)
+fn obtain_chunks(n_validators: usize, data: &T)
-> Result>, Error>
{
let params = code_params(n_validators)?;
- let encoded = available_data.encode();
+ let encoded = data.encode();
if encoded.is_empty() {
return Err(Error::BadPayload);
@@ -145,15 +163,42 @@ pub fn obtain_chunks(n_validators: usize, available_data: &AvailableData)
Ok(shards.into_iter().map(|w| w.into_inner()).collect())
}
-/// Reconstruct the block data from a set of chunks.
+/// Reconstruct the v0 available data from a set of chunks.
+///
+/// Provide an iterator containing chunk data and the corresponding index.
+/// The indices of the present chunks must be indicated. If too few chunks
+/// are provided, recovery is not possible.
+///
+/// Works only up to 65536 validators, and `n_validators` must be non-zero.
+pub fn reconstruct_v0<'a, I: 'a>(n_validators: usize, chunks: I)
+ -> Result
+ where I: IntoIterator-
+{
+ reconstruct(n_validators, chunks)
+}
+
+/// Reconstruct the v1 available data from a set of chunks.
+///
+/// Provide an iterator containing chunk data and the corresponding index.
+/// The indices of the present chunks must be indicated. If too few chunks
+/// are provided, recovery is not possible.
+///
+/// Works only up to 65536 validators, and `n_validators` must be non-zero.
+pub fn reconstruct_v1<'a, I: 'a>(n_validators: usize, chunks: I)
+ -> Result
+ where I: IntoIterator-
+{
+ reconstruct(n_validators, chunks)
+}
+
+/// Reconstruct decodable data from a set of chunks.
///
/// Provide an iterator containing chunk data and the corresponding index.
/// The indices of the present chunks must be indicated. If too few chunks
/// are provided, recovery is not possible.
///
/// Works only up to 65536 validators, and `n_validators` must be non-zero.
-pub fn reconstruct<'a, I: 'a>(n_validators: usize, chunks: I)
- -> Result
+fn reconstruct<'a, I: 'a, T: Decode>(n_validators: usize, chunks: I) -> Result
where I: IntoIterator-
{
let params = code_params(n_validators)?;
@@ -343,7 +388,7 @@ impl<'a, I: Iterator
- > codec::Input for ShardInput<'a, I> {
#[cfg(test)]
mod tests {
use super::*;
- use primitives::parachain::{BlockData, PoVBlock};
+ use primitives::v0::{AvailableData, BlockData, PoVBlock};
#[test]
fn field_order_is_right_size() {
@@ -420,7 +465,7 @@ mod tests {
assert_eq!(chunks.len(), 10);
// any 4 chunks should work.
- let reconstructed = reconstruct(
+ let reconstructed: AvailableData = reconstruct(
10,
[
(&*chunks[1], 1),
diff --git a/network/Cargo.toml b/network/Cargo.toml
deleted file mode 100644
index f3ca97dcdadceb5da30f881b714dcce675a256c5..0000000000000000000000000000000000000000
--- a/network/Cargo.toml
+++ /dev/null
@@ -1,32 +0,0 @@
-[package]
-name = "polkadot-network"
-version = "0.8.12"
-authors = ["Parity Technologies
"]
-description = "Polkadot-specific networking protocol"
-edition = "2018"
-
-[dependencies]
-arrayvec = "0.4.12"
-bytes = "0.5"
-parking_lot = "0.9.0"
-derive_more = "0.14.1"
-av_store = { package = "polkadot-availability-store", path = "../availability-store" }
-polkadot-validation = { path = "../validation" }
-polkadot-primitives = { path = "../primitives" }
-polkadot-erasure-coding = { path = "../erasure-coding" }
-codec = { package = "parity-scale-codec", version = "1.3.0", default-features = false, features = ["derive"] }
-sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sc-network-gossip = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
-futures = "0.3.4"
-log = "0.4.8"
-exit-future = "0.2.0"
-futures-timer = "2.0"
-sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
-wasm-timer = "0.2.4"
-
-[dev-dependencies]
-sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
-sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" }
diff --git a/network/README.adoc b/network/README.adoc
deleted file mode 100644
index 25b3e003d5b0a4688ddb73a2f59324802c5cc5b7..0000000000000000000000000000000000000000
--- a/network/README.adoc
+++ /dev/null
@@ -1,5 +0,0 @@
-
-= Polkadot Network
-
-placeholder
-//TODO Write content :) (https://github.com/paritytech/polkadot/issues/159)
diff --git a/network/src/legacy/collator_pool.rs b/network/src/legacy/collator_pool.rs
deleted file mode 100644
index a0c0a0458e908eeb396883aa80304a29a47095ab..0000000000000000000000000000000000000000
--- a/network/src/legacy/collator_pool.rs
+++ /dev/null
@@ -1,332 +0,0 @@
-// Copyright 2018-2020 Parity Technologies (UK) Ltd.
-// This file is part of Polkadot.
-
-// Polkadot is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Polkadot is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Polkadot. If not, see .
-
-//! Bridge between the network and consensus service for getting collations to it.
-
-use codec::{Encode, Decode};
-use polkadot_primitives::Hash;
-use polkadot_primitives::parachain::{CollatorId, Id as ParaId, Collation};
-use sc_network::PeerId;
-use futures::channel::oneshot;
-
-use std::collections::hash_map::{HashMap, Entry};
-use std::time::Duration;
-use wasm_timer::Instant;
-
-const COLLATION_LIFETIME: Duration = Duration::from_secs(60 * 5);
-
-/// The role of the collator. Whether they're the primary or backup for this parachain.
-#[derive(PartialEq, Debug, Clone, Copy, Encode, Decode)]
-pub enum Role {
- /// Primary collators should send collations whenever it's time.
- Primary = 0,
- /// Backup collators should not.
- Backup = 1,
-}
-
-/// A maintenance action for the collator set.
-#[derive(PartialEq, Debug)]
-#[allow(dead_code)]
-pub enum Action {
- /// Disconnect the given collator.
- Disconnect(CollatorId),
- /// Give the collator a new role.
- NewRole(CollatorId, Role),
-}
-
-struct CollationSlot {
- live_at: Instant,
- entries: SlotEntries,
-}
-
-impl CollationSlot {
- fn blank_now() -> Self {
- CollationSlot {
- live_at: Instant::now(),
- entries: SlotEntries::Blank,
- }
- }
-
- fn stay_alive(&self, now: Instant) -> bool {
- self.live_at + COLLATION_LIFETIME > now
- }
-}
-
-#[derive(Debug)]
-enum SlotEntries {
- Blank,
- // not queried yet
- Pending(Vec),
- // waiting for next to arrive.
- Awaiting(Vec>),
-}
-
-impl SlotEntries {
- fn received_collation(&mut self, collation: Collation) {
- *self = match std::mem::replace(self, SlotEntries::Blank) {
- SlotEntries::Blank => SlotEntries::Pending(vec![collation]),
- SlotEntries::Pending(mut cs) => {
- cs.push(collation);
- SlotEntries::Pending(cs)
- }
- SlotEntries::Awaiting(senders) => {
- for sender in senders {
- let _ = sender.send(collation.clone());
- }
-
- SlotEntries::Blank
- }
- };
- }
-
- fn await_with(&mut self, sender: oneshot::Sender) {
- *self = match ::std::mem::replace(self, SlotEntries::Blank) {
- SlotEntries::Blank => SlotEntries::Awaiting(vec![sender]),
- SlotEntries::Awaiting(mut senders) => {
- senders.push(sender);
- SlotEntries::Awaiting(senders)
- }
- SlotEntries::Pending(mut cs) => {
- let next_collation = cs.pop().expect("empty variant is always `Blank`; qed");
- let _ = sender.send(next_collation);
-
- if cs.is_empty() {
- SlotEntries::Blank
- } else {
- SlotEntries::Pending(cs)
- }
- }
- };
- }
-}
-
-struct ParachainCollators {
- primary: CollatorId,
- backup: Vec,
-}
-
-/// Manages connected collators and role assignments from the perspective of a validator.
-#[derive(Default)]
-pub struct CollatorPool {
- collators: HashMap,
- parachain_collators: HashMap,
- collations: HashMap<(Hash, ParaId), CollationSlot>,
-}
-
-impl CollatorPool {
- /// Create a new `CollatorPool` object.
- pub fn new() -> Self {
- CollatorPool {
- collators: HashMap::new(),
- parachain_collators: HashMap::new(),
- collations: HashMap::new(),
- }
- }
-
- /// Call when a new collator is authenticated. Returns the role.
- pub fn on_new_collator(&mut self, collator_id: CollatorId, para_id: ParaId, peer_id: PeerId) -> Role {
- self.collators.insert(collator_id.clone(), (para_id, peer_id));
- match self.parachain_collators.entry(para_id) {
- Entry::Vacant(vacant) => {
- vacant.insert(ParachainCollators {
- primary: collator_id,
- backup: Vec::new(),
- });
-
- Role::Primary
- },
- Entry::Occupied(mut occupied) => {
- occupied.get_mut().backup.push(collator_id);
-
- Role::Backup
- }
- }
- }
-
- /// Called when a collator disconnects. If it was the primary, returns a new primary for that
- /// parachain.
- pub fn on_disconnect(&mut self, collator_id: CollatorId) -> Option {
- self.collators.remove(&collator_id).and_then(|(para_id, _)| match self.parachain_collators.entry(para_id) {
- Entry::Vacant(_) => None,
- Entry::Occupied(mut occ) => {
- if occ.get().primary == collator_id {
- if occ.get().backup.is_empty() {
- occ.remove();
- None
- } else {
- let mut collators = occ.get_mut();
- collators.primary = collators.backup.pop().expect("backup non-empty; qed");
- Some(collators.primary.clone())
- }
- } else {
- let pos = occ.get().backup.iter().position(|a| a == &collator_id)
- .expect("registered collator always present in backup if not primary; qed");
-
- occ.get_mut().backup.remove(pos);
- None
- }
- }
- })
- }
-
- /// Called when a collation is received.
- /// The collator should be registered for the parachain of the collation as a precondition of this function.
- /// The collation should have been checked for integrity of signature before passing to this function.
- pub fn on_collation(&mut self, collator_id: CollatorId, relay_parent: Hash, collation: Collation) {
- log::debug!(
- target: "collator-pool", "On collation from collator {} for relay parent {}",
- collator_id,
- relay_parent,
- );
-
- if let Some((para_id, _)) = self.collators.get(&collator_id) {
- debug_assert_eq!(para_id, &collation.info.parachain_index);
-
- // TODO: punish if not primary? (https://github.com/paritytech/polkadot/issues/213)
-
- self.collations.entry((relay_parent, para_id.clone()))
- .or_insert_with(CollationSlot::blank_now)
- .entries
- .received_collation(collation);
- }
- }
-
- /// Wait for a collation from a parachain.
- pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender) {
- self.collations.entry((relay_parent, para_id))
- .or_insert_with(CollationSlot::blank_now)
- .entries
- .await_with(sender);
- }
-
- /// Call periodically to perform collator set maintenance.
- /// Returns a set of actions to perform on the network level.
- pub fn maintain_peers(&mut self) -> Vec {
- // TODO: rearrange periodically to new primary, evaluate based on latency etc.
- // https://github.com/paritytech/polkadot/issues/214
- Vec::new()
- }
-
- /// called when a block with given hash has been imported.
- pub fn collect_garbage(&mut self, chain_head: Option<&Hash>) {
- let now = Instant::now();
- self.collations.retain(|&(ref h, _), slot| chain_head != Some(h) && slot.stay_alive(now));
- }
-
- /// Convert the given `CollatorId` to a `PeerId`.
- pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> {
- self.collators.get(collator_id).map(|ids| &ids.1)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use sp_core::crypto::UncheckedInto;
- use polkadot_primitives::parachain::{CollationInfo, BlockData, PoVBlock};
- use futures::executor::block_on;
-
- fn make_pov(block_data: Vec) -> PoVBlock {
- PoVBlock {
- block_data: BlockData(block_data),
- }
- }
-
- #[test]
- fn disconnect_primary_gives_new_primary() {
- let mut pool = CollatorPool::new();
- let para_id: ParaId = 5.into();
- let bad_primary: CollatorId = [0; 32].unchecked_into();
- let good_backup: CollatorId = [1; 32].unchecked_into();
-
- assert_eq!(pool.on_new_collator(bad_primary.clone(), para_id.clone(), PeerId::random()), Role::Primary);
- assert_eq!(pool.on_new_collator(good_backup.clone(), para_id.clone(), PeerId::random()), Role::Backup);
- assert_eq!(pool.on_disconnect(bad_primary), Some(good_backup.clone()));
- assert_eq!(pool.on_disconnect(good_backup), None);
- }
-
- #[test]
- fn disconnect_backup_removes_from_pool() {
- let mut pool = CollatorPool::new();
- let para_id: ParaId = 5.into();
- let primary = [0; 32].unchecked_into();
- let backup: CollatorId = [1; 32].unchecked_into();
-
- assert_eq!(pool.on_new_collator(primary, para_id.clone(), PeerId::random()), Role::Primary);
- assert_eq!(pool.on_new_collator(backup.clone(), para_id.clone(), PeerId::random()), Role::Backup);
- assert_eq!(pool.on_disconnect(backup), None);
- assert!(pool.parachain_collators.get(¶_id).unwrap().backup.is_empty());
- }
-
- #[test]
- fn await_before_collation() {
- let mut pool = CollatorPool::new();
- let para_id: ParaId = 5.into();
- let peer_id = PeerId::random();
- let primary: CollatorId = [0; 32].unchecked_into();
- let relay_parent = [1; 32].into();
-
- assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone(), peer_id.clone()), Role::Primary);
- let (tx1, rx1) = oneshot::channel();
- let (tx2, rx2) = oneshot::channel();
- pool.await_collation(relay_parent, para_id, tx1);
- pool.await_collation(relay_parent, para_id, tx2);
- let mut collation_info = CollationInfo::default();
- collation_info.parachain_index = para_id;
- collation_info.collator = primary.clone().into();
- pool.on_collation(primary.clone(), relay_parent, Collation {
- info: collation_info,
- pov: make_pov(vec![4, 5, 6]),
- });
-
- block_on(rx1).unwrap();
- block_on(rx2).unwrap();
- assert_eq!(pool.collators.get(&primary).map(|ids| &ids.1).unwrap(), &peer_id);
- }
-
- #[test]
- fn collate_before_await() {
- let mut pool = CollatorPool::new();
- let para_id: ParaId = 5.into();
- let primary: CollatorId = [0; 32].unchecked_into();
- let relay_parent = [1; 32].into();
-
- assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone(), PeerId::random()), Role::Primary);
-
- let mut collation_info = CollationInfo::default();
- collation_info.parachain_index = para_id;
- collation_info.collator = primary.clone();
- pool.on_collation(primary.clone(), relay_parent, Collation {
- info: collation_info,
- pov: make_pov(vec![4, 5, 6]),
- });
-
- let (tx, rx) = oneshot::channel();
- pool.await_collation(relay_parent, para_id, tx);
- block_on(rx).unwrap();
- }
-
- #[test]
- fn slot_stay_alive() {
- let slot = CollationSlot::blank_now();
- let now = slot.live_at;
-
- assert!(slot.stay_alive(now));
- assert!(slot.stay_alive(now + Duration::from_secs(10)));
- assert!(!slot.stay_alive(now + COLLATION_LIFETIME));
- assert!(!slot.stay_alive(now + COLLATION_LIFETIME + Duration::from_secs(10)));
- }
-}
diff --git a/network/src/legacy/gossip/attestation.rs b/network/src/legacy/gossip/attestation.rs
deleted file mode 100644
index a47f75288bf40220bcc639003d30c38ab1e492fb..0000000000000000000000000000000000000000
--- a/network/src/legacy/gossip/attestation.rs
+++ /dev/null
@@ -1,347 +0,0 @@
-// Copyright 2019-2020 Parity Technologies (UK) Ltd.
-// This file is part of Polkadot.
-
-// Polkadot is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Polkadot is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Polkadot. If not, see .
-
-//! Gossip messages and structures for dealing with attestations (statements of
-//! validity of invalidity on parachain candidates).
-//!
-//! This follows the same principles as other gossip modules (see parent
-//! documentation for more details) by being aware of our current chain
-//! heads and accepting only information relative to them. Attestations are localized to
-//! relay chain head, so this is easily doable.
-//!
-//! This module also provides a filter, so we can only broadcast messages to
-//! peers that are relevant to chain heads they have advertised.
-//!
-//! Furthermore, since attestations are bottlenecked by the `Candidate` statement,
-//! we only accept attestations which are themselves `Candidate` messages, or reference
-//! a `Candidate` we are aware of. Otherwise, it is possible we could be forced to
-//! consider an infinite amount of attestations produced by a misbehaving validator.
-
-use sc_network_gossip::{ValidationResult as GossipValidationResult};
-use sc_network::ReputationChange;
-use polkadot_validation::GenericStatement;
-use polkadot_primitives::Hash;
-
-use std::collections::HashMap;
-
-use log::warn;
-
-use super::{
- cost, benefit, attestation_topic, MAX_CHAIN_HEADS, LeavesVec,
- ChainContext, Known, MessageValidationData, GossipStatement,
-};
-
-/// Meta-data that we keep about a candidate in the `Knowledge`.
-#[derive(Debug, Clone)]
-pub(super) struct CandidateMeta {
- /// The hash of the pov-block data.
- pub(super) pov_block_hash: Hash,
-}
-
-// knowledge about attestations on a single parent-hash.
-#[derive(Default)]
-pub(super) struct Knowledge {
- candidates: HashMap,
-}
-
-impl Knowledge {
- // whether the peer is aware of a candidate with given hash.
- fn is_aware_of(&self, candidate_hash: &Hash) -> bool {
- self.candidates.contains_key(candidate_hash)
- }
-
- // Get candidate meta data for a candidate by hash.
- fn candidate_meta(&self, candidate_hash: &Hash) -> Option<&CandidateMeta> {
- self.candidates.get(candidate_hash)
- }
-
- // note that the peer is aware of a candidate with given hash. this should
- // be done after observing an incoming candidate message via gossip.
- fn note_aware(&mut self, candidate_hash: Hash, candidate_meta: CandidateMeta) {
- self.candidates.insert(candidate_hash, candidate_meta);
- }
-}
-
-#[derive(Default)]
-pub(super) struct PeerData {
- live: HashMap,
-}
-
-impl PeerData {
- /// Update leaves, returning a list of which leaves are new.
- pub(super) fn update_leaves(&mut self, leaves: &LeavesVec) -> LeavesVec {
- let mut new = LeavesVec::new();
- self.live.retain(|k, _| leaves.contains(k));
- for &leaf in leaves {
- self.live.entry(leaf).or_insert_with(|| {
- new.push(leaf);
- Default::default()
- });
- }
-
- new
- }
-
- #[cfg(test)]
- pub(super) fn note_aware_under_leaf(
- &mut self,
- relay_chain_leaf: &Hash,
- candidate_hash: Hash,
- meta: CandidateMeta,
- ) {
- if let Some(knowledge) = self.live.get_mut(relay_chain_leaf) {
- knowledge.note_aware(candidate_hash, meta);
- }
- }
-
- pub(super) fn knowledge_at_mut(&mut self, parent_hash: &Hash) -> Option<&mut Knowledge> {
- self.live.get_mut(parent_hash)
- }
-}
-
-/// An impartial view of what topics and data are valid based on attestation session data.
-pub(super) struct View {
- leaf_work: Vec<(Hash, LeafView)>, // hashes of the best DAG-leaves paired with validation data.
- topics: HashMap, // maps topic hashes to block hashes.
-}
-
-impl Default for View {
- fn default() -> Self {
- View {
- leaf_work: Vec::with_capacity(MAX_CHAIN_HEADS),
- topics: Default::default(),
- }
- }
-}
-
-impl View {
- fn leaf_view(&self, relay_chain_leaf: &Hash) -> Option<&LeafView> {
- self.leaf_work.iter()
- .find_map(|&(ref h, ref leaf)| if h == relay_chain_leaf { Some(leaf) } else { None } )
- }
-
- fn leaf_view_mut(&mut self, relay_chain_leaf: &Hash) -> Option<&mut LeafView> {
- self.leaf_work.iter_mut()
- .find_map(|&mut (ref h, ref mut leaf)| if h == relay_chain_leaf { Some(leaf) } else { None } )
- }
-
- /// Get our leaves-set. Guaranteed to have length <= MAX_CHAIN_HEADS.
- pub(super) fn neighbor_info<'a>(&'a self) -> impl Iterator- + 'a + Clone {
- self.leaf_work.iter().take(MAX_CHAIN_HEADS).map(|(p, _)| p.clone())
- }
-
- /// Note new leaf in our local view and validation data necessary to check signatures
- /// of statements issued under this leaf.
- ///
- /// This will be pruned later on a call to `prune_old_leaves`, when this leaf
- /// is not a leaf anymore.
- pub(super) fn new_local_leaf(
- &mut self,
- validation_data: MessageValidationData,
- ) {
- let relay_chain_leaf = validation_data.signing_context.parent_hash.clone();
- self.leaf_work.push((
- validation_data.signing_context.parent_hash.clone(),
- LeafView {
- validation_data,
- knowledge: Default::default(),
- },
- ));
- self.topics.insert(attestation_topic(relay_chain_leaf), relay_chain_leaf);
- self.topics.insert(super::pov_block_topic(relay_chain_leaf), relay_chain_leaf);
- }
-
- /// Prune old leaf-work that fails the leaf predicate.
- pub(super) fn prune_old_leaves
bool>(&mut self, is_leaf: F) {
- let leaf_work = &mut self.leaf_work;
- leaf_work.retain(|&(ref relay_chain_leaf, _)| is_leaf(relay_chain_leaf));
- self.topics.retain(|_, v| leaf_work.iter().find(|(p, _)| p == v).is_some());
- }
-
- /// Whether a message topic is considered live relative to our view. non-live
- /// topics do not pertain to our perceived leaves, and are uninteresting to us.
- pub(super) fn is_topic_live(&self, topic: &Hash) -> bool {
- self.topics.contains_key(topic)
- }
-
- /// The relay-chain block hash corresponding to a topic.
- pub(super) fn topic_block(&self, topic: &Hash) -> Option<&Hash> {
- self.topics.get(topic)
- }
-
- #[cfg(test)]
- pub(super) fn note_aware_under_leaf(
- &mut self,
- relay_chain_leaf: &Hash,
- candidate_hash: Hash,
- meta: CandidateMeta,
- ) {
- if let Some(view) = self.leaf_view_mut(relay_chain_leaf) {
- view.knowledge.note_aware(candidate_hash, meta);
- }
- }
-
- /// Validate the signature on an attestation statement of some kind. Should be done before
- /// any repropagation of that statement.
- pub(super) fn validate_statement_signature(
- &mut self,
- message: GossipStatement,
- chain: &C,
- )
- -> (GossipValidationResult, ReputationChange)
- {
- // message must reference one of our chain heads and
- // if message is not a `Candidate` we should have the candidate available
- // in `attestation_view`.
- match self.leaf_view(&message.relay_chain_leaf) {
- None => {
- let cost = match chain.is_known(&message.relay_chain_leaf) {
- Some(Known::Leaf) => {
- warn!(
- target: "network",
- "Leaf block {} not considered live for attestation",
- message.relay_chain_leaf,
- );
- cost::NONE
- }
- Some(Known::Old) => cost::PAST_MESSAGE,
- _ => cost::FUTURE_MESSAGE,
- };
-
- (GossipValidationResult::Discard, cost)
- }
- Some(view) => {
- // first check that we are capable of receiving this message
- // in a DoS-proof manner.
- let benefit = match message.signed_statement.statement {
- GenericStatement::Candidate(_) => benefit::NEW_CANDIDATE,
- GenericStatement::Valid(ref h) | GenericStatement::Invalid(ref h) => {
- if !view.knowledge.is_aware_of(h) {
- let cost = cost::ATTESTATION_NO_CANDIDATE;
- return (GossipValidationResult::Discard, cost);
- }
-
- benefit::NEW_ATTESTATION
- }
- };
-
- // validate signature.
- let res = view.validation_data.check_statement(
- &message.signed_statement,
- );
-
- match res {
- Ok(()) => {
- let topic = attestation_topic(message.relay_chain_leaf);
- (GossipValidationResult::ProcessAndKeep(topic), benefit)
- }
- Err(()) => (GossipValidationResult::Discard, cost::BAD_SIGNATURE),
- }
- }
- }
- }
-
- /// Validate a pov-block message.
- pub(super) fn validate_pov_block_message(
- &mut self,
- message: &super::GossipPoVBlock,
- chain: &C,
- )
- -> (GossipValidationResult, ReputationChange)
- {
- match self.leaf_view(&message.relay_chain_leaf) {
- None => {
- let cost = match chain.is_known(&message.relay_chain_leaf) {
- Some(Known::Leaf) => {
- warn!(
- target: "network",
- "Leaf block {} not considered live for attestation",
- message.relay_chain_leaf,
- );
- cost::NONE
- }
- Some(Known::Old) => cost::POV_BLOCK_UNWANTED,
- _ => cost::FUTURE_MESSAGE,
- };
-
- (GossipValidationResult::Discard, cost)
- }
- Some(view) => {
- // we only accept pov-blocks for candidates that we have
- // and consider active.
- match view.knowledge.candidate_meta(&message.candidate_hash) {
- None => (GossipValidationResult::Discard, cost::POV_BLOCK_UNWANTED),
- Some(meta) => {
- // check that the pov-block hash is actually correct.
- if meta.pov_block_hash == message.pov_block.hash() {
- let topic = super::pov_block_topic(message.relay_chain_leaf);
- (GossipValidationResult::ProcessAndKeep(topic), benefit::NEW_POV_BLOCK)
- } else {
- (GossipValidationResult::Discard, cost::POV_BLOCK_BAD_DATA)
- }
- }
- }
- }
- }
- }
-
- /// whether it's allowed to send a statement to a peer with given knowledge
- /// about the relay parent the statement refers to.
- pub(super) fn statement_allowed(
- &mut self,
- statement: &GossipStatement,
- peer_knowledge: &mut Knowledge,
- ) -> bool {
- let signed = &statement.signed_statement;
- let relay_chain_leaf = &statement.relay_chain_leaf;
-
- match signed.statement {
- GenericStatement::Valid(ref h) | GenericStatement::Invalid(ref h) => {
- // `valid` and `invalid` statements can only be propagated after
- // a candidate message is known by that peer.
- peer_knowledge.is_aware_of(h)
- }
- GenericStatement::Candidate(ref c) => {
- // if we are sending a `Candidate` message we should make sure that
- // attestation_view and their_view reflects that we know about the candidate.
- let hash = c.hash();
- let meta = CandidateMeta { pov_block_hash: c.pov_block_hash };
- peer_knowledge.note_aware(hash, meta.clone());
- if let Some(attestation_view) = self.leaf_view_mut(&relay_chain_leaf) {
- attestation_view.knowledge.note_aware(hash, meta);
- }
-
- // at this point, the peer hasn't seen the message or the candidate
- // and has knowledge of the relevant relay-chain parent.
- true
- }
- }
- }
-
- /// whether it's allowed to send a pov-block to a peer.
- pub(super) fn pov_block_allowed(
- &mut self,
- statement: &super::GossipPoVBlock,
- peer_knowledge: &mut Knowledge,
- ) -> bool {
- peer_knowledge.is_aware_of(&statement.candidate_hash)
- }
-}
-
-struct LeafView {
- validation_data: MessageValidationData,
- knowledge: Knowledge,
-}
diff --git a/network/src/legacy/gossip/mod.rs b/network/src/legacy/gossip/mod.rs
deleted file mode 100644
index 7e97eb688b15cd8cac4d84c100628ef76e7a6a41..0000000000000000000000000000000000000000
--- a/network/src/legacy/gossip/mod.rs
+++ /dev/null
@@ -1,1254 +0,0 @@
-// Copyright 2019-2020 Parity Technologies (UK) Ltd.
-// This file is part of Polkadot.
-
-// Polkadot is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Polkadot is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Polkadot. If not, see .
-
-//! Gossip messages and the message validator.
-//!
-//! At the moment, this module houses 2 gossip protocols central to Polkadot.
-//!
-//! The first is the attestation-gossip system, which aims to circulate parachain
-//! candidate attestations by validators at leaves of the block-DAG.
-//!
-//! The second is the inter-chain message queue routing gossip, which aims to
-//! circulate message queues between parachains, which remain un-routed as of
-//! recent leaves.
-//!
-//! These gossip systems do not have any form of sybil-resistance in terms
-//! of the nodes which can participate. It could be imposed e.g. by limiting only to
-//! validators, but this would prevent message queues from getting into the hands
-//! of collators and of attestations from getting into the hands of fishermen.
-//! As such, we take certain precautions which allow arbitrary full nodes to
-//! join the gossip graph, as well as validators (who are likely to be well-connected
-//! amongst themselves).
-//!
-//! The first is the notion of a neighbor packet. This is a packet sent between
-//! neighbors of the gossip graph to inform each other of their current protocol
-//! state. As of this writing, for both attestation and message-routing gossip,
-//! the only necessary information here is a (length-limited) set of perceived
-//! leaves of the block-DAG.
-//!
-//! These leaves can be used to derive what information a node is willing to accept
-//! There is typically an unbounded amount of possible "future" information relative to
-//! any protocol state. For example, attestations or unrouted message queues from millions
-//! of blocks after a known protocol state. The neighbor packet is meant to avoid being
-//! spammed by illegitimate future information, while informing neighbors of when
-//! previously-future and now current gossip messages would be accepted.
-//!
-//! Peers who send information which was not allowed under a recent neighbor packet
-//! will be noted as non-beneficial to Substrate's peer-set management utility.
-
-use sp_runtime::traits::{BlakeTwo256, Hash as HashT};
-use sp_blockchain::Error as ClientError;
-use sc_network::{ObservedRole, PeerId, ReputationChange};
-use sc_network::NetworkService;
-use sc_network_gossip::{
- ValidationResult as GossipValidationResult,
- ValidatorContext, MessageIntent,
-};
-use polkadot_validation::{SignedStatement};
-use polkadot_primitives::{Block, Hash};
-use polkadot_primitives::parachain::{
- ParachainHost, ValidatorId, ErasureChunk as PrimitiveChunk, SigningContext, PoVBlock,
-};
-use polkadot_erasure_coding::{self as erasure};
-use codec::{Decode, Encode};
-use sp_api::ProvideRuntimeApi;
-
-use std::collections::HashMap;
-use std::sync::Arc;
-
-use arrayvec::ArrayVec;
-use futures::prelude::*;
-use parking_lot::{Mutex, RwLock};
-
-use crate::legacy::{GossipMessageStream, GossipService};
-
-use attestation::{View as AttestationView, PeerData as AttestationPeerData};
-
-mod attestation;
-
-/// The engine ID of the polkadot attestation system.
-pub const POLKADOT_ENGINE_ID: sp_runtime::ConsensusEngineId = *b"dot1";
-pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/legacy/1";
-
-// arbitrary; in practice this should not be more than 2.
-pub(crate) const MAX_CHAIN_HEADS: usize = 5;
-
-/// Type alias for a bounded vector of leaves.
-pub type LeavesVec = ArrayVec<[Hash; MAX_CHAIN_HEADS]>;
-
-mod benefit {
- use sc_network::ReputationChange as Rep;
- /// When a peer sends us a previously-unknown candidate statement.
- pub const NEW_CANDIDATE: Rep = Rep::new(100, "Polkadot: New candidate");
- /// When a peer sends us a previously-unknown attestation.
- pub const NEW_ATTESTATION: Rep = Rep::new(50, "Polkadot: New attestation");
- /// When a peer sends us a previously-unknown pov-block
- pub const NEW_POV_BLOCK: Rep = Rep::new(150, "Polkadot: New PoV block");
- /// When a peer sends us a previously-unknown erasure chunk.
- pub const NEW_ERASURE_CHUNK: Rep = Rep::new(10, "Polkadot: New erasure chunk");
-}
-
-mod cost {
- use sc_network::ReputationChange as Rep;
- /// No cost. This will not be reported.
- pub const NONE: Rep = Rep::new(0, "");
- /// A peer sent us an attestation and we don't know the candidate.
- pub const ATTESTATION_NO_CANDIDATE: Rep = Rep::new(-100, "Polkadot: No candidate");
- /// A peer sent us a pov-block and we don't know the candidate or the leaf.
- pub const POV_BLOCK_UNWANTED: Rep = Rep::new(-500, "Polkadot: No candidate");
- /// A peer sent us a pov-block message with wrong data.
- pub const POV_BLOCK_BAD_DATA: Rep = Rep::new(-1000, "Polkadot: Bad PoV-block data");
- /// A peer sent us a statement we consider in the future.
- pub const FUTURE_MESSAGE: Rep = Rep::new(-100, "Polkadot: Future message");
- /// A peer sent us a statement from the past.
- pub const PAST_MESSAGE: Rep = Rep::new(-30, "Polkadot: Past message");
- /// A peer sent us a malformed message.
- pub const MALFORMED_MESSAGE: Rep = Rep::new(-500, "Polkadot: Malformed message");
- /// A peer sent us a wrongly signed message.
- pub const BAD_SIGNATURE: Rep = Rep::new(-500, "Polkadot: Bad signature");
- /// A peer sent us a bad neighbor packet.
- pub const BAD_NEIGHBOR_PACKET: Rep = Rep::new(-300, "Polkadot: Bad neighbor");
- /// A peer sent us an erasure chunk referring to a candidate that we are not aware of.
- pub const ORPHANED_ERASURE_CHUNK: Rep = Rep::new(-10, "An erasure chunk from unknown candidate");
- /// A peer sent us an erasure chunk that does not match candidate's erasure root.
- pub const ERASURE_CHUNK_WRONG_ROOT: Rep = Rep::new(-100, "Chunk doesn't match encoding root");
-}
-
-/// A gossip message.
-#[derive(Encode, Decode, Clone, PartialEq)]
-pub enum GossipMessage {
- /// A packet sent to a neighbor but not relayed.
- #[codec(index = "1")]
- Neighbor(VersionedNeighborPacket),
- /// An attestation-statement about the candidate.
- /// Non-candidate statements should only be sent to peers who are aware of the candidate.
- #[codec(index = "2")]
- Statement(GossipStatement),
- // TODO: https://github.com/paritytech/polkadot/issues/253
- /// A packet containing one of the erasure-coding chunks of one candidate.
- #[codec(index = "3")]
- ErasureChunk(ErasureChunkMessage),
- /// A PoV-block.
- #[codec(index = "255")]
- PoVBlock(GossipPoVBlock),
-}
-
-impl From for GossipMessage {
- fn from(packet: NeighborPacket) -> Self {
- GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))
- }
-}
-
-impl From for GossipMessage {
- fn from(stmt: GossipStatement) -> Self {
- GossipMessage::Statement(stmt)
- }
-}
-
-impl From for GossipMessage {
- fn from(pov: GossipPoVBlock) -> Self {
- GossipMessage::PoVBlock(pov)
- }
-}
-
-/// A gossip message containing a statement.
-#[derive(Encode, Decode, Clone, PartialEq)]
-pub struct GossipStatement {
- /// The block hash of the relay chain being referred to. In context, this should
- /// be a leaf.
- pub relay_chain_leaf: Hash,
- /// The signed statement being gossipped.
- pub signed_statement: SignedStatement,
-}
-
-impl GossipStatement {
- /// Create a new instance.
- pub fn new(relay_chain_leaf: Hash, signed_statement: SignedStatement) -> Self {
- Self {
- relay_chain_leaf,
- signed_statement,
- }
- }
-}
-
-/// A gossip message containing one erasure chunk of a candidate block.
-/// For each chunk of block erasure encoding one of this messages is constructed.
-#[derive(Encode, Decode, Clone, Debug, PartialEq)]
-pub struct ErasureChunkMessage {
- /// The chunk itself.
- pub chunk: PrimitiveChunk,
- /// The hash of the candidate receipt of the block this chunk belongs to.
- pub candidate_hash: Hash,
-}
-
-impl From for GossipMessage {
- fn from(chk: ErasureChunkMessage) -> Self {
- GossipMessage::ErasureChunk(chk)
- }
-}
-
-/// A pov-block being gossipped. Should only be sent to peers aware of the candidate
-/// referenced.
-#[derive(Encode, Decode, Clone, Debug, PartialEq)]
-pub struct GossipPoVBlock {
- /// The block hash of the relay chain being referred to. In context, this should
- /// be a leaf.
- pub relay_chain_leaf: Hash,
- /// The hash of some candidate localized to the same relay-chain leaf, whose
- /// pov-block is this block.
- pub candidate_hash: Hash,
- /// The pov-block itself.
- pub pov_block: PoVBlock,
-}
-
-/// A versioned neighbor message.
-#[derive(Encode, Decode, Clone, PartialEq)]
-pub enum VersionedNeighborPacket {
- #[codec(index = "1")]
- V1(NeighborPacket),
-}
-
-/// Contains information on which chain heads the peer is
-/// accepting messages for.
-#[derive(Encode, Decode, Clone, PartialEq)]
-pub struct NeighborPacket {
- chain_heads: Vec,
-}
-
-/// whether a block is known.
-#[derive(Clone, Copy, PartialEq)]
-pub enum Known {
- /// The block is a known leaf.
- Leaf,
- /// The block is known to be old.
- Old,
- /// The block is known to be bad.
- Bad,
-}
-
-/// Context to the underlying polkadot chain.
-pub trait ChainContext: Send + Sync {
- /// Provide a closure which is invoked for every unrouted queue hash at a given leaf.
- fn leaf_unrouted_roots(
- &self,
- leaf: &Hash,
- with_queue_root: &mut dyn FnMut(&Hash),
- ) -> Result<(), ClientError>;
-
- /// whether a block is known. If it's not, returns `None`.
- fn is_known(&self, block_hash: &Hash) -> Option;
-}
-
-impl ChainContext for (F, P) where
- F: Fn(&Hash) -> Option + Send + Sync,
- P: Send + Sync + std::ops::Deref,
- P::Target: ProvideRuntimeApi,
- >::Api: ParachainHost,
-{
- fn is_known(&self, block_hash: &Hash) -> Option {
- (self.0)(block_hash)
- }
-
- fn leaf_unrouted_roots(
- &self,
- _leaf: &Hash,
- _with_queue_root: &mut dyn FnMut(&Hash),
- ) -> Result<(), ClientError> {
- Ok(())
- }
-}
-
-
-/// Compute the gossip topic for attestations on the given parent hash.
-pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
- let mut v = parent_hash.as_ref().to_vec();
- v.extend(b"attestations");
-
- BlakeTwo256::hash(&v[..])
-}
-
-/// Compute the gossip topic for PoV blocks based on the given parent hash.
-pub(crate) fn pov_block_topic(parent_hash: Hash) -> Hash {
- let mut v = parent_hash.as_ref().to_vec();
- v.extend(b"pov-blocks");
-
- BlakeTwo256::hash(&v[..])
-}
-
-/// Register a gossip validator on the network service.
-// NOTE: since RegisteredMessageValidator is meant to be a type-safe proof
-// that we've actually done the registration, this should be the only way
-// to construct it outside of tests.
-pub fn register_validator(
- service: Arc>,
- chain: C,
- executor: &impl futures::task::Spawn,
-) -> RegisteredMessageValidator
-{
- let s = service.clone();
- let report_handle = Box::new(move |peer: &PeerId, cost_benefit: ReputationChange| {
- if cost_benefit.value != 0 {
- s.report_peer(peer.clone(), cost_benefit);
- }
- });
- let validator = Arc::new(MessageValidator {
- report_handle,
- inner: RwLock::new(Inner {
- peers: HashMap::new(),
- attestation_view: Default::default(),
- availability_store: None,
- chain,
- })
- });
-
- let gossip_side = validator.clone();
- let gossip_engine = Arc::new(Mutex::new(sc_network_gossip::GossipEngine::new(
- service.clone(),
- POLKADOT_ENGINE_ID,
- POLKADOT_PROTOCOL_NAME,
- gossip_side,
- )));
-
- // Spawn gossip engine.
- //
- // Ideally this would not be spawned as an orphaned task, but polled by
- // `RegisteredMessageValidator` which in turn would be polled by a `ValidationNetwork`.
- {
- let gossip_engine = gossip_engine.clone();
- let fut = futures::future::poll_fn(move |cx| {
- gossip_engine.lock().poll_unpin(cx)
- });
- let spawn_res = executor.spawn_obj(futures::task::FutureObj::from(Box::new(fut)));
-
- // Note: we consider the chances of an error to spawn a background task almost null.
- if spawn_res.is_err() {
- log::error!(target: "polkadot-gossip", "Failed to spawn background task");
- }
- }
-
- RegisteredMessageValidator {
- inner: validator as _,
- service: Some(service),
- gossip_engine: Some(gossip_engine),
- }
-}
-
-#[derive(PartialEq)]
-enum NewLeafAction {
- // (who, message)
- TargetedMessage(PeerId, GossipMessage),
-}
-
-/// Actions to take after noting a new block-DAG leaf.
-///
-/// This should be consumed by passing a consensus-gossip handle to `perform`.
-#[must_use = "New chain-head gossip actions must be performed"]
-pub struct NewLeafActions {
- actions: Vec,
-}
-
-impl NewLeafActions {
- #[cfg(test)]
- pub fn new() -> Self {
- NewLeafActions { actions: Vec::new() }
- }
-
- /// Perform the queued actions, feeding into gossip.
- pub fn perform(
- self,
- gossip: &dyn crate::legacy::GossipService,
- ) {
- for action in self.actions {
- match action {
- NewLeafAction::TargetedMessage(who, message)
- => gossip.send_message(who, message),
- }
- }
- }
-}
-
-/// A registered message validator.
-///
-/// Create this using `register_validator`.
-#[derive(Clone)]
-pub struct RegisteredMessageValidator {
- inner: Arc>,
- // Note: this is always `Some` in real code and `None` in tests.
- service: Option>>,
- // Note: this is always `Some` in real code and `None` in tests.
- gossip_engine: Option>>>,
-}
-
-impl RegisteredMessageValidator {
- /// Register an availabilty store the gossip service can query.
- pub(crate) fn register_availability_store(&self, availability_store: av_store::Store) {
- self.inner.inner.write().availability_store = Some(availability_store);
- }
-
- /// Note that we perceive a new leaf of the block-DAG. We will notify our neighbors that
- /// we now accept parachain candidate attestations and incoming message queues
- /// relevant to this leaf.
- pub(crate) fn new_local_leaf(
- &self,
- validation: MessageValidationData,
- ) -> NewLeafActions {
- // add an entry in attestation_view
- // prune any entries from attestation_view which are no longer leaves
- let mut inner = self.inner.inner.write();
- inner.attestation_view.new_local_leaf(validation);
-
- let mut actions = Vec::new();
-
- {
- let &mut Inner {
- ref chain,
- ref mut attestation_view,
- ..
- } = &mut *inner;
-
- attestation_view.prune_old_leaves(|hash| match chain.is_known(hash) {
- Some(Known::Leaf) => true,
- _ => false,
- });
- }
-
-
- // send neighbor packets to peers
- inner.multicast_neighbor_packet(
- |who, message| actions.push(NewLeafAction::TargetedMessage(who.clone(), message))
- );
-
- NewLeafActions { actions }
- }
-
- pub(crate) fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
- let topic_stream = if let Some(gossip_engine) = self.gossip_engine.as_ref() {
- gossip_engine.lock().messages_for(topic)
- } else {
- log::error!("Called gossip_messages_for on a test engine");
- futures::channel::mpsc::channel(0).1
- };
-
- GossipMessageStream::new(topic_stream.boxed())
- }
-
- pub(crate) fn gossip_message(&self, topic: Hash, message: GossipMessage) {
- if let Some(gossip_engine) = self.gossip_engine.as_ref() {
- gossip_engine.lock().gossip_message(
- topic,
- message.encode(),
- false,
- );
- } else {
- log::error!("Called gossip_message on a test engine");
- }
- }
-
- pub(crate) fn send_message(&self, who: PeerId, message: GossipMessage) {
- if let Some(gossip_engine) = self.gossip_engine.as_ref() {
- gossip_engine.lock().send_message(vec![who], message.encode());
- } else {
- log::error!("Called send_message on a test engine");
- }
- }
-}
-
-impl GossipService for RegisteredMessageValidator {
- fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
- RegisteredMessageValidator::gossip_messages_for(self, topic)
- }
-
- fn gossip_message(&self, topic: Hash, message: GossipMessage) {
- RegisteredMessageValidator::gossip_message(self, topic, message)
- }
-
- fn send_message(&self, who: PeerId, message: GossipMessage) {
- RegisteredMessageValidator::send_message(self, who, message)
- }
-}
-
-/// The data needed for validating gossip messages.
-#[derive(Default)]
-pub(crate) struct MessageValidationData {
- /// The authorities' parachain validation keys at a block.
- pub(crate) authorities: Vec,
- /// The signing context.
- pub(crate) signing_context: SigningContext,
-}
-
-impl MessageValidationData {
- // check a statement's signature.
- fn check_statement(&self, statement: &SignedStatement) -> Result<(), ()> {
- let sender = match self.authorities.get(statement.sender as usize) {
- Some(val) => val,
- None => return Err(()),
- };
-
- let good = self.authorities.contains(&sender) &&
- ::polkadot_validation::check_statement(
- &statement.statement,
- &statement.signature,
- sender.clone(),
- &self.signing_context,
- );
-
- if good {
- Ok(())
- } else {
- Err(())
- }
- }
-}
-
-#[derive(Default)]
-struct PeerData {
- attestation: AttestationPeerData,
-}
-
-struct Inner {
- peers: HashMap,
- attestation_view: AttestationView,
- availability_store: Option,
- chain: C,
-}
-
-impl Inner {
- fn validate_neighbor_packet(&mut self, sender: &PeerId, packet: NeighborPacket)
- -> (GossipValidationResult, ReputationChange, Vec)
- {
- let chain_heads = packet.chain_heads;
- if chain_heads.len() > MAX_CHAIN_HEADS {
- (GossipValidationResult::Discard, cost::BAD_NEIGHBOR_PACKET, Vec::new())
- } else {
- let chain_heads: LeavesVec = chain_heads.into_iter().collect();
- let new_topics = if let Some(ref mut peer) = self.peers.get_mut(sender) {
- let new_leaves = peer.attestation.update_leaves(&chain_heads);
- let new_attestation_topics = new_leaves.iter().cloned().map(attestation_topic);
- let new_pov_block_topics = new_leaves.iter().cloned().map(pov_block_topic);
-
- new_attestation_topics.chain(new_pov_block_topics).collect()
- } else {
- Vec::new()
- };
-
- (GossipValidationResult::Discard, cost::NONE, new_topics)
- }
- }
-
- fn validate_erasure_chunk_packet(&mut self, msg: ErasureChunkMessage)
- -> (GossipValidationResult, ReputationChange)
- {
- if let Some(store) = &self.availability_store {
- if let Some(receipt) = store.get_candidate(&msg.candidate_hash) {
- let chunk_hash = erasure::branch_hash(
- &receipt.commitments.erasure_root,
- &msg.chunk.proof,
- msg.chunk.index as usize
- );
-
- if chunk_hash != Ok(BlakeTwo256::hash(&msg.chunk.chunk)) {
- (
- GossipValidationResult::Discard,
- cost::ERASURE_CHUNK_WRONG_ROOT
- )
- } else {
- if let Some(awaited_chunks) = store.awaited_chunks() {
- let frontier_entry = av_store::AwaitedFrontierEntry {
- candidate_hash: msg.candidate_hash,
- relay_parent: receipt.relay_parent,
- validator_index: msg.chunk.index,
- };
- if awaited_chunks.contains(&frontier_entry) {
- let topic = crate::erasure_coding_topic(
- &msg.candidate_hash
- );
-
- return (
- GossipValidationResult::ProcessAndKeep(topic),
- benefit::NEW_ERASURE_CHUNK,
- );
- }
- }
- (GossipValidationResult::Discard, cost::NONE)
- }
- } else {
- (GossipValidationResult::Discard, cost::ORPHANED_ERASURE_CHUNK)
- }
- } else {
- (GossipValidationResult::Discard, cost::NONE)
- }
- }
-
- fn multicast_neighbor_packet