From 0b39ae4390f6a12cf9f3b0e4aea27c35019b97ee Mon Sep 17 00:00:00 2001
From: Robert Habermeier <rphmeier@gmail.com>
Date: Fri, 18 Feb 2022 05:17:28 -0600
Subject: [PATCH] Revert "collator-protocol: short-term fixes for connectivity
 (#4640)" (#4914)

* Revert "collator-protocol: fix wrong warning (#4909)"

This reverts commit 128421b5dd11ad85c9d1b0fc9da8c77aa9e48454.

* Revert "collator-protocol: short-term fixes for connectivity (#4640)"

This reverts commit aff88a864af7bd09e0a642cc9f21ba3ff3d99578.

* make the slots great again

Co-authored-by: Andronik <write@reusable.software>
---
 .../src/collator_side/mod.rs                  | 130 +++++---------
 .../src/collator_side/tests.rs                | 159 +++++++++---------
 .../node/network/collator-protocol/src/lib.rs |   2 +-
 3 files changed, 126 insertions(+), 165 deletions(-)

diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
index b1f954b4ccb..6834d6ffa4b 100644
--- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
+++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
@@ -321,14 +321,15 @@ impl State {
 
 /// Distribute a collation.
 ///
-/// If the para is not scheduled on any core, at the relay parent,
-/// or the relay parent isn't in our view or we already collated on the relay parent,
-/// we ignore the message as it must be invalid in that case -
-/// although this indicates a logic error elsewhere in the node.
-///
-/// Otherwise, start advertising the collation to interested peers.
+/// Figure out the core our para is assigned to and the relevant validators.
+/// Issue a connection request to these validators.
+/// If the para is not scheduled or next up on any core, at the relay-parent,
+/// or the relay-parent isn't in the active-leaves set, we ignore the message
+/// as it must be invalid in that case - although this indicates a logic error
+/// elsewhere in the node.
 async fn distribute_collation<Context>(
 	ctx: &mut Context,
+	runtime: &mut RuntimeInfo,
 	state: &mut State,
 	id: ParaId,
 	receipt: CandidateReceipt,
@@ -357,8 +358,32 @@ where
 		return Ok(())
 	}
 
-	if !state.our_validators_groups.contains_key(&relay_parent) {
-		tracing::warn!(target: LOG_TARGET, "Could not determine validators assigned to the core.");
+	// Determine which core the para collated-on is assigned to.
+	// If it is not scheduled then ignore the message.
+	let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? {
+		Some(core) => core,
+		None => {
+			tracing::warn!(
+				target: LOG_TARGET,
+				para_id = %id,
+				?relay_parent,
+				"looks like no core is assigned to {} at {}", id, relay_parent,
+			);
+
+			return Ok(())
+		},
+	};
+
+	// Determine the group on that core.
+	let current_validators =
+		determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;
+
+	if current_validators.validators.is_empty() {
+		tracing::warn!(
+			target: LOG_TARGET,
+			core = ?our_core,
+			"there are no validators assigned to core",
+		);
 
 		return Ok(())
 	}
@@ -369,9 +394,16 @@ where
 		relay_parent = %relay_parent,
 		candidate_hash = ?receipt.hash(),
 		pov_hash = ?pov.hash(),
-		"Accepted collation",
+		core = ?our_core,
+		?current_validators,
+		"Accepted collation, connecting to validators."
 	);
 
+	// Issue a discovery request for the validators of the current group:
+	connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await;
+
+	state.our_validators_groups.insert(relay_parent, ValidatorGroup::new());
+
 	if let Some(result_sender) = result_sender {
 		state.collation_result_senders.insert(receipt.hash(), result_sender);
 	}
@@ -490,7 +522,7 @@ where
 	Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
 {
 	// ignore address resolution failure
-	// will reissue a new request on new relay parent
+	// will reissue a new request on new collation
 	let (failed, _) = oneshot::channel();
 	ctx.send_message(NetworkBridgeMessage::ConnectToValidators {
 		validator_ids,
@@ -601,7 +633,8 @@ where
 					);
 				},
 				Some(id) => {
-					distribute_collation(ctx, state, id, receipt, pov, result_sender).await?;
+					distribute_collation(ctx, runtime, state, id, receipt, pov, result_sender)
+						.await?;
 				},
 				None => {
 					tracing::warn!(
@@ -886,7 +919,7 @@ where
 		},
 		OurViewChange(view) => {
 			tracing::trace!(target: LOG_TARGET, ?view, "Own view change");
-			handle_our_view_change(ctx, runtime, state, view).await?;
+			handle_our_view_change(state, view).await?;
 		},
 		PeerMessage(remote, msg) => {
 			handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?;
@@ -900,16 +933,7 @@ where
 }
 
 /// Handles our view changes.
-async fn handle_our_view_change<Context>(
-	ctx: &mut Context,
-	runtime: &mut RuntimeInfo,
-	state: &mut State,
-	view: OurView,
-) -> Result<()>
-where
-	Context: SubsystemContext<Message = CollatorProtocolMessage>,
-	Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
-{
+async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()> {
 	for removed in state.view.difference(&view) {
 		tracing::debug!(target: LOG_TARGET, relay_parent = ?removed, "Removing relay parent because our view changed.");
 
@@ -943,68 +967,6 @@ where
 	}
 
 	state.view = view;
-	if state.view.is_empty() {
-		return Ok(())
-	}
-
-	let id = match state.collating_on {
-		Some(id) => id,
-		None => return Ok(()),
-	};
-
-	// all validators assigned to the core
-	// across all active leaves
-	// this is typically our current group
-	// but can also include the previous group at
-	// rotation boundaries and considering forks
-	let mut group_validators = HashSet::new();
-	let mut maybe_core = None;
-
-	for relay_parent in state.view.iter().cloned() {
-		tracing::debug!(
-			target: LOG_TARGET,
-			?relay_parent,
-			para_id = ?id,
-			"Processing relay parent.",
-		);
-
-		// Determine our assigned core.
-		// If it is not scheduled then ignore the relay parent.
-		let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? {
-			Some(core) => {
-				maybe_core = Some(core);
-				core
-			},
-			None => continue,
-		};
-
-		// Determine the group on that core.
-		let current_validators =
-			determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;
-
-		let validators = current_validators.validators;
-		group_validators.extend(validators);
-
-		state.our_validators_groups.entry(relay_parent).or_insert(ValidatorGroup::new());
-	}
-
-	let validators: Vec<_> = group_validators.into_iter().collect();
-	let no_one_is_assigned = validators.is_empty();
-	if no_one_is_assigned {
-		if let Some(core) = maybe_core {
-			tracing::warn!(target: LOG_TARGET, ?core, "No validators assigned to our core.");
-		} else {
-			tracing::debug!(target: LOG_TARGET, "Core is occupied for all active leaves.");
-		}
-		return Ok(())
-	}
-	tracing::debug!(
-		target: LOG_TARGET,
-		?validators,
-		para_id = ?id,
-		"Connecting to validators.",
-	);
-	connect_to_validators(ctx, validators).await;
 
 	Ok(())
 }
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests.rs
index 86d5639ad61..526cfab04e1 100644
--- a/polkadot/node/network/collator-protocol/src/collator_side/tests.rs
+++ b/polkadot/node/network/collator-protocol/src/collator_side/tests.rs
@@ -29,7 +29,7 @@ use sp_core::crypto::Pair;
 use sp_keyring::Sr25519Keyring;
 use sp_runtime::traits::AppVerify;
 
-use polkadot_node_network_protocol::{our_view, request_response::IncomingRequest, view, OurView};
+use polkadot_node_network_protocol::{our_view, request_response::IncomingRequest, view};
 use polkadot_node_primitives::BlockData;
 use polkadot_node_subsystem_util::TimeoutExt;
 use polkadot_primitives::{
@@ -172,7 +172,13 @@ impl TestState {
 			our_view![self.relay_parent]
 		};
 
-		set_our_view(virtual_overseer, &self, our_view).await;
+		overseer_send(
+			virtual_overseer,
+			CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange(
+				our_view,
+			)),
+		)
+		.await;
 	}
 }
 
@@ -272,83 +278,13 @@ async fn setup_system(virtual_overseer: &mut VirtualOverseer, test_state: &TestS
 	)
 	.await;
 
-	set_our_view(virtual_overseer, test_state, our_view![test_state.relay_parent]).await;
-}
-
-/// Check our view change triggers the right messages
-async fn set_our_view(
-	virtual_overseer: &mut VirtualOverseer,
-	test_state: &TestState,
-	our_view: OurView,
-) {
 	overseer_send(
 		virtual_overseer,
 		CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange(
-			our_view.clone(),
+			our_view![test_state.relay_parent],
 		)),
 	)
 	.await;
-
-	for parent in our_view.iter().cloned() {
-		// obtain the availability cores.
-		assert_matches!(
-			overseer_recv(virtual_overseer).await,
-			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-				relay_parent,
-				RuntimeApiRequest::AvailabilityCores(tx)
-			)) => {
-				assert_eq!(relay_parent, parent);
-				tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap();
-			}
-		);
-
-		// We don't know precisely what is going to come as session info might be cached:
-		loop {
-			match overseer_recv(virtual_overseer).await {
-				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					relay_parent,
-					RuntimeApiRequest::SessionIndexForChild(tx),
-				)) => {
-					assert_eq!(relay_parent, relay_parent);
-					tx.send(Ok(test_state.current_session_index())).unwrap();
-				},
-
-				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					relay_parent,
-					RuntimeApiRequest::SessionInfo(index, tx),
-				)) => {
-					assert_eq!(relay_parent, parent);
-					assert_eq!(index, test_state.current_session_index());
-
-					tx.send(Ok(Some(test_state.session_info.clone()))).unwrap();
-				},
-
-				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					relay_parent,
-					RuntimeApiRequest::ValidatorGroups(tx),
-				)) => {
-					assert_eq!(relay_parent, parent);
-					tx.send(Ok((
-						test_state.session_info.validator_groups.clone(),
-						test_state.group_rotation_info.clone(),
-					)))
-					.unwrap();
-					// This call is mandatory - we are done:
-					break
-				},
-				other => panic!("Unexpected message received: {:?}", other),
-			}
-		}
-	}
-
-	assert_matches!(
-		overseer_recv(virtual_overseer).await,
-		AllMessages::NetworkBridge(
-			NetworkBridgeMessage::ConnectToValidators {
-				..
-			}
-		) => {}
-	);
 }
 
 /// Result of [`distribute_collation`]
