diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs
index f3872c6663b46821e513d0b07f0e90496128c737..40c052ef827e216e0bd8c7c4dfcbdf96d0198ccf 100644
--- a/polkadot/service/src/lib.rs
+++ b/polkadot/service/src/lib.rs
@@ -404,21 +404,28 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(config: Configuration)
 			service.client(),
 			WrappedExecutor(service.spawn_task_handle()),
 		);
+
+		let (validation_service_handle, validation_service) = consensus::ServiceBuilder {
+			client: client.clone(),
+			network: validation_network.clone(),
+			collators: validation_network,
+			task_executor: Arc::new(WrappedExecutor(service.spawn_task_handle())),
+			availability_store: availability_store.clone(),
+			select_chain: select_chain.clone(),
+			keystore: service.keystore(),
+			max_block_data_size,
+		}.build();
+
+		service.spawn_essential_task(Box::pin(validation_service));
+
 		let proposer = consensus::ProposerFactory::new(
 			client.clone(),
-			select_chain.clone(),
-			validation_network.clone(),
-			validation_network,
 			service.transaction_pool(),
-			Arc::new(WrappedExecutor(service.spawn_task_handle())),
-			service.keystore(),
-			availability_store.clone(),
+			validation_service_handle,
 			slot_duration,
-			max_block_data_size,
 			backend,
 		);
 
-		let client = service.client();
 		let select_chain = service.select_chain().ok_or(ServiceError::SelectChainRequired)?;
 		let can_author_with =
 			consensus_common::CanAuthorWithNativeVersion::new(client.executor().clone());
