From d1b1c172857b3ad0b4967da6f5ec4c4a877eb804 Mon Sep 17 00:00:00 2001
From: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
Date: Tue, 8 Sep 2020 11:48:48 +0200
Subject: [PATCH] implement candidate selection subsystem (#1645)

* choose the straightforward candidate selection algorithm for now

* add draft implementation of candidate selection

* fix typo in summary

* more properly report misbehaving collators

* describe how CandidateSelection subsystem becomes aware of candidates

* revise candidate selection / collator protocol interaction pattern

* implement rest of candidate selection per the guide

* review: resolve nits

* start writing test suite, harness

* implement first test

* add second test

* implement third test

Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>
---
 polkadot/Cargo.lock                           |  14 +
 polkadot/Cargo.toml                           |   1 +
 .../node/core/candidate-selection/Cargo.toml  |  17 +
 .../node/core/candidate-selection/src/lib.rs  | 669 ++++++++++++++++++
 polkadot/node/subsystem/src/messages.rs       |   3 +
 .../src/node/backing/candidate-selection.md   |  18 +-
 .../src/node/collators/collator-protocol.md   |  32 +-
 .../src/types/overseer-protocol.md            |   2 +
 8 files changed, 739 insertions(+), 17 deletions(-)
 create mode 100644 polkadot/node/core/candidate-selection/Cargo.toml
 create mode 100644 polkadot/node/core/candidate-selection/src/lib.rs

diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index 11cfc862a31..a15f1f36507 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -4928,6 +4928,20 @@ dependencies = [
  "wasm-timer",
 ]
 
+[[package]]
+name = "polkadot-node-core-candidate-selection"
+version = "0.1.0"
+dependencies = [
+ "derive_more 0.99.9",
+ "futures 0.3.5",
+ "log 0.4.11",
+ "polkadot-node-primitives",
+ "polkadot-node-subsystem",
+ "polkadot-node-subsystem-util",
+ "polkadot-primitives",
+ "sp-core",
+]
+
 [[package]]
 name = "polkadot-node-core-candidate-validation"
 version = "0.1.0"
diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml
index 27762f984f0..4e751390102 100644
--- a/polkadot/Cargo.toml
+++ b/polkadot/Cargo.toml
@@ -42,6 +42,7 @@ members = [
 	"node/core/av-store",
 	"node/core/backing",
 	"node/core/bitfield-signing",
+	"node/core/candidate-selection",
 	"node/core/candidate-validation",
 	"node/core/chain-api",
 	"node/core/proposer",
diff --git a/polkadot/node/core/candidate-selection/Cargo.toml b/polkadot/node/core/candidate-selection/Cargo.toml
new file mode 100644
index 00000000000..171e84723eb
--- /dev/null
+++ b/polkadot/node/core/candidate-selection/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "polkadot-node-core-candidate-selection"
+version = "0.1.0"
+authors = ["Parity Technologies <admin@parity.io>"]
+edition = "2018"
+
+[dependencies]
+derive_more = "0.99.9"
+futures = "0.3.5"
+log = "0.4.8"
+polkadot-primitives = { path = "../../../primitives" }
+polkadot-node-primitives = { path = "../../primitives" }
+polkadot-node-subsystem = { path = "../../subsystem" }
+polkadot-node-subsystem-util = { path = "../../subsystem-util" }
+
+[dev-dependencies]
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
diff --git a/polkadot/node/core/candidate-selection/src/lib.rs b/polkadot/node/core/candidate-selection/src/lib.rs
new file mode 100644
index 00000000000..565cc3e9c83
--- /dev/null
+++ b/polkadot/node/core/candidate-selection/src/lib.rs
@@ -0,0 +1,669 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! The provisioner is responsible for assembling a relay chain block
+//! from a set of available parachain candidates of its choice.
+
+#![deny(missing_docs)]
+
+use futures::{
+	channel::{mpsc, oneshot},
+	prelude::*,
+};
+use polkadot_node_primitives::ValidationResult;
+use polkadot_node_subsystem::{
+	errors::{ChainApiError, RuntimeApiError},
+	messages::{
+		AllMessages, CandidateBackingMessage, CandidateSelectionMessage,
+		CandidateValidationMessage, CollatorProtocolMessage,
+	},
+	metrics::{self, prometheus},
+};
+use polkadot_node_subsystem_util::{self as util, delegated_subsystem, JobTrait, ToJobTrait};
+use polkadot_primitives::v1::{
+	CandidateDescriptor, CandidateReceipt, CollatorId, Hash, Id as ParaId, PoV,
+};
+use std::{convert::TryFrom, pin::Pin, sync::Arc};
+
+const TARGET: &'static str = "candidate_selection";
+
+struct CandidateSelectionJob {
+	sender: mpsc::Sender<FromJob>,
+	receiver: mpsc::Receiver<ToJob>,
+	metrics: Metrics,
+	seconded_candidate: Option<CollatorId>,
+}
+
+/// This enum defines the messages that the provisioner is prepared to receive.
+#[derive(Debug)]
+pub enum ToJob {
+	/// The provisioner message is the main input to the provisioner.
+	CandidateSelection(CandidateSelectionMessage),
+	/// This message indicates that the provisioner should shut itself down.
+	Stop,
+}
+
+impl ToJobTrait for ToJob {
+	const STOP: Self = Self::Stop;
+
+	fn relay_parent(&self) -> Option<Hash> {
+		match self {
+			Self::CandidateSelection(csm) => csm.relay_parent(),
+			Self::Stop => None,
+		}
+	}
+}
+
+impl TryFrom<AllMessages> for ToJob {
+	type Error = ();
+
+	fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
+		match msg {
+			AllMessages::CandidateSelection(csm) => Ok(Self::CandidateSelection(csm)),
+			_ => Err(()),
+		}
+	}
+}
+
+impl From<CandidateSelectionMessage> for ToJob {
+	fn from(csm: CandidateSelectionMessage) -> Self {
+		Self::CandidateSelection(csm)
+	}
+}
+
+#[derive(Debug)]
+enum FromJob {
+	Validation(CandidateValidationMessage),
+	Backing(CandidateBackingMessage),
+	Collator(CollatorProtocolMessage),
+}
+
+impl From<FromJob> for AllMessages {
+	fn from(from_job: FromJob) -> AllMessages {
+		match from_job {
+			FromJob::Validation(msg) => AllMessages::CandidateValidation(msg),
+			FromJob::Backing(msg) => AllMessages::CandidateBacking(msg),
+			FromJob::Collator(msg) => AllMessages::CollatorProtocol(msg),
+		}
+	}
+}
+
+impl TryFrom<AllMessages> for FromJob {
+	type Error = ();
+
+	fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
+		match msg {
+			AllMessages::CandidateValidation(msg) => Ok(FromJob::Validation(msg)),
+			AllMessages::CandidateBacking(msg) => Ok(FromJob::Backing(msg)),
+			AllMessages::CollatorProtocol(msg) => Ok(FromJob::Collator(msg)),
+			_ => Err(()),
+		}
+	}
+}
+
+#[derive(Debug, derive_more::From)]
+enum Error {
+	#[from]
+	Sending(mpsc::SendError),
+	#[from]
+	Util(util::Error),
+	#[from]
+	OneshotRecv(oneshot::Canceled),
+	#[from]
+	ChainApi(ChainApiError),
+	#[from]
+	Runtime(RuntimeApiError),
+}
+
+impl JobTrait for CandidateSelectionJob {
+	type ToJob = ToJob;
+	type FromJob = FromJob;
+	type Error = Error;
+	type RunArgs = ();
+	type Metrics = Metrics;
+
+	const NAME: &'static str = "CandidateSelectionJob";
+
+	/// Run a job for the parent block indicated
+	//
+	// this function is in charge of creating and executing the job's main loop
+	fn run(
+		_relay_parent: Hash,
+		_run_args: Self::RunArgs,
+		metrics: Self::Metrics,
+		receiver: mpsc::Receiver<ToJob>,
+		sender: mpsc::Sender<FromJob>,
+	) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
+		async move {
+			let job = CandidateSelectionJob::new(metrics, sender, receiver);
+
+			// it isn't necessary to break run_loop into its own function,
+			// but it's convenient to separate the concerns in this way
+			job.run_loop().await
+		}
+		.boxed()
+	}
+}
+
+impl CandidateSelectionJob {
+	pub fn new(
+		metrics: Metrics,
+		sender: mpsc::Sender<FromJob>,
+		receiver: mpsc::Receiver<ToJob>,
+	) -> Self {
+		Self {
+			sender,
+			receiver,
+			metrics,
+			seconded_candidate: None,
+		}
+	}
+
+	async fn run_loop(mut self) -> Result<(), Error> {
+		self.run_loop_borrowed().await
+	}
+
+	/// this function exists for testing and should not generally be used; use `run_loop` instead.
+	async fn run_loop_borrowed(&mut self) -> Result<(), Error> {
+		while let Some(msg) = self.receiver.next().await {
+			match msg {
+				ToJob::CandidateSelection(CandidateSelectionMessage::Collation(
+					relay_parent,
+					para_id,
+					collator_id,
+				)) => {
+					self.handle_collation(relay_parent, para_id, collator_id)
+						.await;
+				}
+				ToJob::CandidateSelection(CandidateSelectionMessage::Invalid(
+					_,
+					candidate_receipt,
+				)) => {
+					self.handle_invalid(candidate_receipt).await;
+				}
+				ToJob::Stop => break,
+			}
+		}
+
+		// closing the sender here means that we don't deadlock in tests
+		self.sender.close_channel();
+
+		Ok(())
+	}
+
+	async fn handle_collation(
+		&mut self,
+		relay_parent: Hash,
+		para_id: ParaId,
+		collator_id: CollatorId,
+	) {
+		if self.seconded_candidate.is_none() {
+			let (candidate_receipt, pov) =
+				match get_collation(relay_parent, para_id, self.sender.clone()).await {
+					Ok(response) => response,
+					Err(err) => {
+						log::warn!(
+							target: TARGET,
+							"failed to get collation from collator protocol subsystem: {:?}",
+							err
+						);
+						return;
+					}
+				};
+
+			let pov = Arc::new(pov);
+
+			if !candidate_is_valid(
+				candidate_receipt.descriptor.clone(),
+				pov.clone(),
+				self.sender.clone(),
+			)
+			.await
+			{
+				return;
+			}
+
+			let pov = if let Ok(pov) = Arc::try_unwrap(pov) {
+				pov
+			} else {
+				log::warn!(target: TARGET, "Arc unwrapping is expected to succeed, the other fns should have already run to completion by now.");
+				return;
+			};
+
+			match second_candidate(
+				relay_parent,
+				candidate_receipt,
+				pov,
+				&mut self.sender,
+				&self.metrics,
+			)
+			.await
+			{
+				Err(err) => log::warn!(target: TARGET, "failed to second a candidate: {:?}", err),
+				Ok(()) => self.seconded_candidate = Some(collator_id),
+			}
+		}
+	}
+
+	async fn handle_invalid(&mut self, candidate_receipt: CandidateReceipt) {
+		let received_from = match &self.seconded_candidate {
+			Some(peer) => peer,
+			None => {
+				log::warn!(
+					target: TARGET,
+					"received invalidity notice for a candidate we don't remember seconding"
+				);
+				return;
+			}
+		};
+		log::info!(
+			target: TARGET,
+			"received invalidity note for candidate {:?}",
+			candidate_receipt
+		);
+
+		let succeeded =
+			if let Err(err) = forward_invalidity_note(received_from, &mut self.sender).await {
+				log::warn!(
+					target: TARGET,
+					"failed to forward invalidity note: {:?}",
+					err
+				);
+				false
+			} else {
+				true
+			};
+		self.metrics.on_invalid_selection(succeeded);
+	}
+}
+
+// get a collation from the Collator Protocol subsystem
+//
+// note that this gets an owned clone of the sender; that's becuase unlike `forward_invalidity_note`, it's expected to take a while longer
+async fn get_collation(
+	relay_parent: Hash,
+	para_id: ParaId,
+	mut sender: mpsc::Sender<FromJob>,
+) -> Result<(CandidateReceipt, PoV), Error> {
+	let (tx, rx) = oneshot::channel();
+	sender
+		.send(FromJob::Collator(CollatorProtocolMessage::FetchCollation(
+			relay_parent,
+			para_id,
+			tx,
+		)))
+		.await?;
+	rx.await.map_err(Into::into)
+}
+
+// find out whether a candidate is valid or not
+async fn candidate_is_valid(
+	candidate_descriptor: CandidateDescriptor,
+	pov: Arc<PoV>,
+	sender: mpsc::Sender<FromJob>,
+) -> bool {
+	std::matches!(
+		candidate_is_valid_inner(candidate_descriptor, pov, sender).await,
+		Ok(true)
+	)
+}
+
+// find out whether a candidate is valid or not, with a worse interface
+// the external interface is worse, but the internal implementation is easier
+async fn candidate_is_valid_inner(
+	candidate_descriptor: CandidateDescriptor,
+	pov: Arc<PoV>,
+	mut sender: mpsc::Sender<FromJob>,
+) -> Result<bool, Error> {
+	let (tx, rx) = oneshot::channel();
+	sender
+		.send(FromJob::Validation(
+			CandidateValidationMessage::ValidateFromChainState(candidate_descriptor, pov, tx),
+		))
+		.await?;
+	Ok(std::matches!(rx.await, Ok(Ok(ValidationResult::Valid(_)))))
+}
+
+async fn second_candidate(
+	relay_parent: Hash,
+	candidate_receipt: CandidateReceipt,
+	pov: PoV,
+	sender: &mut mpsc::Sender<FromJob>,
+	metrics: &Metrics,
+) -> Result<(), Error> {
+	match sender
+		.send(FromJob::Backing(CandidateBackingMessage::Second(
+			relay_parent,
+			candidate_receipt,
+			pov,
+		)))
+		.await
+	{
+		Err(err) => {
+			log::warn!(target: TARGET, "failed to send a seconding message");
+			metrics.on_second(false);
+			Err(err.into())
+		}
+		Ok(_) => {
+			metrics.on_second(true);
+			Ok(())
+		}
+	}
+}
+
+async fn forward_invalidity_note(
+	received_from: &CollatorId,
+	sender: &mut mpsc::Sender<FromJob>,
+) -> Result<(), Error> {
+	sender
+		.send(FromJob::Collator(CollatorProtocolMessage::ReportCollator(
+			received_from.clone(),
+		)))
+		.await
+		.map_err(Into::into)
+}
+
+#[derive(Clone)]
+struct MetricsInner {
+	seconds: prometheus::CounterVec<prometheus::U64>,
+	invalid_selections: prometheus::CounterVec<prometheus::U64>,
+}
+
+/// Candidate backing metrics.
+#[derive(Default, Clone)]
+pub struct Metrics(Option<MetricsInner>);
+
+impl Metrics {
+	fn on_second(&self, succeeded: bool) {
+		if let Some(metrics) = &self.0 {
+			let label = if succeeded { "succeeded" } else { "failed" };
+			metrics.seconds.with_label_values(&[label]).inc();
+		}
+	}
+
+	fn on_invalid_selection(&self, succeeded: bool) {
+		if let Some(metrics) = &self.0 {
+			let label = if succeeded { "succeeded" } else { "failed" };
+			metrics.invalid_selections.with_label_values(&[label]).inc();
+		}
+	}
+}
+
+impl metrics::Metrics for Metrics {
+	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
+		let metrics = MetricsInner {
+			seconds: prometheus::register(
+				prometheus::CounterVec::new(
+					prometheus::Opts::new(
+						"candidate_selection_invalid_selections_total",
+						"Number of Candidate Selection subsystem seconding selections which proved to be invalid.",
+					),
+					&["succeeded", "failed"],
+				)?,
+				registry,
+			)?,
+			invalid_selections: prometheus::register(
+				prometheus::CounterVec::new(
+					prometheus::Opts::new(
+						"candidate_selection_invalid_selections_total",
+						"Number of Candidate Selection subsystem seconding selections which proved to be invalid.",
+					),
+					&["succeeded", "failed"],
+				)?,
+				registry,
+			)?,
+		};
+		Ok(Metrics(Some(metrics)))
+	}
+}
+
+delegated_subsystem!(CandidateSelectionJob((), Metrics) <- ToJob as CandidateSelectionSubsystem);
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use futures::lock::Mutex;
+	use polkadot_node_primitives::ValidationOutputs;
+	use polkadot_primitives::v1::{BlockData, HeadData, PersistedValidationData};
+	use sp_core::crypto::Public;
+
+	fn test_harness<Preconditions, TestBuilder, Test, Postconditions>(
+		preconditions: Preconditions,
+		test: TestBuilder,
+		postconditions: Postconditions,
+	) where
+		Preconditions: FnOnce(&mut CandidateSelectionJob),
+		TestBuilder: FnOnce(mpsc::Sender<ToJob>, mpsc::Receiver<FromJob>) -> Test,
+		Test: Future<Output = ()>,
+		Postconditions: FnOnce(CandidateSelectionJob, Result<(), Error>),
+	{
+		let (to_job_tx, to_job_rx) = mpsc::channel(0);
+		let (from_job_tx, from_job_rx) = mpsc::channel(0);
+		let mut job = CandidateSelectionJob {
+			sender: from_job_tx,
+			receiver: to_job_rx,
+			metrics: Default::default(),
+			seconded_candidate: None,
+		};
+
+		preconditions(&mut job);
+
+		let (_, job_result) = futures::executor::block_on(future::join(
+			test(to_job_tx, from_job_rx),
+			job.run_loop_borrowed(),
+		));
+
+		postconditions(job, job_result);
+	}
+
+	fn default_validation_outputs() -> ValidationOutputs {
+		let head_data: Vec<u8> = (0..32).rev().cycle().take(256).collect();
+		let parent_head_data = head_data
+			.iter()
+			.copied()
+			.map(|x| x.saturating_sub(1))
+			.collect();
+
+		ValidationOutputs {
+			head_data: HeadData(head_data),
+			validation_data: PersistedValidationData {
+				parent_head: HeadData(parent_head_data),
+				block_number: 123,
+				hrmp_mqc_heads: Vec::new(),
+			},
+			upward_messages: Vec::new(),
+			fees: 0,
+			new_validation_code: None,
+		}
+	}
+
+	/// when nothing is seconded so far, the collation is fetched and seconded
+	#[test]
+	fn fetches_and_seconds_a_collation() {
+		let relay_parent = Hash::random();
+		let para_id: ParaId = 123.into();
+		let collator_id = CollatorId::from_slice(&(0..32).collect::<Vec<u8>>());
+		let collator_id_clone = collator_id.clone();
+
+		let candidate_receipt = CandidateReceipt::default();
+		let pov = PoV {
+			block_data: BlockData((0..32).cycle().take(256).collect()),
+		};
+
+		let was_seconded = Arc::new(Mutex::new(false));
+		let was_seconded_clone = was_seconded.clone();
+
+		test_harness(
+			|_job| {},
+			|mut to_job, mut from_job| async move {
+				to_job
+					.send(ToJob::CandidateSelection(
+						CandidateSelectionMessage::Collation(
+							relay_parent,
+							para_id,
+							collator_id_clone,
+						),
+					))
+					.await
+					.unwrap();
+				std::mem::drop(to_job);
+
+				while let Some(msg) = from_job.next().await {
+					match msg {
+						FromJob::Collator(CollatorProtocolMessage::FetchCollation(
+							got_relay_parent,
+							got_para_id,
+							return_sender,
+						)) => {
+							assert_eq!(got_relay_parent, relay_parent);
+							assert_eq!(got_para_id, para_id);
+
+							return_sender
+								.send((candidate_receipt.clone(), pov.clone()))
+								.unwrap();
+						}
+						FromJob::Validation(
+							CandidateValidationMessage::ValidateFromChainState(
+								got_candidate_descriptor,
+								got_pov,
+								return_sender,
+							),
+						) => {
+							assert_eq!(got_candidate_descriptor, candidate_receipt.descriptor);
+							assert_eq!(got_pov.as_ref(), &pov);
+
+							return_sender
+								.send(Ok(ValidationResult::Valid(default_validation_outputs())))
+								.unwrap();
+						}
+						FromJob::Backing(CandidateBackingMessage::Second(
+							got_relay_parent,
+							got_candidate_receipt,
+							got_pov,
+						)) => {
+							assert_eq!(got_relay_parent, relay_parent);
+							assert_eq!(got_candidate_receipt, candidate_receipt);
+							assert_eq!(got_pov, pov);
+
+							*was_seconded_clone.lock().await = true;
+						}
+						other => panic!("unexpected message from job: {:?}", other),
+					}
+				}
+			},
+			|job, job_result| {
+				assert!(job_result.is_ok());
+				assert_eq!(job.seconded_candidate.unwrap(), collator_id);
+			},
+		);
+
+		assert!(Arc::try_unwrap(was_seconded).unwrap().into_inner());
+	}
+
+	/// when something has been seconded, further collation notifications are ignored
+	#[test]
+	fn ignores_collation_notifications_after_the_first() {
+		let relay_parent = Hash::random();
+		let para_id: ParaId = 123.into();
+		let prev_collator_id = CollatorId::from_slice(&(0..32).rev().collect::<Vec<u8>>());
+		let collator_id = CollatorId::from_slice(&(0..32).collect::<Vec<u8>>());
+		let collator_id_clone = collator_id.clone();
+
+		let was_seconded = Arc::new(Mutex::new(false));
+		let was_seconded_clone = was_seconded.clone();
+
+		test_harness(
+			|job| job.seconded_candidate = Some(prev_collator_id.clone()),
+			|mut to_job, mut from_job| async move {
+				to_job
+					.send(ToJob::CandidateSelection(
+						CandidateSelectionMessage::Collation(
+							relay_parent,
+							para_id,
+							collator_id_clone,
+						),
+					))
+					.await
+					.unwrap();
+				std::mem::drop(to_job);
+
+				while let Some(msg) = from_job.next().await {
+					match msg {
+						FromJob::Backing(CandidateBackingMessage::Second(
+							_got_relay_parent,
+							_got_candidate_receipt,
+							_got_pov,
+						)) => {
+							*was_seconded_clone.lock().await = true;
+						}
+						other => panic!("unexpected message from job: {:?}", other),
+					}
+				}
+			},
+			|job, job_result| {
+				assert!(job_result.is_ok());
+				assert_eq!(job.seconded_candidate.unwrap(), prev_collator_id);
+			},
+		);
+
+		assert!(!Arc::try_unwrap(was_seconded).unwrap().into_inner());
+	}
+
+	/// reports of invalidity from candidate backing are propagated
+	#[test]
+	fn propagates_invalidity_reports() {
+		let relay_parent = Hash::random();
+		let collator_id = CollatorId::from_slice(&(0..32).collect::<Vec<u8>>());
+		let collator_id_clone = collator_id.clone();
+
+		let candidate_receipt = CandidateReceipt::default();
+
+		let sent_report = Arc::new(Mutex::new(false));
+		let sent_report_clone = sent_report.clone();
+
+		test_harness(
+			|job| job.seconded_candidate = Some(collator_id.clone()),
+			|mut to_job, mut from_job| async move {
+				to_job
+					.send(ToJob::CandidateSelection(
+						CandidateSelectionMessage::Invalid(relay_parent, candidate_receipt),
+					))
+					.await
+					.unwrap();
+				std::mem::drop(to_job);
+
+				while let Some(msg) = from_job.next().await {
+					match msg {
+						FromJob::Collator(CollatorProtocolMessage::ReportCollator(
+							got_collator_id,
+						)) => {
+							assert_eq!(got_collator_id, collator_id_clone);
+
+							*sent_report_clone.lock().await = true;
+						}
+						other => panic!("unexpected message from job: {:?}", other),
+					}
+				}
+			},
+			|job, job_result| {
+				assert!(job_result.is_ok());
+				assert_eq!(job.seconded_candidate.unwrap(), collator_id);
+			},
+		);
+
+		assert!(Arc::try_unwrap(sent_report).unwrap().into_inner());
+	}
+}
diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs
index 39f3dc923f7..afc16678676 100644
--- a/polkadot/node/subsystem/src/messages.rs
+++ b/polkadot/node/subsystem/src/messages.rs
@@ -47,6 +47,8 @@ pub struct NewBackedCandidate(pub BackedCandidate);
 /// Messages received by the Candidate Selection subsystem.
 #[derive(Debug)]
 pub enum CandidateSelectionMessage {
+	/// A candidate collation can be fetched from a collator and should be considered for seconding.
+	Collation(Hash, ParaId, CollatorId),
 	/// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator.
 	/// The hash is the relay parent.
 	Invalid(Hash, CandidateReceipt),
@@ -56,6 +58,7 @@ impl CandidateSelectionMessage {
 	/// If the current variant contains the relay parent hash, return it.
 	pub fn relay_parent(&self) -> Option<Hash> {
 		match self {
+			Self::Collation(hash, ..) => Some(*hash),
 			Self::Invalid(hash, _) => Some(*hash),
 		}
 	}
diff --git a/polkadot/roadmap/implementers-guide/src/node/backing/candidate-selection.md b/polkadot/roadmap/implementers-guide/src/node/backing/candidate-selection.md
index 103146d1618..db441b7f7ed 100644
--- a/polkadot/roadmap/implementers-guide/src/node/backing/candidate-selection.md
+++ b/polkadot/roadmap/implementers-guide/src/node/backing/candidate-selection.md
@@ -24,16 +24,18 @@ Output:
 
 Overarching network protocol + job for every relay-parent
 
-> TODO The Candidate Selection network protocol is currently intentionally unspecified pending further discussion.
-
-Several approaches have been selected, but all have some issues:
-
-- The most straightforward approach is for this subsystem to simply second the first valid parablock candidate which it sees per relay head. However, that protocol is vulnerable to a single collator which, as an attack or simply through chance, gets its block candidate to the node more often than its fair share of the time.
-- It may be possible to do some BABE-like selection algorithm to choose an "Official" collator for the round, but that is tricky because the collator which produces the PoV does not necessarily actually produce the block.
-- We could use relay-chain BABE randomness to generate some delay `D` on the order of 1 second, +- 1 second. The collator would then second the first valid parablock which arrives after `D`, or in case none has arrived by `2*D`, the last valid parablock which has arrived. This makes it very hard for a collator to game the system to always get its block nominated, but it reduces the maximum throughput of the system by introducing delay into an already tight schedule.
-- A variation of that scheme would be to randomly choose a number `I`, and have a fixed acceptance window `D` for parablock candidates. At the end of the period `D`, count `C`: the number of parablock candidates received. Second the one with index `I % C`. Its drawback is the same: it must wait the full `D` period before seconding any of its received candidates, reducing throughput.
+For the moment, the candidate selection algorithm is simply to second the first valid parablock candidate per relay head. See [Future Work](#future-work).
 
 ## Candidate Selection Job
 
 - Aware of validator key and assignment
 - One job for each relay-parent, which selects up to one collation for the Candidate Backing Subsystem
+
+## Future Work
+
+Several approaches have been discussed, but all have some issues:
+
+- The current approach is very straightforward. However, that protocol is vulnerable to a single collator which, as an attack or simply through chance, gets its block candidate to the node more often than its fair share of the time.
+- It may be possible to do some BABE-like selection algorithm to choose an "Official" collator for the round, but that is tricky because the collator which produces the PoV does not necessarily actually produce the block.
+- We could use relay-chain BABE randomness to generate some delay `D` on the order of 1 second, +- 1 second. The collator would then second the first valid parablock which arrives after `D`, or in case none has arrived by `2*D`, the last valid parablock which has arrived. This makes it very hard for a collator to game the system to always get its block nominated, but it reduces the maximum throughput of the system by introducing delay into an already tight schedule.
+- A variation of that scheme would be to randomly choose a number `I`, and have a fixed acceptance window `D` for parablock candidates. At the end of the period `D`, count `C`: the number of parablock candidates received. Second the one with index `I % C`. Its drawback is the same: it must wait the full `D` period before seconding any of its received candidates, reducing throughput.
diff --git a/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md b/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md
index 8863c3e23d7..1a8392aaa75 100644
--- a/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md
+++ b/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md
@@ -10,15 +10,17 @@ Validation of candidates is a heavy task, and furthermore, the [`PoV`][PoV] itse
 
 > TODO: note the incremental validation function Ximin proposes at https://github.com/paritytech/polkadot/issues/1348
 
-As this network protocol serves as a bridge between collators and validators, it communicates primarily with one subsystem on behalf of each. As a collator, this will receive messages from the [`CollationGeneration`][CG] subsystem. As a validator, this will communicate with the [`CandidateBacking`][CB] subsystem.
+As this network protocol serves as a bridge between collators and validators, it communicates primarily with one subsystem on behalf of each. As a collator, this will receive messages from the [`CollationGeneration`][CG] subsystem. As a validator, this will communicate with the [`CandidateBacking`][CB] and [`CandidateSelection`][CS] subsystems.
 
 ## Protocol
 
 Input: [`CollatorProtocolMessage`][CPM]
 
 Output:
-  - [`RuntimeApiMessage`][RAM]
-  - [`NetworkBridgeMessage`][NBM]
+
+- [`RuntimeApiMessage`][RAM]
+- [`NetworkBridgeMessage`][NBM]
+- [`CandidateSelectionMessage`][CSM]
 
 ## Functionality
 
@@ -102,18 +104,30 @@ When peers connect to us, they can `Declare` that they represent a collator with
 
 The protocol tracks advertisements received and the source of the advertisement. The advertisement source is the `PeerId` of the peer who sent the message. We accept one advertisement per collator per source per relay-parent.
 
+
 As a validator, we will handle requests from other subsystems to fetch a collation on a specific `ParaId` and relay-parent. These requests are made with the [`CollatorProtocolMessage`][CPM]`::FetchCollation`. To do so, we need to first check if we have already gathered a collation on that `ParaId` and relay-parent. If not, we need to select one of the advertisements and issue a request for it. If we've already issued a request, we shouldn't issue another one until the first has returned.
 
 When acting on an advertisement, we issue a `WireMessage::RequestCollation`. If the request times out, we need to note the collator as being unreliable and reduce its priority relative to other collators. And then make another request - repeat until we get a response or the chain has moved on.
 
 As a validator, once the collation has been fetched some other subsystem will inspect and do deeper validation of the collation. The subsystem will report to this subsystem with a [`CollatorProtocolMessage`][CPM]`::ReportCollator` or `NoteGoodCollation` message. In that case, if we are connected directly to the collator, we apply a cost to the `PeerId` associated with the collator and potentially disconnect or blacklist it.
 
-[PoV]: ../../types/availability.md#proofofvalidity
-[CPM]: ../../types/overseer-protocol.md#collatorprotocolmessage
-[CG]: collation-generation.md
+### Interaction with [Candidate Selection][CS]
+
+As collators advertise the availability, we notify the Candidate Selection subsystem with a [`CandidateSelection`][CSM]`::Collation` message. Note that this message is lightweight: it only contains the relay parent, para id, and collator id.
+
+At that point, the Candidate Selection algorithm is free to use an arbitrary algorithm to determine which if any of these messages to follow up on. It is expected to use the [`CollatorProtocolMessage`][CPM]`::FetchCollation` message to follow up.
+
+The intent behind this design is to minimize the total number of (large) collations which must be transmitted.
+
+
 [CB]: ../backing/candidate-backing.md
+[CBM]: ../../types/overseer-protocol.md#candidate-backing-mesage
+[CG]: collation-generation.md
+[CPM]: ../../types/overseer-protocol.md#collator-protocol-message
+[CS]: ../backing/candidate-selection.md
+[CSM]: ../../types/overseer-protocol.md#candidate-selection-message
 [NB]: ../utility/network-bridge.md
-[CBM]: ../../types/overseer-protocol.md#candidatebackingmesage
-[RAM]: ../../types/overseer-protocol.md#runtimeapimessage
-[NBM]: ../../types/overseer-protocol.md#networkbridgemessage
+[NBM]: ../../types/overseer-protocol.md#network-bridge-message
+[PoV]: ../../types/availability.md#proofofvalidity
+[RAM]: ../../types/overseer-protocol.md#runtime-api-message
 [SCH]: ../../runtime/scheduler.md
diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md
index 0fdd871e274..3af9d0cac4f 100644
--- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md
+++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md
@@ -124,6 +124,8 @@ These messages are sent to the [Candidate Selection subsystem](../node/backing/c
 
 ```rust
 enum CandidateSelectionMessage {
+  /// A candidate collation can be fetched from a collator and should be considered for seconding.
+  Collation(RelayParent, ParaId, CollatorId),
   /// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator.
   Invalid(CandidateReceipt),
 }
-- 
GitLab