diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index 105b143acfbc2aa8e27f74c20b45e707699c5189..e0ddadbe1ec5b79a86dbd27bc652548a63c3988c 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -4653,6 +4653,22 @@ dependencies = [
  "sp-runtime",
 ]
 
+[[package]]
+name = "polkadot-node-collation-generation"
+version = "0.1.0"
+dependencies = [
+ "derive_more 0.99.9",
+ "futures 0.3.5",
+ "log 0.4.11",
+ "polkadot-erasure-coding",
+ "polkadot-node-primitives",
+ "polkadot-node-subsystem",
+ "polkadot-node-subsystem-test-helpers",
+ "polkadot-node-subsystem-util",
+ "polkadot-primitives",
+ "sp-core",
+]
+
 [[package]]
 name = "polkadot-node-core-av-store"
 version = "0.1.0"
@@ -4816,6 +4832,7 @@ dependencies = [
 name = "polkadot-node-primitives"
 version = "0.1.0"
 dependencies = [
+ "futures 0.3.5",
  "parity-scale-codec",
  "polkadot-primitives",
  "polkadot-statement-table",
diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml
index c36511f813cf34eef813b02a2a1b1b61ba626c3d..1c98ca8d0b85715ce85444b180ef8e4f63e9aedd 100644
--- a/polkadot/Cargo.toml
+++ b/polkadot/Cargo.toml
@@ -44,6 +44,7 @@ members = [
 	"service",
 	"validation",
 
+	"node/collation-generation",
 	"node/core/av-store",
 	"node/core/backing",
 	"node/core/bitfield-signing",
diff --git a/polkadot/node/collation-generation/Cargo.toml b/polkadot/node/collation-generation/Cargo.toml
new file mode 100644
index 0000000000000000000000000000000000000000..f7d5e7f162efc0c8230cd8d6714035a97b2fc33c
--- /dev/null
+++ b/polkadot/node/collation-generation/Cargo.toml
@@ -0,0 +1,19 @@
+[package]
+name = "polkadot-node-collation-generation"
+version = "0.1.0"
+authors = ["Parity Technologies <admin@parity.io>"]
+edition = "2018"
+
+[dependencies]
+derive_more = "0.99.9"
+futures = "0.3.5"
+log = "0.4.8"
+polkadot-erasure-coding = { path = "../../erasure-coding" }
+polkadot-node-primitives = { path = "../primitives" }
+polkadot-node-subsystem = { path = "../subsystem" }
+polkadot-node-subsystem-util = { path = "../subsystem-util" }
+polkadot-primitives = { path = "../../primitives" }
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
+
+[dev-dependencies]
+polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs
new file mode 100644
index 0000000000000000000000000000000000000000..3ad76ff7f76dae1b903be313c6874b97247ff065
--- /dev/null
+++ b/polkadot/node/collation-generation/src/lib.rs
@@ -0,0 +1,652 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! The collation generation subsystem is the interface between polkadot and the collators.
+
+#![deny(missing_docs)]
+
+use futures::{
+	channel::{mpsc, oneshot},
+	future::FutureExt,
+	join,
+	select,
+	sink::SinkExt,
+	stream::StreamExt,
+};
+use polkadot_node_primitives::CollationGenerationConfig;
+use polkadot_node_subsystem::{
+	errors::RuntimeApiError,
+	messages::{AllMessages, CollationGenerationMessage, CollatorProtocolMessage},
+	FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
+};
+use polkadot_node_subsystem_util::{
+	self as util, request_availability_cores_ctx, request_global_validation_data_ctx,
+	request_local_validation_data_ctx, request_validators_ctx,
+};
+use polkadot_primitives::v1::{
+	collator_signature_payload, validation_data_hash, AvailableData, CandidateCommitments,
+	CandidateDescriptor, CandidateReceipt, CoreState, GlobalValidationData, Hash,
+	LocalValidationData, OccupiedCoreAssumption, PoV,
+};
+use sp_core::crypto::Pair;
+use std::sync::Arc;
+
+/// Collation Generation Subsystem
+pub struct CollationGenerationSubsystem {
+	config: Option<Arc<CollationGenerationConfig>>,
+}
+
+impl CollationGenerationSubsystem {
+	/// Run this subsystem
+	///
+	/// Conceptually, this is very simple: it just loops forever.
+	///
+	/// - On incoming overseer messages, it starts or stops jobs as appropriate.
+	/// - On other incoming messages, if they can be converted into Job::ToJob and
+	///   include a hash, then they're forwarded to the appropriate individual job.
+	/// - On outgoing messages from the jobs, it forwards them to the overseer.
+	///
+	/// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
+	/// Otherwise, most are logged and then discarded.
+	async fn run<Context>(mut self, mut ctx: Context)
+	where
+		Context: SubsystemContext<Message = CollationGenerationMessage>,
+	{
+		// when we activate new leaves, we spawn a bunch of sub-tasks, each of which is
+		// expected to generate precisely one message. We don't want to block the main loop
+		// at any point waiting for them all, so instead, we create a channel on which they can
+		// send those messages. We can then just monitor the channel and forward messages on it
+		// to the overseer here, via the context.
+		let (sender, mut receiver) = mpsc::channel(0);
+
+		loop {
+			select! {
+				incoming = ctx.recv().fuse() => {
+					if self.handle_incoming::<Context>(incoming, &mut ctx, &sender).await {
+						break;
+					}
+				},
+				msg = receiver.next().fuse() => {
+					if let Some(msg) = msg {
+						if let Err(err) = ctx.send_message(msg).await {
+							log::warn!(target: "collation_generation", "failed to forward message to overseer: {:?}", err);
+							break;
+						}
+					}
+				},
+			}
+		}
+	}
+
+	// handle an incoming message. return true if we should break afterwards.
+	// note: this doesn't strictly need to be a separate function; it's more an administrative function
+	// so that we don't clutter the run loop. It could in principle be inlined directly into there.
+	// it should hopefully therefore be ok that it's an async function mutably borrowing self.
+	async fn handle_incoming<Context>(
+		&mut self,
+		incoming: SubsystemResult<FromOverseer<Context::Message>>,
+		ctx: &mut Context,
+		sender: &mpsc::Sender<AllMessages>,
+	) -> bool
+	where
+		Context: SubsystemContext<Message = CollationGenerationMessage>,
+	{
+		use polkadot_node_subsystem::ActiveLeavesUpdate;
+		use polkadot_node_subsystem::FromOverseer::{Communication, Signal};
+		use polkadot_node_subsystem::OverseerSignal::{ActiveLeaves, BlockFinalized, Conclude};
+
+		match incoming {
+			Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, .. }))) => {
+				// follow the procedure from the guide
+				if let Some(config) = &self.config {
+					if let Err(err) =
+						handle_new_activations(config.clone(), &activated, ctx, sender).await
+					{
+						log::warn!(target: "collation_generation", "failed to handle new activations: {:?}", err);
+						return true;
+					};
+				}
+				false
+			}
+			Ok(Signal(Conclude)) => true,
+			Ok(Communication {
+				msg: CollationGenerationMessage::Initialize(config),
+			}) => {
+				if self.config.is_some() {
+					log::warn!(target: "collation_generation", "double initialization");
+					true
+				} else {
+					self.config = Some(Arc::new(config));
+					false
+				}
+			}
+			Ok(Signal(BlockFinalized(_))) => false,
+			Err(err) => {
+				log::error!(target: "collation_generation", "error receiving message from subsystem context: {:?}", err);
+				true
+			}
+		}
+	}
+}
+
+impl<Context> Subsystem<Context> for CollationGenerationSubsystem
+where
+	Context: SubsystemContext<Message = CollationGenerationMessage>,
+{
+	fn start(self, ctx: Context) -> SpawnedSubsystem {
+		let subsystem = CollationGenerationSubsystem { config: None };
+
+		let future = Box::pin(subsystem.run(ctx));
+
+		SpawnedSubsystem {
+			name: "CollationGenerationSubsystem",
+			future,
+		}
+	}
+}
+
+#[derive(Debug, derive_more::From)]
+enum Error {
+	#[from]
+	Subsystem(SubsystemError),
+	#[from]
+	OneshotRecv(oneshot::Canceled),
+	#[from]
+	Runtime(RuntimeApiError),
+	#[from]
+	Util(util::Error),
+	#[from]
+	Erasure(polkadot_erasure_coding::Error),
+}
+
+type Result<T> = std::result::Result<T, Error>;
+
+async fn handle_new_activations<Context: SubsystemContext>(
+	config: Arc<CollationGenerationConfig>,
+	activated: &[Hash],
+	ctx: &mut Context,
+	sender: &mpsc::Sender<AllMessages>,
+) -> Result<()> {
+	// follow the procedure from the guide:
+	// https://w3f.github.io/parachain-implementers-guide/node/collators/collation-generation.html
+
+	for relay_parent in activated.iter().copied() {
+		// double-future magic happens here: the first layer of requests takes a mutable borrow of the context, and
+		// returns a receiver. The second layer of requests actually polls those receivers to completion.
+		let (global_validation_data, availability_cores, validators) = join!(
+			request_global_validation_data_ctx(relay_parent, ctx).await?,
+			request_availability_cores_ctx(relay_parent, ctx).await?,
+			request_validators_ctx(relay_parent, ctx).await?,
+		);
+
+		let global_validation_data = global_validation_data??;
+		let availability_cores = availability_cores??;
+		let n_validators = validators??.len();
+
+		for core in availability_cores {
+			let (scheduled_core, assumption) = match core {
+				CoreState::Scheduled(scheduled_core) => {
+					(scheduled_core, OccupiedCoreAssumption::Free)
+				}
+				CoreState::Occupied(_occupied_core) => {
+					// TODO: https://github.com/paritytech/polkadot/issues/1573
+					continue;
+				}
+				_ => continue,
+			};
+
+			if scheduled_core.para_id != config.para_id {
+				continue;
+			}
+
+			// we get local validation data synchronously for each core instead of within the subtask loop,
+			// because we have only a single mutable handle to the context, so the work can't really be distributed
+			let local_validation_data = match request_local_validation_data_ctx(
+				relay_parent,
+				scheduled_core.para_id,
+				assumption,
+				ctx,
+			)
+			.await?
+			.await??
+			{
+				Some(local_validation_data) => local_validation_data,
+				None => continue,
+			};
+
+			let task_global_validation_data = global_validation_data.clone();
+			let task_config = config.clone();
+			let mut task_sender = sender.clone();
+			ctx.spawn("collation generation collation builder", Box::pin(async move {
+				let validation_data_hash =
+					validation_data_hash(&task_global_validation_data, &local_validation_data);
+
+				let collation = (task_config.collator)(&task_global_validation_data, &local_validation_data).await;
+
+				let pov_hash = collation.proof_of_validity.hash();
+
+				let signature_payload = collator_signature_payload(
+					&relay_parent,
+					&scheduled_core.para_id,
+					&validation_data_hash,
+					&pov_hash,
+				);
+
+				let erasure_root = match erasure_root(n_validators, local_validation_data, task_global_validation_data, collation.proof_of_validity.clone()) {
+					Ok(erasure_root) => erasure_root,
+					Err(err) => {
+						log::error!(target: "collation_generation", "failed to calculate erasure root for para_id {}: {:?}", scheduled_core.para_id, err);
+						return
+					}
+				};
+
+				let commitments = CandidateCommitments {
+					fees: collation.fees,
+					upward_messages: collation.upward_messages,
+					new_validation_code: collation.new_validation_code,
+					head_data: collation.head_data,
+					erasure_root,
+				};
+
+				let ccr = CandidateReceipt {
+					commitments_hash: commitments.hash(),
+					descriptor: CandidateDescriptor {
+						signature: task_config.key.sign(&signature_payload),
+						para_id: scheduled_core.para_id,
+						relay_parent,
+						collator: task_config.key.public(),
+						validation_data_hash,
+						pov_hash,
+					},
+				};
+
+				if let Err(err) = task_sender.send(AllMessages::CollatorProtocol(
+					CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity)
+				)).await {
+					log::warn!(target: "collation_generation", "failed to send collation result for para_id {}: {:?}", scheduled_core.para_id, err);
+				}
+			})).await?;
+		}
+	}
+
+	Ok(())
+}
+
+fn erasure_root(
+	n_validators: usize,
+	local_validation_data: LocalValidationData,
+	global_validation_data: GlobalValidationData,
+	pov: PoV,
+) -> Result<Hash> {
+	let omitted_validation = polkadot_primitives::v1::OmittedValidationData {
+		global_validation: global_validation_data,
+		local_validation: local_validation_data,
+	};
+
+	let available_data = AvailableData {
+		omitted_validation,
+		pov,
+	};
+
+	let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
+	Ok(polkadot_erasure_coding::branches(&chunks).root())
+}
+
+#[cfg(test)]
+mod tests {
+	mod handle_new_activations {
+		use super::super::*;
+		use futures::{
+			lock::Mutex,
+			task::{Context as FuturesContext, Poll},
+			Future,
+		};
+		use polkadot_node_primitives::Collation;
+		use polkadot_node_subsystem::messages::{
+			AllMessages, RuntimeApiMessage, RuntimeApiRequest,
+		};
+		use polkadot_node_subsystem_test_helpers::{
+			subsystem_test_harness, TestSubsystemContextHandle,
+		};
+		use polkadot_primitives::v1::{
+			BlockData, BlockNumber, CollatorPair, GlobalValidationData, Id as ParaId,
+			LocalValidationData, PoV, ScheduledCore,
+		};
+		use std::pin::Pin;
+
+		fn test_collation() -> Collation {
+			Collation {
+				fees: Default::default(),
+				upward_messages: Default::default(),
+				new_validation_code: Default::default(),
+				head_data: Default::default(),
+				proof_of_validity: PoV {
+					block_data: BlockData(Vec::new()),
+				},
+			}
+		}
+
+		// Box<dyn Future<Output = Collation> + Unpin + Send
+		struct TestCollator;
+
+		impl Future for TestCollator {
+			type Output = Collation;
+
+			fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll<Self::Output> {
+				Poll::Ready(test_collation())
+			}
+		}
+
+		impl Unpin for TestCollator {}
+
+		fn test_config<Id: Into<ParaId>>(para_id: Id) -> Arc<CollationGenerationConfig> {
+			Arc::new(CollationGenerationConfig {
+				key: CollatorPair::generate().0,
+				collator: Box::new(|_gvd: &GlobalValidationData, _lvd: &LocalValidationData| {
+					Box::new(TestCollator)
+				}),
+				para_id: para_id.into(),
+			})
+		}
+
+		fn scheduled_core_for<Id: Into<ParaId>>(para_id: Id) -> ScheduledCore {
+			ScheduledCore {
+				para_id: para_id.into(),
+				collator: None,
+			}
+		}
+
+		#[test]
+		fn requests_validation_and_availability_per_relay_parent() {
+			let activated_hashes: Vec<Hash> = vec![
+				[1; 32].into(),
+				[4; 32].into(),
+				[9; 32].into(),
+				[16; 32].into(),
+			];
+
+			let requested_validation_data = Arc::new(Mutex::new(Vec::new()));
+			let requested_availability_cores = Arc::new(Mutex::new(Vec::new()));
+
+			let overseer_requested_validation_data = requested_validation_data.clone();
+			let overseer_requested_availability_cores = requested_availability_cores.clone();
+			let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
+				loop {
+					match handle.try_recv().await {
+						None => break,
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::GlobalValidationData(tx)))) => {
+							overseer_requested_validation_data.lock().await.push(hash);
+							tx.send(Ok(Default::default())).unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx)))) => {
+							overseer_requested_availability_cores.lock().await.push(hash);
+							tx.send(Ok(vec![])).unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(_hash, RuntimeApiRequest::Validators(tx)))) => {
+							tx.send(Ok(vec![Default::default(); 3])).unwrap();
+						}
+						Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg),
+					}
+				}
+			};
+
+			let (tx, _rx) = mpsc::channel(0);
+
+			let subsystem_activated_hashes = activated_hashes.clone();
+			subsystem_test_harness(overseer, |mut ctx| async move {
+				handle_new_activations(
+					test_config(123),
+					&subsystem_activated_hashes,
+					&mut ctx,
+					&tx,
+				)
+				.await
+				.unwrap();
+			});
+
+			let mut requested_validation_data = Arc::try_unwrap(requested_validation_data)
+				.expect("overseer should have shut down by now")
+				.into_inner();
+			requested_validation_data.sort();
+			let mut requested_availability_cores = Arc::try_unwrap(requested_availability_cores)
+				.expect("overseer should have shut down by now")
+				.into_inner();
+			requested_availability_cores.sort();
+
+			assert_eq!(requested_validation_data, activated_hashes);
+			assert_eq!(requested_availability_cores, activated_hashes);
+		}
+
+		#[test]
+		fn requests_local_validation_for_scheduled_matches() {
+			let activated_hashes: Vec<Hash> = vec![
+				Hash::repeat_byte(1),
+				Hash::repeat_byte(4),
+				Hash::repeat_byte(9),
+				Hash::repeat_byte(16),
+			];
+
+			let requested_local_validation_data = Arc::new(Mutex::new(Vec::new()));
+
+			let overseer_requested_local_validation_data = requested_local_validation_data.clone();
+			let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
+				loop {
+					match handle.try_recv().await {
+						None => break,
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							_hash,
+							RuntimeApiRequest::GlobalValidationData(tx),
+						))) => {
+							tx.send(Ok(Default::default())).unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							hash,
+							RuntimeApiRequest::AvailabilityCores(tx),
+						))) => {
+							tx.send(Ok(vec![
+								CoreState::Free,
+								// this is weird, see explanation below
+								CoreState::Scheduled(scheduled_core_for(
+									(hash.as_fixed_bytes()[0] * 4) as u32,
+								)),
+								CoreState::Scheduled(scheduled_core_for(
+									(hash.as_fixed_bytes()[0] * 5) as u32,
+								)),
+							]))
+							.unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							hash,
+							RuntimeApiRequest::LocalValidationData(
+								_para_id,
+								_occupied_core_assumption,
+								tx,
+							),
+						))) => {
+							overseer_requested_local_validation_data
+								.lock()
+								.await
+								.push(hash);
+							tx.send(Ok(Default::default())).unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							_hash,
+							RuntimeApiRequest::Validators(tx),
+						))) => {
+							tx.send(Ok(vec![Default::default(); 3])).unwrap();
+						}
+						Some(msg) => {
+							panic!("didn't expect any other overseer requests; got {:?}", msg)
+						}
+					}
+				}
+			};
+
+			let (tx, _rx) = mpsc::channel(0);
+
+			subsystem_test_harness(overseer, |mut ctx| async move {
+				handle_new_activations(test_config(16), &activated_hashes, &mut ctx, &tx)
+					.await
+					.unwrap();
+			});
+
+			let requested_local_validation_data = Arc::try_unwrap(requested_local_validation_data)
+				.expect("overseer should have shut down by now")
+				.into_inner();
+
+			// the only activated hash should be from the 4 hash:
+			// each activated hash generates two scheduled cores: one with its value * 4, one with its value * 5
+			// given that the test configuration has a para_id of 16, there's only one way to get that value: with the 4
+			// hash.
+			assert_eq!(requested_local_validation_data, vec![[4; 32].into()]);
+		}
+
+		#[test]
+		fn sends_distribute_collation_message() {
+			let activated_hashes: Vec<Hash> = vec![
+				Hash::repeat_byte(1),
+				Hash::repeat_byte(4),
+				Hash::repeat_byte(9),
+				Hash::repeat_byte(16),
+			];
+
+			let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
+				loop {
+					match handle.try_recv().await {
+						None => break,
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							_hash,
+							RuntimeApiRequest::GlobalValidationData(tx),
+						))) => {
+							tx.send(Ok(Default::default())).unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							hash,
+							RuntimeApiRequest::AvailabilityCores(tx),
+						))) => {
+							tx.send(Ok(vec![
+								CoreState::Free,
+								// this is weird, see explanation below
+								CoreState::Scheduled(scheduled_core_for(
+									(hash.as_fixed_bytes()[0] * 4) as u32,
+								)),
+								CoreState::Scheduled(scheduled_core_for(
+									(hash.as_fixed_bytes()[0] * 5) as u32,
+								)),
+							]))
+							.unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							_hash,
+							RuntimeApiRequest::LocalValidationData(
+								_para_id,
+								_occupied_core_assumption,
+								tx,
+							),
+						))) => {
+							tx.send(Ok(Some(Default::default()))).unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							_hash,
+							RuntimeApiRequest::Validators(tx),
+						))) => {
+							tx.send(Ok(vec![Default::default(); 3])).unwrap();
+						}
+						Some(msg) => {
+							panic!("didn't expect any other overseer requests; got {:?}", msg)
+						}
+					}
+				}
+			};
+
+			let config = test_config(16);
+			let subsystem_config = config.clone();
+
+			let (tx, rx) = mpsc::channel(0);
+
+			// empty vec doesn't allocate on the heap, so it's ok we throw it away
+			let sent_messages = Arc::new(Mutex::new(Vec::new()));
+			let subsystem_sent_messages = sent_messages.clone();
+			subsystem_test_harness(overseer, |mut ctx| async move {
+				handle_new_activations(subsystem_config, &activated_hashes, &mut ctx, &tx)
+					.await
+					.unwrap();
+
+				std::mem::drop(tx);
+
+				// collect all sent messages
+				*subsystem_sent_messages.lock().await = rx.collect().await;
+			});
+
+			let sent_messages = Arc::try_unwrap(sent_messages)
+				.expect("subsystem should have shut down by now")
+				.into_inner();
+
+			// we expect a single message to be sent, containing a candidate receipt.
+			// we don't care too much about the commitments_hash right now, but let's ensure that we've calculated the
+			// correct descriptor
+			let expect_pov_hash = test_collation().proof_of_validity.hash();
+			let expect_validation_data_hash =
+				validation_data_hash::<BlockNumber>(&Default::default(), &Default::default());
+			let expect_relay_parent = Hash::repeat_byte(4);
+			let expect_payload = collator_signature_payload(
+				&expect_relay_parent,
+				&config.para_id,
+				&expect_validation_data_hash,
+				&expect_pov_hash,
+			);
+			let expect_descriptor = CandidateDescriptor {
+				signature: config.key.sign(&expect_payload),
+				para_id: config.para_id,
+				relay_parent: expect_relay_parent,
+				collator: config.key.public(),
+				validation_data_hash: expect_validation_data_hash,
+				pov_hash: expect_pov_hash,
+			};
+
+			assert_eq!(sent_messages.len(), 1);
+			match &sent_messages[0] {
+				AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
+					CandidateReceipt { descriptor, .. },
+					_pov,
+				)) => {
+					// signature generation is non-deterministic, so we can't just assert that the
+					// expected descriptor is correct. What we can do is validate that the produced
+					// descriptor has a valid signature, then just copy in the generated signature
+					// and check the rest of the fields for equality.
+					assert!(CollatorPair::verify(
+						&descriptor.signature,
+						&collator_signature_payload(
+							&descriptor.relay_parent,
+							&descriptor.para_id,
+							&descriptor.validation_data_hash,
+							&descriptor.pov_hash,
+						)
+						.as_ref(),
+						&descriptor.collator,
+					));
+					let expect_descriptor = {
+						let mut expect_descriptor = expect_descriptor;
+						expect_descriptor.signature = descriptor.signature.clone();
+						expect_descriptor
+					};
+					assert_eq!(descriptor, &expect_descriptor);
+				}
+				_ => panic!("received wrong message type"),
+			}
+		}
+	}
+}
diff --git a/polkadot/node/overseer/examples/minimal-example.rs b/polkadot/node/overseer/examples/minimal-example.rs
index 3b12da323ba98e4e8c20eec31c142a16529e807f..3ebcb348255874a06a97dfe9c4d6d44874a082a9 100644
--- a/polkadot/node/overseer/examples/minimal-example.rs
+++ b/polkadot/node/overseer/examples/minimal-example.rs
@@ -145,7 +145,6 @@ fn main() {
 			candidate_validation: Subsystem2,
 			candidate_backing: Subsystem1,
 			candidate_selection: DummySubsystem,
-			collator_protocol: DummySubsystem,
 			statement_distribution: DummySubsystem,
 			availability_distribution: DummySubsystem,
 			bitfield_signing: DummySubsystem,
@@ -156,6 +155,8 @@ fn main() {
 			availability_store: DummySubsystem,
 			network_bridge: DummySubsystem,
 			chain_api: DummySubsystem,
+			collation_generation: DummySubsystem,
+			collator_protocol: DummySubsystem,
 		};
 		let (overseer, _handler) = Overseer::new(
 			vec![],
diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs
index 4db8209b8b6dbe30caf2471af91c452b8344d700..f37a398df470cb4629d4d6576dba746942516859 100644
--- a/polkadot/node/overseer/src/lib.rs
+++ b/polkadot/node/overseer/src/lib.rs
@@ -78,8 +78,8 @@ use polkadot_subsystem::messages::{
 	CandidateValidationMessage, CandidateBackingMessage,
 	CandidateSelectionMessage, ChainApiMessage, StatementDistributionMessage,
 	AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage,
-	ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, CollatorProtocolMessage,
-	AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages,
+	ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage,
+	AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, CollationGenerationMessage, CollatorProtocolMessage,
 };
 pub use polkadot_subsystem::{
 	Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
@@ -352,9 +352,6 @@ pub struct Overseer<S: SpawnNamed> {
 	/// A candidate selection subsystem.
 	candidate_selection_subsystem: OverseenSubsystem<CandidateSelectionMessage>,
 
-	/// A collator protocol subsystem
-	collator_protocol_subsystem: OverseenSubsystem<CollatorProtocolMessage>,
-
 	/// A statement distribution subsystem.
 	statement_distribution_subsystem: OverseenSubsystem<StatementDistributionMessage>,
 
@@ -382,9 +379,15 @@ pub struct Overseer<S: SpawnNamed> {
 	/// A network bridge subsystem.
 	network_bridge_subsystem: OverseenSubsystem<NetworkBridgeMessage>,
 
-	/// A Chain API subsystem
+	/// A Chain API subsystem.
 	chain_api_subsystem: OverseenSubsystem<ChainApiMessage>,
 
+	/// A Collation Generation subsystem.
+	collation_generation_subsystem: OverseenSubsystem<CollationGenerationMessage>,
+
+	/// A Collator Protocol subsystem.
+	collator_protocol_subsystem: OverseenSubsystem<CollatorProtocolMessage>,
+
 	/// Spawner to spawn tasks to.
 	s: S,
 
@@ -417,15 +420,13 @@ pub struct Overseer<S: SpawnNamed> {
 ///
 /// [`Subsystem`]: trait.Subsystem.html
 /// [`DummySubsystem`]: struct.DummySubsystem.html
-pub struct AllSubsystems<CV, CB, CS, CP, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA> {
+pub struct AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
 	/// A candidate validation subsystem.
 	pub candidate_validation: CV,
 	/// A candidate backing subsystem.
 	pub candidate_backing: CB,
 	/// A candidate selection subsystem.
 	pub candidate_selection: CS,
-	/// A collator protocol subsystem.
-	pub collator_protocol: CP,
 	/// A statement distribution subsystem.
 	pub statement_distribution: SD,
 	/// An availability distribution subsystem.
@@ -446,6 +447,10 @@ pub struct AllSubsystems<CV, CB, CS, CP, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA
 	pub network_bridge: NB,
 	/// A Chain API subsystem.
 	pub chain_api: CA,
+	/// A Collation Generation subsystem.
+	pub collation_generation: CG,
+	/// A Collator Protocol subsystem.
+	pub collator_protocol: CP,
 }
 
 impl<S> Overseer<S>
@@ -518,7 +523,6 @@ where
 	///     candidate_validation: ValidationSubsystem,
 	///     candidate_backing: DummySubsystem,
 	///     candidate_selection: DummySubsystem,
-	///	    collator_protocol: DummySubsystem,
 	///     statement_distribution: DummySubsystem,
 	///     availability_distribution: DummySubsystem,
 	///     bitfield_signing: DummySubsystem,
@@ -529,6 +533,8 @@ where
 	///     availability_store: DummySubsystem,
 	///     network_bridge: DummySubsystem,
 	///     chain_api: DummySubsystem,
+	///     collation_generation: DummySubsystem,
+	///     collator_protocol: DummySubsystem,
 	/// };
 	/// let (overseer, _handler) = Overseer::new(
 	///     vec![],
@@ -549,16 +555,15 @@ where
 	/// #
 	/// # }); }
 	/// ```
-	pub fn new<CV, CB, CS, CP, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA>(
+	pub fn new<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>(
 		leaves: impl IntoIterator<Item = BlockInfo>,
-		all_subsystems: AllSubsystems<CV, CB, CS, CP, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA>,
+		all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>,
 		mut s: S,
 	) -> SubsystemResult<(Self, OverseerHandler)>
 	where
 		CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + Send,
 		CB: Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + Send,
 		CS: Subsystem<OverseerSubsystemContext<CandidateSelectionMessage>> + Send,
-		CP: Subsystem<OverseerSubsystemContext<CollatorProtocolMessage>> + Send,
 		SD: Subsystem<OverseerSubsystemContext<StatementDistributionMessage>> + Send,
 		AD: Subsystem<OverseerSubsystemContext<AvailabilityDistributionMessage>> + Send,
 		BS: Subsystem<OverseerSubsystemContext<BitfieldSigningMessage>> + Send,
@@ -569,6 +574,8 @@ where
 		AS: Subsystem<OverseerSubsystemContext<AvailabilityStoreMessage>> + Send,
 		NB: Subsystem<OverseerSubsystemContext<NetworkBridgeMessage>> + Send,
 		CA: Subsystem<OverseerSubsystemContext<ChainApiMessage>> + Send,
+		CG: Subsystem<OverseerSubsystemContext<CollationGenerationMessage>> + Send,
+		CP: Subsystem<OverseerSubsystemContext<CollatorProtocolMessage>> + Send,
 	{
 		let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
 
@@ -600,13 +607,6 @@ where
 			all_subsystems.candidate_selection,
 		)?;
 
-		let collator_protocol_subsystem = spawn(
-			&mut s,
-			&mut running_subsystems,
-			&mut running_subsystems_rx,
-			all_subsystems.collator_protocol,
-		)?;
-
 		let statement_distribution_subsystem = spawn(
 			&mut s,
 			&mut running_subsystems,
@@ -677,6 +677,21 @@ where
 			all_subsystems.chain_api,
 		)?;
 
+		let collation_generation_subsystem = spawn(
+			&mut s,
+			&mut running_subsystems,
+			&mut running_subsystems_rx,
+			all_subsystems.collation_generation,
+		)?;
+
+
+		let collator_protocol_subsystem = spawn(
+			&mut s,
+			&mut running_subsystems,
+			&mut running_subsystems_rx,
+			all_subsystems.collator_protocol,
+		)?;
+
 		let active_leaves = HashSet::new();
 
 		let leaves = leaves
@@ -688,7 +703,6 @@ where
 			candidate_validation_subsystem,
 			candidate_backing_subsystem,
 			candidate_selection_subsystem,
-			collator_protocol_subsystem,
 			statement_distribution_subsystem,
 			availability_distribution_subsystem,
 			bitfield_signing_subsystem,
@@ -699,6 +713,8 @@ where
 			availability_store_subsystem,
 			network_bridge_subsystem,
 			chain_api_subsystem,
+			collation_generation_subsystem,
+			collator_protocol_subsystem,
 			s,
 			running_subsystems,
 			running_subsystems_rx,
@@ -724,10 +740,6 @@ where
 			let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
 		}
 
-		if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
-			let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
-		}
-
 		if let Some(ref mut s) = self.statement_distribution_subsystem.instance {
 			let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
 		}
@@ -768,6 +780,14 @@ where
 			let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
 		}
 
+		if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
+			let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
+		}
+
+		if let Some(ref mut s) = self.collation_generation_subsystem.instance {
+			let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
+		}
+
 		let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse();
 
 		loop {
@@ -889,10 +909,6 @@ where
 			s.tx.send(FromOverseer::Signal(signal.clone())).await?;
 		}
 
-		if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
-			s.tx.send(FromOverseer::Signal(signal.clone())).await?;
-		}
-
 		if let Some(ref mut s) = self.statement_distribution_subsystem.instance {
 			s.tx.send(FromOverseer::Signal(signal.clone())).await?;
 		}
@@ -930,7 +946,15 @@ where
 		}
 
 		if let Some(ref mut s) = self.chain_api_subsystem.instance {
-			s.tx.send(FromOverseer::Signal(signal)).await?;
+			s.tx.send(FromOverseer::Signal(signal.clone())).await?;
+		}
+
+		if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
+			s.tx.send(FromOverseer::Signal(signal.clone())).await?;
+		}
+
+		if let Some(ref mut s) = self.collation_generation_subsystem.instance {
+			s.tx.send(FromOverseer::Signal(signal.clone())).await?;
 		}
 
 		Ok(())
