From fb2e0c0ca96a04b78f04589791ff2de738510ad7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bastian=20K=C3=B6cher?= <bkchr@users.noreply.github.com>
Date: Mon, 20 Apr 2020 21:58:36 +0200
Subject: [PATCH] Make sure collators join the validation network (#1010)

Collators need to join the validation network to tell its connected
relay chain peers the leaf they listen on. This is required to make the
Parachain validator send the signed statement to the collators as well.
---
 polkadot/collator/src/lib.rs         | 20 +++++++++++---------
 polkadot/network/src/protocol/mod.rs |  3 +--
 polkadot/service/src/lib.rs          | 19 +++++++++++++++++--
 3 files changed, 29 insertions(+), 13 deletions(-)

diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs
index 0510a7f8106..d2fcf0b861a 100644
--- a/polkadot/collator/src/lib.rs
+++ b/polkadot/collator/src/lib.rs
@@ -83,12 +83,12 @@ pub trait Network: Send + Sync {
 	/// The returned stream will not terminate, so it is required to make sure that the stream is
 	/// dropped when it is not required anymore. Otherwise, it will stick around in memory
 	/// infinitely.
-	fn checked_statements(&self, relay_parent: Hash) -> Pin<Box<dyn Stream<Item=SignedStatement>>>;
+	fn checked_statements(&self, relay_parent: Hash) -> Pin<Box<dyn Stream<Item=SignedStatement> + Send>>;
 }
 
 impl Network for polkadot_network::protocol::Service {
-	fn checked_statements(&self, relay_parent: Hash) -> Pin<Box<dyn Stream<Item=SignedStatement>>> {
-		polkadot_network::protocol::Service::checked_statements(self, relay_parent)
+	fn checked_statements(&self, relay_parent: Hash) -> Pin<Box<dyn Stream<Item=SignedStatement> + Send>> {
+		polkadot_network::protocol::Service::checked_statements(self, relay_parent).boxed()
 	}
 }
 
@@ -241,12 +241,14 @@ fn build_collator_service<S, P, Extrinsic>(
 	let (service, handles) = service;
 	let spawner = service.spawn_task_handle();
 
-	let polkadot_network = match handles.polkadot_network {
-		None => return Err(
-			"Collator cannot run when Polkadot-specific networking has not been started".into()
-		),
-		Some(n) => n,
-	};
+	let polkadot_network = handles.polkadot_network
+		.ok_or_else(|| "Collator cannot run when Polkadot-specific networking has not been started")?;
+
+	// We don't require this here, but we need to make sure that the validation service is started.
+	// This service makes sure the collator is joining the correct gossip topics and receives the appropiate
+	// messages.
+	handles.validation_service_handle
+		.ok_or_else(|| "Collator cannot run when validation networking has not been started")?;
 
 	let client = service.client();
 
diff --git a/polkadot/network/src/protocol/mod.rs b/polkadot/network/src/protocol/mod.rs
index 7368dc338aa..d9257650eb8 100644
--- a/polkadot/network/src/protocol/mod.rs
+++ b/polkadot/network/src/protocol/mod.rs
@@ -1375,7 +1375,7 @@ impl<N: NetworkServiceOps> Service<N> {
 	/// Take care to drop the stream, as the sending side will not be cleaned
 	/// up until it is.
 	pub fn checked_statements(&self, relay_parent: Hash)
-		-> Pin<Box<dyn Stream<Item = SignedStatement>>> {
+		-> impl Stream<Item = SignedStatement> + Send {
 		let (tx, rx) = oneshot::channel();
 		let mut sender = self.sender.clone();
 
@@ -1400,7 +1400,6 @@ impl<N: NetworkServiceOps> Service<N> {
 				}
 			})
 			.flatten_stream()
-			.boxed()
 	}
 }
 
diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs
index ca9af8e329d..f27f19e1c72 100644
--- a/polkadot/service/src/lib.rs
+++ b/polkadot/service/src/lib.rs
@@ -320,9 +320,12 @@ pub fn westend_new_full(
 /// Handles to other sub-services that full nodes instantiate, which consumers
 /// of the node may use.
 #[cfg(feature = "full-node")]
+#[derive(Default)]
 pub struct FullNodeHandles {
 	/// A handle to the Polkadot networking protocol.
 	pub polkadot_network: Option<network_protocol::Service>,
+	/// A handle to the validation service.
+	pub validation_service_handle: Option<consensus::ServiceHandle>,
 }
 
 /// Builds a new service for a full client.
@@ -389,7 +392,7 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(
 	let client = service.client();
 	let known_oracle = client.clone();
 
-	let mut handles = FullNodeHandles { polkadot_network: None };
+	let mut handles = FullNodeHandles::default();
 	let select_chain = if let Some(select_chain) = service.select_chain() {
 		select_chain
 	} else {
@@ -427,7 +430,7 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(
 		service.spawn_task_handle(),
 	).map_err(|e| format!("Could not spawn network worker: {:?}", e))?;
 
-	if let Role::Authority { .. } = &role {
+	let authority_handles = if is_collator || role.is_authority() {
 		let availability_store = {
 			use std::path::PathBuf;
 
@@ -464,6 +467,18 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(
 
 		service.spawn_essential_task("validation-service", Box::pin(validation_service));
 
+		handles.validation_service_handle = Some(validation_service_handle.clone());
+
+		Some((validation_service_handle, availability_store))
+	} else {
+		None
+	};
+
+	if role.is_authority() {
+		let (validation_service_handle, availability_store) = authority_handles
+			.clone()
+			.expect("Authority handles are set for authority nodes; qed");
+
 		let proposer = consensus::ProposerFactory::new(
 			client.clone(),
 			service.transaction_pool(),
-- 
GitLab