worker.rs 28.3 KiB
Newer Older
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::io;
use std::sync::Arc;
use std::thread;

use log::{error, info, trace, warn};
use sp_blockchain::{Result as ClientResult};
use sp_runtime::traits::{Header as HeaderT, ProvideRuntimeApi, Block as BlockT};
use sp_api::ApiExt;
use client::{
	BlockchainEvents, BlockBody,
	blockchain::ProvideCache,
};
use consensus_common::{
	self, BlockImport, BlockCheckParams, BlockImportParams, Error as ConsensusError,
	ImportResult,
	import_queue::CacheKeyId,
};
use polkadot_primitives::{Block, BlockId, Hash};
use polkadot_primitives::parachain::{
	CandidateReceipt, ParachainHost, ValidatorId,
	ValidatorPair, AvailableMessages, BlockData, ErasureChunk,
};
use futures::channel::{mpsc, oneshot};
Ashley's avatar
Ashley committed
use futures::{FutureExt, Sink, SinkExt, StreamExt, future::select, task::SpawnExt};
use keystore::KeyStorePtr;

use tokio::runtime::{Handle, Runtime as LocalRuntime};

use crate::{LOG_TARGET, Data, TaskExecutor, ProvideGossipMessages, erasure_coding_topic};
use crate::store::Store;

/// Errors that may occur.
#[derive(Debug, derive_more::Display, derive_more::From)]
pub(crate) enum Error {
	#[from]
	StoreError(io::Error),
	#[display(fmt = "Validator's id and number of validators at block with parent {} not found", relay_parent)]
	IdAndNValidatorsNotFound { relay_parent: Hash },
	#[display(fmt = "Candidate receipt with hash {} not found", candidate_hash)]
	CandidateNotFound { candidate_hash: Hash },
}

/// Messages sent to the `Worker`.
///
/// Messages are sent in a number of different scenarios,
/// for instance, when:
///   * importing blocks in `BlockImport` implementation,
///   * recieving finality notifications,
///   * when the `Store` api is used by outside code.
#[derive(Debug)]
pub(crate) enum WorkerMsg {
	ErasureRoots(ErasureRoots),
	ParachainBlocks(ParachainBlocks),
	ListenForChunks(ListenForChunks),
	Chunks(Chunks),
	CandidatesFinalized(CandidatesFinalized),
	MakeAvailable(MakeAvailable),
}

/// The erasure roots of the heads included in the block with a given parent.
#[derive(Debug)]
pub(crate) struct ErasureRoots {
	/// The relay parent of the block these roots belong to.
	pub relay_parent: Hash,
	/// The roots themselves.
	pub erasure_roots: Vec<Hash>,
	/// A sender to signal the result asynchronously.
	pub result: oneshot::Sender<Result<(), Error>>,
}

/// The receipts of the heads included into the block with a given parent.
#[derive(Debug)]
pub(crate) struct ParachainBlocks {
	/// The relay parent of the block these parachain blocks belong to.
	pub relay_parent: Hash,
	/// The blocks themselves.
	pub blocks: Vec<(CandidateReceipt, Option<(BlockData, AvailableMessages)>)>,
	/// A sender to signal the result asynchronously.
	pub result: oneshot::Sender<Result<(), Error>>,
}

/// Listen gossip for these chunks.
#[derive(Debug)]
pub(crate) struct ListenForChunks {
	/// The relay parent of the block the chunks from we want to listen to.
	pub relay_parent: Hash,
	/// The hash of the candidate chunk belongs to.
	pub candidate_hash: Hash,
	/// The index of the chunk we need.
	pub index: u32,
	/// A sender to signal the result asynchronously.
	pub result: Option<oneshot::Sender<Result<(), Error>>>,
}

/// We have received some chunks.
#[derive(Debug)]
pub(crate) struct Chunks {
	/// The relay parent of the block these chunks belong to.
	pub relay_parent: Hash,
	/// The hash of the parachain candidate these chunks belong to.
	pub candidate_hash: Hash,
	/// The chunks.
	pub chunks: Vec<ErasureChunk>,
	/// A sender to signal the result asynchronously.
	pub result: oneshot::Sender<Result<(), Error>>,
}

/// These candidates have been finalized, so unneded availability may be now pruned
#[derive(Debug)]
pub(crate) struct CandidatesFinalized {
	/// The relay parent of the block that was finalized.
	relay_parent: Hash,
	/// The parachain heads that were finalized in this block.
	candidate_hashes: Vec<Hash>,
}

/// The message that corresponds to `make_available` call of the crate API.
#[derive(Debug)]
pub(crate) struct MakeAvailable {
	/// The data being made available.
	pub data: Data,
	/// A sender to signal the result asynchronously.
	pub result: oneshot::Sender<Result<(), Error>>,
}

/// An availability worker with it's inner state.
pub(super) struct Worker<PGM> {
	availability_store: Store,
	provide_gossip_messages: PGM,
	registered_gossip_streams: HashMap<Hash, exit_future::Signal>,

	sender: mpsc::UnboundedSender<WorkerMsg>,
}

/// The handle to the `Worker`.
pub(super) struct WorkerHandle {
	exit_signal: Option<exit_future::Signal>,
	thread: Option<thread::JoinHandle<io::Result<()>>>,
	sender: mpsc::UnboundedSender<WorkerMsg>,
}

impl WorkerHandle {
	pub(crate) fn to_worker(&self) -> &mpsc::UnboundedSender<WorkerMsg> {
		&self.sender
	}
}

impl Drop for WorkerHandle {
	fn drop(&mut self) {
		if let Some(signal) = self.exit_signal.take() {
Ashley's avatar
Ashley committed
			let _ = signal.fire();
		}

		if let Some(thread) = self.thread.take() {
			if let Err(_) = thread.join() {
				error!(target: LOG_TARGET, "Errored stopping the thread");
			}
		}
	}
}

async fn listen_for_chunks<PGM, S>(
	p: PGM,
	topic: Hash,
	mut sender: S
)
where
	PGM: ProvideGossipMessages,
	S: Sink<WorkerMsg> + Unpin,
{
	trace!(target: LOG_TARGET, "Registering gossip listener for topic {}", topic);
	let mut chunks_stream = p.gossip_messages_for(topic);

	while let Some(item) = chunks_stream.next().await {
		let (s, _) = oneshot::channel();
		trace!(target: LOG_TARGET, "Received for {:?}", item);
		let chunks = Chunks {
			relay_parent: item.0,
			candidate_hash: item.1,
			chunks: vec![item.2],
			result: s,
		};

Loading full blame...