diff --git a/polkadot/statement-table/src/generic.rs b/polkadot/statement-table/src/generic.rs
index f429e59960ec3d9788cabe5b7699aabdad617f30..83cae9daf26b975526bd831f1fb0e8ec49d93399 100644
--- a/polkadot/statement-table/src/generic.rs
+++ b/polkadot/statement-table/src/generic.rs
@@ -354,6 +354,9 @@ impl<C: Context> Table<C> {
 	/// Import a signed statement. Signatures should be checked for validity, and the
 	/// sender should be checked to actually be an authority.
 	///
+	/// Validity and invalidity statements are only valid if the corresponding
+	/// candidate has already been imported.
+	///
 	/// If this returns `None`, the statement was either duplicate or invalid.
 	pub fn import_statement(
 		&mut self,
diff --git a/polkadot/validation/src/attestation_service.rs b/polkadot/validation/src/attestation_service.rs
deleted file mode 100644
index a5087f1cd40815c1871c76cb942e6c326acb55c3..0000000000000000000000000000000000000000
--- a/polkadot/validation/src/attestation_service.rs
+++ /dev/null
@@ -1,154 +0,0 @@
-// Copyright 2017-2020 Parity Technologies (UK) Ltd.
-// This file is part of Polkadot.
-
-// Polkadot is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Polkadot is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
-
-//! Attestation service.
-
-/// Attestation service. A long running service that creates and manages parachain attestation
-/// instances.
-///
-/// This uses a handle to an underlying thread pool to dispatch heavy work
-/// such as candidate verification while performing event-driven work
-/// on a local event loop.
-
-use std::{thread, time::Duration, sync::Arc};
-
-use sc_client_api::{BlockchainEvents, BlockBody};
-use sp_blockchain::HeaderBackend;
-use block_builder::BlockBuilderApi;
-use consensus::SelectChain;
-use futures::prelude::*;
-use futures::{future::{ready, select}, task::{Spawn, SpawnExt}};
-use polkadot_primitives::Block;
-use polkadot_primitives::parachain::ParachainHost;
-use babe_primitives::BabeApi;
-use keystore::KeyStorePtr;
-use sp_api::{ApiExt, ProvideRuntimeApi};
-use runtime_primitives::traits::HasherFor;
-
-use tokio::{runtime::Runtime as LocalRuntime};
-use log::{warn, error};
-
-use super::{Network, Collators};
-
-type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
-
-/// Parachain candidate attestation service handle.
-pub(crate) struct ServiceHandle {
-	thread: Option<thread::JoinHandle<()>>,
-	exit_signal: Option<::exit_future::Signal>,
-}
-
-/// Create and start a new instance of the attestation service.
-pub(crate) fn start<C, N, P, SC>(
-	client: Arc<P>,
-	select_chain: SC,
-	parachain_validation: Arc<crate::ParachainValidation<C, N, P>>,
-	thread_pool: TaskExecutor,
-	keystore: KeyStorePtr,
-	max_block_data_size: Option<u64>,
-) -> ServiceHandle
-	where
-		C: Collators + Send + Sync + Unpin + 'static,
-		C::Collation: Send + Unpin + 'static,
-		P: BlockchainEvents<Block> + BlockBody<Block>,
-		P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
-		P::Api: ParachainHost<Block> +
-			BlockBuilderApi<Block> +
-			BabeApi<Block> +
-			ApiExt<Block, Error = sp_blockchain::Error>,
-		N: Network + Send + Sync + 'static,
-		N::TableRouter: Send + 'static,
-		N::BuildTableRouter: Send + Unpin + 'static,
-		SC: SelectChain<Block> + 'static,
-		// Rust bug: https://github.com/rust-lang/rust/issues/24159
-		sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>,
-{
-	const TIMER_INTERVAL: Duration = Duration::from_secs(30);
-
-	let (signal, exit) = ::exit_future::signal();
-	let thread = thread::spawn(move || {
-		let mut runtime = LocalRuntime::new().expect("Could not create local runtime");
-		let notifications = {
-			let client = client.clone();
-			let validation = parachain_validation.clone();
-
-			let keystore = keystore.clone();
-
-			let notifications = client.import_notification_stream()
-				.for_each(move |notification| {
-					let parent_hash = notification.hash;
-					if notification.is_new_best {
-						let res = validation.get_or_instantiate(
-							parent_hash,
-							&keystore,
-							max_block_data_size,
-						);
-
-						if let Err(e) = res {
-							warn!(
-								"Unable to start parachain validation on top of {:?}: {}",
-								parent_hash, e
-							);
-						}
-					}
-					ready(())
-				});
-
-			select(notifications, exit.clone())
-		};
-
-		let prune_old_sessions = {
-			let select_chain = select_chain.clone();
-			let interval = crate::interval(TIMER_INTERVAL)
-				.for_each(move |_| match select_chain.leaves() {
-					Ok(leaves) => {
-						parachain_validation.retain(|h| leaves.contains(h));
-						ready(())
-					}
-					Err(e) => {
-						warn!("Error fetching leaves from client: {:?}", e);
-						ready(())
-					}
-				});
-
-			select(interval, exit.clone()).map(|_| ())
-		};
-
-		runtime.spawn(notifications);
-		if let Err(_) = thread_pool.spawn(prune_old_sessions) {
-			error!("Failed to spawn old sessions pruning task");
-		}
-
-		runtime.block_on(exit);
-	});
-
-	ServiceHandle {
-		thread: Some(thread),
-		exit_signal: Some(signal),
-	}
-}
-
-impl Drop for ServiceHandle {
-	fn drop(&mut self) {
-		if let Some(signal) = self.exit_signal.take() {
-			let _ = signal.fire();
-		}
-
-		if let Some(thread) = self.thread.take() {
-			thread.join().expect("The service thread has panicked");
-		}
-	}
-}
diff --git a/polkadot/validation/src/block_production.rs b/polkadot/validation/src/block_production.rs
new file mode 100644
index 0000000000000000000000000000000000000000..6d4e73638995a95cead8cdd83d0ab92b475f3db5
--- /dev/null
+++ b/polkadot/validation/src/block_production.rs
@@ -0,0 +1,354 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! The block production pipeline of Polkadot.
+//!
+//! The `ProposerFactory` exported by this module will be wrapped by some
+//! consensus engine, and triggered when it is time to create a block.
+
+use std::{
+	pin::Pin,
+	sync::Arc,
+	time::{self, Duration, Instant},
+};
+
+use sp_blockchain::HeaderBackend;
+use block_builder::BlockBuilderApi;
+use codec::Encode;
+use consensus::{Proposal, RecordProof};
+use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header};
+use polkadot_primitives::parachain::{
+	ParachainHost, AttestedCandidate, NEW_HEADS_IDENTIFIER,
+};
+use runtime_primitives::traits::{DigestFor, HasherFor};
+use futures_timer::Delay;
+use txpool_api::{TransactionPool, InPoolTransaction};
+
+use futures::prelude::*;
+use inherents::InherentData;
+use sp_timestamp::TimestampInherentData;
+use log::{info, debug, trace};
+use sp_api::{ApiExt, ProvideRuntimeApi};
+
+use crate::validation_service::ServiceHandle;
+use crate::dynamic_inclusion::DynamicInclusion;
+use crate::Error;
+
+// Polkadot proposer factory.
+pub struct ProposerFactory<Client, TxPool, Backend> {
+	client: Arc<Client>,
+	transaction_pool: Arc<TxPool>,
+	service_handle: ServiceHandle,
+	babe_slot_duration: u64,
+	backend: Arc<Backend>,
+}
+
+impl<Client, TxPool, Backend> ProposerFactory<Client, TxPool, Backend> {
+	/// Create a new proposer factory.
+	pub fn new(
+		client: Arc<Client>,
+		transaction_pool: Arc<TxPool>,
+		service_handle: ServiceHandle,
+		babe_slot_duration: u64,
+		backend: Arc<Backend>,
+	) -> Self {
+		ProposerFactory {
+			client,
+			transaction_pool,
+			service_handle: service_handle,
+			babe_slot_duration,
+			backend,
+		}
+	}
+}
+
+impl<Client, TxPool, Backend> consensus::Environment<Block>
+	for ProposerFactory<Client, TxPool, Backend>
+where
+	TxPool: TransactionPool<Block=Block> + 'static,
+	Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
+	Client::Api: ParachainHost<Block> + BlockBuilderApi<Block>
+		+ ApiExt<Block, Error = sp_blockchain::Error>,
+	Backend: sc_client_api::Backend<
+		Block,
+		State = sp_api::StateBackendFor<Client, Block>
+	> + 'static,
+	// Rust bug: https://github.com/rust-lang/rust/issues/24159
+	sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send,
+{
+	type CreateProposer = Pin<Box<
+		dyn Future<Output = Result<Self::Proposer, Self::Error>> + Send + 'static
+	>>;
+	type Proposer = Proposer<Client, TxPool, Backend>;
+	type Error = Error;
+
+	fn init(
+		&mut self,
+		parent_header: &Header,
+	) -> Self::CreateProposer {
+		let parent_hash = parent_header.hash();
+		let parent_number = parent_header.number;
+		let parent_id = BlockId::hash(parent_hash);
+
+		let client = self.client.clone();
+		let transaction_pool = self.transaction_pool.clone();
+		let backend = self.backend.clone();
+		let slot_duration = self.babe_slot_duration.clone();
+
+		let maybe_proposer = self.service_handle
+			.clone()
+			.get_validation_instance(parent_hash)
+			.and_then(move |tracker| future::ready(Ok(Proposer {
+				client,
+				tracker,
+				parent_hash,
+				parent_id,
+				parent_number,
+				transaction_pool,
+				slot_duration,
+				backend,
+			})));
+
+		Box::pin(maybe_proposer)
+	}
+}
+
+/// The Polkadot proposer logic.
+pub struct Proposer<Client, TxPool, Backend> {
+	client: Arc<Client>,
+	parent_hash: Hash,
+	parent_id: BlockId,
+	parent_number: BlockNumber,
+	tracker: Arc<crate::validation_service::ValidationInstanceHandle>,
+	transaction_pool: Arc<TxPool>,
+	slot_duration: u64,
+	backend: Arc<Backend>,
+}
+
+impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, TxPool, Backend> where
+	TxPool: TransactionPool<Block=Block> + 'static,
+	Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
+	Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
+	Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static,
+	// Rust bug: https://github.com/rust-lang/rust/issues/24159
+	sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send,
+{
+	type Error = Error;
+	type Transaction = sp_api::TransactionFor<Client, Block>;
+	type Proposal = Pin<
+		Box<
+			dyn Future<Output = Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error>>
+				+ Send
+		>
+	>;
+
+	fn propose(&mut self,
+		inherent_data: InherentData,
+		inherent_digests: DigestFor<Block>,
+		max_duration: Duration,
+		record_proof: RecordProof,
+	) -> Self::Proposal {
+		const SLOT_DURATION_DENOMINATOR: u64 = 3; // wait up to 1/3 of the slot for candidates.
+
+		let initial_included = self.tracker.table().includable_count();
+		let now = Instant::now();
+
+		let dynamic_inclusion = DynamicInclusion::new(
+			self.tracker.table().num_parachains(),
+			self.tracker.started(),
+			Duration::from_millis(self.slot_duration / SLOT_DURATION_DENOMINATOR),
+		);
+
+		let parent_hash = self.parent_hash.clone();
+		let parent_number = self.parent_number.clone();
+		let parent_id = self.parent_id.clone();
+		let client = self.client.clone();
+		let transaction_pool = self.transaction_pool.clone();
+		let table = self.tracker.table().clone();
+		let backend = self.backend.clone();
+
+		async move {
+			let enough_candidates = dynamic_inclusion.acceptable_in(
+				now,
+				initial_included,
+			).unwrap_or_else(|| Duration::from_millis(1));
+
+			let believed_timestamp = match inherent_data.timestamp_inherent_data() {
+				Ok(timestamp) => timestamp,
+				Err(e) => return Err(Error::InherentError(e)),
+			};
+
+			let deadline_diff = max_duration - max_duration / 3;
+			let deadline = match Instant::now().checked_add(deadline_diff) {
+				None => return Err(Error::DeadlineComputeFailure(deadline_diff)),
+				Some(d) => d,
+			};
+
+			let data = CreateProposalData {
+				parent_hash,
+				parent_number,
+				parent_id,
+				client,
+				transaction_pool,
+				table,
+				believed_minimum_timestamp: believed_timestamp,
+				inherent_data: Some(inherent_data),
+				inherent_digests,
+				// leave some time for the proposal finalisation
+				deadline,
+				record_proof,
+				backend,
+			};
+
+			// set up delay until next allowed timestamp.
+			let current_timestamp = current_timestamp();
+			if current_timestamp < believed_timestamp {
+				Delay::new(Duration::from_millis(current_timestamp - believed_timestamp))
+					.await;
+			}
+
+			Delay::new(enough_candidates).await;
+
+			tokio_executor::blocking::run(move || {
+				let proposed_candidates = data.table.proposed_set();
+				data.propose_with(proposed_candidates)
+			})
+				.await
+		}.boxed()
+	}
+}
+
+fn current_timestamp() -> u64 {
+	time::SystemTime::now().duration_since(time::UNIX_EPOCH)
+		.expect("now always later than unix epoch; qed")
+		.as_millis() as u64
+}
+
+/// Inner data of the create proposal.
+struct CreateProposalData<Client, TxPool, Backend> {
+	parent_hash: Hash,
+	parent_number: BlockNumber,
+	parent_id: BlockId,
+	client: Arc<Client>,
+	transaction_pool: Arc<TxPool>,
+	table: Arc<crate::SharedTable>,
+	believed_minimum_timestamp: u64,
+	inherent_data: Option<InherentData>,
+	inherent_digests: DigestFor<Block>,
+	deadline: Instant,
+	record_proof: RecordProof,
+	backend: Arc<Backend>,
+}
+
+impl<Client, TxPool, Backend> CreateProposalData<Client, TxPool, Backend> where
+	TxPool: TransactionPool<Block=Block>,
+	Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync,
+	Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
+	Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static,
+	// Rust bug: https://github.com/rust-lang/rust/issues/24159
+	sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send,
+{
+	fn propose_with(
+		mut self,
+		candidates: Vec<AttestedCandidate>,
+	) -> Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error> {
+		use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
+
+		const MAX_TRANSACTIONS: usize = 40;
+
+		let mut inherent_data = self.inherent_data
+			.take()
+			.expect("CreateProposal is not polled after finishing; qed");
+		inherent_data.put_data(NEW_HEADS_IDENTIFIER, &candidates)
+			.map_err(Error::InherentError)?;
+
+		let runtime_api = self.client.runtime_api();
+
+		let mut block_builder = block_builder::BlockBuilder::new(
+			&*self.client,
+			self.client.expect_block_hash_from_id(&self.parent_id)?,
+			self.client.expect_block_number_from_id(&self.parent_id)?,
+			self.record_proof,
+			self.inherent_digests.clone(),
+			&*self.backend,
+		)?;
+
+		{
+			let inherents = runtime_api.inherent_extrinsics(&self.parent_id, inherent_data)?;
+			for inherent in inherents {
+				block_builder.push(inherent)?;
+			}
+
+			let mut unqueue_invalid = Vec::new();
+			let mut pending_size = 0;
+
+			let ready_iter = self.transaction_pool.ready();
+			for ready in ready_iter.take(MAX_TRANSACTIONS) {
+				let encoded_size = ready.data().encode().len();
+				if pending_size + encoded_size >= crate::evaluation::MAX_TRANSACTIONS_SIZE {
+					break;
+				}
+				if Instant::now() > self.deadline {
+					debug!("Consensus deadline reached when pushing block transactions, proceeding with proposing.");
+					break;
+				}
+
+				match block_builder.push(ready.data().clone()) {
+					Ok(()) => {
+						debug!("[{:?}] Pushed to the block.", ready.hash());
+						pending_size += encoded_size;
+					}
+					Err(sp_blockchain::Error::ApplyExtrinsicFailed(sp_blockchain::ApplyExtrinsicFailed::Validity(e)))
+						if e.exhausted_resources() =>
+					{
+						debug!("Block is full, proceed with proposing.");
+						break;
+					}
+					Err(e) => {
+						trace!(target: "transaction-pool", "Invalid transaction: {}", e);
+						unqueue_invalid.push(ready.hash().clone());
+					}
+				}
+			}
+
+			self.transaction_pool.remove_invalid(&unqueue_invalid);
+		}
+
+		let (new_block, storage_changes, proof) = block_builder.build()?.into_inner();
+
+		info!("Prepared block for proposing at {} [hash: {:?}; parent_hash: {}; extrinsics: [{}]]",
+			new_block.header.number,
+			Hash::from(new_block.header.hash()),
+			new_block.header.parent_hash,
+			new_block.extrinsics.iter()
+				.map(|xt| format!("{}", BlakeTwo256::hash_of(xt)))
+				.collect::<Vec<_>>()
+				.join(", ")
+		);
+
+		// TODO: full re-evaluation (https://github.com/paritytech/polkadot/issues/216)
+		let active_parachains = runtime_api.active_parachains(&self.parent_id)?;
+		assert!(crate::evaluation::evaluate_initial(
+			&new_block,
+			self.believed_minimum_timestamp,
+			&self.parent_hash,
+			self.parent_number,
+			&active_parachains[..],
+		).is_ok());
+
+		Ok(Proposal { block: new_block, storage_changes, proof })
+	}
+}
diff --git a/polkadot/validation/src/error.rs b/polkadot/validation/src/error.rs
index 1890863a07760af88fbdad4b6892383ac66c949b..9ba4922eeb785314c35913d8f231951568a112fc 100644
--- a/polkadot/validation/src/error.rs
+++ b/polkadot/validation/src/error.rs
@@ -46,6 +46,8 @@ pub enum Error {
 	Timer(std::io::Error),
 	#[display(fmt = "Failed to compute deadline of now + {:?}", _0)]
 	DeadlineComputeFailure(std::time::Duration),
+	#[display(fmt = "Validation service is down.")]
+	ValidationServiceDown,
 	Join(tokio::task::JoinError)
 }
 
diff --git a/polkadot/validation/src/evaluation.rs b/polkadot/validation/src/evaluation.rs
index 18e0d9829afed07e3eddda81b760d6ff3d7aadc7..76e7e9f8fb5d57eac274a3812b53c19bcaa117f1 100644
--- a/polkadot/validation/src/evaluation.rs
+++ b/polkadot/validation/src/evaluation.rs
@@ -16,7 +16,8 @@
 
 //! Polkadot block evaluation and evaluation errors.
 
-use super::MAX_TRANSACTIONS_SIZE;
+// block size limit.
+pub(crate) const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;
 
 use codec::Encode;
 use polkadot_primitives::{Block, Hash, BlockNumber};
diff --git a/polkadot/validation/src/lib.rs b/polkadot/validation/src/lib.rs
index 860c1a6f3659016626bd7ffead0041aa6c405290..41f2552aebdafc998f2bce3f8167d1da81a42fef 100644
--- a/polkadot/validation/src/lib.rs
+++ b/polkadot/validation/src/lib.rs
@@ -31,50 +31,22 @@
 
 use std::{
 	collections::{HashMap, HashSet},
-	pin::Pin,
 	sync::Arc,
-	time::{self, Duration, Instant},
 };
 
-use babe_primitives::BabeApi;
-use sc_client_api::{Backend, BlockchainEvents, BlockBody};
-use sp_blockchain::HeaderBackend;
-use block_builder::BlockBuilderApi;
 use codec::Encode;
-use consensus::{SelectChain, Proposal, RecordProof};
-use availability_store::Store as AvailabilityStore;
-use parking_lot::Mutex;
-use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header};
+use polkadot_primitives::Hash;
 use polkadot_primitives::parachain::{
 	Id as ParaId, Chain, DutyRoster, CandidateReceipt,
-	ParachainHost, AttestedCandidate, Statement as PrimitiveStatement, Message, OutgoingMessages,
+	Statement as PrimitiveStatement, Message, OutgoingMessages,
 	Collation, PoVBlock, ErasureChunk, ValidatorSignature, ValidatorIndex,
-	ValidatorPair, ValidatorId, NEW_HEADS_IDENTIFIER,
+	ValidatorPair, ValidatorId,
 };
 use primitives::Pair;
