From 54bec3bfc09f7a370797af3d8617e173ca84526c Mon Sep 17 00:00:00 2001
From: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
Date: Mon, 17 Aug 2020 14:27:37 +0200
Subject: [PATCH] implement collation generation subsystem (#1557)

* start sketching out a collation generation subsystem

* invent a basic strategy for double initialization

* clean up warnings

* impl util requests from runtime assuming a context instead of a FromJob sender

* implement collation generation algorithm from guide

* update AllMessages in tests

* fix trivial review comments

* remove another redundant declaration from merge

* filter availability cores by para_id

* handle new activations each in their own async task

* update guide according to the actual current implementation

* add initialization to guide

* add general-purpose subsystem_test_harness helper

* write first handle_new_activations test

* add test that handle_new_activations filters local_validation_data requests

* add (failing) test of collation distribution message sending

* rustfmt

* broken: work on fixing sender test

Unfortunately, for reasons that are not yet clear, despite the public key
and checked data being identical, the signer is not producing an identical
signature. This commit produces this output (among more):

signing with  Public(c4733ab0bbe3ba4c096685d1737a7f498cdbdd167a767d04a21dc7df12b8c858 (5GWHUNm5...))
checking with Public(c4733ab0bbe3ba4c096685d1737a7f498cdbdd167a767d04a21dc7df12b8c858 (5GWHUNm5...))
signed payload:  [4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 10, 0, 0, 0, c7, e5, c0, 64, 7a, db, fe, 44, 81, e5, 51, 11, 79, 9f, a5, 63, 93, 94, 3c, c4, 36, c6, 30, 36, c2, c5, 44, a2, 1b, db, b7, 82, 3, 17, a, 2e, 75, 97, b7, b7, e3, d8, 4c, 5, 39, 1d, 13, 9a, 62, b1, 57, e7, 87, 86, d8, c0, 82, f2, 9d, cf, 4c, 11, 13, 14]
checked payload: [4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 10, 0, 0, 0, c7, e5, c0, 64, 7a, db, fe, 44, 81, e5, 51, 11, 79, 9f, a5, 63, 93, 94, 3c, c4, 36, c6, 30, 36, c2, c5, 44, a2, 1b, db, b7, 82, 3, 17, a, 2e, 75, 97, b7, b7, e3, d8, 4c, 5, 39, 1d, 13, 9a, 62, b1, 57, e7, 87, 86, d8, c0, 82, f2, 9d, cf, 4c, 11, 13, 14]

* fix broken test

* collation function returns commitments hash

It doesn't look like we use the actual commitments data anywhere, and
it's not obvious if there are any fields of `CandidateCommitments`
not available to the collator, so this commit just assigns them the
entire responsibility of generating the hash.

* add missing overseer impls

* calculating erasure coding is polkadot's responsibility, not cumulus

* concurrentize per-relay_parent requests
---
 polkadot/Cargo.lock                           |  17 +
 polkadot/Cargo.toml                           |   1 +
 polkadot/node/collation-generation/Cargo.toml |  19 +
 polkadot/node/collation-generation/src/lib.rs | 652 ++++++++++++++++++
 .../node/overseer/examples/minimal-example.rs |   3 +-
 polkadot/node/overseer/src/lib.rs             | 111 +--
 polkadot/node/primitives/Cargo.toml           |   1 +
 polkadot/node/primitives/src/lib.rs           |  39 +-
 polkadot/node/service/src/lib.rs              |   3 +-
 .../node/subsystem-test-helpers/src/lib.rs    | 129 ++--
 polkadot/node/subsystem-util/src/lib.rs       |  83 +++
 polkadot/node/subsystem/src/messages.rs       |  38 +-
 .../node/collators/collation-generation.md    |  41 +-
 13 files changed, 1030 insertions(+), 107 deletions(-)
 create mode 100644 polkadot/node/collation-generation/Cargo.toml
 create mode 100644 polkadot/node/collation-generation/src/lib.rs

diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index 105b143acfb..e0ddadbe1ec 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -4653,6 +4653,22 @@ dependencies = [
  "sp-runtime",
 ]
 
+[[package]]
+name = "polkadot-node-collation-generation"
+version = "0.1.0"
+dependencies = [
+ "derive_more 0.99.9",
+ "futures 0.3.5",
+ "log 0.4.11",
+ "polkadot-erasure-coding",
+ "polkadot-node-primitives",
+ "polkadot-node-subsystem",
+ "polkadot-node-subsystem-test-helpers",
+ "polkadot-node-subsystem-util",
+ "polkadot-primitives",
+ "sp-core",
+]
+
 [[package]]
 name = "polkadot-node-core-av-store"
 version = "0.1.0"
@@ -4816,6 +4832,7 @@ dependencies = [
 name = "polkadot-node-primitives"
 version = "0.1.0"
 dependencies = [
+ "futures 0.3.5",
  "parity-scale-codec",
  "polkadot-primitives",
  "polkadot-statement-table",
diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml
index c36511f813c..1c98ca8d0b8 100644
--- a/polkadot/Cargo.toml
+++ b/polkadot/Cargo.toml
@@ -44,6 +44,7 @@ members = [
 	"service",
 	"validation",
 
+	"node/collation-generation",
 	"node/core/av-store",
 	"node/core/backing",
 	"node/core/bitfield-signing",
diff --git a/polkadot/node/collation-generation/Cargo.toml b/polkadot/node/collation-generation/Cargo.toml
new file mode 100644
index 00000000000..f7d5e7f162e
--- /dev/null
+++ b/polkadot/node/collation-generation/Cargo.toml
@@ -0,0 +1,19 @@
+[package]
+name = "polkadot-node-collation-generation"
+version = "0.1.0"
+authors = ["Parity Technologies <admin@parity.io>"]
+edition = "2018"
+
+[dependencies]
+derive_more = "0.99.9"
+futures = "0.3.5"
+log = "0.4.8"
+polkadot-erasure-coding = { path = "../../erasure-coding" }
+polkadot-node-primitives = { path = "../primitives" }
+polkadot-node-subsystem = { path = "../subsystem" }
+polkadot-node-subsystem-util = { path = "../subsystem-util" }
+polkadot-primitives = { path = "../../primitives" }
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
+
+[dev-dependencies]
+polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs
new file mode 100644
index 00000000000..3ad76ff7f76
--- /dev/null
+++ b/polkadot/node/collation-generation/src/lib.rs
@@ -0,0 +1,652 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! The collation generation subsystem is the interface between polkadot and the collators.
+
+#![deny(missing_docs)]
+
+use futures::{
+	channel::{mpsc, oneshot},
+	future::FutureExt,
+	join,
+	select,
+	sink::SinkExt,
+	stream::StreamExt,
+};
+use polkadot_node_primitives::CollationGenerationConfig;
+use polkadot_node_subsystem::{
+	errors::RuntimeApiError,
+	messages::{AllMessages, CollationGenerationMessage, CollatorProtocolMessage},
+	FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
+};
+use polkadot_node_subsystem_util::{
+	self as util, request_availability_cores_ctx, request_global_validation_data_ctx,
+	request_local_validation_data_ctx, request_validators_ctx,
+};
+use polkadot_primitives::v1::{
+	collator_signature_payload, validation_data_hash, AvailableData, CandidateCommitments,
+	CandidateDescriptor, CandidateReceipt, CoreState, GlobalValidationData, Hash,
+	LocalValidationData, OccupiedCoreAssumption, PoV,
+};
+use sp_core::crypto::Pair;
+use std::sync::Arc;
+
+/// Collation Generation Subsystem
+pub struct CollationGenerationSubsystem {
+	config: Option<Arc<CollationGenerationConfig>>,
+}
+
+impl CollationGenerationSubsystem {
+	/// Run this subsystem
+	///
+	/// Conceptually, this is very simple: it just loops forever.
+	///
+	/// - On incoming overseer messages, it starts or stops jobs as appropriate.
+	/// - On other incoming messages, if they can be converted into Job::ToJob and
+	///   include a hash, then they're forwarded to the appropriate individual job.
+	/// - On outgoing messages from the jobs, it forwards them to the overseer.
+	///
+	/// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
+	/// Otherwise, most are logged and then discarded.
+	async fn run<Context>(mut self, mut ctx: Context)
+	where
+		Context: SubsystemContext<Message = CollationGenerationMessage>,
+	{
+		// when we activate new leaves, we spawn a bunch of sub-tasks, each of which is
+		// expected to generate precisely one message. We don't want to block the main loop
+		// at any point waiting for them all, so instead, we create a channel on which they can
+		// send those messages. We can then just monitor the channel and forward messages on it
+		// to the overseer here, via the context.
+		let (sender, mut receiver) = mpsc::channel(0);
+
+		loop {
+			select! {
+				incoming = ctx.recv().fuse() => {
+					if self.handle_incoming::<Context>(incoming, &mut ctx, &sender).await {
+						break;
+					}
+				},
+				msg = receiver.next().fuse() => {
+					if let Some(msg) = msg {
+						if let Err(err) = ctx.send_message(msg).await {
+							log::warn!(target: "collation_generation", "failed to forward message to overseer: {:?}", err);
+							break;
+						}
+					}
+				},
+			}
+		}
+	}
+
+	// handle an incoming message. return true if we should break afterwards.
+	// note: this doesn't strictly need to be a separate function; it's more an administrative function
+	// so that we don't clutter the run loop. It could in principle be inlined directly into there.
+	// it should hopefully therefore be ok that it's an async function mutably borrowing self.
+	async fn handle_incoming<Context>(
+		&mut self,
+		incoming: SubsystemResult<FromOverseer<Context::Message>>,
+		ctx: &mut Context,
+		sender: &mpsc::Sender<AllMessages>,
+	) -> bool
+	where
+		Context: SubsystemContext<Message = CollationGenerationMessage>,
+	{
+		use polkadot_node_subsystem::ActiveLeavesUpdate;
+		use polkadot_node_subsystem::FromOverseer::{Communication, Signal};
+		use polkadot_node_subsystem::OverseerSignal::{ActiveLeaves, BlockFinalized, Conclude};
+
+		match incoming {
+			Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, .. }))) => {
+				// follow the procedure from the guide
+				if let Some(config) = &self.config {
+					if let Err(err) =
+						handle_new_activations(config.clone(), &activated, ctx, sender).await
+					{
+						log::warn!(target: "collation_generation", "failed to handle new activations: {:?}", err);
+						return true;
+					};
+				}
+				false
+			}
+			Ok(Signal(Conclude)) => true,
+			Ok(Communication {
+				msg: CollationGenerationMessage::Initialize(config),
+			}) => {
+				if self.config.is_some() {
+					log::warn!(target: "collation_generation", "double initialization");
+					true
+				} else {
+					self.config = Some(Arc::new(config));
+					false
+				}
+			}
+			Ok(Signal(BlockFinalized(_))) => false,
+			Err(err) => {
+				log::error!(target: "collation_generation", "error receiving message from subsystem context: {:?}", err);
+				true
+			}
+		}
+	}
+}
+
+impl<Context> Subsystem<Context> for CollationGenerationSubsystem
+where
+	Context: SubsystemContext<Message = CollationGenerationMessage>,
+{
+	fn start(self, ctx: Context) -> SpawnedSubsystem {
+		let subsystem = CollationGenerationSubsystem { config: None };
+
+		let future = Box::pin(subsystem.run(ctx));
+
+		SpawnedSubsystem {
+			name: "CollationGenerationSubsystem",
+			future,
+		}
+	}
+}
+
+#[derive(Debug, derive_more::From)]
+enum Error {
+	#[from]
+	Subsystem(SubsystemError),
+	#[from]
+	OneshotRecv(oneshot::Canceled),
+	#[from]
+	Runtime(RuntimeApiError),
+	#[from]
+	Util(util::Error),
+	#[from]
+	Erasure(polkadot_erasure_coding::Error),
+}
+
+type Result<T> = std::result::Result<T, Error>;
+
+async fn handle_new_activations<Context: SubsystemContext>(
+	config: Arc<CollationGenerationConfig>,
+	activated: &[Hash],
+	ctx: &mut Context,
+	sender: &mpsc::Sender<AllMessages>,
+) -> Result<()> {
+	// follow the procedure from the guide:
+	// https://w3f.github.io/parachain-implementers-guide/node/collators/collation-generation.html
+
+	for relay_parent in activated.iter().copied() {
+		// double-future magic happens here: the first layer of requests takes a mutable borrow of the context, and
+		// returns a receiver. The second layer of requests actually polls those receivers to completion.
+		let (global_validation_data, availability_cores, validators) = join!(
+			request_global_validation_data_ctx(relay_parent, ctx).await?,
+			request_availability_cores_ctx(relay_parent, ctx).await?,
+			request_validators_ctx(relay_parent, ctx).await?,
+		);
+
+		let global_validation_data = global_validation_data??;
+		let availability_cores = availability_cores??;
+		let n_validators = validators??.len();
+
+		for core in availability_cores {
+			let (scheduled_core, assumption) = match core {
+				CoreState::Scheduled(scheduled_core) => {
+					(scheduled_core, OccupiedCoreAssumption::Free)
+				}
+				CoreState::Occupied(_occupied_core) => {
+					// TODO: https://github.com/paritytech/polkadot/issues/1573
+					continue;
+				}
+				_ => continue,
+			};
+
+			if scheduled_core.para_id != config.para_id {
+				continue;
+			}
+
+			// we get local validation data synchronously for each core instead of within the subtask loop,
+			// because we have only a single mutable handle to the context, so the work can't really be distributed
+			let local_validation_data = match request_local_validation_data_ctx(
+				relay_parent,
+				scheduled_core.para_id,
+				assumption,
+				ctx,
+			)
+			.await?
+			.await??
+			{
+				Some(local_validation_data) => local_validation_data,
+				None => continue,
+			};
+
+			let task_global_validation_data = global_validation_data.clone();
+			let task_config = config.clone();
+			let mut task_sender = sender.clone();
+			ctx.spawn("collation generation collation builder", Box::pin(async move {
+				let validation_data_hash =
+					validation_data_hash(&task_global_validation_data, &local_validation_data);
+
+				let collation = (task_config.collator)(&task_global_validation_data, &local_validation_data).await;
+
+				let pov_hash = collation.proof_of_validity.hash();
+
+				let signature_payload = collator_signature_payload(
+					&relay_parent,
+					&scheduled_core.para_id,
+					&validation_data_hash,
+					&pov_hash,
+				);
+
+				let erasure_root = match erasure_root(n_validators, local_validation_data, task_global_validation_data, collation.proof_of_validity.clone()) {
+					Ok(erasure_root) => erasure_root,
+					Err(err) => {
+						log::error!(target: "collation_generation", "failed to calculate erasure root for para_id {}: {:?}", scheduled_core.para_id, err);
+						return
+					}
+				};
+
+				let commitments = CandidateCommitments {
+					fees: collation.fees,
+					upward_messages: collation.upward_messages,
+					new_validation_code: collation.new_validation_code,
+					head_data: collation.head_data,
+					erasure_root,
+				};
+
+				let ccr = CandidateReceipt {
+					commitments_hash: commitments.hash(),
+					descriptor: CandidateDescriptor {
+						signature: task_config.key.sign(&signature_payload),
+						para_id: scheduled_core.para_id,
+						relay_parent,
+						collator: task_config.key.public(),
+						validation_data_hash,
+						pov_hash,
+					},
+				};
+
+				if let Err(err) = task_sender.send(AllMessages::CollatorProtocol(
+					CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity)
+				)).await {
+					log::warn!(target: "collation_generation", "failed to send collation result for para_id {}: {:?}", scheduled_core.para_id, err);
+				}
+			})).await?;
+		}
+	}
+
+	Ok(())
+}
+
+fn erasure_root(
+	n_validators: usize,
+	local_validation_data: LocalValidationData,
+	global_validation_data: GlobalValidationData,
+	pov: PoV,
+) -> Result<Hash> {
+	let omitted_validation = polkadot_primitives::v1::OmittedValidationData {
+		global_validation: global_validation_data,
+		local_validation: local_validation_data,
+	};
+
+	let available_data = AvailableData {
+		omitted_validation,
+		pov,
+	};
+
+	let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
+	Ok(polkadot_erasure_coding::branches(&chunks).root())
+}
+
+#[cfg(test)]
+mod tests {
+	mod handle_new_activations {
+		use super::super::*;
+		use futures::{
+			lock::Mutex,
+			task::{Context as FuturesContext, Poll},
+			Future,
+		};
+		use polkadot_node_primitives::Collation;
+		use polkadot_node_subsystem::messages::{
+			AllMessages, RuntimeApiMessage, RuntimeApiRequest,
+		};
+		use polkadot_node_subsystem_test_helpers::{
+			subsystem_test_harness, TestSubsystemContextHandle,
+		};
+		use polkadot_primitives::v1::{
+			BlockData, BlockNumber, CollatorPair, GlobalValidationData, Id as ParaId,
+			LocalValidationData, PoV, ScheduledCore,
+		};
+		use std::pin::Pin;
+
+		fn test_collation() -> Collation {
+			Collation {
+				fees: Default::default(),
+				upward_messages: Default::default(),
+				new_validation_code: Default::default(),
+				head_data: Default::default(),
+				proof_of_validity: PoV {
+					block_data: BlockData(Vec::new()),
+				},
+			}
+		}
+
+		// Box<dyn Future<Output = Collation> + Unpin + Send
+		struct TestCollator;
+
+		impl Future for TestCollator {
+			type Output = Collation;
+
+			fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll<Self::Output> {
+				Poll::Ready(test_collation())
+			}
+		}
+
+		impl Unpin for TestCollator {}
+
+		fn test_config<Id: Into<ParaId>>(para_id: Id) -> Arc<CollationGenerationConfig> {
+			Arc::new(CollationGenerationConfig {
+				key: CollatorPair::generate().0,
+				collator: Box::new(|_gvd: &GlobalValidationData, _lvd: &LocalValidationData| {
+					Box::new(TestCollator)
+				}),
+				para_id: para_id.into(),
+			})
+		}
+
+		fn scheduled_core_for<Id: Into<ParaId>>(para_id: Id) -> ScheduledCore {
+			ScheduledCore {
+				para_id: para_id.into(),
+				collator: None,
+			}
+		}
+
+		#[test]
+		fn requests_validation_and_availability_per_relay_parent() {
+			let activated_hashes: Vec<Hash> = vec![
+				[1; 32].into(),
+				[4; 32].into(),
+				[9; 32].into(),
+				[16; 32].into(),
+			];
+
+			let requested_validation_data = Arc::new(Mutex::new(Vec::new()));
+			let requested_availability_cores = Arc::new(Mutex::new(Vec::new()));
+
+			let overseer_requested_validation_data = requested_validation_data.clone();
+			let overseer_requested_availability_cores = requested_availability_cores.clone();
+			let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
+				loop {
+					match handle.try_recv().await {
+						None => break,
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::GlobalValidationData(tx)))) => {
+							overseer_requested_validation_data.lock().await.push(hash);
+							tx.send(Ok(Default::default())).unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx)))) => {
+							overseer_requested_availability_cores.lock().await.push(hash);
+							tx.send(Ok(vec![])).unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(_hash, RuntimeApiRequest::Validators(tx)))) => {
+							tx.send(Ok(vec![Default::default(); 3])).unwrap();
+						}
+						Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg),
+					}
+				}
+			};
+
+			let (tx, _rx) = mpsc::channel(0);
+
+			let subsystem_activated_hashes = activated_hashes.clone();
+			subsystem_test_harness(overseer, |mut ctx| async move {
+				handle_new_activations(
+					test_config(123),
+					&subsystem_activated_hashes,
+					&mut ctx,
+					&tx,
+				)
+				.await
+				.unwrap();
+			});
+
+			let mut requested_validation_data = Arc::try_unwrap(requested_validation_data)
+				.expect("overseer should have shut down by now")
+				.into_inner();
+			requested_validation_data.sort();
+			let mut requested_availability_cores = Arc::try_unwrap(requested_availability_cores)
+				.expect("overseer should have shut down by now")
+				.into_inner();
+			requested_availability_cores.sort();
+
+			assert_eq!(requested_validation_data, activated_hashes);
+			assert_eq!(requested_availability_cores, activated_hashes);
+		}
+
+		#[test]
+		fn requests_local_validation_for_scheduled_matches() {
+			let activated_hashes: Vec<Hash> = vec![
+				Hash::repeat_byte(1),
+				Hash::repeat_byte(4),
+				Hash::repeat_byte(9),
+				Hash::repeat_byte(16),
+			];
+
+			let requested_local_validation_data = Arc::new(Mutex::new(Vec::new()));
+
+			let overseer_requested_local_validation_data = requested_local_validation_data.clone();
+			let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
+				loop {
+					match handle.try_recv().await {
+						None => break,
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							_hash,
+							RuntimeApiRequest::GlobalValidationData(tx),
+						))) => {
+							tx.send(Ok(Default::default())).unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							hash,
+							RuntimeApiRequest::AvailabilityCores(tx),
+						))) => {
+							tx.send(Ok(vec![
+								CoreState::Free,
+								// this is weird, see explanation below
+								CoreState::Scheduled(scheduled_core_for(
+									(hash.as_fixed_bytes()[0] * 4) as u32,
+								)),
+								CoreState::Scheduled(scheduled_core_for(
+									(hash.as_fixed_bytes()[0] * 5) as u32,
+								)),
+							]))
+							.unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							hash,
+							RuntimeApiRequest::LocalValidationData(
+								_para_id,
+								_occupied_core_assumption,
+								tx,
+							),
+						))) => {
+							overseer_requested_local_validation_data
+								.lock()
+								.await
+								.push(hash);
+							tx.send(Ok(Default::default())).unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							_hash,
+							RuntimeApiRequest::Validators(tx),
+						))) => {
+							tx.send(Ok(vec![Default::default(); 3])).unwrap();
+						}
+						Some(msg) => {
+							panic!("didn't expect any other overseer requests; got {:?}", msg)
+						}
+					}
+				}
+			};
+
+			let (tx, _rx) = mpsc::channel(0);
+
+			subsystem_test_harness(overseer, |mut ctx| async move {
+				handle_new_activations(test_config(16), &activated_hashes, &mut ctx, &tx)
+					.await
+					.unwrap();
+			});
+
+			let requested_local_validation_data = Arc::try_unwrap(requested_local_validation_data)
+				.expect("overseer should have shut down by now")
+				.into_inner();
+
+			// the only activated hash should be from the 4 hash:
+			// each activated hash generates two scheduled cores: one with its value * 4, one with its value * 5
+			// given that the test configuration has a para_id of 16, there's only one way to get that value: with the 4
+			// hash.
+			assert_eq!(requested_local_validation_data, vec![[4; 32].into()]);
+		}
+
+		#[test]
+		fn sends_distribute_collation_message() {
+			let activated_hashes: Vec<Hash> = vec![
+				Hash::repeat_byte(1),
+				Hash::repeat_byte(4),
+				Hash::repeat_byte(9),
+				Hash::repeat_byte(16),
+			];
+
+			let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
+				loop {
+					match handle.try_recv().await {
+						None => break,
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							_hash,
+							RuntimeApiRequest::GlobalValidationData(tx),
+						))) => {
+							tx.send(Ok(Default::default())).unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							hash,
+							RuntimeApiRequest::AvailabilityCores(tx),
+						))) => {
+							tx.send(Ok(vec![
+								CoreState::Free,
+								// this is weird, see explanation below
+								CoreState::Scheduled(scheduled_core_for(
+									(hash.as_fixed_bytes()[0] * 4) as u32,
+								)),
+								CoreState::Scheduled(scheduled_core_for(
+									(hash.as_fixed_bytes()[0] * 5) as u32,
+								)),
+							]))
+							.unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							_hash,
+							RuntimeApiRequest::LocalValidationData(
+								_para_id,
+								_occupied_core_assumption,
+								tx,
+							),
+						))) => {
+							tx.send(Ok(Some(Default::default()))).unwrap();
+						}
+						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+							_hash,
+							RuntimeApiRequest::Validators(tx),
+						))) => {
+							tx.send(Ok(vec![Default::default(); 3])).unwrap();
+						}
+						Some(msg) => {
+							panic!("didn't expect any other overseer requests; got {:?}", msg)
+						}
+					}
+				}
+			};
+
+			let config = test_config(16);
+			let subsystem_config = config.clone();
+
+			let (tx, rx) = mpsc::channel(0);
+
+			// empty vec doesn't allocate on the heap, so it's ok we throw it away
+			let sent_messages = Arc::new(Mutex::new(Vec::new()));
+			let subsystem_sent_messages = sent_messages.clone();
+			subsystem_test_harness(overseer, |mut ctx| async move {
+				handle_new_activations(subsystem_config, &activated_hashes, &mut ctx, &tx)
+					.await
+					.unwrap();
+
+				std::mem::drop(tx);
+
+				// collect all sent messages
+				*subsystem_sent_messages.lock().await = rx.collect().await;
+			});
+
+			let sent_messages = Arc::try_unwrap(sent_messages)
+				.expect("subsystem should have shut down by now")
+				.into_inner();
+
+			// we expect a single message to be sent, containing a candidate receipt.
+			// we don't care too much about the commitments_hash right now, but let's ensure that we've calculated the
+			// correct descriptor
+			let expect_pov_hash = test_collation().proof_of_validity.hash();
+			let expect_validation_data_hash =
+				validation_data_hash::<BlockNumber>(&Default::default(), &Default::default());
+			let expect_relay_parent = Hash::repeat_byte(4);
+			let expect_payload = collator_signature_payload(
+				&expect_relay_parent,
+				&config.para_id,
+				&expect_validation_data_hash,
+				&expect_pov_hash,
+			);
+			let expect_descriptor = CandidateDescriptor {
+				signature: config.key.sign(&expect_payload),
+				para_id: config.para_id,
+				relay_parent: expect_relay_parent,
+				collator: config.key.public(),
+				validation_data_hash: expect_validation_data_hash,
+				pov_hash: expect_pov_hash,
+			};
+
+			assert_eq!(sent_messages.len(), 1);
+			match &sent_messages[0] {
+				AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
+					CandidateReceipt { descriptor, .. },
+					_pov,
+				)) => {
+					// signature generation is non-deterministic, so we can't just assert that the
+					// expected descriptor is correct. What we can do is validate that the produced
+					// descriptor has a valid signature, then just copy in the generated signature
+					// and check the rest of the fields for equality.
+					assert!(CollatorPair::verify(
+						&descriptor.signature,
+						&collator_signature_payload(
+							&descriptor.relay_parent,
+							&descriptor.para_id,
+							&descriptor.validation_data_hash,
+							&descriptor.pov_hash,
+						)
+						.as_ref(),
+						&descriptor.collator,
+					));
+					let expect_descriptor = {
+						let mut expect_descriptor = expect_descriptor;
+						expect_descriptor.signature = descriptor.signature.clone();
+						expect_descriptor
+					};
+					assert_eq!(descriptor, &expect_descriptor);
+				}
+				_ => panic!("received wrong message type"),
+			}
+		}
+	}
+}
diff --git a/polkadot/node/overseer/examples/minimal-example.rs b/polkadot/node/overseer/examples/minimal-example.rs
index 3b12da323ba..3ebcb348255 100644
--- a/polkadot/node/overseer/examples/minimal-example.rs
+++ b/polkadot/node/overseer/examples/minimal-example.rs
@@ -145,7 +145,6 @@ fn main() {
 			candidate_validation: Subsystem2,
 			candidate_backing: Subsystem1,
 			candidate_selection: DummySubsystem,
-			collator_protocol: DummySubsystem,
 			statement_distribution: DummySubsystem,
 			availability_distribution: DummySubsystem,
 			bitfield_signing: DummySubsystem,
@@ -156,6 +155,8 @@ fn main() {
 			availability_store: DummySubsystem,
 			network_bridge: DummySubsystem,
 			chain_api: DummySubsystem,
+			collation_generation: DummySubsystem,
+			collator_protocol: DummySubsystem,
 		};
 		let (overseer, _handler) = Overseer::new(
 			vec![],
diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs
index 4db8209b8b6..f37a398df47 100644
--- a/polkadot/node/overseer/src/lib.rs
+++ b/polkadot/node/overseer/src/lib.rs
@@ -78,8 +78,8 @@ use polkadot_subsystem::messages::{
 	CandidateValidationMessage, CandidateBackingMessage,
 	CandidateSelectionMessage, ChainApiMessage, StatementDistributionMessage,
 	AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage,
-	ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, CollatorProtocolMessage,
-	AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages,
+	ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage,
+	AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, CollationGenerationMessage, CollatorProtocolMessage,
 };
 pub use polkadot_subsystem::{
 	Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
@@ -352,9 +352,6 @@ pub struct Overseer<S: SpawnNamed> {
 	/// A candidate selection subsystem.
 	candidate_selection_subsystem: OverseenSubsystem<CandidateSelectionMessage>,
 
-	/// A collator protocol subsystem
-	collator_protocol_subsystem: OverseenSubsystem<CollatorProtocolMessage>,
-
 	/// A statement distribution subsystem.
 	statement_distribution_subsystem: OverseenSubsystem<StatementDistributionMessage>,
 
@@ -382,9 +379,15 @@ pub struct Overseer<S: SpawnNamed> {
 	/// A network bridge subsystem.
 	network_bridge_subsystem: OverseenSubsystem<NetworkBridgeMessage>,
 
-	/// A Chain API subsystem
+	/// A Chain API subsystem.
 	chain_api_subsystem: OverseenSubsystem<ChainApiMessage>,
 
+	/// A Collation Generation subsystem.
+	collation_generation_subsystem: OverseenSubsystem<CollationGenerationMessage>,
+
+	/// A Collator Protocol subsystem.
+	collator_protocol_subsystem: OverseenSubsystem<CollatorProtocolMessage>,
+
 	/// Spawner to spawn tasks to.
 	s: S,
 
@@ -417,15 +420,13 @@ pub struct Overseer<S: SpawnNamed> {
 ///
 /// [`Subsystem`]: trait.Subsystem.html
 /// [`DummySubsystem`]: struct.DummySubsystem.html
-pub struct AllSubsystems<CV, CB, CS, CP, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA> {
+pub struct AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
 	/// A candidate validation subsystem.
 	pub candidate_validation: CV,
 	/// A candidate backing subsystem.
 	pub candidate_backing: CB,
 	/// A candidate selection subsystem.
 	pub candidate_selection: CS,
-	/// A collator protocol subsystem.
-	pub collator_protocol: CP,
 	/// A statement distribution subsystem.
 	pub statement_distribution: SD,
 	/// An availability distribution subsystem.
@@ -446,6 +447,10 @@ pub struct AllSubsystems<CV, CB, CS, CP, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA
 	pub network_bridge: NB,
 	/// A Chain API subsystem.
 	pub chain_api: CA,
+	/// A Collation Generation subsystem.
+	pub collation_generation: CG,
+	/// A Collator Protocol subsystem.
+	pub collator_protocol: CP,
 }
 
 impl<S> Overseer<S>
@@ -518,7 +523,6 @@ where
 	///     candidate_validation: ValidationSubsystem,
 	///     candidate_backing: DummySubsystem,
 	///     candidate_selection: DummySubsystem,
-	///	    collator_protocol: DummySubsystem,
 	///     statement_distribution: DummySubsystem,
 	///     availability_distribution: DummySubsystem,
 	///     bitfield_signing: DummySubsystem,
@@ -529,6 +533,8 @@ where
 	///     availability_store: DummySubsystem,
 	///     network_bridge: DummySubsystem,
 	///     chain_api: DummySubsystem,
+	///     collation_generation: DummySubsystem,
+	///     collator_protocol: DummySubsystem,
 	/// };
 	/// let (overseer, _handler) = Overseer::new(
 	///     vec![],
@@ -549,16 +555,15 @@ where
 	/// #
 	/// # }); }
 	/// ```
-	pub fn new<CV, CB, CS, CP, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA>(
+	pub fn new<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>(
 		leaves: impl IntoIterator<Item = BlockInfo>,
-		all_subsystems: AllSubsystems<CV, CB, CS, CP, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA>,
+		all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>,
 		mut s: S,
 	) -> SubsystemResult<(Self, OverseerHandler)>
 	where
 		CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + Send,
 		CB: Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + Send,
 		CS: Subsystem<OverseerSubsystemContext<CandidateSelectionMessage>> + Send,
-		CP: Subsystem<OverseerSubsystemContext<CollatorProtocolMessage>> + Send,
 		SD: Subsystem<OverseerSubsystemContext<StatementDistributionMessage>> + Send,
 		AD: Subsystem<OverseerSubsystemContext<AvailabilityDistributionMessage>> + Send,
 		BS: Subsystem<OverseerSubsystemContext<BitfieldSigningMessage>> + Send,
@@ -569,6 +574,8 @@ where
 		AS: Subsystem<OverseerSubsystemContext<AvailabilityStoreMessage>> + Send,
 		NB: Subsystem<OverseerSubsystemContext<NetworkBridgeMessage>> + Send,
 		CA: Subsystem<OverseerSubsystemContext<ChainApiMessage>> + Send,
+		CG: Subsystem<OverseerSubsystemContext<CollationGenerationMessage>> + Send,
+		CP: Subsystem<OverseerSubsystemContext<CollatorProtocolMessage>> + Send,
 	{
 		let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
 
@@ -600,13 +607,6 @@ where
 			all_subsystems.candidate_selection,
 		)?;
 
-		let collator_protocol_subsystem = spawn(
-			&mut s,
-			&mut running_subsystems,
-			&mut running_subsystems_rx,
-			all_subsystems.collator_protocol,
-		)?;
-
 		let statement_distribution_subsystem = spawn(
 			&mut s,
 			&mut running_subsystems,
@@ -677,6 +677,21 @@ where
 			all_subsystems.chain_api,
 		)?;
 
+		let collation_generation_subsystem = spawn(
+			&mut s,
+			&mut running_subsystems,
+			&mut running_subsystems_rx,
+			all_subsystems.collation_generation,
+		)?;
+
+
+		let collator_protocol_subsystem = spawn(
+			&mut s,
+			&mut running_subsystems,
+			&mut running_subsystems_rx,
+			all_subsystems.collator_protocol,
+		)?;
+
 		let active_leaves = HashSet::new();
 
 		let leaves = leaves
@@ -688,7 +703,6 @@ where
 			candidate_validation_subsystem,
 			candidate_backing_subsystem,
 			candidate_selection_subsystem,
-			collator_protocol_subsystem,
 			statement_distribution_subsystem,
 			availability_distribution_subsystem,
 			bitfield_signing_subsystem,
@@ -699,6 +713,8 @@ where
 			availability_store_subsystem,
 			network_bridge_subsystem,
 			chain_api_subsystem,
+			collation_generation_subsystem,
+			collator_protocol_subsystem,
 			s,
 			running_subsystems,
 			running_subsystems_rx,
@@ -724,10 +740,6 @@ where
 			let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
 		}
 
-		if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
-			let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
-		}
-
 		if let Some(ref mut s) = self.statement_distribution_subsystem.instance {
 			let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
 		}
@@ -768,6 +780,14 @@ where
 			let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
 		}
 
+		if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
+			let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
+		}
+
+		if let Some(ref mut s) = self.collation_generation_subsystem.instance {
+			let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
+		}
+
 		let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse();
 
 		loop {
@@ -889,10 +909,6 @@ where
 			s.tx.send(FromOverseer::Signal(signal.clone())).await?;
 		}
 
-		if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
-			s.tx.send(FromOverseer::Signal(signal.clone())).await?;
-		}
-
 		if let Some(ref mut s) = self.statement_distribution_subsystem.instance {
 			s.tx.send(FromOverseer::Signal(signal.clone())).await?;
 		}
@@ -930,7 +946,15 @@ where
 		}
 
 		if let Some(ref mut s) = self.chain_api_subsystem.instance {
-			s.tx.send(FromOverseer::Signal(signal)).await?;
+			s.tx.send(FromOverseer::Signal(signal.clone())).await?;
+		}
+
+		if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
+			s.tx.send(FromOverseer::Signal(signal.clone())).await?;
+		}
+
+		if let Some(ref mut s) = self.collation_generation_subsystem.instance {
+			s.tx.send(FromOverseer::Signal(signal.clone())).await?;
 		}
 
 		Ok(())
