// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
use std::collections::HashMap;
use std::io;
use std::sync::Arc;
use std::thread;
use log::{error, info, trace, warn};
use sp_blockchain::{Result as ClientResult};
use sp_runtime::traits::{Header as HeaderT, ProvideRuntimeApi, Block as BlockT};
use sp_api::ApiExt;
use client::{
BlockchainEvents, BlockBody,
blockchain::ProvideCache,
};
use consensus_common::{
self, BlockImport, BlockCheckParams, BlockImportParams, Error as ConsensusError,
ImportResult,
import_queue::CacheKeyId,
};
use polkadot_primitives::{Block, BlockId, Hash};
use polkadot_primitives::parachain::{
CandidateReceipt, ParachainHost, ValidatorId,
ValidatorPair, AvailableMessages, BlockData, ErasureChunk,
};
use futures::{prelude::*, future::select, channel::{mpsc, oneshot}, task::SpawnExt};
use keystore::KeyStorePtr;
use tokio::runtime::{Handle, Runtime as LocalRuntime};
use crate::{LOG_TARGET, Data, TaskExecutor, ProvideGossipMessages, erasure_coding_topic};
use crate::store::Store;
/// Errors that may occur.
#[derive(Debug, derive_more::Display, derive_more::From)]
pub(crate) enum Error {
#[from]
StoreError(io::Error),
#[display(fmt = "Validator's id and number of validators at block with parent {} not found", relay_parent)]
IdAndNValidatorsNotFound { relay_parent: Hash },
#[display(fmt = "Candidate receipt with hash {} not found", candidate_hash)]
CandidateNotFound { candidate_hash: Hash },
}
/// Messages sent to the `Worker`.
///
/// Messages are sent in a number of different scenarios,
/// for instance, when:
/// * importing blocks in `BlockImport` implementation,
/// * recieving finality notifications,
/// * when the `Store` api is used by outside code.
#[derive(Debug)]
pub(crate) enum WorkerMsg {
ErasureRoots(ErasureRoots),
ParachainBlocks(ParachainBlocks),
ListenForChunks(ListenForChunks),
Chunks(Chunks),
CandidatesFinalized(CandidatesFinalized),
MakeAvailable(MakeAvailable),
}
/// The erasure roots of the heads included in the block with a given parent.
#[derive(Debug)]
pub(crate) struct ErasureRoots {
/// The relay parent of the block these roots belong to.
pub relay_parent: Hash,
/// The roots themselves.
pub erasure_roots: Vec,
/// A sender to signal the result asynchronously.
pub result: oneshot::Sender>,
}
/// The receipts of the heads included into the block with a given parent.
#[derive(Debug)]
pub(crate) struct ParachainBlocks {
/// The relay parent of the block these parachain blocks belong to.
pub relay_parent: Hash,
/// The blocks themselves.
pub blocks: Vec<(CandidateReceipt, Option<(BlockData, AvailableMessages)>)>,
/// A sender to signal the result asynchronously.
pub result: oneshot::Sender>,
}
/// Listen gossip for these chunks.
#[derive(Debug)]
pub(crate) struct ListenForChunks {
/// The relay parent of the block the chunks from we want to listen to.
pub relay_parent: Hash,
/// The hash of the candidate chunk belongs to.
pub candidate_hash: Hash,
/// The index of the chunk we need.
pub index: u32,
/// A sender to signal the result asynchronously.
pub result: Option>>,
}
/// We have received some chunks.
#[derive(Debug)]
pub(crate) struct Chunks {
/// The relay parent of the block these chunks belong to.
pub relay_parent: Hash,
/// The hash of the parachain candidate these chunks belong to.
pub candidate_hash: Hash,
/// The chunks.
pub chunks: Vec,
/// A sender to signal the result asynchronously.
pub result: oneshot::Sender>,
}
/// These candidates have been finalized, so unneded availability may be now pruned
#[derive(Debug)]
pub(crate) struct CandidatesFinalized {
/// The relay parent of the block that was finalized.
relay_parent: Hash,
/// The parachain heads that were finalized in this block.
candidate_hashes: Vec,
}
/// The message that corresponds to `make_available` call of the crate API.
#[derive(Debug)]
pub(crate) struct MakeAvailable {
/// The data being made available.
pub data: Data,
/// A sender to signal the result asynchronously.
pub result: oneshot::Sender>,
}
/// An availability worker with it's inner state.
pub(super) struct Worker {
availability_store: Store,
provide_gossip_messages: PGM,
registered_gossip_streams: HashMap,
sender: mpsc::UnboundedSender,
}
/// The handle to the `Worker`.
pub(super) struct WorkerHandle {
exit_signal: Option,
thread: Option>>,
sender: mpsc::UnboundedSender,
}
impl WorkerHandle {
pub(crate) fn to_worker(&self) -> &mpsc::UnboundedSender {
&self.sender
}
}
impl Drop for WorkerHandle {
fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() {
let _ = signal.fire();
}
if let Some(thread) = self.thread.take() {
if let Err(_) = thread.join() {
error!(target: LOG_TARGET, "Errored stopping the thread");
}
}
}
}
async fn listen_for_chunks(
p: PGM,
topic: Hash,
mut sender: S
)
where
PGM: ProvideGossipMessages,
S: Sink + Unpin,
{
trace!(target: LOG_TARGET, "Registering gossip listener for topic {}", topic);
let mut chunks_stream = p.gossip_messages_for(topic);
while let Some(item) = chunks_stream.next().await {
let (s, _) = oneshot::channel();
trace!(target: LOG_TARGET, "Received for {:?}", item);
let chunks = Chunks {
relay_parent: item.0,
candidate_hash: item.1,
chunks: vec![item.2],
result: s,
};
if let Err(_) = sender.send(WorkerMsg::Chunks(chunks)).await {
break;
}
}
}
fn fetch_candidates