-use runtime_primitives::traits::{DigestFor, HasherFor};
-use futures_timer::Delay;
-use txpool_api::{TransactionPool, InPoolTransaction};
 
-use attestation_service::ServiceHandle;
 use futures::prelude::*;
-use futures::{future::{select, ready}, stream::unfold, task::{Spawn, SpawnExt}};
-use collation::collation_fetch;
-use dynamic_inclusion::DynamicInclusion;
-use inherents::InherentData;
-use sp_timestamp::TimestampInherentData;
-use log::{info, debug, warn, trace, error};
-use keystore::KeyStorePtr;
-use sp_api::{ApiExt, ProvideRuntimeApi};
-
-type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
-
-fn interval(duration: Duration) -> impl Stream<Item=()> + Send + Unpin {
-	unfold((), move |_| {
-		futures_timer::Delay::new(duration).map(|_| Some(((), ())))
-	}).map(drop)
-}
 
+pub use self::block_production::ProposerFactory;
 pub use self::collation::{
 	validate_collation, validate_incoming, message_queue_root, egress_roots, Collators,
 	produce_receipt_and_chunks,
@@ -84,20 +56,19 @@ pub use self::shared_table::{
 	SharedTable, ParachainWork, PrimedParachainWork, Validated, Statement, SignedStatement,
 	GenericStatement,
 };
+pub use self::validation_service::{ServiceHandle, ServiceBuilder};
 
 #[cfg(not(target_os = "unknown"))]
 pub use parachain::wasm_executor::{run_worker as run_validation_worker};
 
-mod attestation_service;
 mod dynamic_inclusion;
 mod evaluation;
 mod error;
 mod shared_table;
 
 pub mod collation;
-
-// block size limit.
-const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;
+pub mod validation_service;
+pub mod block_production;
 
 /// Incoming messages; a series of sorted (ParaId, Message) pairs.
 pub type Incoming = Vec<(ParaId, Vec<Message>)>;
@@ -148,6 +119,13 @@ pub trait Network {
 	) -> Self::BuildTableRouter;
 }
 
