Unverified Commit a10670c3 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Validation service refactoring (#773)

* add some more docs about statement import

* instantiate environment async

* move attestation service into subfolder

* refactor validation service architecture somewhat

* remove dependence on validation service in proposer

* fix a bunch of warnings

* improve docs

* introduce a builder for the validation service

* extract block production to its own file

* integrate new API into service

* address review grumbles
parent 65da53e7
......@@ -404,21 +404,28 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(config: Configuration)
service.client(),
WrappedExecutor(service.spawn_task_handle()),
);
let (validation_service_handle, validation_service) = consensus::ServiceBuilder {
client: client.clone(),
network: validation_network.clone(),
collators: validation_network,
task_executor: Arc::new(WrappedExecutor(service.spawn_task_handle())),
availability_store: availability_store.clone(),
select_chain: select_chain.clone(),
keystore: service.keystore(),
max_block_data_size,
}.build();
service.spawn_essential_task(Box::pin(validation_service));
let proposer = consensus::ProposerFactory::new(
client.clone(),
select_chain.clone(),
validation_network.clone(),
validation_network,
service.transaction_pool(),
Arc::new(WrappedExecutor(service.spawn_task_handle())),
service.keystore(),
availability_store.clone(),
validation_service_handle,
slot_duration,
max_block_data_size,
backend,
);
let client = service.client();
let select_chain = service.select_chain().ok_or(ServiceError::SelectChainRequired)?;
let can_author_with =
consensus_common::CanAuthorWithNativeVersion::new(client.executor().clone());
......
......@@ -354,6 +354,9 @@ impl<C: Context> Table<C> {
/// Import a signed statement. Signatures should be checked for validity, and the
/// sender should be checked to actually be an authority.
///
/// Validity and invalidity statements are only valid if the corresponding
/// candidate has already been imported.
///
/// If this returns `None`, the statement was either duplicate or invalid.
pub fn import_statement(
&mut self,
......
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Attestation service.
/// Attestation service. A long running service that creates and manages parachain attestation
/// instances.
///
/// This uses a handle to an underlying thread pool to dispatch heavy work
/// such as candidate verification while performing event-driven work
/// on a local event loop.
use std::{thread, time::Duration, sync::Arc};
use sc_client_api::{BlockchainEvents, BlockBody};
use sp_blockchain::HeaderBackend;
use block_builder::BlockBuilderApi;
use consensus::SelectChain;
use futures::prelude::*;
use futures::{future::{ready, select}, task::{Spawn, SpawnExt}};
use polkadot_primitives::Block;
use polkadot_primitives::parachain::ParachainHost;
use babe_primitives::BabeApi;
use keystore::KeyStorePtr;
use sp_api::{ApiExt, ProvideRuntimeApi};
use runtime_primitives::traits::HasherFor;
use tokio::{runtime::Runtime as LocalRuntime};
use log::{warn, error};
use super::{Network, Collators};
type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
/// Parachain candidate attestation service handle.
pub(crate) struct ServiceHandle {
thread: Option<thread::JoinHandle<()>>,
exit_signal: Option<::exit_future::Signal>,
}
/// Create and start a new instance of the attestation service.
pub(crate) fn start<C, N, P, SC>(
client: Arc<P>,
select_chain: SC,
parachain_validation: Arc<crate::ParachainValidation<C, N, P>>,
thread_pool: TaskExecutor,
keystore: KeyStorePtr,
max_block_data_size: Option<u64>,
) -> ServiceHandle
where
C: Collators + Send + Sync + Unpin + 'static,
C::Collation: Send + Unpin + 'static,
P: BlockchainEvents<Block> + BlockBody<Block>,
P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> +
BlockBuilderApi<Block> +
BabeApi<Block> +
ApiExt<Block, Error = sp_blockchain::Error>,
N: Network + Send + Sync + 'static,
N::TableRouter: Send + 'static,
N::BuildTableRouter: Send + Unpin + 'static,
SC: SelectChain<Block> + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>,
{
const TIMER_INTERVAL: Duration = Duration::from_secs(30);
let (signal, exit) = ::exit_future::signal();
let thread = thread::spawn(move || {
let mut runtime = LocalRuntime::new().expect("Could not create local runtime");
let notifications = {
let client = client.clone();
let validation = parachain_validation.clone();
let keystore = keystore.clone();
let notifications = client.import_notification_stream()
.for_each(move |notification| {
let parent_hash = notification.hash;
if notification.is_new_best {
let res = validation.get_or_instantiate(
parent_hash,
&keystore,
max_block_data_size,
);
if let Err(e) = res {
warn!(
"Unable to start parachain validation on top of {:?}: {}",
parent_hash, e
);
}
}
ready(())
});
select(notifications, exit.clone())
};
let prune_old_sessions = {
let select_chain = select_chain.clone();
let interval = crate::interval(TIMER_INTERVAL)
.for_each(move |_| match select_chain.leaves() {
Ok(leaves) => {
parachain_validation.retain(|h| leaves.contains(h));
ready(())
}
Err(e) => {
warn!("Error fetching leaves from client: {:?}", e);
ready(())
}
});
select(interval, exit.clone()).map(|_| ())
};
runtime.spawn(notifications);
if let Err(_) = thread_pool.spawn(prune_old_sessions) {
error!("Failed to spawn old sessions pruning task");
}
runtime.block_on(exit);
});
ServiceHandle {
thread: Some(thread),
exit_signal: Some(signal),
}
}
impl Drop for ServiceHandle {
fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() {
let _ = signal.fire();
}
if let Some(thread) = self.thread.take() {
thread.join().expect("The service thread has panicked");
}
}
}
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The block production pipeline of Polkadot.
//!
//! The `ProposerFactory` exported by this module will be wrapped by some
//! consensus engine, and triggered when it is time to create a block.
use std::{
pin::Pin,
sync::Arc,
time::{self, Duration, Instant},
};
use sp_blockchain::HeaderBackend;
use block_builder::BlockBuilderApi;
use codec::Encode;
use consensus::{Proposal, RecordProof};
use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header};
use polkadot_primitives::parachain::{
ParachainHost, AttestedCandidate, NEW_HEADS_IDENTIFIER,
};
use runtime_primitives::traits::{DigestFor, HasherFor};
use futures_timer::Delay;
use txpool_api::{TransactionPool, InPoolTransaction};
use futures::prelude::*;
use inherents::InherentData;
use sp_timestamp::TimestampInherentData;
use log::{info, debug, trace};
use sp_api::{ApiExt, ProvideRuntimeApi};
use crate::validation_service::ServiceHandle;
use crate::dynamic_inclusion::DynamicInclusion;
use crate::Error;
// Polkadot proposer factory.
pub struct ProposerFactory<Client, TxPool, Backend> {
client: Arc<Client>,
transaction_pool: Arc<TxPool>,
service_handle: ServiceHandle,
babe_slot_duration: u64,
backend: Arc<Backend>,
}
impl<Client, TxPool, Backend> ProposerFactory<Client, TxPool, Backend> {
/// Create a new proposer factory.
pub fn new(
client: Arc<Client>,
transaction_pool: Arc<TxPool>,
service_handle: ServiceHandle,
babe_slot_duration: u64,
backend: Arc<Backend>,
) -> Self {
ProposerFactory {
client,
transaction_pool,
service_handle: service_handle,
babe_slot_duration,
backend,
}
}
}
impl<Client, TxPool, Backend> consensus::Environment<Block>
for ProposerFactory<Client, TxPool, Backend>
where
TxPool: TransactionPool<Block=Block> + 'static,
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block>
+ ApiExt<Block, Error = sp_blockchain::Error>,
Backend: sc_client_api::Backend<
Block,
State = sp_api::StateBackendFor<Client, Block>
> + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send,
{
type CreateProposer = Pin<Box<
dyn Future<Output = Result<Self::Proposer, Self::Error>> + Send + 'static
>>;
type Proposer = Proposer<Client, TxPool, Backend>;
type Error = Error;
fn init(
&mut self,
parent_header: &Header,
) -> Self::CreateProposer {
let parent_hash = parent_header.hash();
let parent_number = parent_header.number;
let parent_id = BlockId::hash(parent_hash);
let client = self.client.clone();
let transaction_pool = self.transaction_pool.clone();
let backend = self.backend.clone();
let slot_duration = self.babe_slot_duration.clone();
let maybe_proposer = self.service_handle
.clone()
.get_validation_instance(parent_hash)
.and_then(move |tracker| future::ready(Ok(Proposer {
client,
tracker,
parent_hash,
parent_id,
parent_number,
transaction_pool,
slot_duration,
backend,
})));
Box::pin(maybe_proposer)
}
}
/// The Polkadot proposer logic.
pub struct Proposer<Client, TxPool, Backend> {
client: Arc<Client>,
parent_hash: Hash,
parent_id: BlockId,
parent_number: BlockNumber,
tracker: Arc<crate::validation_service::ValidationInstanceHandle>,
transaction_pool: Arc<TxPool>,
slot_duration: u64,
backend: Arc<Backend>,
}
impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, TxPool, Backend> where
TxPool: TransactionPool<Block=Block> + 'static,
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send,
{
type Error = Error;
type Transaction = sp_api::TransactionFor<Client, Block>;
type Proposal = Pin<
Box<
dyn Future<Output = Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error>>
+ Send
>
>;
fn propose(&mut self,
inherent_data: InherentData,
inherent_digests: DigestFor<Block>,
max_duration: Duration,
record_proof: RecordProof,
) -> Self::Proposal {
const SLOT_DURATION_DENOMINATOR: u64 = 3; // wait up to 1/3 of the slot for candidates.
let initial_included = self.tracker.table().includable_count();
let now = Instant::now();
let dynamic_inclusion = DynamicInclusion::new(
self.tracker.table().num_parachains(),
self.tracker.started(),
Duration::from_millis(self.slot_duration / SLOT_DURATION_DENOMINATOR),
);
let parent_hash = self.parent_hash.clone();
let parent_number = self.parent_number.clone();
let parent_id = self.parent_id.clone();
let client = self.client.clone();
let transaction_pool = self.transaction_pool.clone();
let table = self.tracker.table().clone();
let backend = self.backend.clone();
async move {
let enough_candidates = dynamic_inclusion.acceptable_in(
now,
initial_included,
).unwrap_or_else(|| Duration::from_millis(1));
let believed_timestamp = match inherent_data.timestamp_inherent_data() {
Ok(timestamp) => timestamp,
Err(e) => return Err(Error::InherentError(e)),
};
let deadline_diff = max_duration - max_duration / 3;
let deadline = match Instant::now().checked_add(deadline_diff) {
None => return Err(Error::DeadlineComputeFailure(deadline_diff)),
Some(d) => d,
};
let data = CreateProposalData {
parent_hash,
parent_number,
parent_id,
client,
transaction_pool,
table,
believed_minimum_timestamp: believed_timestamp,
inherent_data: Some(inherent_data),
inherent_digests,
// leave some time for the proposal finalisation
deadline,
record_proof,
backend,
};
// set up delay until next allowed timestamp.
let current_timestamp = current_timestamp();
if current_timestamp < believed_timestamp {
Delay::new(Duration::from_millis(current_timestamp - believed_timestamp))
.await;
}
Delay::new(enough_candidates).await;
tokio_executor::blocking::run(move || {
let proposed_candidates = data.table.proposed_set();
data.propose_with(proposed_candidates)
})
.await
}.boxed()
}
}
fn current_timestamp() -> u64 {
time::SystemTime::now().duration_since(time::UNIX_EPOCH)
.expect("now always later than unix epoch; qed")
.as_millis() as u64
}
/// Inner data of the create proposal.
struct CreateProposalData<Client, TxPool, Backend> {
parent_hash: Hash,
parent_number: BlockNumber,
parent_id: BlockId,
client: Arc<Client>,
transaction_pool: Arc<TxPool>,
table: Arc<crate::SharedTable>,
believed_minimum_timestamp: u64,
inherent_data: Option<InherentData>,
inherent_digests: DigestFor<Block>,
deadline: Instant,
record_proof: RecordProof,
backend: Arc<Backend>,
}
impl<Client, TxPool, Backend> CreateProposalData<Client, TxPool, Backend> where
TxPool: TransactionPool<Block=Block>,
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync,
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send,
{
fn propose_with(
mut self,
candidates: Vec<AttestedCandidate>,
) -> Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error> {
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
const MAX_TRANSACTIONS: usize = 40;
let mut inherent_data = self.inherent_data
.take()
.expect("CreateProposal is not polled after finishing; qed");
inherent_data.put_data(NEW_HEADS_IDENTIFIER, &candidates)
.map_err(Error::InherentError)?;
let runtime_api = self.client.runtime_api();
let mut block_builder = block_builder::BlockBuilder::new(
&*self.client,
self.client.expect_block_hash_from_id(&self.parent_id)?,
self.client.expect_block_number_from_id(&self.parent_id)?,
self.record_proof,
self.inherent_digests.clone(),
&*self.backend,
)?;
{
let inherents = runtime_api.inherent_extrinsics(&self.parent_id, inherent_data)?;
for inherent in inherents {
block_builder.push(inherent)?;
}
let mut unqueue_invalid = Vec::new();
let mut pending_size = 0;
let ready_iter = self.transaction_pool.ready();
for ready in ready_iter.take(MAX_TRANSACTIONS) {
let encoded_size = ready.data().encode().len();
if pending_size + encoded_size >= crate::evaluation::MAX_TRANSACTIONS_SIZE {
break;
}
if Instant::now() > self.deadline {
debug!("Consensus deadline reached when pushing block transactions, proceeding with proposing.");
break;
}
match block_builder.push(ready.data().clone()) {
Ok(()) => {
debug!("[{:?}] Pushed to the block.", ready.hash());
pending_size += encoded_size;
}
Err(sp_blockchain::Error::ApplyExtrinsicFailed(sp_blockchain::ApplyExtrinsicFailed::Validity(e)))
if e.exhausted_resources() =>
{
debug!("Block is full, proceed with proposing.");
break;
}
Err(e) => {
trace!(target: "transaction-pool", "Invalid transaction: {}", e);
unqueue_invalid.push(ready.hash().clone());
}
}
}
self.transaction_pool.remove_invalid(&unqueue_invalid);
}
let (new_block, storage_changes, proof) = block_builder.build()?.into_inner();
info!("Prepared block for proposing at {} [hash: {:?}; parent_hash: {}; extrinsics: [{}]]",
new_block.header.number,
Hash::from(new_block.header.hash()),
new_block.header.parent_hash,
new_block.extrinsics.iter()
.map(|xt| format!("{}", BlakeTwo256::hash_of(xt)))
.collect::<Vec<_>>()
.join(", ")
);
// TODO: full re-evaluation (https://github.com/paritytech/polkadot/issues/216)
let active_parachains = runtime_api.active_parachains(&self.parent_id)?;
assert!(crate::evaluation::evaluate_initial(
&new_block,
self.believed_minimum_timestamp,
&self.parent_hash,
self.parent_number,
&active_parachains[..],
).is_ok());
Ok(Proposal { block: new_block, storage_changes, proof })
}
}
......@@ -46,6 +46,8 @@ pub enum Error {
Timer(std::io::Error),
#[display(fmt = "Failed to compute deadline of now + {:?}", _0)]
DeadlineComputeFailure(std::time::Duration),
#[display(fmt = "Validation service is down.")]
ValidationServiceDown,
Join(tokio::task::JoinError)
}
......
......@@ -16,7 +16,8 @@
//! Polkadot block evaluation and evaluation errors.
use super::MAX_TRANSACTIONS_SIZE;
// block size limit.
pub(crate) const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;
use codec::Encode;
use polkadot_primitives::{Block, Hash, BlockNumber};
......
This diff is collapsed.
......@@ -431,7 +431,10 @@ impl SharedTable {
/// Import a single statement with remote source, whose signature has already been checked.
///
/// The statement producer, if any, will produce only statements concerning the same candidate
/// Validity and invalidity statements are only valid if the corresponding
/// candidate has already been imported.
///
/// The ParachainWork, if any, will produce only statements concerning the same candidate
/// as the one just imported
pub fn import_remote_statement<R: TableRouter>(
&self,
......@@ -446,8 +449,10 @@ impl SharedTable {
/// Import many statements at once.
///
/// Provide an iterator yielding remote, pre-checked statements.
/// Validity and invalidity statements are only valid if the corresponding
/// candidate has already been imported.
///
/// The statement producer, if any, will produce only statements concerning the same candidate
/// The ParachainWork, if any, will produce only statements concerning the same candidate