From bfc8f4fcf3d0da9f3ef7129ce33c27e8bc63905a Mon Sep 17 00:00:00 2001
From: Robert Habermeier <rphmeier@gmail.com>
Date: Sun, 4 Apr 2021 18:59:00 +0200
Subject: [PATCH] Collators: Declare to all peers (#2816)

* fix tests

* add test for rejecting declares on collators

* fix bad test
---
 .../collator-protocol/src/collator_side.rs    | 137 ++++++++++++------
 1 file changed, 89 insertions(+), 48 deletions(-)

diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs
index 88dbe93d4f0..b661c6f9562 100644
--- a/polkadot/node/network/collator-protocol/src/collator_side.rs
+++ b/polkadot/node/network/collator-protocol/src/collator_side.rs
@@ -33,7 +33,7 @@ use polkadot_subsystem::{
 use polkadot_node_network_protocol::{
 	OurView, PeerId, View, peer_set::PeerSet,
 	request_response::{IncomingRequest, v1::{CollationFetchingRequest, CollationFetchingResponse}},
-	v1 as protocol_v1
+	v1 as protocol_v1, UnifiedReputationChange as Rep,
 };
 use polkadot_node_subsystem_util::{
 	validator_discovery,
@@ -44,6 +44,8 @@ use polkadot_node_subsystem_util::{
 };
 use polkadot_node_primitives::{SignedFullStatement, Statement, PoV, CompressedPoV};
 
+const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
+
 #[derive(Clone, Default)]
 pub struct Metrics(Option<MetricsInner>);
 
@@ -241,9 +243,6 @@ struct State {
 	/// Our validator groups per active leaf.
 	our_validators_groups: HashMap<Hash, ValidatorGroup>,
 
-	/// List of peers where we declared ourself as a collator.
-	declared_at: HashSet<PeerId>,
-
 	/// The connection requests to validators per relay parent.
 	connection_requests: validator_discovery::ConnectionRequests,
 
@@ -266,7 +265,6 @@ impl State {
 			collations: Default::default(),
 			collation_result_senders: Default::default(),
 			our_validators_groups: Default::default(),
-			declared_at: Default::default(),
 			connection_requests: Default::default(),
 		}
 	}
@@ -486,7 +484,7 @@ async fn advertise_collation(
 
 	match (state.collations.get_mut(&relay_parent), should_advertise) {
 		(None, _) => {
-			tracing::debug!(
+			tracing::trace!(
 				target: LOG_TARGET,
 				?relay_parent,
 				peer_id = %peer,
@@ -693,8 +691,9 @@ async fn send_collation(
 }
 
 /// A networking messages switch.
-#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))]
+#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
 async fn handle_incoming_peer_message(
+	ctx: &mut impl SubsystemContext,
 	state: &mut State,
 	origin: PeerId,
 	msg: protocol_v1::CollatorProtocolMessage,
@@ -703,18 +702,32 @@ async fn handle_incoming_peer_message(
 
 	match msg {
 		Declare(_, _, _) => {
-			tracing::warn!(
+			tracing::trace!(
 				target: LOG_TARGET,
 				?origin,
 				"Declare message is not expected on the collator side of the protocol",
 			);
+
+			// If we are declared to, this is another collator, and we should disconnect.
+			ctx.send_message(
+				NetworkBridgeMessage::DisconnectPeer(origin, PeerSet::Collation).into()
+			).await;
 		}
 		AdvertiseCollation(_) => {
-			tracing::warn!(
+			tracing::trace!(
 				target: LOG_TARGET,
 				?origin,
 				"AdvertiseCollation message is not expected on the collator side of the protocol",
 			);
+
+			ctx.send_message(
+				NetworkBridgeMessage::ReportPeer(origin.clone(), COST_UNEXPECTED_MESSAGE).into()
+			).await;
+
+			// If we are advertised to, this is another collator, and we should disconnect.
+			ctx.send_message(
+				NetworkBridgeMessage::DisconnectPeer(origin, PeerSet::Collation).into()
+			).await;
 		}
 		CollationSeconded(statement) => {
 			if !matches!(statement.payload(), Statement::Seconded(_)) {
@@ -775,12 +788,6 @@ async fn handle_validator_connected(
 		"Connected to requested validator"
 	);
 
-	let not_declared = state.declared_at.insert(peer_id.clone());
-
-	if not_declared {
-		declare(ctx, state, peer_id.clone()).await;
-	}
-
 	// Store the PeerId and find out if we should advertise to this peer.
 	//
 	// If this peer does not belong to the para validators, we also don't need to try to advertise our collation.
@@ -814,6 +821,9 @@ async fn handle_network_msg(
 				?observed_role,
 				"Peer connected",
 			);
+
+			// Always declare to every peer. We should be connecting only to validators.
+			declare(ctx, state, peer_id.clone()).await;
 		}
 		PeerViewChange(peer_id, view) => {
 			tracing::trace!(
@@ -831,7 +841,6 @@ async fn handle_network_msg(
 				"Peer disconnected",
 			);
 			state.peer_views.remove(&peer_id);
-			state.declared_at.remove(&peer_id);
 		}
 		OurViewChange(view) => {
 			tracing::trace!(
@@ -842,7 +851,7 @@ async fn handle_network_msg(
 			handle_our_view_change(state, view).await?;
 		}
 		PeerMessage(remote, msg) => {
-			handle_incoming_peer_message(state, remote, msg).await?;
+			handle_incoming_peer_message(ctx, state, remote, msg).await?;
 		}
 	}
 
@@ -1135,18 +1144,6 @@ mod tests {
 		collator_pair: CollatorPair,
 		test: impl FnOnce(TestHarness) -> T,
 	) {
-		let _ = env_logger::builder()
-			.is_test(true)
-			.filter(
-				Some("polkadot_collator_protocol"),
-				log::LevelFilter::Trace,
-			)
-			.filter(
-				Some(LOG_TARGET),
-				log::LevelFilter::Trace,
-			)
-			.try_init();
-
 		let pool = sp_core::testing::TaskExecutor::new();
 
 		let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
@@ -1468,10 +1465,14 @@ mod tests {
 
 			let DistributeCollation { mut connected, candidate, pov_block } =
 				distribute_collation(&mut virtual_overseer, &test_state).await;
-			test_state.current_group_validator_authority_ids()
+
+			for (val, peer) in test_state.current_group_validator_authority_ids()
 				.into_iter()
 				.zip(test_state.current_group_validator_peer_ids())
-				.for_each(|r| connected.try_send(r).unwrap());
+			{
+				connect_peer(&mut virtual_overseer, peer.clone()).await;
+				connected.try_send((val, peer)).unwrap();
+			}
 
 			// We declare to the connected validators that we are a collator.
 			// We need to catch all `Declare` messages to the validators we've
@@ -1549,10 +1550,13 @@ mod tests {
 
 			let DistributeCollation { mut connected, .. } =
 				distribute_collation(&mut virtual_overseer, &test_state).await;
-			test_state.current_group_validator_authority_ids()
+
+			for (val, peer) in test_state.current_group_validator_authority_ids()
 				.into_iter()
 				.zip(test_state.current_group_validator_peer_ids())
-				.for_each(|r| connected.try_send(r).unwrap());
+			{
+				connected.try_send((val, peer)).unwrap();
+			}
 
 			// Send info about peer's view.
 			overseer_send(
@@ -1569,10 +1573,8 @@ mod tests {
 		});
 	}
 
-	/// This test ensures that we declare a collator at a validator by sending the `Declare` message as soon as the
-	/// collator is aware of the validator being connected.
 	#[test]
-	fn collators_are_registered_correctly_at_validators() {
+	fn collators_declare_to_connected_peers() {
 		let test_state = TestState::default();
 		let local_peer_id = test_state.local_peer_id.clone();
 		let collator_pair = test_state.collator_pair.clone();
@@ -1581,16 +1583,11 @@ mod tests {
 			let mut virtual_overseer = test_harness.virtual_overseer;
 
 			let peer = test_state.validator_peer_id[0].clone();
-			let validator_id = test_state.validator_authority_id[0].clone();
 
 			setup_system(&mut virtual_overseer, &test_state).await;
 
 			// A validator connected to us
 			connect_peer(&mut virtual_overseer, peer.clone()).await;
-
-			let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected;
-			connected.try_send((validator_id, peer.clone())).unwrap();
-
 			expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
 		})
 	}
@@ -1618,6 +1615,9 @@ mod tests {
 			// Connect the second validator
 			connect_peer(&mut virtual_overseer, peer2.clone()).await;
 
+			expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
+			expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
+
 			// And let it tell us that it is has the same view.
 			send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await;
 
@@ -1625,9 +1625,6 @@ mod tests {
 			connected.try_send((validator_id, peer.clone())).unwrap();
 			connected.try_send((validator_id2, peer2.clone())).unwrap();
 
-			expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
-			expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
-
 			expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await;
 
 			// The other validator announces that it changed its view.
@@ -1661,13 +1658,13 @@ mod tests {
 			// Connect the second validator
 			connect_peer(&mut virtual_overseer, peer2.clone()).await;
 
+			expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
+			expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
+
 			let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected;
 			connected.try_send((validator_id.clone(), peer.clone())).unwrap();
 			connected.try_send((validator_id2.clone(), peer2.clone())).unwrap();
 
-			expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
-			expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
-
 			let old_relay_parent = test_state.relay_parent;
 
 			// Advance to a new round, while informing the subsystem that the old and the new relay parent are active.
@@ -1701,20 +1698,64 @@ mod tests {
 
 			// A validator connected to us
 			connect_peer(&mut virtual_overseer, peer.clone()).await;
+			expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
 
 			let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected;
 			connected.try_send((validator_id.clone(), peer.clone())).unwrap();
 
-			expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
 			send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
 			expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
 
 			// Disconnect and reconnect directly
 			disconnect_peer(&mut virtual_overseer, peer.clone()).await;
 			connect_peer(&mut virtual_overseer, peer.clone()).await;
+			expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
+
 			send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
 
 			assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());
 		})
 	}
+
+	#[test]
+	fn collators_reject_declare_messages() {
+		let test_state = TestState::default();
+		let local_peer_id = test_state.local_peer_id.clone();
+		let collator_pair = test_state.collator_pair.clone();
+		let collator_pair2 = CollatorPair::generate().0;
+
+		test_harness(local_peer_id, collator_pair, |test_harness| async move {
+			let mut virtual_overseer = test_harness.virtual_overseer;
+
+			let peer = test_state.current_group_validator_peer_ids()[0].clone();
+
+			setup_system(&mut virtual_overseer, &test_state).await;
+
+			// A validator connected to us
+			connect_peer(&mut virtual_overseer, peer.clone()).await;
+			expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
+
+			overseer_send(
+				&mut virtual_overseer,
+				CollatorProtocolMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerMessage(
+						peer.clone(),
+						protocol_v1::CollatorProtocolMessage::Declare(
+							collator_pair2.public(),
+							ParaId::from(5),
+							collator_pair2.sign(b"garbage"),
+						),
+					)
+				)
+			).await;
+
+			assert_matches!(
+				overseer_recv(&mut virtual_overseer).await,
+				AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer(
+					p,
+					PeerSet::Collation,
+				)) if p == peer
+			);
+		})
+	}
 }
-- 
GitLab