+/// The local duty of a validator.
+#[derive(Debug)]
+pub struct LocalDuty {
+	validation: Chain,
+	index: ValidatorIndex,
+}
+
 /// Information about a specific group.
 #[derive(Debug, Clone, Default)]
 pub struct GroupInfo {
@@ -246,595 +224,6 @@ pub fn outgoing_queues(outgoing_targeted: &'_ OutgoingMessages)
 	})
 }
 
-// finds the first key we are capable of signing with out of the given set of validators,
-// if any.
-fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc<ValidatorPair>> {
-	let keystore = keystore.read();
-	validators.iter()
-		.find_map(|v| {
-			keystore.key_pair::<ValidatorPair>(&v).ok()
-		})
-		.map(|pair| Arc::new(pair))
-}
-
-/// Constructs parachain-agreement instances.
-struct ParachainValidation<C, N, P> {
-	/// The client instance.
-	client: Arc<P>,
-	/// The backing network handle.
-	network: N,
-	/// Parachain collators.
-	collators: C,
-	/// handle to remote task executor
-	handle: TaskExecutor,
-	/// Store for extrinsic data.
-	availability_store: AvailabilityStore,
-	/// Live agreements. Maps relay chain parent hashes to attestation
-	/// instances.
-	live_instances: Mutex<HashMap<Hash, Arc<AttestationTracker>>>,
-}
-
-impl<C, N, P> ParachainValidation<C, N, P> where
-	C: Collators + Send + Unpin + 'static + Sync,
-	N: Network,
-	P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
-	P::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
-	C::Collation: Send + Unpin + 'static,
-	N::TableRouter: Send + 'static,
-	N::BuildTableRouter: Unpin + Send + 'static,
-	// Rust bug: https://github.com/rust-lang/rust/issues/24159
-	sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>,
-{
-	/// Get an attestation table for given parent hash.
-	///
-	/// This starts a parachain agreement process on top of the parent hash if
-	/// one has not already started.
-	///
-	/// Additionally, this will trigger broadcast of data to the new block's duty
-	/// roster.
-	fn get_or_instantiate(
-		&self,
-		parent_hash: Hash,
-		keystore: &KeyStorePtr,
-		max_block_data_size: Option<u64>,
-	)
-		-> Result<Arc<AttestationTracker>, Error>
-	{
-		let mut live_instances = self.live_instances.lock();
-		if let Some(tracker) = live_instances.get(&parent_hash) {
-			return Ok(tracker.clone());
-		}
-
-		let id = BlockId::hash(parent_hash);
-
-		let validators = self.client.runtime_api().validators(&id)?;
-		let sign_with = signing_key(&validators[..], keystore);
-
-		let duty_roster = self.client.runtime_api().duty_roster(&id)?;
-
-		let (group_info, local_duty) = make_group_info(
-			duty_roster,
-			&validators,
-			sign_with.as_ref().map(|k| k.public()),
-		)?;
-
-		info!(
-			"Starting parachain attestation session on top of parent {:?}. Local parachain duty is {:?}",
-			parent_hash,
-			local_duty,
-		);
-
-		let active_parachains = self.client.runtime_api().active_parachains(&id)?;
-
-		debug!(target: "validation", "Active parachains: {:?}", active_parachains);
-
-		// If we are a validator, we need to store our index in this round in availability store.
-		// This will tell which erasure chunk we should store.
-		if let Some(ref local_duty) = local_duty {
-			if let Err(e) = self.availability_store.add_validator_index_and_n_validators(
-				&parent_hash,
-				local_duty.index,
-				validators.len() as u32,
-			) {
-				warn!(
-					target: "validation",
-					"Failed to add validator index and n_validators to the availability-store: {:?}", e
-				)
-			}
-		}
-
-		let table = Arc::new(SharedTable::new(
-			validators.clone(),
-			group_info,
-			sign_with,
-			parent_hash,
-			self.availability_store.clone(),
-			max_block_data_size,
-		));
-
-		let (_drop_signal, exit) = exit_future::signal();
-
-		let router = self.network.communication_for(
-			table.clone(),
-			&validators,
-			exit.clone(),
-		);
-
-		if let Some((Chain::Parachain(id), index)) = local_duty.as_ref().map(|d| (d.validation, d.index)) {
-			self.launch_work(parent_hash, id, router, max_block_data_size, validators.len(), index, exit);
-		}
-
-		let tracker = Arc::new(AttestationTracker {
-			table,
-			started: Instant::now(),
-			_drop_signal,
-		});
-
-		live_instances.insert(parent_hash, tracker.clone());
-
-		Ok(tracker)
-	}
-
-	/// Retain validation sessions matching predicate.
-	fn retain<F: FnMut(&Hash) -> bool>(&self, mut pred: F) {
-		self.live_instances.lock().retain(|k, _| pred(k))
-	}
-
-	// launch parachain work asynchronously.
-	fn launch_work(
-		&self,
-		relay_parent: Hash,
-		validation_para: ParaId,
-		build_router: N::BuildTableRouter,
-		max_block_data_size: Option<u64>,
-		authorities_num: usize,
-		local_id: ValidatorIndex,
-		exit: exit_future::Exit,
-	) {
-		let (collators, client) = (self.collators.clone(), self.client.clone());
-		let availability_store = self.availability_store.clone();
-
-		let with_router = move |router: N::TableRouter| {
-			// fetch a local collation from connected collators.
-			let collation_work = collation_fetch(
-				validation_para,
-				relay_parent,
-				collators,
-				client.clone(),
-				max_block_data_size,
-			);
-
-			collation_work.map(move |result| match result {
-				Ok((collation, outgoing_targeted, fees_charged)) => {
-					match produce_receipt_and_chunks(
-						authorities_num,
-						&collation.pov,
-						&outgoing_targeted,
-						fees_charged,
-						&collation.info,
-					) {
-						Ok((receipt, chunks)) => {
-							// Apparently the `async move` block is the only way to convince
-							// the compiler that we are not moving values out of borrowed context.
-							let av_clone = availability_store.clone();
-							let chunks_clone = chunks.clone();
-							let receipt_clone = receipt.clone();
-
-							let res = async move {
-								if let Err(e) = av_clone.clone().add_erasure_chunks(
-									relay_parent.clone(),
-									receipt_clone,
-									chunks_clone,
-								).await {
-									warn!(target: "validation", "Failed to add erasure chunks: {}", e);
-								}
-							}
-							.unit_error()
-							.boxed()
-							.then(move |_| {
-								router.local_collation(collation, receipt, outgoing_targeted, (local_id, &chunks));
-								ready(())
-							});
-
-							Ok(Some(res))
-						}
-						Err(e) => {
-							warn!(target: "validation", "Failed to produce a receipt: {:?}", e);
-							Ok(None)
-						}
-					}
-				}
-				Err(e) => {
-					warn!(target: "validation", "Failed to collate candidate: {:?}", e);
-					Ok(None)
-				}
-			})
-		};
-
-		let router = build_router
-			.map_ok(with_router)
-			.map_err(|e| {
-				warn!(target: "validation" , "Failed to build table router: {:?}", e);
-			})
-			.and_then(|f| f)
-			.and_then(|f| match f {
-				Some(f) => f.map(Ok).boxed(),
-				None => ready(Ok(())).boxed(),
-			}).boxed();
-
-		let cancellable_work = select(exit, router).map(drop);
-
-		// spawn onto thread pool.
-		if self.handle.spawn(cancellable_work).is_err() {
-			error!("Failed to spawn cancellable work task");
-		}
-	}
-}
-
-/// Parachain validation for a single block.
-struct AttestationTracker {
-	_drop_signal: exit_future::Signal,
-	table: Arc<SharedTable>,
-	started: Instant,
-}
-
-/// Polkadot proposer factory.
-pub struct ProposerFactory<C, N, P, SC, TxPool, B> {
-	parachain_validation: Arc<ParachainValidation<C, N, P>>,
-	transaction_pool: Arc<TxPool>,
-	keystore: KeyStorePtr,
-	_service_handle: ServiceHandle,
-	babe_slot_duration: u64,
-	_select_chain: SC,
-	max_block_data_size: Option<u64>,
-	backend: Arc<B>,
-}
-
-impl<C, N, P, SC, TxPool, B> ProposerFactory<C, N, P, SC, TxPool, B> where
-	C: Collators + Send + Sync + Unpin + 'static,
-	C::Collation: Send + Unpin + 'static,
-	P: BlockchainEvents<Block> + BlockBody<Block>,
-	P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
-	P::Api: ParachainHost<Block> +
-		BlockBuilderApi<Block> +
-		BabeApi<Block> +
-		ApiExt<Block, Error = sp_blockchain::Error>,
-	N: Network + Send + Sync + 'static,
-	N::TableRouter: Send + 'static,
-	N::BuildTableRouter: Send + Unpin + 'static,
-	TxPool: TransactionPool,
-	SC: SelectChain<Block> + 'static,
-	// Rust bug: https://github.com/rust-lang/rust/issues/24159
-	sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>,
-{
-	/// Create a new proposer factory.
-	pub fn new(
-		client: Arc<P>,
-		_select_chain: SC,
-		network: N,
-		collators: C,
-		transaction_pool: Arc<TxPool>,
-		thread_pool: TaskExecutor,
-		keystore: KeyStorePtr,
-		availability_store: AvailabilityStore,
-		babe_slot_duration: u64,
-		max_block_data_size: Option<u64>,
-		backend: Arc<B>,
-	) -> Self {
-		let parachain_validation = Arc::new(ParachainValidation {
-			client: client.clone(),
-			network,
-			collators,
-			handle: thread_pool.clone(),
-			availability_store: availability_store.clone(),
-			live_instances: Mutex::new(HashMap::new()),
-		});
-
-		let service_handle = crate::attestation_service::start(
-			client,
-			_select_chain.clone(),
-			parachain_validation.clone(),
-			thread_pool,
-			keystore.clone(),
-			max_block_data_size,
-		);
-
-		ProposerFactory {
-			parachain_validation,
-			transaction_pool,
-			keystore,
-			_service_handle: service_handle,
-			babe_slot_duration,
-			_select_chain,
-			max_block_data_size,
-			backend,
-		}
-	}
-}
-
-impl<C, N, P, SC, TxPool, B> consensus::Environment<Block> for ProposerFactory<C, N, P, SC, TxPool, B> where
-	C: Collators + Send + Unpin + 'static + Sync,
-	N: Network,
-	TxPool: TransactionPool<Block=Block> + 'static,
-	P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
-	P::Api: ParachainHost<Block> +
-		BlockBuilderApi<Block> +
-		BabeApi<Block> +
-		ApiExt<Block, Error = sp_blockchain::Error>,
-	C::Collation: Send + Unpin + 'static,
-	N::TableRouter: Send + 'static,
-	N::BuildTableRouter: Send + Unpin + 'static,
-	SC: SelectChain<Block>,
-	B: Backend<Block, State = sp_api::StateBackendFor<P, Block>> + 'static,
-	// Rust bug: https://github.com/rust-lang/rust/issues/24159
-	sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>> + Send,
-{
-	type CreateProposer = Pin<Box<
-		dyn Future<Output = Result<Self::Proposer, Self::Error>> + Send + Unpin + 'static
-	>>;
-	type Proposer = Proposer<P, TxPool, B>;
-	type Error = Error;
-
-	fn init(
-		&mut self,
-		parent_header: &Header,
-	) -> Self::CreateProposer {
-		let parent_hash = parent_header.hash();
-		let parent_id = BlockId::hash(parent_hash);
-
-		let maybe_proposer = self.parachain_validation.get_or_instantiate(
-			parent_hash,
-			&self.keystore,
-			self.max_block_data_size,
-		).map(|tracker| Proposer {
-			client: self.parachain_validation.client.clone(),
-			tracker,
-			parent_hash,
-			parent_id,
-			parent_number: parent_header.number,
-			transaction_pool: self.transaction_pool.clone(),
-			slot_duration: self.babe_slot_duration,
-			backend: self.backend.clone(),
-		});
-
-		Box::pin(future::ready(maybe_proposer))
-	}
-}
-
-/// The local duty of a validator.
-#[derive(Debug)]
-pub struct LocalDuty {
-	validation: Chain,
-	index: ValidatorIndex,
-}
-
-/// The Polkadot proposer logic.
-pub struct Proposer<Client, TxPool, Backend> {
-	client: Arc<Client>,
-	parent_hash: Hash,
-	parent_id: BlockId,
-	parent_number: BlockNumber,
-	tracker: Arc<AttestationTracker>,
-	transaction_pool: Arc<TxPool>,
-	slot_duration: u64,
-	backend: Arc<Backend>,
-}
-
-impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, TxPool, Backend> where
-	TxPool: TransactionPool<Block=Block> + 'static,
-	Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
-	Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
-	Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static,
-	// Rust bug: https://github.com/rust-lang/rust/issues/24159
-	sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send,
-{
-	type Error = Error;
-	type Transaction = sp_api::TransactionFor<Client, Block>;
-	type Proposal = Pin<
-		Box<
-			dyn Future<Output = Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error>>
-				+ Send
-		>
-	>;
-
-	fn propose(&mut self,
-		inherent_data: InherentData,
-		inherent_digests: DigestFor<Block>,
-		max_duration: Duration,
-		record_proof: RecordProof,
-	) -> Self::Proposal {
-		const SLOT_DURATION_DENOMINATOR: u64 = 3; // wait up to 1/3 of the slot for candidates.
-
-		let initial_included = self.tracker.table.includable_count();
-		let now = Instant::now();
-
-		let dynamic_inclusion = DynamicInclusion::new(
-			self.tracker.table.num_parachains(),
-			self.tracker.started,
-			Duration::from_millis(self.slot_duration / SLOT_DURATION_DENOMINATOR),
-		);
-
-		let parent_hash = self.parent_hash.clone();
-		let parent_number = self.parent_number.clone();
-		let parent_id = self.parent_id.clone();
-		let client = self.client.clone();
-		let transaction_pool = self.transaction_pool.clone();
-		let table = self.tracker.table.clone();
-		let backend = self.backend.clone();
-
-		async move {
-			let enough_candidates = dynamic_inclusion.acceptable_in(
-				now,
-				initial_included,
-			).unwrap_or_else(|| Duration::from_millis(1));
-
-			let believed_timestamp = match inherent_data.timestamp_inherent_data() {
-				Ok(timestamp) => timestamp,
-				Err(e) => return Err(Error::InherentError(e)),
-			};
-
-			let deadline_diff = max_duration - max_duration / 3;
-			let deadline = match Instant::now().checked_add(deadline_diff) {
-				None => return Err(Error::DeadlineComputeFailure(deadline_diff)),
-				Some(d) => d,
-			};
-
-			let data = CreateProposalData {
-				parent_hash,
-				parent_number,
-				parent_id,
-				client,
-				transaction_pool,
-				table,
-				believed_minimum_timestamp: believed_timestamp,
-				inherent_data: Some(inherent_data),
-				inherent_digests,
-				// leave some time for the proposal finalisation
-				deadline,
-				record_proof,
-				backend,
-			};
-
-			// set up delay until next allowed timestamp.
-			let current_timestamp = current_timestamp();
-			if current_timestamp < believed_timestamp {
-				Delay::new(Duration::from_millis(current_timestamp - believed_timestamp))
-					.await;
-			}
-
-			Delay::new(enough_candidates).await;
-
-			tokio_executor::blocking::run(move || {
-				let proposed_candidates = data.table.proposed_set();
-				data.propose_with(proposed_candidates)
-			})
-				.await
-		}.boxed()
-	}
-}
-
-fn current_timestamp() -> u64 {
-	time::SystemTime::now().duration_since(time::UNIX_EPOCH)
-		.expect("now always later than unix epoch; qed")
-		.as_millis() as u64
-}
-
-/// Inner data of the create proposal.
-struct CreateProposalData<Client, TxPool, Backend> {
-	parent_hash: Hash,
-	parent_number: BlockNumber,
-	parent_id: BlockId,
-	client: Arc<Client>,
-	transaction_pool: Arc<TxPool>,
-	table: Arc<SharedTable>,
-	believed_minimum_timestamp: u64,
-	inherent_data: Option<InherentData>,
-	inherent_digests: DigestFor<Block>,
-	deadline: Instant,
-	record_proof: RecordProof,
-	backend: Arc<Backend>,
-}
-
-impl<Client, TxPool, Backend> CreateProposalData<Client, TxPool, Backend> where
-	TxPool: TransactionPool<Block=Block>,
-	Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync,
-	Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
-	Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static,
-	// Rust bug: https://github.com/rust-lang/rust/issues/24159
-	sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HasherFor<Block>> + Send,
-{
-	fn propose_with(
-		mut self,
-		candidates: Vec<AttestedCandidate>,
-	) -> Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error> {
-		use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
-
-		const MAX_TRANSACTIONS: usize = 40;
-
-		let mut inherent_data = self.inherent_data
-			.take()
-			.expect("CreateProposal is not polled after finishing; qed");
-		inherent_data.put_data(NEW_HEADS_IDENTIFIER, &candidates)
-			.map_err(Error::InherentError)?;
-
-		let runtime_api = self.client.runtime_api();
-
-		let mut block_builder = block_builder::BlockBuilder::new(
-			&*self.client,
-			self.client.expect_block_hash_from_id(&self.parent_id)?,
-			self.client.expect_block_number_from_id(&self.parent_id)?,
-			self.record_proof,
-			self.inherent_digests.clone(),
-			&*self.backend,
-		)?;
-
-		{
-			let inherents = runtime_api.inherent_extrinsics(&self.parent_id, inherent_data)?;
-			for inherent in inherents {
-				block_builder.push(inherent)?;
-			}
-
-			let mut unqueue_invalid = Vec::new();
-			let mut pending_size = 0;
-
-			let ready_iter = self.transaction_pool.ready();
-			for ready in ready_iter.take(MAX_TRANSACTIONS) {
-				let encoded_size = ready.data().encode().len();
-				if pending_size + encoded_size >= MAX_TRANSACTIONS_SIZE {
-					break;
-				}
-				if Instant::now() > self.deadline {
-					debug!("Consensus deadline reached when pushing block transactions, proceeding with proposing.");
-					break;
-				}
-
-				match block_builder.push(ready.data().clone()) {
-					Ok(()) => {
-						debug!("[{:?}] Pushed to the block.", ready.hash());
-						pending_size += encoded_size;
-					}
-					Err(sp_blockchain::Error::ApplyExtrinsicFailed(sp_blockchain::ApplyExtrinsicFailed::Validity(e)))
-						if e.exhausted_resources() =>
-					{
-						debug!("Block is full, proceed with proposing.");
-						break;
-					}
-					Err(e) => {
-						trace!(target: "transaction-pool", "Invalid transaction: {}", e);
-						unqueue_invalid.push(ready.hash().clone());
-					}
-				}
-			}
-
-			self.transaction_pool.remove_invalid(&unqueue_invalid);
-		}
-
-		let (new_block, storage_changes, proof) = block_builder.build()?.into_inner();
-
-		info!("Prepared block for proposing at {} [hash: {:?}; parent_hash: {}; extrinsics: [{}]]",
-			new_block.header.number,
-			Hash::from(new_block.header.hash()),
-			new_block.header.parent_hash,
-			new_block.extrinsics.iter()
-				.map(|xt| format!("{}", BlakeTwo256::hash_of(xt)))
-				.collect::<Vec<_>>()
-				.join(", ")
-		);
-
-		// TODO: full re-evaluation (https://github.com/paritytech/polkadot/issues/216)
-		let active_parachains = runtime_api.active_parachains(&self.parent_id)?;
-		assert!(evaluation::evaluate_initial(
-			&new_block,
-			self.believed_minimum_timestamp,
-			&self.parent_hash,
-			self.parent_number,
-			&active_parachains[..],
-		).is_ok());
-
-		Ok(Proposal { block: new_block, storage_changes, proof })
-	}
-}
-
 #[cfg(test)]
 mod tests {
 	use super::*;
diff --git a/polkadot/validation/src/shared_table/mod.rs b/polkadot/validation/src/shared_table/mod.rs
index ad5eca61b3daed831394afb47048fefa7d7b7536..a6c22d627451e3f1d307a11d745222a198ad617b 100644
--- a/polkadot/validation/src/shared_table/mod.rs
+++ b/polkadot/validation/src/shared_table/mod.rs
@@ -431,7 +431,10 @@ impl SharedTable {
 
 	/// Import a single statement with remote source, whose signature has already been checked.
 	///
-	/// The statement producer, if any, will produce only statements concerning the same candidate
+	/// Validity and invalidity statements are only valid if the corresponding
+	/// candidate has already been imported.
+	///
+	/// The ParachainWork, if any, will produce only statements concerning the same candidate
 	/// as the one just imported
 	pub fn import_remote_statement<R: TableRouter>(
 		&self,
@@ -446,8 +449,10 @@ impl SharedTable {
 	/// Import many statements at once.
 	///
 	/// Provide an iterator yielding remote, pre-checked statements.
+	/// Validity and invalidity statements are only valid if the corresponding
+	/// candidate has already been imported.
 	///
-	/// The statement producer, if any, will produce only statements concerning the same candidate
+	/// The ParachainWork, if any, will produce only statements concerning the same candidate
 	/// as the one just imported
 	pub fn import_remote_statements<R, I, U>(&self, router: &R, iterable: I) -> U
 		where
diff --git a/polkadot/validation/src/validation_service/mod.rs b/polkadot/validation/src/validation_service/mod.rs
new file mode 100644
index 0000000000000000000000000000000000000000..14dd85c72eeda9276fca5d39740e8f1e7338159c
--- /dev/null
+++ b/polkadot/validation/src/validation_service/mod.rs
@@ -0,0 +1,463 @@
+// Copyright 2017-2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! The validation service is a long-running future that creates and manages parachain attestation
+//! instances.
+//!
+//! As soon as we import a new chain head, we start a parachain attestation session on top of it.
+//! The block authorship service may want access to the attestation session, and for that reason
+//! we expose a `ServiceHandle` which can be used to request a copy of it.
+//!
+//! In fact, the import notification and request from the block production pipeline may race to be
+//! the first one to create the instant, but the import notification will usually win.
+//!
+//! These attestation sessions are kept live until they are periodically garbage-collected.
+
+use std::{time::{Duration, Instant}, sync::Arc};
+use std::collections::HashMap;
+
+use sc_client_api::{BlockchainEvents, BlockBody};
+use sp_blockchain::HeaderBackend;
+use block_builder::BlockBuilderApi;
+use consensus::SelectChain;
+use futures::prelude::*;
+use futures::{future::{ready, select}, task::{Spawn, SpawnExt}};
+use polkadot_primitives::{Block, Hash, BlockId};
+use polkadot_primitives::parachain::{
+	Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair,
+};
+use babe_primitives::BabeApi;
+use keystore::KeyStorePtr;
+use sp_api::{ApiExt, ProvideRuntimeApi};
+use runtime_primitives::traits::HasherFor;
+use availability_store::Store as AvailabilityStore;
+
+use log::{warn, error, info, debug};
+
+use super::{Network, Collators, SharedTable, TableRouter};
+use crate::Error;
+
+/// A handle to spawn background tasks onto.
+pub type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
+
+// Remote processes may request for a validation instance to be cloned or instantiated.
+// They send a oneshot channel.
+type ValidationInstanceRequest = (
+	Hash,
+	futures::channel::oneshot::Sender<Result<Arc<ValidationInstanceHandle>, Error>>,
+);
+
+/// A handle to a single instance of parachain validation, which is pinned to
+/// a specific relay-chain block. This is the instance that should be used when
+/// constructing any
+pub(crate) struct ValidationInstanceHandle {
+	_drop_signal: exit_future::Signal,
+	table: Arc<SharedTable>,
+	started: Instant,
+}
+
+impl ValidationInstanceHandle {
+	/// Access the underlying table of attestations on parachain candidates.
+	pub(crate) fn table(&self) -> &Arc<SharedTable> {
+		&self.table
+	}
+
+	/// The moment we started this validation instance.
+	pub(crate) fn started(&self) -> Instant {
+		self.started.clone()
+	}
+}
+
+/// A handle to the service. This can be used to create a block-production environment.
+#[derive(Clone)]
+pub struct ServiceHandle {
+	sender: futures::channel::mpsc::Sender<ValidationInstanceRequest>,
+}
+
+impl ServiceHandle {
+	/// Requests instantiation or cloning of a validation instance from the service.
+	///
+	/// This can fail if the service task has shut down for some reason.
+	pub(crate) async fn get_validation_instance(self, relay_parent: Hash)
+		-> Result<Arc<ValidationInstanceHandle>, Error>
+	{
+		let mut sender = self.sender;
+		let instance_rx = loop {
+			let (instance_tx, instance_rx) = futures::channel::oneshot::channel();
+			match sender.send((relay_parent, instance_tx)).await {
+				Ok(()) => break instance_rx,
+				Err(e) => if !e.is_full() {
+					// Sink::send should be doing `poll_ready` before start-send,
+					// so this should only happen when there is a race.
+					return Err(Error::ValidationServiceDown)
+				},
+			}
+		};
+
+		instance_rx.map_err(|_| Error::ValidationServiceDown).await.and_then(|x| x)
+	}
+}
+
+fn interval(duration: Duration) -> impl Stream<Item=()> + Send + Unpin {
+	stream::unfold((), move |_| {
+		futures_timer::Delay::new(duration).map(|_| Some(((), ())))
+	}).map(drop)
+}
+
+/// A builder for the validation service.
+pub struct ServiceBuilder<C, N, P, SC> {
+	/// The underlying blockchain client.
+	pub client: Arc<P>,
+	/// A handle to the network object used to communicate.
+	pub network: N,
+	/// A handle to the collator pool we are using.
+	pub collators: C,
+	/// A handle to a background executor.
+	pub task_executor: TaskExecutor,
+	/// A handle to the availability store.
+	pub availability_store: AvailabilityStore,
+	/// A chain selector for determining active leaves in the block-DAG.
+	pub select_chain: SC,
+	/// The keystore which holds the signing keys.
+	pub keystore: KeyStorePtr,
+	/// The maximum block-data size in bytes.
+	pub max_block_data_size: Option<u64>,
+}
+
+impl<C, N, P, SC> ServiceBuilder<C, N, P, SC> where
+	C: Collators + Send + Sync + Unpin + 'static,
+	C::Collation: Send + Unpin + 'static,
+	P: BlockchainEvents<Block> + BlockBody<Block>,
+	P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
+	P::Api: ParachainHost<Block> +
+		BlockBuilderApi<Block> +
+		BabeApi<Block> +
+		ApiExt<Block, Error = sp_blockchain::Error>,
+	N: Network + Send + Sync + 'static,
+	N::TableRouter: Send + 'static,
+	N::BuildTableRouter: Send + Unpin + 'static,
+	SC: SelectChain<Block> + 'static,
+	// Rust bug: https://github.com/rust-lang/rust/issues/24159
+	sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>,
+{
+	/// Build the service - this consists of a handle to it, as well as a background
+	/// future to be run to completion.
+	pub fn build(self) -> (ServiceHandle, impl Future<Output = ()> + Send + 'static) {
+		const TIMER_INTERVAL: Duration = Duration::from_secs(30);
+		const CHAN_BUFFER: usize = 10;
+
+		enum Message {
+			CollectGarbage,
+			// relay-parent, receiver for instance.
+			RequestInstance(ValidationInstanceRequest),
+			// new chain heads - import notification.
+			NotifyImport(sc_client_api::BlockImportNotification<Block>),
+		}
+
+		let mut parachain_validation = ParachainValidationInstances {
+			client: self.client.clone(),
+			network: self.network,
+			collators: self.collators,
+			handle: self.task_executor,
+			availability_store: self.availability_store,
+			live_instances: HashMap::new(),
+		};
+
+		let client = self.client;
+		let select_chain = self.select_chain;
+		let keystore = self.keystore;
+		let max_block_data_size = self.max_block_data_size;
+
+		let (tx, rx) = futures::channel::mpsc::channel(CHAN_BUFFER);
+		let interval = interval(TIMER_INTERVAL).map(|_| Message::CollectGarbage);
+		let import_notifications = client.import_notification_stream().map(Message::NotifyImport);
+		let instance_requests = rx.map(Message::RequestInstance);
+		let service = ServiceHandle { sender: tx };
+
+		let background_work = async move {
+			let message_stream = futures::stream::select(interval, instance_requests);
+			let mut message_stream = futures::stream::select(import_notifications, message_stream);
+			while let Some(message) = message_stream.next().await {
+				match message {
+					Message::CollectGarbage => {
+						match select_chain.leaves() {
+							Ok(leaves) => {
+								parachain_validation.retain(|h| leaves.contains(h));
+							}
+							Err(e) => {
+								warn!("Error fetching leaves from client: {:?}", e);
+							}
+						}
+					}
+					Message::RequestInstance((relay_parent, sender)) => {
+						// Upstream will handle the failure case.
+						let _ = sender.send(parachain_validation.get_or_instantiate(
+							relay_parent,
+							&keystore,
+							max_block_data_size,
+						));
+					}
+					Message::NotifyImport(notification) => {
+						let relay_parent = notification.hash;
+						if notification.is_new_best {
+							let res = parachain_validation.get_or_instantiate(
+								relay_parent,
+								&keystore,
+								max_block_data_size,
+							);
+
+							if let Err(e) = res {
+								warn!(
+									"Unable to start parachain validation on top of {:?}: {}",
+									relay_parent, e
+								);
+							}
+						}
+					}
+				}
+			}
+		};
+
+		(service, background_work)
+	}
+}
+
+// finds the first key we are capable of signing with out of the given set of validators,
+// if any.
+fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc<ValidatorPair>> {
+	let keystore = keystore.read();
+	validators.iter()
+		.find_map(|v| {
+			keystore.key_pair::<ValidatorPair>(&v).ok()
+		})
+		.map(|pair| Arc::new(pair))
+}
+
+/// Constructs parachain-agreement instances.
+pub(crate) struct ParachainValidationInstances<C, N, P> {
+	/// The client instance.
+	client: Arc<P>,
+	/// The backing network handle.
+	network: N,
+	/// Parachain collators.
+	collators: C,
+	/// handle to remote task executor
+	handle: TaskExecutor,
+	/// Store for extrinsic data.
+	availability_store: AvailabilityStore,
+	/// Live agreements. Maps relay chain parent hashes to attestation
+	/// instances.
+	live_instances: HashMap<Hash, Arc<ValidationInstanceHandle>>,
+}
+
+impl<C, N, P> ParachainValidationInstances<C, N, P> where
+	C: Collators + Send + Unpin + 'static,
+	N: Network,
+	P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
+	P::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
+	C::Collation: Send + Unpin + 'static,
+	N::TableRouter: Send + 'static,
+	N::BuildTableRouter: Unpin + Send + 'static,
+	// Rust bug: https://github.com/rust-lang/rust/issues/24159
+	sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HasherFor<Block>>,
+{
+	/// Get an attestation table for given parent hash.
+	///
+	/// This starts a parachain agreement process on top of the parent hash if
+	/// one has not already started.
+	///
+	/// Additionally, this will trigger broadcast of data to the new block's duty
+	/// roster.
+	fn get_or_instantiate(
+		&mut self,
+		parent_hash: Hash,
+		keystore: &KeyStorePtr,
+		max_block_data_size: Option<u64>,
+	)
+		-> Result<Arc<ValidationInstanceHandle>, Error>
+	{
+		use primitives::Pair;
+
+		if let Some(tracker) = self.live_instances.get(&parent_hash) {
+			return Ok(tracker.clone());
+		}
+
+		let id = BlockId::hash(parent_hash);
+
+		let validators = self.client.runtime_api().validators(&id)?;
+		let sign_with = signing_key(&validators[..], keystore);
+
+		let duty_roster = self.client.runtime_api().duty_roster(&id)?;
+
+		let (group_info, local_duty) = crate::make_group_info(
+			duty_roster,
+			&validators,
+			sign_with.as_ref().map(|k| k.public()),
+		)?;
+
+		info!(
+			"Starting parachain attestation session on top of parent {:?}. Local parachain duty is {:?}",
+			parent_hash,
+			local_duty,
+		);
+
+		let active_parachains = self.client.runtime_api().active_parachains(&id)?;
+
+		debug!(target: "validation", "Active parachains: {:?}", active_parachains);
+
+		// If we are a validator, we need to store our index in this round in availability store.
+		// This will tell which erasure chunk we should store.
+		if let Some(ref local_duty) = local_duty {
+			if let Err(e) = self.availability_store.add_validator_index_and_n_validators(
+				&parent_hash,
+				local_duty.index,
+				validators.len() as u32,
+			) {
+				warn!(
+					target: "validation",
+					"Failed to add validator index and n_validators to the availability-store: {:?}", e
+				)
+			}
+		}
+
+		let table = Arc::new(SharedTable::new(
+			validators.clone(),
+			group_info,
+			sign_with,
+			parent_hash,
+			self.availability_store.clone(),
+			max_block_data_size,
+		));
+
+		let (_drop_signal, exit) = exit_future::signal();
+
+		let router = self.network.communication_for(
+			table.clone(),
+			&validators,
+			exit.clone(),
+		);
+
+		if let Some((Chain::Parachain(id), index)) = local_duty.as_ref().map(|d| (d.validation, d.index)) {
+			self.launch_work(parent_hash, id, router, max_block_data_size, validators.len(), index, exit);
+		}
+
+		let tracker = Arc::new(ValidationInstanceHandle {
+			table,
+			started: Instant::now(),
+			_drop_signal,
+		});
+
+		self.live_instances.insert(parent_hash, tracker.clone());
+
+		Ok(tracker)
+	}
+
+	/// Retain validation sessions matching predicate.
+	fn retain<F: FnMut(&Hash) -> bool>(&mut self, mut pred: F) {
+		self.live_instances.retain(|k, _| pred(k))
+	}
+
+	// launch parachain work asynchronously.
+	fn launch_work(
+		&self,
+		relay_parent: Hash,
+		validation_para: ParaId,
+		build_router: N::BuildTableRouter,
+		max_block_data_size: Option<u64>,
+		authorities_num: usize,
+		local_id: ValidatorIndex,
+		exit: exit_future::Exit,
+	) {
+		let (collators, client) = (self.collators.clone(), self.client.clone());
+		let availability_store = self.availability_store.clone();
+
+		let with_router = move |router: N::TableRouter| {
+			// fetch a local collation from connected collators.
+			let collation_work = crate::collation::collation_fetch(
+				validation_para,
+				relay_parent,
+				collators,
+				client.clone(),
+				max_block_data_size,
+			);
+
+			collation_work.map(move |result| match result {
+				Ok((collation, outgoing_targeted, fees_charged)) => {
+					match crate::collation::produce_receipt_and_chunks(
+						authorities_num,
+						&collation.pov,
+						&outgoing_targeted,
+						fees_charged,
+						&collation.info,
+					) {
+						Ok((receipt, chunks)) => {
+							// Apparently the `async move` block is the only way to convince
+							// the compiler that we are not moving values out of borrowed context.
+							let av_clone = availability_store.clone();
+							let chunks_clone = chunks.clone();
+							let receipt_clone = receipt.clone();
+
+							let res = async move {
+								if let Err(e) = av_clone.clone().add_erasure_chunks(
+									relay_parent.clone(),
+									receipt_clone,
+									chunks_clone,
+								).await {
+									warn!(target: "validation", "Failed to add erasure chunks: {}", e);
+								}
+							}
+							.unit_error()
+							.boxed()
+							.then(move |_| {
+								router.local_collation(
+									collation,
+									receipt,
+									outgoing_targeted,
+									(local_id, &chunks),
+								);
+								ready(())
+							});
+
+
+							Some(res)
+						}
+						Err(e) => {
+							warn!(target: "validation", "Failed to produce a receipt: {:?}", e);
+							None
+						}
+					}
+				}
+				Err(e) => {
+					warn!(target: "validation", "Failed to collate candidate: {:?}", e);
+					None
+				}
+			})
+		};
+
+		let router = build_router
+			.map_ok(with_router)
+			.map_err(|e| {
+				warn!(target: "validation" , "Failed to build table router: {:?}", e);
+			});
+
+		let cancellable_work = select(exit, router).map(drop);
+
+		// spawn onto thread pool.
+		if self.handle.spawn(cancellable_work).is_err() {
+			error!("Failed to spawn cancellable work task");
+		}
+	}
+}