@@ -361,6 +297,8 @@ struct DistributeCollation {
 async fn distribute_collation(
 	virtual_overseer: &mut VirtualOverseer,
 	test_state: &TestState,
+	// whether or not we expect a connection request or not.
+	should_connect: bool,
 ) -> DistributeCollation {
 	// Now we want to distribute a `PoVBlock`
 	let pov_block = PoV { block_data: BlockData(vec![42, 43, 44]) };
@@ -381,6 +319,67 @@ async fn distribute_collation(
 	)
 	.await;
 
+	// obtain the availability cores.
+	assert_matches!(
+		overseer_recv(virtual_overseer).await,
+		AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+			relay_parent,
+			RuntimeApiRequest::AvailabilityCores(tx)
+		)) => {
+			assert_eq!(relay_parent, test_state.relay_parent);
+			tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap();
+		}
+	);
+
+	// We don't know precisely what is going to come as session info might be cached:
+	loop {
+		match overseer_recv(virtual_overseer).await {
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+				relay_parent,
+				RuntimeApiRequest::SessionIndexForChild(tx),
+			)) => {
+				assert_eq!(relay_parent, test_state.relay_parent);
+				tx.send(Ok(test_state.current_session_index())).unwrap();
+			},
+
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+				relay_parent,
+				RuntimeApiRequest::SessionInfo(index, tx),
+			)) => {
+				assert_eq!(relay_parent, test_state.relay_parent);
+				assert_eq!(index, test_state.current_session_index());
+
+				tx.send(Ok(Some(test_state.session_info.clone()))).unwrap();
+			},
+
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+				relay_parent,
+				RuntimeApiRequest::ValidatorGroups(tx),
+			)) => {
+				assert_eq!(relay_parent, test_state.relay_parent);
+				tx.send(Ok((
+					test_state.session_info.validator_groups.clone(),
+					test_state.group_rotation_info.clone(),
+				)))
+				.unwrap();
+				// This call is mandatory - we are done:
+				break
+			},
+			other => panic!("Unexpected message received: {:?}", other),
+		}
+	}
+
+	if should_connect {
+		assert_matches!(
+			overseer_recv(virtual_overseer).await,
+			AllMessages::NetworkBridge(
+				NetworkBridgeMessage::ConnectToValidators {
+					..
+				}
+			) => {}
+		);
+	}
+
 	DistributeCollation { candidate, pov_block }
 }
 