@@ -953,11 +977,6 @@ where
 					let _ = s.tx.send(FromOverseer::Communication { msg }).await;
 				}
 			}
-			AllMessages::CollatorProtocol(msg) => {
-				if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
-					let _ = s.tx.send(FromOverseer::Communication { msg }).await;
-				}
-			}
 			AllMessages::StatementDistribution(msg) => {
 				if let Some(ref mut s) = self.statement_distribution_subsystem.instance {
 					let _ = s.tx.send(FromOverseer::Communication { msg }).await;
@@ -1008,6 +1027,16 @@ where
 					let _ = s.tx.send(FromOverseer::Communication { msg }).await;
 				}
 			}
+			AllMessages::CollationGeneration(msg) => {
+				if let Some(ref mut s) = self.collation_generation_subsystem.instance {
+					let _ = s.tx.send(FromOverseer::Communication { msg }).await;
+				}
+			}
+			AllMessages::CollatorProtocol(msg) => {
+				if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
+					let _ = s.tx.send(FromOverseer::Communication { msg }).await;
+				}
+			}
 		}
 	}
 
@@ -1169,7 +1198,6 @@ mod tests {
 				candidate_validation: TestSubsystem1(s1_tx),
 				candidate_backing: TestSubsystem2(s2_tx),
 				candidate_selection: DummySubsystem,
-				collator_protocol: DummySubsystem,
 				statement_distribution: DummySubsystem,
 				availability_distribution: DummySubsystem,
 				bitfield_signing: DummySubsystem,
@@ -1180,6 +1208,8 @@ mod tests {
 				availability_store: DummySubsystem,
 				network_bridge: DummySubsystem,
 				chain_api: DummySubsystem,
+				collation_generation: DummySubsystem,
+				collator_protocol: DummySubsystem,
 			};
 			let (overseer, mut handler) = Overseer::new(
 				vec![],
@@ -1234,7 +1264,6 @@ mod tests {
 				candidate_validation: TestSubsystem1(s1_tx),
 				candidate_backing: TestSubsystem4,
 				candidate_selection: DummySubsystem,
-				collator_protocol: DummySubsystem,
 				statement_distribution: DummySubsystem,
 				availability_distribution: DummySubsystem,
 				bitfield_signing: DummySubsystem,
@@ -1245,6 +1274,8 @@ mod tests {
 				availability_store: DummySubsystem,
 				network_bridge: DummySubsystem,
 				chain_api: DummySubsystem,
+				collation_generation: DummySubsystem,
+				collator_protocol: DummySubsystem,
 			};
 			let (overseer, _handle) = Overseer::new(
 				vec![],
@@ -1352,7 +1383,6 @@ mod tests {
 				candidate_validation: TestSubsystem5(tx_5),
 				candidate_backing: TestSubsystem6(tx_6),
 				candidate_selection: DummySubsystem,
-				collator_protocol: DummySubsystem,
 				statement_distribution: DummySubsystem,
 				availability_distribution: DummySubsystem,
 				bitfield_signing: DummySubsystem,
@@ -1363,6 +1393,8 @@ mod tests {
 				availability_store: DummySubsystem,
 				network_bridge: DummySubsystem,
 				chain_api: DummySubsystem,
+				collation_generation: DummySubsystem,
+				collator_protocol: DummySubsystem,
 			};
 			let (overseer, mut handler) = Overseer::new(
 				vec![first_block],
@@ -1455,7 +1487,6 @@ mod tests {
 				candidate_validation: TestSubsystem5(tx_5),
 				candidate_backing: TestSubsystem6(tx_6),
 				candidate_selection: DummySubsystem,
-				collator_protocol: DummySubsystem,
 				statement_distribution: DummySubsystem,
 				availability_distribution: DummySubsystem,
 				bitfield_signing: DummySubsystem,
@@ -1466,6 +1497,8 @@ mod tests {
 				availability_store: DummySubsystem,
 				network_bridge: DummySubsystem,
 				chain_api: DummySubsystem,
+				collation_generation: DummySubsystem,
+				collator_protocol: DummySubsystem,
 			};
 			// start with two forks of different height.
 			let (overseer, mut handler) = Overseer::new(
diff --git a/polkadot/node/primitives/Cargo.toml b/polkadot/node/primitives/Cargo.toml
index b38e7e542ed..81e2467b374 100644
--- a/polkadot/node/primitives/Cargo.toml
+++ b/polkadot/node/primitives/Cargo.toml
@@ -6,6 +6,7 @@ edition = "2018"
 description = "Primitives types for the Node-side"
 
 [dependencies]
+futures = "0.3.5"
 polkadot-primitives = { path = "../../primitives" }
 polkadot-statement-table = { path = "../../statement-table" }
 parity-scale-codec = { version = "1.3.4", default-features = false, features = ["derive"] }
diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs
index 2bcc7a39264..2ff704c2dd1 100644
--- a/polkadot/node/primitives/src/lib.rs
+++ b/polkadot/node/primitives/src/lib.rs
@@ -20,12 +20,13 @@
 //! not shared between the node and the runtime. This crate builds on top of the primitives defined
 //! there.
 
+use futures::Future;
 use parity_scale_codec::{Decode, Encode};
 use polkadot_primitives::v1::{
 	Hash, CommittedCandidateReceipt, CandidateReceipt, CompactStatement,
 	EncodeAs, Signed, SigningContext, ValidatorIndex, ValidatorId,
 	UpwardMessage, Balance, ValidationCode, GlobalValidationData, LocalValidationData,
-	HeadData,
+	HeadData, PoV, CollatorPair, Id as ParaId,
 };
 use polkadot_statement_table::{
 	generic::{
@@ -258,3 +259,39 @@ impl std::convert::TryFrom<FromTableMisbehavior> for MisbehaviorReport {
 		}
 	}
 }
+
+/// The output of a collator.
+///
+/// This differs from `CandidateCommitments` in two ways:
+///
+/// - does not contain the erasure root; that's computed at the Polkadot level, not at Cumulus
+/// - contains a proof of validity.
+#[derive(Clone, Encode, Decode)]
+pub struct Collation {
+	/// Fees paid from the chain to the relay chain validators.
+	pub fees: Balance,
+	/// Messages destined to be interpreted by the Relay chain itself.
+	pub upward_messages: Vec<UpwardMessage>,
+	/// New validation code.
+	pub new_validation_code: Option<ValidationCode>,
+	/// The head-data produced as a result of execution.
+	pub head_data: HeadData,
+	/// Proof that this block is valid.
+	pub proof_of_validity: PoV,
+}
+
+/// Configuration for the collation generator
+pub struct CollationGenerationConfig {
+	/// Collator's authentication key, so it can sign things.
+	pub key: CollatorPair,
+	/// Collation function.
+	pub collator: Box<dyn Fn(&GlobalValidationData, &LocalValidationData) -> Box<dyn Future<Output = Collation> + Unpin + Send> + Send + Sync>,
+	/// The parachain that this collator collates for
+	pub para_id: ParaId,
+}
+
+impl std::fmt::Debug for CollationGenerationConfig {
+	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+		write!(f, "CollationGenerationConfig {{ ... }}")
+	}
+}
diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs
index ca350454f02..01b1a240906 100644
--- a/polkadot/node/service/src/lib.rs
+++ b/polkadot/node/service/src/lib.rs
@@ -281,7 +281,6 @@ fn real_overseer<S: SpawnNamed>(
 		candidate_validation: DummySubsystem,
 		candidate_backing: DummySubsystem,
 		candidate_selection: DummySubsystem,
-		collator_protocol: DummySubsystem,
 		statement_distribution: DummySubsystem,
 		availability_distribution: DummySubsystem,
 		bitfield_signing: DummySubsystem,
@@ -292,6 +291,8 @@ fn real_overseer<S: SpawnNamed>(
 		availability_store: DummySubsystem,
 		network_bridge: DummySubsystem,
 		chain_api: DummySubsystem,
+		collation_generation: DummySubsystem,
+		collator_protocol: DummySubsystem,
 	};
 	Overseer::new(
 		leaves,
diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs
index 26c55354fbe..eee22dfc6f5 100644
--- a/polkadot/node/subsystem-test-helpers/src/lib.rs
+++ b/polkadot/node/subsystem-test-helpers/src/lib.rs
@@ -16,19 +16,21 @@
 
 //! Utilities for testing subsystems.
 
-use polkadot_node_subsystem::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError};
 use polkadot_node_subsystem::messages::AllMessages;
+use polkadot_node_subsystem::{FromOverseer, SubsystemContext, SubsystemError, SubsystemResult};
 
-use futures::prelude::*;
 use futures::channel::mpsc;
 use futures::poll;
+use futures::prelude::*;
+use futures_timer::Delay;
 use parking_lot::Mutex;
-use sp_core::traits::SpawnNamed;
+use sp_core::{testing::TaskExecutor, traits::SpawnNamed};
 
 use std::convert::Infallible;
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll, Waker};
+use std::time::Duration;
 
 enum SinkState<T> {
 	Empty {
@@ -50,24 +52,21 @@ pub struct SingleItemStream<T>(Arc<Mutex<SinkState<T>>>);
 impl<T> Sink<T> for SingleItemSink<T> {
 	type Error = Infallible;
 
-	fn poll_ready(
-		self: Pin<&mut Self>,
-		cx: &mut Context,
-	) -> Poll<Result<(), Infallible>> {
+	fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
 		let mut state = self.0.lock();
 		match *state {
 			SinkState::Empty { .. } => Poll::Ready(Ok(())),
-			SinkState::Item { ref mut ready_waker, .. } => {
+			SinkState::Item {
+				ref mut ready_waker,
+				..
+			} => {
 				*ready_waker = Some(cx.waker().clone());
 				Poll::Pending
 			}
 		}
 	}
 
-	fn start_send(
-		self: Pin<&mut Self>,
-		item: T,
-	) -> Result<(), Infallible> {
+	fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Infallible> {
 		let mut state = self.0.lock();
 
 		match *state {
@@ -88,24 +87,21 @@ impl<T> Sink<T> for SingleItemSink<T> {
 		Ok(())
 	}
 
-	fn poll_flush(
-		self: Pin<&mut Self>,
-		cx: &mut Context,
-	) -> Poll<Result<(), Infallible>> {
+	fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
 		let mut state = self.0.lock();
 		match *state {
 			SinkState::Empty { .. } => Poll::Ready(Ok(())),
-			SinkState::Item { ref mut flush_waker, .. } => {
+			SinkState::Item {
+				ref mut flush_waker,
+				..
+			} => {
 				*flush_waker = Some(cx.waker().clone());
 				Poll::Pending
 			}
 		}
 	}
 
-	fn poll_close(
-		self: Pin<&mut Self>,
-		cx: &mut Context,
-	) -> Poll<Result<(), Infallible>> {
+	fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
 		self.poll_flush(cx)
 	}
 }
@@ -120,7 +116,11 @@ impl<T> Stream for SingleItemStream<T> {
 
 		match std::mem::replace(&mut *state, SinkState::Empty { read_waker }) {
 			SinkState::Empty { .. } => Poll::Pending,
-			SinkState::Item { item, ready_waker, flush_waker } => {
+			SinkState::Item {
+				item,
+				ready_waker,
+				flush_waker,
+			} => {
 				if let Some(waker) = ready_waker {
 					waker.wake();
 				}
@@ -141,10 +141,7 @@ impl<T> Stream for SingleItemStream<T> {
 /// not when the item is buffered.
 pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) {
 	let inner = Arc::new(Mutex::new(SinkState::Empty { read_waker: None }));
-	(
-		SingleItemSink(inner.clone()),
-		SingleItemStream(inner),
-	)
+	(SingleItemSink(inner.clone()), SingleItemStream(inner))
 }
 
 /// A test subsystem context.
@@ -155,7 +152,9 @@ pub struct TestSubsystemContext<M, S> {
 }
 
 #[async_trait::async_trait]
-impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext for TestSubsystemContext<M, S> {
+impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
+	for TestSubsystemContext<M, S>
+{
 	type Message = M;
 
 	async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
@@ -170,9 +169,11 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext for Tes
 		self.rx.next().await.ok_or(SubsystemError)
 	}
 
-	async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
-		-> SubsystemResult<()>
-	{
+	async fn spawn(
+		&mut self,
+		name: &'static str,
+		s: Pin<Box<dyn Future<Output = ()> + Send>>,
+	) -> SubsystemResult<()> {
 		self.spawn.spawn(name, s);
 		Ok(())
 	}
@@ -185,15 +186,23 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext for Tes
 	}
 
 	async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
-		self.tx.send(msg).await.expect("test overseer no longer live");
+		self.tx
+			.send(msg)
+			.await
+			.expect("test overseer no longer live");
 		Ok(())
 	}
 
 	async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
-		where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
+	where
+		T: IntoIterator<Item = AllMessages> + Send,
+		T::IntoIter: Send,
 	{
 		let mut iter = stream::iter(msgs.into_iter().map(Ok));
-		self.tx.send_all(&mut iter).await.expect("test overseer no longer live");
+		self.tx
+			.send_all(&mut iter)
+			.await
+			.expect("test overseer no longer live");
 
 		Ok(())
 	}
@@ -209,19 +218,27 @@ impl<M> TestSubsystemContextHandle<M> {
 	/// Send a message or signal to the subsystem. This resolves at the point in time where the
 	/// subsystem has _read_ the message.
 	pub async fn send(&mut self, from_overseer: FromOverseer<M>) {
-		self.tx.send(from_overseer).await.expect("Test subsystem no longer live");
+		self.tx
+			.send(from_overseer)
+			.await
+			.expect("Test subsystem no longer live");
 	}
 
 	/// Receive the next message from the subsystem.
 	pub async fn recv(&mut self) -> AllMessages {
-		self.rx.next().await.expect("Test subsystem no longer live")
+		self.try_recv().await.expect("Test subsystem no longer live")
+	}
+
+	/// Receive the next message from the subsystem, or `None` if the channel has been closed.
+	pub async fn try_recv(&mut self) -> Option<AllMessages> {
+		self.rx.next().await
 	}
 }
 
 /// Make a test subsystem context.
-pub fn make_subsystem_context<M, S>(spawn: S)
-	-> (TestSubsystemContext<M, S>, TestSubsystemContextHandle<M>)
-{
+pub fn make_subsystem_context<M, S>(
+	spawn: S,
+) -> (TestSubsystemContext<M, S>, TestSubsystemContextHandle<M>) {
 	let (overseer_tx, overseer_rx) = single_item_sink();
 	let (all_messages_tx, all_messages_rx) = mpsc::unbounded();
 
@@ -233,7 +250,39 @@ pub fn make_subsystem_context<M, S>(spawn: S)
 		},
 		TestSubsystemContextHandle {
 			tx: overseer_tx,
-			rx: all_messages_rx
+			rx: all_messages_rx,
 		},
 	)
-}
\ No newline at end of file
+}
+
+/// Test a subsystem, mocking the overseer
+///
+/// Pass in two async closures: one mocks the overseer, the other runs the test from the perspective of a subsystem.
+///
+/// Times out in two seconds.
+pub fn subsystem_test_harness<M, OverseerFactory, Overseer, TestFactory, Test>(
+	overseer_factory: OverseerFactory,
+	test_factory: TestFactory,
+) where
+	OverseerFactory: FnOnce(TestSubsystemContextHandle<M>) -> Overseer,
+	Overseer: Future<Output = ()>,
+	TestFactory: FnOnce(TestSubsystemContext<M, TaskExecutor>) -> Test,
+	Test: Future<Output = ()>,
+{
+	let pool = TaskExecutor::new();
+	let (context, handle) = make_subsystem_context(pool);
+	let overseer = overseer_factory(handle);
+	let test = test_factory(context);
+
+	let timeout = Delay::new(Duration::from_secs(2));
+
+	futures::pin_mut!(overseer, test, timeout);
+
+	futures::executor::block_on(async move {
+		futures::select! {
+			_ = overseer.fuse() => (),
+			_ = test.fuse() => (),
+			_ = timeout.fuse() => panic!("test timed out instead of completing"),
+		}
+	});
+}
diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs
index cc8cbd239dd..d6998b267fe 100644
--- a/polkadot/node/subsystem-util/src/lib.rs
+++ b/polkadot/node/subsystem-util/src/lib.rs
@@ -189,6 +189,89 @@ specialize_requests! {
 	fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
 }
 
+/// Request some data from the `RuntimeApi` via a SubsystemContext.
+async fn request_from_runtime_ctx<RequestBuilder, Context, Response>(
+	parent: Hash,
+	ctx: &mut Context,
+	request_builder: RequestBuilder,
+) -> Result<RuntimeApiReceiver<Response>, Error>
+where
+	RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
+	Context: SubsystemContext,
+{
+	let (tx, rx) = oneshot::channel();
+
+	ctx
+		.send_message(
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
+				.try_into()
+				.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
+		)
+		.await?;
+
+	Ok(rx)
+}
+
+
+/// Construct specialized request functions for the runtime.
+///
+/// These would otherwise get pretty repetitive.
+macro_rules! specialize_requests_ctx {
+	// expand return type name for documentation purposes
+	(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
+		specialize_requests_ctx!{
+			named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
+		}
+	};
+
+	// create a single specialized request function
+	(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
+		#[doc = "Request `"]
+		#[doc = $doc_name]
+		#[doc = "` from the runtime via a `SubsystemContext`"]
+		pub async fn $func_name<Context: SubsystemContext>(
+			parent: Hash,
+			$(
+				$param_name: $param_ty,
+			)*
+			ctx: &mut Context,
+		) -> Result<RuntimeApiReceiver<$return_ty>, Error> {
+			request_from_runtime_ctx(parent, ctx, |tx| RuntimeApiRequest::$request_variant(
+				$( $param_name, )* tx
+			)).await
+		}
+	};
+
+	// recursive decompose
+	(
+		fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
+		$(
+			fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
+		)+
+	) => {
+		specialize_requests_ctx!{
+			fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
+		}
+		specialize_requests_ctx!{
+			$(
+				fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
+			)+
+		}
+	};
+}
+
+specialize_requests_ctx! {
+	fn request_validators_ctx() -> Vec<ValidatorId>; Validators;
+	fn request_validator_groups_ctx() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
+	fn request_availability_cores_ctx() -> Vec<CoreState>; AvailabilityCores;
+	fn request_global_validation_data_ctx() -> GlobalValidationData; GlobalValidationData;
+	fn request_local_validation_data_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<LocalValidationData>; LocalValidationData;
+	fn request_session_index_for_child_ctx() -> SessionIndex; SessionIndexForChild;
+	fn request_validation_code_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
+	fn request_candidate_pending_availability_ctx(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
+	fn request_candidate_events_ctx() -> Vec<CandidateEvent>; CandidateEvents;
+}
+
 /// From the given set of validators, find the first key we can sign with, if any.
 pub fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<ValidatorPair> {
 	let keystore = keystore.read();
diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs
index 60813124015..ddea2639e1e 100644
--- a/polkadot/node/subsystem/src/messages.rs
+++ b/polkadot/node/subsystem/src/messages.rs
@@ -24,20 +24,19 @@
 
 use futures::channel::{mpsc, oneshot};
 
-use polkadot_primitives::v1::{
-	Hash, CommittedCandidateReceipt, CollatorId,
-	CandidateReceipt, PoV, ErasureChunk, BackedCandidate, Id as ParaId,
-	SignedAvailabilityBitfield, ValidatorId, ValidationCode, ValidatorIndex,
-	CoreAssignment, CoreOccupied, CandidateDescriptor,
-	ValidatorSignature, OmittedValidationData, AvailableData, GroupRotationInfo,
-	CoreState, LocalValidationData, GlobalValidationData, OccupiedCoreAssumption,
-	CandidateEvent, SessionIndex, BlockNumber,
+use polkadot_node_network_protocol::{
+	v1 as protocol_v1, NetworkBridgeEvent, ReputationChange, PeerId, PeerSet,
 };
 use polkadot_node_primitives::{
-	MisbehaviorReport, SignedFullStatement, ValidationResult,
+	CollationGenerationConfig, MisbehaviorReport, SignedFullStatement, ValidationResult,
 };
-use polkadot_node_network_protocol::{
-	v1 as protocol_v1, NetworkBridgeEvent, ReputationChange, PeerId, PeerSet,
+use polkadot_primitives::v1::{
+	AvailableData, BackedCandidate, BlockNumber, CandidateDescriptor, CandidateEvent,
+	CandidateReceipt, CollatorId, CommittedCandidateReceipt,
+	CoreAssignment, CoreOccupied, CoreState, ErasureChunk, GlobalValidationData, GroupRotationInfo,
+	Hash, Id as ParaId, LocalValidationData, OccupiedCoreAssumption, OmittedValidationData, PoV,
+	SessionIndex, SignedAvailabilityBitfield, ValidationCode, ValidatorId, ValidatorIndex,
+	ValidatorSignature,
 };
 use std::sync::Arc;
 
@@ -82,7 +81,6 @@ pub enum CandidateBackingMessage {
 	Statement(Hash, SignedFullStatement),
 }
 
-
 impl CandidateBackingMessage {
 	/// If the current variant contains the relay parent hash, return it.
 	pub fn relay_parent(&self) -> Option<Hash> {
@@ -505,6 +503,20 @@ impl PoVDistributionMessage {
 	}
 }
 
+/// Message to the Collation Generation Subsystem.
+#[derive(Debug)]
+pub enum CollationGenerationMessage {
+	/// Initialize the collation generation subsystem
+	Initialize(CollationGenerationConfig),
+}
+
+impl CollationGenerationMessage {
+	/// If the current variant contains the relay parent hash, return it.
+	pub fn relay_parent(&self) -> Option<Hash> {
+		None
+	}
+}
+
 /// A message type tying together all message types that are used across Subsystems.
 #[derive(Debug)]
 pub enum AllMessages {
@@ -536,4 +548,6 @@ pub enum AllMessages {
 	AvailabilityStore(AvailabilityStoreMessage),
 	/// Message for the network bridge subsystem.
 	NetworkBridge(NetworkBridgeMessage),
+	/// Message for the Collation Generation subsystem
+	CollationGeneration(CollationGenerationMessage),
 }
diff --git a/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md b/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md
index d27ba740850..ab3f80273d6 100644
--- a/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md
+++ b/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md
@@ -4,33 +4,48 @@ The collation generation subsystem is executed on collator nodes and produces ca
 
 ## Protocol
 
-Input: None
+Input: `CollationGenerationMessage`
 
-Output: CollationDistributionMessage
+```rust
+enum CollationGenerationMessage {
+  Initialize(CollationGenerationConfig),
+}
+```
+
+No more than one initialization message should ever be sent to the collation generation subsystem.
+
+Output: `CollationDistributionMessage`
 
 ## Functionality
 
 The process of generating a collation for a parachain is very parachain-specific. As such, the details of how to do so are left beyond the scope of this description. The subsystem should be implemented as an abstract wrapper, which is aware of this configuration:
 
 ```rust
+pub struct Collation {
+  /// Hash of `CandidateCommitments` as understood by the collator.
+  pub commitments_hash: Hash,
+  pub proof_of_validity: PoV,
+}
+
 struct CollationGenerationConfig {
-	key: CollatorPair,
-	collation_producer: Fn(params) -> async (HeadData, Vec<UpwardMessage>, PoV),
+  key: CollatorPair,
+  collator: Box<dyn Fn(&GlobalValidationData, &LocalValidationData) -> Box<dyn Future<Output = Collation>>>
+  para_id: ParaId,
 }
 ```
 
 The configuration should be optional, to allow for the case where the node is not run with the capability to collate.
 
 On `ActiveLeavesUpdate`:
-  * If there is no collation generation config, ignore.
-  * Otherwise, for each `activated` head in the update:
-    * Determine if the para is scheduled or is next up on any occupied core by fetching the `availability_cores` Runtime API.
-    * Determine an occupied core assumption to make about the para. The simplest thing to do is to always assume that if the para occupies a core, that the candidate will become available. Further on, this might be determined based on bitfields seen or validator requests.
-    * Use the Runtime API subsystem to fetch the full validation data.
-	* Construct validation function params based on validation data.
-	* Invoke the `collation_producer`.
-	* Construct a `CommittedCandidateReceipt` using the outputs of the `collation_producer` and signing with the `key`.
-	* Dispatch a [`CollatorProtocolMessage`][CPM]`::DistributeCollation(receipt, pov)`.
+
+* If there is no collation generation config, ignore.
+* Otherwise, for each `activated` head in the update:
+  * Determine if the para is scheduled on any core by fetching the `availability_cores` Runtime API.
+    > TODO: figure out what to do in the case of occupied cores; see [this issue](https://github.com/paritytech/polkadot/issues/1573).
+  * Determine an occupied core assumption to make about the para. Scheduled cores can make `OccupiedCoreAssumption::Free`.
+  * Use the Runtime API subsystem to fetch the full validation data.
+  * Invoke the `collator`, and use its outputs to produce a `CandidateReceipt`, signed with the configuration's `key`.
+  * Dispatch a [`CollatorProtocolMessage`][CPM]`::DistributeCollation(receipt, pov)`.
 
 [CP]: collator-protocol.md
 [CPM]: ../../types/overseer-protocol.md#collatorprotocolmessage
-- 
GitLab