From 3e17fcfb3d1f76498a3d2507d72aa4a2c47ee4ed Mon Sep 17 00:00:00 2001 From: Robert Habermeier <rphmeier@gmail.com> Date: Fri, 17 Jan 2020 10:28:19 +0100 Subject: [PATCH] Validation service refactoring (#773) * add some more docs about statement import * instantiate environment async * move attestation service into subfolder * refactor validation service architecture somewhat * remove dependence on validation service in proposer * fix a bunch of warnings * improve docs * introduce a builder for the validation service * extract block production to its own file * integrate new API into service * address review grumbles --- polkadot/service/src/lib.rs | 23 +- polkadot/statement-table/src/generic.rs | 3 + .../validation/src/attestation_service.rs | 154 ----- polkadot/validation/src/block_production.rs | 354 ++++++++++ polkadot/validation/src/error.rs | 2 + polkadot/validation/src/evaluation.rs | 3 +- polkadot/validation/src/lib.rs | 639 +----------------- polkadot/validation/src/shared_table/mod.rs | 9 +- .../validation/src/validation_service/mod.rs | 463 +++++++++++++ 9 files changed, 860 insertions(+), 790 deletions(-) delete mode 100644 polkadot/validation/src/attestation_service.rs create mode 100644 polkadot/validation/src/block_production.rs create mode 100644 polkadot/validation/src/validation_service/mod.rs diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index f3872c6663b..40c052ef827 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -404,21 +404,28 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(config: Configuration) service.client(), WrappedExecutor(service.spawn_task_handle()), ); + + let (validation_service_handle, validation_service) = consensus::ServiceBuilder { + client: client.clone(), + network: validation_network.clone(), + collators: validation_network, + task_executor: Arc::new(WrappedExecutor(service.spawn_task_handle())), + availability_store: availability_store.clone(), + select_chain: select_chain.clone(), + keystore: service.keystore(), + max_block_data_size, + }.build(); + + service.spawn_essential_task(Box::pin(validation_service)); + let proposer = consensus::ProposerFactory::new( client.clone(), - select_chain.clone(), - validation_network.clone(), - validation_network, service.transaction_pool(), - Arc::new(WrappedExecutor(service.spawn_task_handle())), - service.keystore(), - availability_store.clone(), + validation_service_handle, slot_duration, - max_block_data_size, backend, ); - let client = service.client(); let select_chain = service.select_chain().ok_or(ServiceError::SelectChainRequired)?; let can_author_with = consensus_common::CanAuthorWithNativeVersion::new(client.executor().clone()); diff --git a/polkadot/statement-table/src/generic.rs b/polkadot/statement-table/src/generic.rs index f429e59960e..83cae9daf26 100644 --- a/polkadot/statement-table/src/generic.rs +++ b/polkadot/statement-table/src/generic.rs @@ -354,6 +354,9 @@ impl<C: Context> Table<C> { /// Import a signed statement. Signatures should be checked for validity, and the /// sender should be checked to actually be an authority. /// + /// Validity and invalidity statements are only valid if the corresponding + /// candidate has already been imported. + /// /// If this returns `None`, the statement was either duplicate or invalid. pub fn import_statement( &mut self, diff --git a/polkadot/validation/src/attestation_service.rs b/polkadot/validation/src/attestation_service.rs deleted file mode 100644 index a5087f1cd40..00000000000 --- a/polkadot/validation/src/attestation_service.rs +++ /dev/null @@ -1,154 +0,0 @@ -// Copyright 2017-2020 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. - -//! Attestation service. - -/// Attestation service. A long running service that creates and manages parachain attestation -/// instances. -/// -/// This uses a handle to an underlying thread pool to dispatch heavy work -/// such as candidate verification while performing event-driven work -/// on a local event loop. - -use std::{thread, time::Duration, sync::Arc}; - -use sc_client_api::{BlockchainEvents, BlockBody}; -use sp_blockchain::HeaderBackend; -use block_builder::BlockBuilderApi; -use consensus::SelectChain; -use futures::prelude::*; -use futures::{future::{ready, select}, task::{Spawn, SpawnExt}}; -use polkadot_primitives::Block; -use polkadot_primitives::parachain::ParachainHost; -use babe_primitives::BabeApi; -use keystore::KeyStorePtr; -use sp_api::{ApiExt, ProvideRuntimeApi}; -use runtime_primitives::traits::HasherFor; - -use tokio::{runtime::Runtime as LocalRuntime}; -use log::{warn, error}; - -use super::{Network, Collators}; - -type TaskExecutor = Arc<dyn Spawn + Send + Sync>; - -/// Parachain candidate attestation service handle. -pub(crate) struct ServiceHandle { - thread: Option<thread::JoinHandle<()>>, - exit_signal: Option<::exit_future::Signal>, -} - -/// Create and start a new instance of the attestation service. -pub(crate) fn start<C, N, P, SC>( - client: Arc<P>, - select_chain: SC, - parachain_validation: Arc<crate::ParachainValidation<C, N, P>>, - thread_pool: TaskExecutor, - keystore: KeyStorePtr, - max_block_data_size: Option<u64>, -) -> ServiceHandle - where - C: Collators + Send + Sync + Unpin + 'static, - C::Collation: Send + Unpin + 'static, - P: BlockchainEvents<Block> + BlockBody<Block>, - P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static, - P::Api: ParachainHost<Block> + - BlockBuilderApi<Block> + - BabeApi<Block> + - ApiExt<Block, Error = sp_blockchain::Error>, - N: Network + Send + Sync + 'static, - N::TableRouter: Send + 'static, - N::BuildTableRouter: Send + Unpin + 'static, - SC: SelectChain<Block> + 'static, - // Rust bug: https://github.com/rust-lang/rust/issues/24159 - sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>, -{ - const TIMER_INTERVAL: Duration = Duration::from_secs(30); - - let (signal, exit) = ::exit_future::signal(); - let thread = thread::spawn(move || { - let mut runtime = LocalRuntime::new().expect("Could not create local runtime"); - let notifications = { - let client = client.clone(); - let validation = parachain_validation.clone(); - - let keystore = keystore.clone(); - - let notifications = client.import_notification_stream() - .for_each(move |notification| { - let parent_hash = notification.hash; - if notification.is_new_best { - let res = validation.get_or_instantiate( - parent_hash, - &keystore, - max_block_data_size, - ); - - if let Err(e) = res { - warn!( - "Unable to start parachain validation on top of {:?}: {}", - parent_hash, e - ); - } - } - ready(()) - }); - - select(notifications, exit.clone()) - }; - - let prune_old_sessions = { - let select_chain = select_chain.clone(); - let interval = crate::interval(TIMER_INTERVAL) - .for_each(move |_| match select_chain.leaves() { - Ok(leaves) => { - parachain_validation.retain(|h| leaves.contains(h)); - ready(()) - } - Err(e) => { - warn!("Error fetching leaves from client: {:?}", e); - ready(()) - } - }); - - select(interval, exit.clone()).map(|_| ()) - }; - - runtime.spawn(notifications); - if let Err(_) = thread_pool.spawn(prune_old_sessions) { - error!("Failed to spawn old sessions pruning task"); - } - - runtime.block_on(exit); - }); - - ServiceHandle { - thread: Some(thread), - exit_signal: Some(signal), - } -} - -impl Drop for ServiceHandle { - fn drop(&mut self) { - if let Some(signal) = self.exit_signal.take() { - let _ = signal.fire(); - } - - if let Some(thread) = self.thread.take() { - thread.join().expect("The service thread has panicked"); - } - } -} diff --git a/polkadot/validation/src/block_production.rs b/polkadot/validation/src/block_production.rs new file mode 100644 index 00000000000..6d4e7363899 --- /dev/null +++ b/polkadot/validation/src/block_production.rs @@ -0,0 +1,354 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +//! The block production pipeline of Polkadot. +//! +//! The `ProposerFactory` exported by this module will be wrapped by some +//! consensus engine, and triggered when it is time to create a block. + +use std::{ + pin::Pin, + sync::Arc, + time::{self, Duration, Instant}, +}; + +use sp_blockchain::HeaderBackend; +use block_builder::BlockBuilderApi; +use codec::Encode; +use consensus::{Proposal, RecordProof}; +use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header}; +use polkadot_primitives::parachain::{ + ParachainHost, AttestedCandidate, NEW_HEADS_IDENTIFIER, +}; +use runtime_primitives::traits::{DigestFor, HasherFor}; +use futures_timer::Delay; +use txpool_api::{TransactionPool, InPoolTransaction}; + +use futures::prelude::*; +use inherents::InherentData; +use sp_timestamp::TimestampInherentData; +use log::{info, debug, trace}; +use sp_api::{ApiExt, ProvideRuntimeApi}; + +use crate::validation_service::ServiceHandle; +use crate::dynamic_inclusion::DynamicInclusion; +use crate::Error; + +// Polkadot proposer factory. +pub struct ProposerFactory<Client, TxPool, Backend> { + client: Arc<Client>, + transaction_pool: Arc<TxPool>, + service_handle: ServiceHandle, + babe_slot_duration: u64, + backend: Arc<Backend>, +} + +impl<Client, TxPool, Backend> ProposerFactory<Client, TxPool, Backend> { + /// Create a new proposer factory. + pub fn new( + client: Arc<Client>, + transaction_pool: Arc<TxPool>, + service_handle: ServiceHandle, + babe_slot_duration: u64, + backend: Arc<Backend>, + ) -> Self { + ProposerFactory { + client, + transaction_pool, + service_handle: service_handle, + babe_slot_duration, + backend, + } + } +} + +impl<Client, TxPool, Backend> consensus::Environment<Block> + for ProposerFactory<Client, TxPool, Backend> +where + TxPool: TransactionPool<Block=Block> + 'static, + Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static, + Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + + ApiExt<Block, Error = sp_blockchain::Error>, + Backend: sc_client_api::Backend< + Block, + State = sp_api::StateBackendFor<Client, Block> + > + 'static, + // Rust bug: https://github.com/rust-lang/rust/issues/24159 + sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send, +{ + type CreateProposer = Pin<Box< + dyn Future<Output = Result<Self::Proposer, Self::Error>> + Send + 'static + >>; + type Proposer = Proposer<Client, TxPool, Backend>; + type Error = Error; + + fn init( + &mut self, + parent_header: &Header, + ) -> Self::CreateProposer { + let parent_hash = parent_header.hash(); + let parent_number = parent_header.number; + let parent_id = BlockId::hash(parent_hash); + + let client = self.client.clone(); + let transaction_pool = self.transaction_pool.clone(); + let backend = self.backend.clone(); + let slot_duration = self.babe_slot_duration.clone(); + + let maybe_proposer = self.service_handle + .clone() + .get_validation_instance(parent_hash) + .and_then(move |tracker| future::ready(Ok(Proposer { + client, + tracker, + parent_hash, + parent_id, + parent_number, + transaction_pool, + slot_duration, + backend, + }))); + + Box::pin(maybe_proposer) + } +} + +/// The Polkadot proposer logic. +pub struct Proposer<Client, TxPool, Backend> { + client: Arc<Client>, + parent_hash: Hash, + parent_id: BlockId, + parent_number: BlockNumber, + tracker: Arc<crate::validation_service::ValidationInstanceHandle>, + transaction_pool: Arc<TxPool>, + slot_duration: u64, + backend: Arc<Backend>, +} + +impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, TxPool, Backend> where + TxPool: TransactionPool<Block=Block> + 'static, + Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static, + Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>, + Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static, + // Rust bug: https://github.com/rust-lang/rust/issues/24159 + sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send, +{ + type Error = Error; + type Transaction = sp_api::TransactionFor<Client, Block>; + type Proposal = Pin< + Box< + dyn Future<Output = Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error>> + + Send + > + >; + + fn propose(&mut self, + inherent_data: InherentData, + inherent_digests: DigestFor<Block>, + max_duration: Duration, + record_proof: RecordProof, + ) -> Self::Proposal { + const SLOT_DURATION_DENOMINATOR: u64 = 3; // wait up to 1/3 of the slot for candidates. + + let initial_included = self.tracker.table().includable_count(); + let now = Instant::now(); + + let dynamic_inclusion = DynamicInclusion::new( + self.tracker.table().num_parachains(), + self.tracker.started(), + Duration::from_millis(self.slot_duration / SLOT_DURATION_DENOMINATOR), + ); + + let parent_hash = self.parent_hash.clone(); + let parent_number = self.parent_number.clone(); + let parent_id = self.parent_id.clone(); + let client = self.client.clone(); + let transaction_pool = self.transaction_pool.clone(); + let table = self.tracker.table().clone(); + let backend = self.backend.clone(); + + async move { + let enough_candidates = dynamic_inclusion.acceptable_in( + now, + initial_included, + ).unwrap_or_else(|| Duration::from_millis(1)); + + let believed_timestamp = match inherent_data.timestamp_inherent_data() { + Ok(timestamp) => timestamp, + Err(e) => return Err(Error::InherentError(e)), + }; + + let deadline_diff = max_duration - max_duration / 3; + let deadline = match Instant::now().checked_add(deadline_diff) { + None => return Err(Error::DeadlineComputeFailure(deadline_diff)), + Some(d) => d, + }; + + let data = CreateProposalData { + parent_hash, + parent_number, + parent_id, + client, + transaction_pool, + table, + believed_minimum_timestamp: believed_timestamp, + inherent_data: Some(inherent_data), + inherent_digests, + // leave some time for the proposal finalisation + deadline, + record_proof, + backend, + }; + + // set up delay until next allowed timestamp. + let current_timestamp = current_timestamp(); + if current_timestamp < believed_timestamp { + Delay::new(Duration::from_millis(current_timestamp - believed_timestamp)) + .await; + } + + Delay::new(enough_candidates).await; + + tokio_executor::blocking::run(move || { + let proposed_candidates = data.table.proposed_set(); + data.propose_with(proposed_candidates) + }) + .await + }.boxed() + } +} + +fn current_timestamp() -> u64 { + time::SystemTime::now().duration_since(time::UNIX_EPOCH) + .expect("now always later than unix epoch; qed") + .as_millis() as u64 +} + +/// Inner data of the create proposal. +struct CreateProposalData<Client, TxPool, Backend> { + parent_hash: Hash, + parent_number: BlockNumber, + parent_id: BlockId, + client: Arc<Client>, + transaction_pool: Arc<TxPool>, + table: Arc<crate::SharedTable>, + believed_minimum_timestamp: u64, + inherent_data: Option<InherentData>, + inherent_digests: DigestFor<Block>, + deadline: Instant, + record_proof: RecordProof, + backend: Arc<Backend>, +} + +impl<Client, TxPool, Backend> CreateProposalData<Client, TxPool, Backend> where + TxPool: TransactionPool<Block=Block>, + Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync, + Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>, + Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static, + // Rust bug: https://github.com/rust-lang/rust/issues/24159 + sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send, +{ + fn propose_with( + mut self, + candidates: Vec<AttestedCandidate>, + ) -> Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error> { + use runtime_primitives::traits::{Hash as HashT, BlakeTwo256}; + + const MAX_TRANSACTIONS: usize = 40; + + let mut inherent_data = self.inherent_data + .take() + .expect("CreateProposal is not polled after finishing; qed"); + inherent_data.put_data(NEW_HEADS_IDENTIFIER, &candidates) + .map_err(Error::InherentError)?; + + let runtime_api = self.client.runtime_api(); + + let mut block_builder = block_builder::BlockBuilder::new( + &*self.client, + self.client.expect_block_hash_from_id(&self.parent_id)?, + self.client.expect_block_number_from_id(&self.parent_id)?, + self.record_proof, + self.inherent_digests.clone(), + &*self.backend, + )?; + + { + let inherents = runtime_api.inherent_extrinsics(&self.parent_id, inherent_data)?; + for inherent in inherents { + block_builder.push(inherent)?; + } + + let mut unqueue_invalid = Vec::new(); + let mut pending_size = 0; + + let ready_iter = self.transaction_pool.ready(); + for ready in ready_iter.take(MAX_TRANSACTIONS) { + let encoded_size = ready.data().encode().len(); + if pending_size + encoded_size >= crate::evaluation::MAX_TRANSACTIONS_SIZE { + break; + } + if Instant::now() > self.deadline { + debug!("Consensus deadline reached when pushing block transactions, proceeding with proposing."); + break; + } + + match block_builder.push(ready.data().clone()) { + Ok(()) => { + debug!("[{:?}] Pushed to the block.", ready.hash()); + pending_size += encoded_size; + } + Err(sp_blockchain::Error::ApplyExtrinsicFailed(sp_blockchain::ApplyExtrinsicFailed::Validity(e))) + if e.exhausted_resources() => + { + debug!("Block is full, proceed with proposing."); + break; + } + Err(e) => { + trace!(target: "transaction-pool", "Invalid transaction: {}", e); + unqueue_invalid.push(ready.hash().clone()); + } + } + } + + self.transaction_pool.remove_invalid(&unqueue_invalid); + } + + let (new_block, storage_changes, proof) = block_builder.build()?.into_inner(); + + info!("Prepared block for proposing at {} [hash: {:?}; parent_hash: {}; extrinsics: [{}]]", + new_block.header.number, + Hash::from(new_block.header.hash()), + new_block.header.parent_hash, + new_block.extrinsics.iter() + .map(|xt| format!("{}", BlakeTwo256::hash_of(xt))) + .collect::<Vec<_>>() + .join(", ") + ); + + // TODO: full re-evaluation (https://github.com/paritytech/polkadot/issues/216) + let active_parachains = runtime_api.active_parachains(&self.parent_id)?; + assert!(crate::evaluation::evaluate_initial( + &new_block, + self.believed_minimum_timestamp, + &self.parent_hash, + self.parent_number, + &active_parachains[..], + ).is_ok()); + + Ok(Proposal { block: new_block, storage_changes, proof }) + } +} diff --git a/polkadot/validation/src/error.rs b/polkadot/validation/src/error.rs index 1890863a077..9ba4922eeb7 100644 --- a/polkadot/validation/src/error.rs +++ b/polkadot/validation/src/error.rs @@ -46,6 +46,8 @@ pub enum Error { Timer(std::io::Error), #[display(fmt = "Failed to compute deadline of now + {:?}", _0)] DeadlineComputeFailure(std::time::Duration), + #[display(fmt = "Validation service is down.")] + ValidationServiceDown, Join(tokio::task::JoinError) } diff --git a/polkadot/validation/src/evaluation.rs b/polkadot/validation/src/evaluation.rs index 18e0d9829af..76e7e9f8fb5 100644 --- a/polkadot/validation/src/evaluation.rs +++ b/polkadot/validation/src/evaluation.rs @@ -16,7 +16,8 @@ //! Polkadot block evaluation and evaluation errors. -use super::MAX_TRANSACTIONS_SIZE; +// block size limit. +pub(crate) const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024; use codec::Encode; use polkadot_primitives::{Block, Hash, BlockNumber}; diff --git a/polkadot/validation/src/lib.rs b/polkadot/validation/src/lib.rs index 860c1a6f365..41f2552aebd 100644 --- a/polkadot/validation/src/lib.rs +++ b/polkadot/validation/src/lib.rs @@ -31,50 +31,22 @@ use std::{ collections::{HashMap, HashSet}, - pin::Pin, sync::Arc, - time::{self, Duration, Instant}, }; -use babe_primitives::BabeApi; -use sc_client_api::{Backend, BlockchainEvents, BlockBody}; -use sp_blockchain::HeaderBackend; -use block_builder::BlockBuilderApi; use codec::Encode; -use consensus::{SelectChain, Proposal, RecordProof}; -use availability_store::Store as AvailabilityStore; -use parking_lot::Mutex; -use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header}; +use polkadot_primitives::Hash; use polkadot_primitives::parachain::{ Id as ParaId, Chain, DutyRoster, CandidateReceipt, - ParachainHost, AttestedCandidate, Statement as PrimitiveStatement, Message, OutgoingMessages, + Statement as PrimitiveStatement, Message, OutgoingMessages, Collation, PoVBlock, ErasureChunk, ValidatorSignature, ValidatorIndex, - ValidatorPair, ValidatorId, NEW_HEADS_IDENTIFIER, + ValidatorPair, ValidatorId, }; use primitives::Pair; -use runtime_primitives::traits::{DigestFor, HasherFor}; -use futures_timer::Delay; -use txpool_api::{TransactionPool, InPoolTransaction}; -use attestation_service::ServiceHandle; use futures::prelude::*; -use futures::{future::{select, ready}, stream::unfold, task::{Spawn, SpawnExt}}; -use collation::collation_fetch; -use dynamic_inclusion::DynamicInclusion; -use inherents::InherentData; -use sp_timestamp::TimestampInherentData; -use log::{info, debug, warn, trace, error}; -use keystore::KeyStorePtr; -use sp_api::{ApiExt, ProvideRuntimeApi}; - -type TaskExecutor = Arc<dyn Spawn + Send + Sync>; - -fn interval(duration: Duration) -> impl Stream<Item=()> + Send + Unpin { - unfold((), move |_| { - futures_timer::Delay::new(duration).map(|_| Some(((), ()))) - }).map(drop) -} +pub use self::block_production::ProposerFactory; pub use self::collation::{ validate_collation, validate_incoming, message_queue_root, egress_roots, Collators, produce_receipt_and_chunks, @@ -84,20 +56,19 @@ pub use self::shared_table::{ SharedTable, ParachainWork, PrimedParachainWork, Validated, Statement, SignedStatement, GenericStatement, }; +pub use self::validation_service::{ServiceHandle, ServiceBuilder}; #[cfg(not(target_os = "unknown"))] pub use parachain::wasm_executor::{run_worker as run_validation_worker}; -mod attestation_service; mod dynamic_inclusion; mod evaluation; mod error; mod shared_table; pub mod collation; - -// block size limit. -const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024; +pub mod validation_service; +pub mod block_production; /// Incoming messages; a series of sorted (ParaId, Message) pairs. pub type Incoming = Vec<(ParaId, Vec<Message>)>; @@ -148,6 +119,13 @@ pub trait Network { ) -> Self::BuildTableRouter; } +/// The local duty of a validator. +#[derive(Debug)] +pub struct LocalDuty { + validation: Chain, + index: ValidatorIndex, +} + /// Information about a specific group. #[derive(Debug, Clone, Default)] pub struct GroupInfo { @@ -246,595 +224,6 @@ pub fn outgoing_queues(outgoing_targeted: &'_ OutgoingMessages) }) } -// finds the first key we are capable of signing with out of the given set of validators, -// if any. -fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc<ValidatorPair>> { - let keystore = keystore.read(); - validators.iter() - .find_map(|v| { - keystore.key_pair::<ValidatorPair>(&v).ok() - }) - .map(|pair| Arc::new(pair)) -} - -/// Constructs parachain-agreement instances. -struct ParachainValidation<C, N, P> { - /// The client instance. - client: Arc<P>, - /// The backing network handle. - network: N, - /// Parachain collators. - collators: C, - /// handle to remote task executor - handle: TaskExecutor, - /// Store for extrinsic data. - availability_store: AvailabilityStore, - /// Live agreements. Maps relay chain parent hashes to attestation - /// instances. - live_instances: Mutex<HashMap<Hash, Arc<AttestationTracker>>>, -} - -impl<C, N, P> ParachainValidation<C, N, P> where - C: Collators + Send + Unpin + 'static + Sync, - N: Network, - P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static, - P::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>, - C::Collation: Send + Unpin + 'static, - N::TableRouter: Send + 'static, - N::BuildTableRouter: Unpin + Send + 'static, - // Rust bug: https://github.com/rust-lang/rust/issues/24159 - sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>, -{ - /// Get an attestation table for given parent hash. - /// - /// This starts a parachain agreement process on top of the parent hash if - /// one has not already started. - /// - /// Additionally, this will trigger broadcast of data to the new block's duty - /// roster. - fn get_or_instantiate( - &self, - parent_hash: Hash, - keystore: &KeyStorePtr, - max_block_data_size: Option<u64>, - ) - -> Result<Arc<AttestationTracker>, Error> - { - let mut live_instances = self.live_instances.lock(); - if let Some(tracker) = live_instances.get(&parent_hash) { - return Ok(tracker.clone()); - } - - let id = BlockId::hash(parent_hash); - - let validators = self.client.runtime_api().validators(&id)?; - let sign_with = signing_key(&validators[..], keystore); - - let duty_roster = self.client.runtime_api().duty_roster(&id)?; - - let (group_info, local_duty) = make_group_info( - duty_roster, - &validators, - sign_with.as_ref().map(|k| k.public()), - )?; - - info!( - "Starting parachain attestation session on top of parent {:?}. Local parachain duty is {:?}", - parent_hash, - local_duty, - ); - - let active_parachains = self.client.runtime_api().active_parachains(&id)?; - - debug!(target: "validation", "Active parachains: {:?}", active_parachains); - - // If we are a validator, we need to store our index in this round in availability store. - // This will tell which erasure chunk we should store. - if let Some(ref local_duty) = local_duty { - if let Err(e) = self.availability_store.add_validator_index_and_n_validators( - &parent_hash, - local_duty.index, - validators.len() as u32, - ) { - warn!( - target: "validation", - "Failed to add validator index and n_validators to the availability-store: {:?}", e - ) - } - } - - let table = Arc::new(SharedTable::new( - validators.clone(), - group_info, - sign_with, - parent_hash, - self.availability_store.clone(), - max_block_data_size, - )); - - let (_drop_signal, exit) = exit_future::signal(); - - let router = self.network.communication_for( - table.clone(), - &validators, - exit.clone(), - ); - - if let Some((Chain::Parachain(id), index)) = local_duty.as_ref().map(|d| (d.validation, d.index)) { - self.launch_work(parent_hash, id, router, max_block_data_size, validators.len(), index, exit); - } - - let tracker = Arc::new(AttestationTracker { - table, - started: Instant::now(), - _drop_signal, - }); - - live_instances.insert(parent_hash, tracker.clone()); - - Ok(tracker) - } - - /// Retain validation sessions matching predicate. - fn retain<F: FnMut(&Hash) -> bool>(&self, mut pred: F) { - self.live_instances.lock().retain(|k, _| pred(k)) - } - - // launch parachain work asynchronously. - fn launch_work( - &self, - relay_parent: Hash, - validation_para: ParaId, - build_router: N::BuildTableRouter, - max_block_data_size: Option<u64>, - authorities_num: usize, - local_id: ValidatorIndex, - exit: exit_future::Exit, - ) { - let (collators, client) = (self.collators.clone(), self.client.clone()); - let availability_store = self.availability_store.clone(); - - let with_router = move |router: N::TableRouter| { - // fetch a local collation from connected collators. - let collation_work = collation_fetch( - validation_para, - relay_parent, - collators, - client.clone(), - max_block_data_size, - ); - - collation_work.map(move |result| match result { - Ok((collation, outgoing_targeted, fees_charged)) => { - match produce_receipt_and_chunks( - authorities_num, - &collation.pov, - &outgoing_targeted, - fees_charged, - &collation.info, - ) { - Ok((receipt, chunks)) => { - // Apparently the `async move` block is the only way to convince - // the compiler that we are not moving values out of borrowed context. - let av_clone = availability_store.clone(); - let chunks_clone = chunks.clone(); - let receipt_clone = receipt.clone(); - - let res = async move { - if let Err(e) = av_clone.clone().add_erasure_chunks( - relay_parent.clone(), - receipt_clone, - chunks_clone, - ).await { - warn!(target: "validation", "Failed to add erasure chunks: {}", e); - } - } - .unit_error() - .boxed() - .then(move |_| { - router.local_collation(collation, receipt, outgoing_targeted, (local_id, &chunks)); - ready(()) - }); - - Ok(Some(res)) - } - Err(e) => { - warn!(target: "validation", "Failed to produce a receipt: {:?}", e); - Ok(None) - } - } - } - Err(e) => { - warn!(target: "validation", "Failed to collate candidate: {:?}", e); - Ok(None) - } - }) - }; - - let router = build_router - .map_ok(with_router) - .map_err(|e| { - warn!(target: "validation" , "Failed to build table router: {:?}", e); - }) - .and_then(|f| f) - .and_then(|f| match f { - Some(f) => f.map(Ok).boxed(), - None => ready(Ok(())).boxed(), - }).boxed(); - - let cancellable_work = select(exit, router).map(drop); - - // spawn onto thread pool. - if self.handle.spawn(cancellable_work).is_err() { - error!("Failed to spawn cancellable work task"); - } - } -} - -/// Parachain validation for a single block. -struct AttestationTracker { - _drop_signal: exit_future::Signal, - table: Arc<SharedTable>, - started: Instant, -} - -/// Polkadot proposer factory. -pub struct ProposerFactory<C, N, P, SC, TxPool, B> { - parachain_validation: Arc<ParachainValidation<C, N, P>>, - transaction_pool: Arc<TxPool>, - keystore: KeyStorePtr, - _service_handle: ServiceHandle, - babe_slot_duration: u64, - _select_chain: SC, - max_block_data_size: Option<u64>, - backend: Arc<B>, -} - -impl<C, N, P, SC, TxPool, B> ProposerFactory<C, N, P, SC, TxPool, B> where - C: Collators + Send + Sync + Unpin + 'static, - C::Collation: Send + Unpin + 'static, - P: BlockchainEvents<Block> + BlockBody<Block>, - P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static, - P::Api: ParachainHost<Block> + - BlockBuilderApi<Block> + - BabeApi<Block> + - ApiExt<Block, Error = sp_blockchain::Error>, - N: Network + Send + Sync + 'static, - N::TableRouter: Send + 'static, - N::BuildTableRouter: Send + Unpin + 'static, - TxPool: TransactionPool, - SC: SelectChain<Block> + 'static, - // Rust bug: https://github.com/rust-lang/rust/issues/24159 - sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>, -{ - /// Create a new proposer factory. - pub fn new( - client: Arc<P>, - _select_chain: SC, - network: N, - collators: C, - transaction_pool: Arc<TxPool>, - thread_pool: TaskExecutor, - keystore: KeyStorePtr, - availability_store: AvailabilityStore, - babe_slot_duration: u64, - max_block_data_size: Option<u64>, - backend: Arc<B>, - ) -> Self { - let parachain_validation = Arc::new(ParachainValidation { - client: client.clone(), - network, - collators, - handle: thread_pool.clone(), - availability_store: availability_store.clone(), - live_instances: Mutex::new(HashMap::new()), - }); - - let service_handle = crate::attestation_service::start( - client, - _select_chain.clone(), - parachain_validation.clone(), - thread_pool, - keystore.clone(), - max_block_data_size, - ); - - ProposerFactory { - parachain_validation, - transaction_pool, - keystore, - _service_handle: service_handle, - babe_slot_duration, - _select_chain, - max_block_data_size, - backend, - } - } -} - -impl<C, N, P, SC, TxPool, B> consensus::Environment<Block> for ProposerFactory<C, N, P, SC, TxPool, B> where - C: Collators + Send + Unpin + 'static + Sync, - N: Network, - TxPool: TransactionPool<Block=Block> + 'static, - P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static, - P::Api: ParachainHost<Block> + - BlockBuilderApi<Block> + - BabeApi<Block> + - ApiExt<Block, Error = sp_blockchain::Error>, - C::Collation: Send + Unpin + 'static, - N::TableRouter: Send + 'static, - N::BuildTableRouter: Send + Unpin + 'static, - SC: SelectChain<Block>, - B: Backend<Block, State = sp_api::StateBackendFor<P, Block>> + 'static, - // Rust bug: https://github.com/rust-lang/rust/issues/24159 - sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>> + Send, -{ - type CreateProposer = Pin<Box< - dyn Future<Output = Result<Self::Proposer, Self::Error>> + Send + Unpin + 'static - >>; - type Proposer = Proposer<P, TxPool, B>; - type Error = Error; - - fn init( - &mut self, - parent_header: &Header, - ) -> Self::CreateProposer { - let parent_hash = parent_header.hash(); - let parent_id = BlockId::hash(parent_hash); - - let maybe_proposer = self.parachain_validation.get_or_instantiate( - parent_hash, - &self.keystore, - self.max_block_data_size, - ).map(|tracker| Proposer { - client: self.parachain_validation.client.clone(), - tracker, - parent_hash, - parent_id, - parent_number: parent_header.number, - transaction_pool: self.transaction_pool.clone(), - slot_duration: self.babe_slot_duration, - backend: self.backend.clone(), - }); - - Box::pin(future::ready(maybe_proposer)) - } -} - -/// The local duty of a validator. -#[derive(Debug)] -pub struct LocalDuty { - validation: Chain, - index: ValidatorIndex, -} - -/// The Polkadot proposer logic. -pub struct Proposer<Client, TxPool, Backend> { - client: Arc<Client>, - parent_hash: Hash, - parent_id: BlockId, - parent_number: BlockNumber, - tracker: Arc<AttestationTracker>, - transaction_pool: Arc<TxPool>, - slot_duration: u64, - backend: Arc<Backend>, -} - -impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, TxPool, Backend> where - TxPool: TransactionPool<Block=Block> + 'static, - Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static, - Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>, - Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static, - // Rust bug: https://github.com/rust-lang/rust/issues/24159 - sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send, -{ - type Error = Error; - type Transaction = sp_api::TransactionFor<Client, Block>; - type Proposal = Pin< - Box< - dyn Future<Output = Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error>> - + Send - > - >; - - fn propose(&mut self, - inherent_data: InherentData, - inherent_digests: DigestFor<Block>, - max_duration: Duration, - record_proof: RecordProof, - ) -> Self::Proposal { - const SLOT_DURATION_DENOMINATOR: u64 = 3; // wait up to 1/3 of the slot for candidates. - - let initial_included = self.tracker.table.includable_count(); - let now = Instant::now(); - - let dynamic_inclusion = DynamicInclusion::new( - self.tracker.table.num_parachains(), - self.tracker.started, - Duration::from_millis(self.slot_duration / SLOT_DURATION_DENOMINATOR), - ); - - let parent_hash = self.parent_hash.clone(); - let parent_number = self.parent_number.clone(); - let parent_id = self.parent_id.clone(); - let client = self.client.clone(); - let transaction_pool = self.transaction_pool.clone(); - let table = self.tracker.table.clone(); - let backend = self.backend.clone(); - - async move { - let enough_candidates = dynamic_inclusion.acceptable_in( - now, - initial_included, - ).unwrap_or_else(|| Duration::from_millis(1)); - - let believed_timestamp = match inherent_data.timestamp_inherent_data() { - Ok(timestamp) => timestamp, - Err(e) => return Err(Error::InherentError(e)), - }; - - let deadline_diff = max_duration - max_duration / 3; - let deadline = match Instant::now().checked_add(deadline_diff) { - None => return Err(Error::DeadlineComputeFailure(deadline_diff)), - Some(d) => d, - }; - - let data = CreateProposalData { - parent_hash, - parent_number, - parent_id, - client, - transaction_pool, - table, - believed_minimum_timestamp: believed_timestamp, - inherent_data: Some(inherent_data), - inherent_digests, - // leave some time for the proposal finalisation - deadline, - record_proof, - backend, - }; - - // set up delay until next allowed timestamp. - let current_timestamp = current_timestamp(); - if current_timestamp < believed_timestamp { - Delay::new(Duration::from_millis(current_timestamp - believed_timestamp)) - .await; - } - - Delay::new(enough_candidates).await; - - tokio_executor::blocking::run(move || { - let proposed_candidates = data.table.proposed_set(); - data.propose_with(proposed_candidates) - }) - .await - }.boxed() - } -} - -fn current_timestamp() -> u64 { - time::SystemTime::now().duration_since(time::UNIX_EPOCH) - .expect("now always later than unix epoch; qed") - .as_millis() as u64 -} - -/// Inner data of the create proposal. -struct CreateProposalData<Client, TxPool, Backend> { - parent_hash: Hash, - parent_number: BlockNumber, - parent_id: BlockId, - client: Arc<Client>, - transaction_pool: Arc<TxPool>, - table: Arc<SharedTable>, - believed_minimum_timestamp: u64, - inherent_data: Option<InherentData>, - inherent_digests: DigestFor<Block>, - deadline: Instant, - record_proof: RecordProof, - backend: Arc<Backend>, -} - -impl<Client, TxPool, Backend> CreateProposalData<Client, TxPool, Backend> where - TxPool: TransactionPool<Block=Block>, - Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync, - Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>, - Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static, - // Rust bug: https://github.com/rust-lang/rust/issues/24159 - sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send, -{ - fn propose_with( - mut self, - candidates: Vec<AttestedCandidate>, - ) -> Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error> { - use runtime_primitives::traits::{Hash as HashT, BlakeTwo256}; - - const MAX_TRANSACTIONS: usize = 40; - - let mut inherent_data = self.inherent_data - .take() - .expect("CreateProposal is not polled after finishing; qed"); - inherent_data.put_data(NEW_HEADS_IDENTIFIER, &candidates) - .map_err(Error::InherentError)?; - - let runtime_api = self.client.runtime_api(); - - let mut block_builder = block_builder::BlockBuilder::new( - &*self.client, - self.client.expect_block_hash_from_id(&self.parent_id)?, - self.client.expect_block_number_from_id(&self.parent_id)?, - self.record_proof, - self.inherent_digests.clone(), - &*self.backend, - )?; - - { - let inherents = runtime_api.inherent_extrinsics(&self.parent_id, inherent_data)?; - for inherent in inherents { - block_builder.push(inherent)?; - } - - let mut unqueue_invalid = Vec::new(); - let mut pending_size = 0; - - let ready_iter = self.transaction_pool.ready(); - for ready in ready_iter.take(MAX_TRANSACTIONS) { - let encoded_size = ready.data().encode().len(); - if pending_size + encoded_size >= MAX_TRANSACTIONS_SIZE { - break; - } - if Instant::now() > self.deadline { - debug!("Consensus deadline reached when pushing block transactions, proceeding with proposing."); - break; - } - - match block_builder.push(ready.data().clone()) { - Ok(()) => { - debug!("[{:?}] Pushed to the block.", ready.hash()); - pending_size += encoded_size; - } - Err(sp_blockchain::Error::ApplyExtrinsicFailed(sp_blockchain::ApplyExtrinsicFailed::Validity(e))) - if e.exhausted_resources() => - { - debug!("Block is full, proceed with proposing."); - break; - } - Err(e) => { - trace!(target: "transaction-pool", "Invalid transaction: {}", e); - unqueue_invalid.push(ready.hash().clone()); - } - } - } - - self.transaction_pool.remove_invalid(&unqueue_invalid); - } - - let (new_block, storage_changes, proof) = block_builder.build()?.into_inner(); - - info!("Prepared block for proposing at {} [hash: {:?}; parent_hash: {}; extrinsics: [{}]]", - new_block.header.number, - Hash::from(new_block.header.hash()), - new_block.header.parent_hash, - new_block.extrinsics.iter() - .map(|xt| format!("{}", BlakeTwo256::hash_of(xt))) - .collect::<Vec<_>>() - .join(", ") - ); - - // TODO: full re-evaluation (https://github.com/paritytech/polkadot/issues/216) - let active_parachains = runtime_api.active_parachains(&self.parent_id)?; - assert!(evaluation::evaluate_initial( - &new_block, - self.believed_minimum_timestamp, - &self.parent_hash, - self.parent_number, - &active_parachains[..], - ).is_ok()); - - Ok(Proposal { block: new_block, storage_changes, proof }) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/polkadot/validation/src/shared_table/mod.rs b/polkadot/validation/src/shared_table/mod.rs index ad5eca61b3d..a6c22d62745 100644 --- a/polkadot/validation/src/shared_table/mod.rs +++ b/polkadot/validation/src/shared_table/mod.rs @@ -431,7 +431,10 @@ impl SharedTable { /// Import a single statement with remote source, whose signature has already been checked. /// - /// The statement producer, if any, will produce only statements concerning the same candidate + /// Validity and invalidity statements are only valid if the corresponding + /// candidate has already been imported. + /// + /// The ParachainWork, if any, will produce only statements concerning the same candidate /// as the one just imported pub fn import_remote_statement<R: TableRouter>( &self, @@ -446,8 +449,10 @@ impl SharedTable { /// Import many statements at once. /// /// Provide an iterator yielding remote, pre-checked statements. + /// Validity and invalidity statements are only valid if the corresponding + /// candidate has already been imported. /// - /// The statement producer, if any, will produce only statements concerning the same candidate + /// The ParachainWork, if any, will produce only statements concerning the same candidate /// as the one just imported pub fn import_remote_statements<R, I, U>(&self, router: &R, iterable: I) -> U where diff --git a/polkadot/validation/src/validation_service/mod.rs b/polkadot/validation/src/validation_service/mod.rs new file mode 100644 index 00000000000..14dd85c72ee --- /dev/null +++ b/polkadot/validation/src/validation_service/mod.rs @@ -0,0 +1,463 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +//! The validation service is a long-running future that creates and manages parachain attestation +//! instances. +//! +//! As soon as we import a new chain head, we start a parachain attestation session on top of it. +//! The block authorship service may want access to the attestation session, and for that reason +//! we expose a `ServiceHandle` which can be used to request a copy of it. +//! +//! In fact, the import notification and request from the block production pipeline may race to be +//! the first one to create the instant, but the import notification will usually win. +//! +//! These attestation sessions are kept live until they are periodically garbage-collected. + +use std::{time::{Duration, Instant}, sync::Arc}; +use std::collections::HashMap; + +use sc_client_api::{BlockchainEvents, BlockBody}; +use sp_blockchain::HeaderBackend; +use block_builder::BlockBuilderApi; +use consensus::SelectChain; +use futures::prelude::*; +use futures::{future::{ready, select}, task::{Spawn, SpawnExt}}; +use polkadot_primitives::{Block, Hash, BlockId}; +use polkadot_primitives::parachain::{ + Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair, +}; +use babe_primitives::BabeApi; +use keystore::KeyStorePtr; +use sp_api::{ApiExt, ProvideRuntimeApi}; +use runtime_primitives::traits::HasherFor; +use availability_store::Store as AvailabilityStore; + +use log::{warn, error, info, debug}; + +use super::{Network, Collators, SharedTable, TableRouter}; +use crate::Error; + +/// A handle to spawn background tasks onto. +pub type TaskExecutor = Arc<dyn Spawn + Send + Sync>; + +// Remote processes may request for a validation instance to be cloned or instantiated. +// They send a oneshot channel. +type ValidationInstanceRequest = ( + Hash, + futures::channel::oneshot::Sender<Result<Arc<ValidationInstanceHandle>, Error>>, +); + +/// A handle to a single instance of parachain validation, which is pinned to +/// a specific relay-chain block. This is the instance that should be used when +/// constructing any +pub(crate) struct ValidationInstanceHandle { + _drop_signal: exit_future::Signal, + table: Arc<SharedTable>, + started: Instant, +} + +impl ValidationInstanceHandle { + /// Access the underlying table of attestations on parachain candidates. + pub(crate) fn table(&self) -> &Arc<SharedTable> { + &self.table + } + + /// The moment we started this validation instance. + pub(crate) fn started(&self) -> Instant { + self.started.clone() + } +} + +/// A handle to the service. This can be used to create a block-production environment. +#[derive(Clone)] +pub struct ServiceHandle { + sender: futures::channel::mpsc::Sender<ValidationInstanceRequest>, +} + +impl ServiceHandle { + /// Requests instantiation or cloning of a validation instance from the service. + /// + /// This can fail if the service task has shut down for some reason. + pub(crate) async fn get_validation_instance(self, relay_parent: Hash) + -> Result<Arc<ValidationInstanceHandle>, Error> + { + let mut sender = self.sender; + let instance_rx = loop { + let (instance_tx, instance_rx) = futures::channel::oneshot::channel(); + match sender.send((relay_parent, instance_tx)).await { + Ok(()) => break instance_rx, + Err(e) => if !e.is_full() { + // Sink::send should be doing `poll_ready` before start-send, + // so this should only happen when there is a race. + return Err(Error::ValidationServiceDown) + }, + } + }; + + instance_rx.map_err(|_| Error::ValidationServiceDown).await.and_then(|x| x) + } +} + +fn interval(duration: Duration) -> impl Stream<Item=()> + Send + Unpin { + stream::unfold((), move |_| { + futures_timer::Delay::new(duration).map(|_| Some(((), ()))) + }).map(drop) +} + +/// A builder for the validation service. +pub struct ServiceBuilder<C, N, P, SC> { + /// The underlying blockchain client. + pub client: Arc<P>, + /// A handle to the network object used to communicate. + pub network: N, + /// A handle to the collator pool we are using. + pub collators: C, + /// A handle to a background executor. + pub task_executor: TaskExecutor, + /// A handle to the availability store. + pub availability_store: AvailabilityStore, + /// A chain selector for determining active leaves in the block-DAG. + pub select_chain: SC, + /// The keystore which holds the signing keys. + pub keystore: KeyStorePtr, + /// The maximum block-data size in bytes. + pub max_block_data_size: Option<u64>, +} + +impl<C, N, P, SC> ServiceBuilder<C, N, P, SC> where + C: Collators + Send + Sync + Unpin + 'static, + C::Collation: Send + Unpin + 'static, + P: BlockchainEvents<Block> + BlockBody<Block>, + P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static, + P::Api: ParachainHost<Block> + + BlockBuilderApi<Block> + + BabeApi<Block> + + ApiExt<Block, Error = sp_blockchain::Error>, + N: Network + Send + Sync + 'static, + N::TableRouter: Send + 'static, + N::BuildTableRouter: Send + Unpin + 'static, + SC: SelectChain<Block> + 'static, + // Rust bug: https://github.com/rust-lang/rust/issues/24159 + sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>, +{ + /// Build the service - this consists of a handle to it, as well as a background + /// future to be run to completion. + pub fn build(self) -> (ServiceHandle, impl Future<Output = ()> + Send + 'static) { + const TIMER_INTERVAL: Duration = Duration::from_secs(30); + const CHAN_BUFFER: usize = 10; + + enum Message { + CollectGarbage, + // relay-parent, receiver for instance. + RequestInstance(ValidationInstanceRequest), + // new chain heads - import notification. + NotifyImport(sc_client_api::BlockImportNotification<Block>), + } + + let mut parachain_validation = ParachainValidationInstances { + client: self.client.clone(), + network: self.network, + collators: self.collators, + handle: self.task_executor, + availability_store: self.availability_store, + live_instances: HashMap::new(), + }; + + let client = self.client; + let select_chain = self.select_chain; + let keystore = self.keystore; + let max_block_data_size = self.max_block_data_size; + + let (tx, rx) = futures::channel::mpsc::channel(CHAN_BUFFER); + let interval = interval(TIMER_INTERVAL).map(|_| Message::CollectGarbage); + let import_notifications = client.import_notification_stream().map(Message::NotifyImport); + let instance_requests = rx.map(Message::RequestInstance); + let service = ServiceHandle { sender: tx }; + + let background_work = async move { + let message_stream = futures::stream::select(interval, instance_requests); + let mut message_stream = futures::stream::select(import_notifications, message_stream); + while let Some(message) = message_stream.next().await { + match message { + Message::CollectGarbage => { + match select_chain.leaves() { + Ok(leaves) => { + parachain_validation.retain(|h| leaves.contains(h)); + } + Err(e) => { + warn!("Error fetching leaves from client: {:?}", e); + } + } + } + Message::RequestInstance((relay_parent, sender)) => { + // Upstream will handle the failure case. + let _ = sender.send(parachain_validation.get_or_instantiate( + relay_parent, + &keystore, + max_block_data_size, + )); + } + Message::NotifyImport(notification) => { + let relay_parent = notification.hash; + if notification.is_new_best { + let res = parachain_validation.get_or_instantiate( + relay_parent, + &keystore, + max_block_data_size, + ); + + if let Err(e) = res { + warn!( + "Unable to start parachain validation on top of {:?}: {}", + relay_parent, e + ); + } + } + } + } + } + }; + + (service, background_work) + } +} + +// finds the first key we are capable of signing with out of the given set of validators, +// if any. +fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc<ValidatorPair>> { + let keystore = keystore.read(); + validators.iter() + .find_map(|v| { + keystore.key_pair::<ValidatorPair>(&v).ok() + }) + .map(|pair| Arc::new(pair)) +} + +/// Constructs parachain-agreement instances. +pub(crate) struct ParachainValidationInstances<C, N, P> { + /// The client instance. + client: Arc<P>, + /// The backing network handle. + network: N, + /// Parachain collators. + collators: C, + /// handle to remote task executor + handle: TaskExecutor, + /// Store for extrinsic data. + availability_store: AvailabilityStore, + /// Live agreements. Maps relay chain parent hashes to attestation + /// instances. + live_instances: HashMap<Hash, Arc<ValidationInstanceHandle>>, +} + +impl<C, N, P> ParachainValidationInstances<C, N, P> where + C: Collators + Send + Unpin + 'static, + N: Network, + P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static, + P::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>, + C::Collation: Send + Unpin + 'static, + N::TableRouter: Send + 'static, + N::BuildTableRouter: Unpin + Send + 'static, + // Rust bug: https://github.com/rust-lang/rust/issues/24159 + sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>, +{ + /// Get an attestation table for given parent hash. + /// + /// This starts a parachain agreement process on top of the parent hash if + /// one has not already started. + /// + /// Additionally, this will trigger broadcast of data to the new block's duty + /// roster. + fn get_or_instantiate( + &mut self, + parent_hash: Hash, + keystore: &KeyStorePtr, + max_block_data_size: Option<u64>, + ) + -> Result<Arc<ValidationInstanceHandle>, Error> + { + use primitives::Pair; + + if let Some(tracker) = self.live_instances.get(&parent_hash) { + return Ok(tracker.clone()); + } + + let id = BlockId::hash(parent_hash); + + let validators = self.client.runtime_api().validators(&id)?; + let sign_with = signing_key(&validators[..], keystore); + + let duty_roster = self.client.runtime_api().duty_roster(&id)?; + + let (group_info, local_duty) = crate::make_group_info( + duty_roster, + &validators, + sign_with.as_ref().map(|k| k.public()), + )?; + + info!( + "Starting parachain attestation session on top of parent {:?}. Local parachain duty is {:?}", + parent_hash, + local_duty, + ); + + let active_parachains = self.client.runtime_api().active_parachains(&id)?; + + debug!(target: "validation", "Active parachains: {:?}", active_parachains); + + // If we are a validator, we need to store our index in this round in availability store. + // This will tell which erasure chunk we should store. + if let Some(ref local_duty) = local_duty { + if let Err(e) = self.availability_store.add_validator_index_and_n_validators( + &parent_hash, + local_duty.index, + validators.len() as u32, + ) { + warn!( + target: "validation", + "Failed to add validator index and n_validators to the availability-store: {:?}", e + ) + } + } + + let table = Arc::new(SharedTable::new( + validators.clone(), + group_info, + sign_with, + parent_hash, + self.availability_store.clone(), + max_block_data_size, + )); + + let (_drop_signal, exit) = exit_future::signal(); + + let router = self.network.communication_for( + table.clone(), + &validators, + exit.clone(), + ); + + if let Some((Chain::Parachain(id), index)) = local_duty.as_ref().map(|d| (d.validation, d.index)) { + self.launch_work(parent_hash, id, router, max_block_data_size, validators.len(), index, exit); + } + + let tracker = Arc::new(ValidationInstanceHandle { + table, + started: Instant::now(), + _drop_signal, + }); + + self.live_instances.insert(parent_hash, tracker.clone()); + + Ok(tracker) + } + + /// Retain validation sessions matching predicate. + fn retain<F: FnMut(&Hash) -> bool>(&mut self, mut pred: F) { + self.live_instances.retain(|k, _| pred(k)) + } + + // launch parachain work asynchronously. + fn launch_work( + &self, + relay_parent: Hash, + validation_para: ParaId, + build_router: N::BuildTableRouter, + max_block_data_size: Option<u64>, + authorities_num: usize, + local_id: ValidatorIndex, + exit: exit_future::Exit, + ) { + let (collators, client) = (self.collators.clone(), self.client.clone()); + let availability_store = self.availability_store.clone(); + + let with_router = move |router: N::TableRouter| { + // fetch a local collation from connected collators. + let collation_work = crate::collation::collation_fetch( + validation_para, + relay_parent, + collators, + client.clone(), + max_block_data_size, + ); + + collation_work.map(move |result| match result { + Ok((collation, outgoing_targeted, fees_charged)) => { + match crate::collation::produce_receipt_and_chunks( + authorities_num, + &collation.pov, + &outgoing_targeted, + fees_charged, + &collation.info, + ) { + Ok((receipt, chunks)) => { + // Apparently the `async move` block is the only way to convince + // the compiler that we are not moving values out of borrowed context. + let av_clone = availability_store.clone(); + let chunks_clone = chunks.clone(); + let receipt_clone = receipt.clone(); + + let res = async move { + if let Err(e) = av_clone.clone().add_erasure_chunks( + relay_parent.clone(), + receipt_clone, + chunks_clone, + ).await { + warn!(target: "validation", "Failed to add erasure chunks: {}", e); + } + } + .unit_error() + .boxed() + .then(move |_| { + router.local_collation( + collation, + receipt, + outgoing_targeted, + (local_id, &chunks), + ); + ready(()) + }); + + + Some(res) + } + Err(e) => { + warn!(target: "validation", "Failed to produce a receipt: {:?}", e); + None + } + } + } + Err(e) => { + warn!(target: "validation", "Failed to collate candidate: {:?}", e); + None + } + }) + }; + + let router = build_router + .map_ok(with_router) + .map_err(|e| { + warn!(target: "validation" , "Failed to build table router: {:?}", e); + }); + + let cancellable_work = select(exit, router).map(drop); + + // spawn onto thread pool. + if self.handle.spawn(cancellable_work).is_err() { + error!("Failed to spawn cancellable work task"); + } + } +} -- GitLab