@@ -509,7 +508,7 @@ fn advertise_and_send_collation() {
 		setup_system(&mut virtual_overseer, &test_state).await;
 
 		let DistributeCollation { candidate, pov_block } =
-			distribute_collation(&mut virtual_overseer, &test_state).await;
+			distribute_collation(&mut virtual_overseer, &test_state, true).await;
 
 		for (val, peer) in test_state
 			.current_group_validator_authority_ids()
@@ -626,7 +625,7 @@ fn advertise_and_send_collation() {
 
 		assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());
 
-		distribute_collation(&mut virtual_overseer, &test_state).await;
+		distribute_collation(&mut virtual_overseer, &test_state, true).await;
 
 		// Send info about peer's view.
 		overseer_send(
@@ -714,7 +713,7 @@ fn collations_are_only_advertised_to_validators_with_correct_view() {
 		// And let it tell us that it is has the same view.
 		send_peer_view_change(virtual_overseer, &peer2, vec![test_state.relay_parent]).await;
 
-		distribute_collation(virtual_overseer, &test_state).await;
+		distribute_collation(virtual_overseer, &test_state, true).await;
 
 		expect_advertise_collation_msg(virtual_overseer, &peer2, test_state.relay_parent).await;
 
@@ -753,14 +752,14 @@ fn collate_on_two_different_relay_chain_blocks() {
 		expect_declare_msg(virtual_overseer, &test_state, &peer).await;
 		expect_declare_msg(virtual_overseer, &test_state, &peer2).await;
 
-		distribute_collation(virtual_overseer, &test_state).await;
+		distribute_collation(virtual_overseer, &test_state, true).await;
 
 		let old_relay_parent = test_state.relay_parent;
 
 		// Advance to a new round, while informing the subsystem that the old and the new relay parent are active.
 		test_state.advance_to_new_round(virtual_overseer, true).await;
 
-		distribute_collation(virtual_overseer, &test_state).await;
+		distribute_collation(virtual_overseer, &test_state, true).await;
 
 		send_peer_view_change(virtual_overseer, &peer, vec![old_relay_parent]).await;
 		expect_advertise_collation_msg(virtual_overseer, &peer, old_relay_parent).await;
@@ -790,7 +789,7 @@ fn validator_reconnect_does_not_advertise_a_second_time() {
 		connect_peer(virtual_overseer, peer.clone(), Some(validator_id.clone())).await;
 		expect_declare_msg(virtual_overseer, &test_state, &peer).await;
 
-		distribute_collation(virtual_overseer, &test_state).await;
+		distribute_collation(virtual_overseer, &test_state, true).await;
 
 		send_peer_view_change(virtual_overseer, &peer, vec![test_state.relay_parent]).await;
 		expect_advertise_collation_msg(virtual_overseer, &peer, test_state.relay_parent).await;
@@ -875,7 +874,7 @@ where
 		setup_system(virtual_overseer, &test_state).await;
 
 		let DistributeCollation { candidate, pov_block } =
-			distribute_collation(virtual_overseer, &test_state).await;
+			distribute_collation(virtual_overseer, &test_state, true).await;
 
 		for (val, peer) in test_state
 			.current_group_validator_authority_ids()
diff --git a/polkadot/node/network/collator-protocol/src/lib.rs b/polkadot/node/network/collator-protocol/src/lib.rs
index 769b1448690..0aa53156e75 100644
--- a/polkadot/node/network/collator-protocol/src/lib.rs
+++ b/polkadot/node/network/collator-protocol/src/lib.rs
@@ -58,7 +58,7 @@ pub struct CollatorEvictionPolicy {
 impl Default for CollatorEvictionPolicy {
 	fn default() -> Self {
 		CollatorEvictionPolicy {
-			inactive_collator: Duration::from_secs(5),
+			inactive_collator: Duration::from_secs(24),
 			undeclared: Duration::from_secs(1),
 		}
 	}
-- 
GitLab