@@ -953,11 +977,6 @@ where
 					let _ = s.tx.send(FromOverseer::Communication { msg }).await;
 				}
 			}
-			AllMessages::CollatorProtocol(msg) => {
-				if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
-					let _ = s.tx.send(FromOverseer::Communication { msg }).await;
-				}
-			}
 			AllMessages::StatementDistribution(msg) => {
 				if let Some(ref mut s) = self.statement_distribution_subsystem.instance {
 					let _ = s.tx.send(FromOverseer::Communication { msg }).await;
@@ -1008,6 +1027,16 @@ where
 					let _ = s.tx.send(FromOverseer::Communication { msg }).await;
 				}
 			}
+			AllMessages::CollationGeneration(msg) => {
+				if let Some(ref mut s) = self.collation_generation_subsystem.instance {
+					let _ = s.tx.send(FromOverseer::Communication { msg }).await;
+				}
+			}
+			AllMessages::CollatorProtocol(msg) => {
+				if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
+					let _ = s.tx.send(FromOverseer::Communication { msg }).await;
+				}
+			}
 		}
 	}
 
@@ -1169,7 +1198,6 @@ mod tests {
 				candidate_validation: TestSubsystem1(s1_tx),
 				candidate_backing: TestSubsystem2(s2_tx),
 				candidate_selection: DummySubsystem,
-				collator_protocol: DummySubsystem,
 				statement_distribution: DummySubsystem,
 				availability_distribution: DummySubsystem,
 				bitfield_signing: DummySubsystem,
@@ -1180,6 +1208,8 @@ mod tests {
 				availability_store: DummySubsystem,
 				network_bridge: DummySubsystem,
 				chain_api: DummySubsystem,
+				collation_generation: DummySubsystem,
+				collator_protocol: DummySubsystem,
 			};
 			let (overseer, mut handler) = Overseer::new(
 				vec![],
@@ -1234,7 +1264,6 @@ mod tests {
 				candidate_validation: TestSubsystem1(s1_tx),
 				candidate_backing: TestSubsystem4,
 				candidate_selection: DummySubsystem,
-				collator_protocol: DummySubsystem,
 				statement_distribution: DummySubsystem,
 				availability_distribution: DummySubsystem,
 				bitfield_signing: DummySubsystem,
@@ -1245,6 +1274,8 @@ mod tests {
 				availability_store: DummySubsystem,
 				network_bridge: DummySubsystem,
 				chain_api: DummySubsystem,
+				collation_generation: DummySubsystem,
+				collator_protocol: DummySubsystem,
 			};
 			let (overseer, _handle) = Overseer::new(
 				vec![],
@@ -1352,7 +1383,6 @@ mod tests {
 				candidate_validation: TestSubsystem5(tx_5),
 				candidate_backing: TestSubsystem6(tx_6),
 				candidate_selection: DummySubsystem,
-				collator_protocol: DummySubsystem,
 				statement_distribution: DummySubsystem,
 				availability_distribution: DummySubsystem,
 				bitfield_signing: DummySubsystem,
@@ -1363,6 +1393,8 @@ mod tests {
 				availability_store: DummySubsystem,
 				network_bridge: DummySubsystem,
 				chain_api: DummySubsystem,
+				collation_generation: DummySubsystem,
+				collator_protocol: DummySubsystem,
 			};
 			let (overseer, mut handler) = Overseer::new(
 				vec![first_block],
@@ -1455,7 +1487,6 @@ mod tests {
 				candidate_validation: TestSubsystem5(tx_5),
 				candidate_backing: TestSubsystem6(tx_6),
 				candidate_selection: DummySubsystem,
-				collator_protocol: DummySubsystem,
 				statement_distribution: DummySubsystem,
 				availability_distribution: DummySubsystem,
 				bitfield_signing: DummySubsystem,
@@ -1466,6 +1497,8 @@ mod tests {
 				availability_store: DummySubsystem,
 				network_bridge: DummySubsystem,
 				chain_api: DummySubsystem,
+				collation_generation: DummySubsystem,
+				collator_protocol: DummySubsystem,
 			};
 			// start with two forks of different height.
 			let (overseer, mut handler) = Overseer::new(
diff --git a/polkadot/node/primitives/Cargo.toml b/polkadot/node/primitives/Cargo.toml
index b38e7e542ed24190f8360a5648d105dfde71d00f..81e2467b374fd0dbeac1715a6aea7a875e7969c4 100644
--- a/polkadot/node/primitives/Cargo.toml
+++ b/polkadot/node/primitives/Cargo.toml
@@ -6,6 +6,7 @@ edition = "2018"
 description = "Primitives types for the Node-side"
 
 [dependencies]
+futures = "0.3.5"
 polkadot-primitives = { path = "../../primitives" }
 polkadot-statement-table = { path = "../../statement-table" }
 parity-scale-codec = { version = "1.3.4", default-features = false, features = ["derive"] }
diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs
index 2bcc7a392647da9ccdd024180077a84c83a806d6..2ff704c2dd1628f0d372b338108267cd0554abbb 100644
--- a/polkadot/node/primitives/src/lib.rs
+++ b/polkadot/node/primitives/src/lib.rs
@@ -20,12 +20,13 @@
 //! not shared between the node and the runtime. This crate builds on top of the primitives defined
 //! there.
 
+use futures::Future;
 use parity_scale_codec::{Decode, Encode};
 use polkadot_primitives::v1::{
 	Hash, CommittedCandidateReceipt, CandidateReceipt, CompactStatement,
 	EncodeAs, Signed, SigningContext, ValidatorIndex, ValidatorId,
 	UpwardMessage, Balance, ValidationCode, GlobalValidationData, LocalValidationData,
-	HeadData,
+	HeadData, PoV, CollatorPair, Id as ParaId,
 };
 use polkadot_statement_table::{
 	generic::{
@@ -258,3 +259,39 @@ impl std::convert::TryFrom<FromTableMisbehavior> for MisbehaviorReport {
 		}
 	}
 }
+
+/// The output of a collator.
+///
+/// This differs from `CandidateCommitments` in two ways:
+///
+/// - does not contain the erasure root; that's computed at the Polkadot level, not at Cumulus
+/// - contains a proof of validity.
+#[derive(Clone, Encode, Decode)]
+pub struct Collation {
+	/// Fees paid from the chain to the relay chain validators.
+	pub fees: Balance,
+	/// Messages destined to be interpreted by the Relay chain itself.
+	pub upward_messages: Vec<UpwardMessage>,
+	/// New validation code.
+	pub new_validation_code: Option<ValidationCode>,
+	/// The head-data produced as a result of execution.
+	pub head_data: HeadData,
+	/// Proof that this block is valid.
+	pub proof_of_validity: PoV,
+}
+
+/// Configuration for the collation generator
+pub struct CollationGenerationConfig {
+	/// Collator's authentication key, so it can sign things.
+	pub key: CollatorPair,
+	/// Collation function.
+	pub collator: Box<dyn Fn(&GlobalValidationData, &LocalValidationData) -> Box<dyn Future<Output = Collation> + Unpin + Send> + Send + Sync>,
+	/// The parachain that this collator collates for
+	pub para_id: ParaId,
+}
+
+impl std::fmt::Debug for CollationGenerationConfig {
+	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+		write!(f, "CollationGenerationConfig {{ ... }}")
+	}
+}
diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs
index ca350454f025544f8e5fa1e52f812226d955e9c9..01b1a240906baaf3314a86fb05251417f12a1716 100644
--- a/polkadot/node/service/src/lib.rs
+++ b/polkadot/node/service/src/lib.rs
@@ -281,7 +281,6 @@ fn real_overseer<S: SpawnNamed>(
 		candidate_validation: DummySubsystem,
 		candidate_backing: DummySubsystem,
 		candidate_selection: DummySubsystem,
-		collator_protocol: DummySubsystem,
 		statement_distribution: DummySubsystem,
 		availability_distribution: DummySubsystem,
 		bitfield_signing: DummySubsystem,
@@ -292,6 +291,8 @@ fn real_overseer<S: SpawnNamed>(
 		availability_store: DummySubsystem,
 		network_bridge: DummySubsystem,
 		chain_api: DummySubsystem,
+		collation_generation: DummySubsystem,
+		collator_protocol: DummySubsystem,
 	};
 	Overseer::new(
 		leaves,
diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs
index 26c55354fbe1230f7a203de2edb6eafaf20ce997..eee22dfc6f5fd548a93c51e0f3e3045d0c6d9174 100644
--- a/polkadot/node/subsystem-test-helpers/src/lib.rs
+++ b/polkadot/node/subsystem-test-helpers/src/lib.rs
@@ -16,19 +16,21 @@
 
 //! Utilities for testing subsystems.
 
-use polkadot_node_subsystem::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError};
 use polkadot_node_subsystem::messages::AllMessages;
+use polkadot_node_subsystem::{FromOverseer, SubsystemContext, SubsystemError, SubsystemResult};
 
-use futures::prelude::*;
 use futures::channel::mpsc;
 use futures::poll;
+use futures::prelude::*;
+use futures_timer::Delay;
 use parking_lot::Mutex;
-use sp_core::traits::SpawnNamed;
+use sp_core::{testing::TaskExecutor, traits::SpawnNamed};
 
 use std::convert::Infallible;
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll, Waker};
+use std::time::Duration;
 
 enum SinkState<T> {
 	Empty {
@@ -50,24 +52,21 @@ pub struct SingleItemStream<T>(Arc<Mutex<SinkState<T>>>);
 impl<T> Sink<T> for SingleItemSink<T> {
 	type Error = Infallible;
 
-	fn poll_ready(
-		self: Pin<&mut Self>,
-		cx: &mut Context,
-	) -> Poll<Result<(), Infallible>> {
+	fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
 		let mut state = self.0.lock();
 		match *state {
 			SinkState::Empty { .. } => Poll::Ready(Ok(())),
-			SinkState::Item { ref mut ready_waker, .. } => {
+			SinkState::Item {
+				ref mut ready_waker,
+				..
+			} => {
 				*ready_waker = Some(cx.waker().clone());
 				Poll::Pending
 			}
 		}
 	}
 
-	fn start_send(
-		self: Pin<&mut Self>,
-		item: T,
-	) -> Result<(), Infallible> {
+	fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Infallible> {
 		let mut state = self.0.lock();
 
 		match *state {
@@ -88,24 +87,21 @@ impl<T> Sink<T> for SingleItemSink<T> {
 		Ok(())
 	}
 
-	fn poll_flush(
-		self: Pin<&mut Self>,
-		cx: &mut Context,
-	) -> Poll<Result<(), Infallible>> {
+	fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
 		let mut state = self.0.lock();
 		match *state {
 			SinkState::Empty { .. } => Poll::Ready(Ok(())),
-			SinkState::Item { ref mut flush_waker, .. } => {
+			SinkState::Item {
+				ref mut flush_waker,
+				..
+			} => {
 				*flush_waker = Some(cx.waker().clone());
 				Poll::Pending
 			}
 		}
 	}
 
-	fn poll_close(
-		self: Pin<&mut Self>,
-		cx: &mut Context,
-	) -> Poll<Result<(), Infallible>> {
+	fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
 		self.poll_flush(cx)
 	}
 }
@@ -120,7 +116,11 @@ impl<T> Stream for SingleItemStream<T> {
 
 		match std::mem::replace(&mut *state, SinkState::Empty { read_waker }) {
 			SinkState::Empty { .. } => Poll::Pending,
-			SinkState::Item { item, ready_waker, flush_waker } => {
+			SinkState::Item {
+				item,
+				ready_waker,
+				flush_waker,
+			} => {
 				if let Some(waker) = ready_waker {
 					waker.wake();
 				}
@@ -141,10 +141,7 @@ impl<T> Stream for SingleItemStream<T> {
 /// not when the item is buffered.
 pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) {
 	let inner = Arc::new(Mutex::new(SinkState::Empty { read_waker: None }));
-	(
-		SingleItemSink(inner.clone()),
-		SingleItemStream(inner),
-	)
+	(SingleItemSink(inner.clone()), SingleItemStream(inner))
 }
 
 /// A test subsystem context.
@@ -155,7 +152,9 @@ pub struct TestSubsystemContext<M, S> {
 }
 
 #[async_trait::async_trait]
-impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext for TestSubsystemContext<M, S> {
+impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
+	for TestSubsystemContext<M, S>
+{
 	type Message = M;
 
 	async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
@@ -170,9 +169,11 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext for Tes
 		self.rx.next().await.ok_or(SubsystemError)
 	}
 
-	async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
-		-> SubsystemResult<()>
-	{
+	async fn spawn(
+		&mut self,
+		name: &'static str,
+		s: Pin<Box<dyn Future<Output = ()> + Send>>,
+	) -> SubsystemResult<()> {
 		self.spawn.spawn(name, s);
 		Ok(())
 	}
@@ -185,15 +186,23 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext for Tes
 	}
 
 	async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
-		self.tx.send(msg).await.expect("test overseer no longer live");
+		self.tx
+			.send(msg)
+			.await
+			.expect("test overseer no longer live");
 		Ok(())
 	}
 
 	async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
-		where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
+	where
+		T: IntoIterator<Item = AllMessages> + Send,
+		T::IntoIter: Send,
 	{
 		let mut iter = stream::iter(msgs.into_iter().map(Ok));
-		self.tx.send_all(&mut iter).await.expect("test overseer no longer live");
+		self.tx
+			.send_all(&mut iter)
+			.await
+			.expect("test overseer no longer live");
 
 		Ok(())
 	}
@@ -209,19 +218,27 @@ impl<M> TestSubsystemContextHandle<M> {
 	/// Send a message or signal to the subsystem. This resolves at the point in time where the
 	/// subsystem has _read_ the message.
 	pub async fn send(&mut self, from_overseer: FromOverseer<M>) {
-		self.tx.send(from_overseer).await.expect("Test subsystem no longer live");
+		self.tx
+			.send(from_overseer)
+			.await
+			.expect("Test subsystem no longer live");
 	}
 
 	/// Receive the next message from the subsystem.
 	pub async fn recv(&mut self) -> AllMessages {
-		self.rx.next().await.expect("Test subsystem no longer live")
+		self.try_recv().await.expect("Test subsystem no longer live")
+	}
+
+	/// Receive the next message from the subsystem, or `None` if the channel has been closed.
+	pub async fn try_recv(&mut self) -> Option<AllMessages> {
+		self.rx.next().await
 	}
 }
 
 /// Make a test subsystem context.
-pub fn make_subsystem_context<M, S>(spawn: S)
-	-> (TestSubsystemContext<M, S>, TestSubsystemContextHandle<M>)
-{
+pub fn make_subsystem_context<M, S>(
+	spawn: S,
+) -> (TestSubsystemContext<M, S>, TestSubsystemContextHandle<M>) {
 	let (overseer_tx, overseer_rx) = single_item_sink();
 	let (all_messages_tx, all_messages_rx) = mpsc::unbounded();
 
@@ -233,7 +250,39 @@ pub fn make_subsystem_context<M, S>(spawn: S)
 		},
 		TestSubsystemContextHandle {
 			tx: overseer_tx,
-			rx: all_messages_rx
+			rx: all_messages_rx,
 		},
 	)
-}
\ No newline at end of file
+}
+
+/// Test a subsystem, mocking the overseer
+///
+/// Pass in two async closures: one mocks the overseer, the other runs the test from the perspective of a subsystem.
+///
+/// Times out in two seconds.
+pub fn subsystem_test_harness<M, OverseerFactory, Overseer, TestFactory, Test>(
+	overseer_factory: OverseerFactory,
+	test_factory: TestFactory,
+) where
+	OverseerFactory: FnOnce(TestSubsystemContextHandle<M>) -> Overseer,
+	Overseer: Future<Output = ()>,
+	TestFactory: FnOnce(TestSubsystemContext<M, TaskExecutor>) -> Test,
+	Test: Future<Output = ()>,
+{
+	let pool = TaskExecutor::new();
+	let (context, handle) = make_subsystem_context(pool);
+	let overseer = overseer_factory(handle);
+	let test = test_factory(context);
+
+	let timeout = Delay::new(Duration::from_secs(2));
+
+	futures::pin_mut!(overseer, test, timeout);
+
+	futures::executor::block_on(async move {
+		futures::select! {
+			_ = overseer.fuse() => (),
+			_ = test.fuse() => (),
+			_ = timeout.fuse() => panic!("test timed out instead of completing"),
+		}
+	});
+}
diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs
index cc8cbd239dd9e783afd16e9ff98bf5824a487a58..d6998b267fea7c883da320942ccaab16b8d84ed9 100644
--- a/polkadot/node/subsystem-util/src/lib.rs
+++ b/polkadot/node/subsystem-util/src/lib.rs
@@ -189,6 +189,89 @@ specialize_requests! {
 	fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
 }
 
+/// Request some data from the `RuntimeApi` via a SubsystemContext.
+async fn request_from_runtime_ctx<RequestBuilder, Context, Response>(
+	parent: Hash,
+	ctx: &mut Context,
+	request_builder: RequestBuilder,
+) -> Result<RuntimeApiReceiver<Response>, Error>
+where
+	RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
+	Context: SubsystemContext,
+{
+	let (tx, rx) = oneshot::channel();
+
+	ctx
+		.send_message(
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
+				.try_into()
+				.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
+		)
+		.await?;
+
+	Ok(rx)
+}
+
+
+/// Construct specialized request functions for the runtime.
+///
+/// These would otherwise get pretty repetitive.
+macro_rules! specialize_requests_ctx {
+	// expand return type name for documentation purposes
+	(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
+		specialize_requests_ctx!{
+			named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
+		}
+	};
+
+	// create a single specialized request function
+	(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
+		#[doc = "Request `"]
+		#[doc = $doc_name]
+		#[doc = "` from the runtime via a `SubsystemContext`"]
+		pub async fn $func_name<Context: SubsystemContext>(
+			parent: Hash,
+			$(
+				$param_name: $param_ty,
+			)*
+			ctx: &mut Context,
+		) -> Result<RuntimeApiReceiver<$return_ty>, Error> {
+			request_from_runtime_ctx(parent, ctx, |tx| RuntimeApiRequest::$request_variant(
+				$( $param_name, )* tx
+			)).await
+		}
+	};
+
+	// recursive decompose
+	(
+		fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
+		$(
+			fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
+		)+
+	) => {
+		specialize_requests_ctx!{
+			fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
+		}
+		specialize_requests_ctx!{
+			$(
+				fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
+			)+
+		}
+	};
+}
+
+specialize_requests_ctx! {
+	fn request_validators_ctx() -> Vec<ValidatorId>; Validators;
+	fn request_validator_groups_ctx() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
+	fn request_availability_cores_ctx() -> Vec<CoreState>; AvailabilityCores;
+	fn request_global_validation_data_ctx() -> GlobalValidationData; GlobalValidationData;
+	fn request_local_validation_data_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<LocalValidationData>; LocalValidationData;
+	fn request_session_index_for_child_ctx() -> SessionIndex; SessionIndexForChild;
+	fn request_validation_code_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
+	fn request_candidate_pending_availability_ctx(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
+	fn request_candidate_events_ctx() -> Vec<CandidateEvent>; CandidateEvents;
+}
+
 /// From the given set of validators, find the first key we can sign with, if any.
 pub fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<ValidatorPair> {
 	let keystore = keystore.read();
diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs
index 60813124015ac937b32d367aa7290064f785ceed..ddea2639e1ebc9ee30ef908824235483f7ab684b 100644
--- a/polkadot/node/subsystem/src/messages.rs
+++ b/polkadot/node/subsystem/src/messages.rs
@@ -24,20 +24,19 @@
 
 use futures::channel::{mpsc, oneshot};
 
-use polkadot_primitives::v1::{
-	Hash, CommittedCandidateReceipt, CollatorId,
-	CandidateReceipt, PoV, ErasureChunk, BackedCandidate, Id as ParaId,
-	SignedAvailabilityBitfield, ValidatorId, ValidationCode, ValidatorIndex,
-	CoreAssignment, CoreOccupied, CandidateDescriptor,
-	ValidatorSignature, OmittedValidationData, AvailableData, GroupRotationInfo,
-	CoreState, LocalValidationData, GlobalValidationData, OccupiedCoreAssumption,
-	CandidateEvent, SessionIndex, BlockNumber,
+use polkadot_node_network_protocol::{
+	v1 as protocol_v1, NetworkBridgeEvent, ReputationChange, PeerId, PeerSet,
 };
 use polkadot_node_primitives::{
-	MisbehaviorReport, SignedFullStatement, ValidationResult,
+	CollationGenerationConfig, MisbehaviorReport, SignedFullStatement, ValidationResult,
 };
-use polkadot_node_network_protocol::{
-	v1 as protocol_v1, NetworkBridgeEvent, ReputationChange, PeerId, PeerSet,
+use polkadot_primitives::v1::{
+	AvailableData, BackedCandidate, BlockNumber, CandidateDescriptor, CandidateEvent,
+	CandidateReceipt, CollatorId, CommittedCandidateReceipt,
+	CoreAssignment, CoreOccupied, CoreState, ErasureChunk, GlobalValidationData, GroupRotationInfo,
+	Hash, Id as ParaId, LocalValidationData, OccupiedCoreAssumption, OmittedValidationData, PoV,
+	SessionIndex, SignedAvailabilityBitfield, ValidationCode, ValidatorId, ValidatorIndex,
+	ValidatorSignature,
 };
 use std::sync::Arc;
 
@@ -82,7 +81,6 @@ pub enum CandidateBackingMessage {
 	Statement(Hash, SignedFullStatement),
 }
 
-
 impl CandidateBackingMessage {
 	/// If the current variant contains the relay parent hash, return it.
 	pub fn relay_parent(&self) -> Option<Hash> {
@@ -505,6 +503,20 @@ impl PoVDistributionMessage {
 	}
 }
 
+/// Message to the Collation Generation Subsystem.
+#[derive(Debug)]
+pub enum CollationGenerationMessage {
+	/// Initialize the collation generation subsystem
+	Initialize(CollationGenerationConfig),
+}
+
+impl CollationGenerationMessage {
+	/// If the current variant contains the relay parent hash, return it.
+	pub fn relay_parent(&self) -> Option<Hash> {
+		None
+	}
+}
+
 /// A message type tying together all message types that are used across Subsystems.
 #[derive(Debug)]
 pub enum AllMessages {
@@ -536,4 +548,6 @@ pub enum AllMessages {
 	AvailabilityStore(AvailabilityStoreMessage),
 	/// Message for the network bridge subsystem.
 	NetworkBridge(NetworkBridgeMessage),
+	/// Message for the Collation Generation subsystem
+	CollationGeneration(CollationGenerationMessage),
 }
diff --git a/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md b/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md
index d27ba74085080d49cea436002970b0912ce2e574..ab3f80273d60dce522cd764554f4e05c2c076be1 100644
--- a/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md
+++ b/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md
@@ -4,33 +4,48 @@ The collation generation subsystem is executed on collator nodes and produces ca
 
 ## Protocol
 
-Input: None
+Input: `CollationGenerationMessage`
 
-Output: CollationDistributionMessage
+```rust
+enum CollationGenerationMessage {
+  Initialize(CollationGenerationConfig),
+}
+```
+
+No more than one initialization message should ever be sent to the collation generation subsystem.
+
+Output: `CollationDistributionMessage`
 
 ## Functionality
 
 The process of generating a collation for a parachain is very parachain-specific. As such, the details of how to do so are left beyond the scope of this description. The subsystem should be implemented as an abstract wrapper, which is aware of this configuration:
 
 ```rust
+pub struct Collation {
+  /// Hash of `CandidateCommitments` as understood by the collator.
+  pub commitments_hash: Hash,
+  pub proof_of_validity: PoV,
+}
+
 struct CollationGenerationConfig {
-	key: CollatorPair,
-	collation_producer: Fn(params) -> async (HeadData, Vec<UpwardMessage>, PoV),
+  key: CollatorPair,
+  collator: Box<dyn Fn(&GlobalValidationData, &LocalValidationData) -> Box<dyn Future<Output = Collation>>>
+  para_id: ParaId,
 }
 ```
 
 The configuration should be optional, to allow for the case where the node is not run with the capability to collate.
 
 On `ActiveLeavesUpdate`:
-  * If there is no collation generation config, ignore.
-  * Otherwise, for each `activated` head in the update:
-    * Determine if the para is scheduled or is next up on any occupied core by fetching the `availability_cores` Runtime API.
-    * Determine an occupied core assumption to make about the para. The simplest thing to do is to always assume that if the para occupies a core, that the candidate will become available. Further on, this might be determined based on bitfields seen or validator requests.
-    * Use the Runtime API subsystem to fetch the full validation data.
-	* Construct validation function params based on validation data.
-	* Invoke the `collation_producer`.
-	* Construct a `CommittedCandidateReceipt` using the outputs of the `collation_producer` and signing with the `key`.
-	* Dispatch a [`CollatorProtocolMessage`][CPM]`::DistributeCollation(receipt, pov)`.
+
+* If there is no collation generation config, ignore.
+* Otherwise, for each `activated` head in the update:
+  * Determine if the para is scheduled on any core by fetching the `availability_cores` Runtime API.
+    > TODO: figure out what to do in the case of occupied cores; see [this issue](https://github.com/paritytech/polkadot/issues/1573).
+  * Determine an occupied core assumption to make about the para. Scheduled cores can make `OccupiedCoreAssumption::Free`.
+  * Use the Runtime API subsystem to fetch the full validation data.
+  * Invoke the `collator`, and use its outputs to produce a `CandidateReceipt`, signed with the configuration's `key`.
+  * Dispatch a [`CollatorProtocolMessage`][CPM]`::DistributeCollation(receipt, pov)`.
 
 [CP]: collator-protocol.md
 [CPM]: ../../types/overseer-protocol.md#collatorprotocolmessage