From 4df29e71ab926addf2574333c0e09e2ab8e71f40 Mon Sep 17 00:00:00 2001
From: Andronik Ordian <write@reusable.software>
Date: Mon, 5 Apr 2021 00:25:40 +0200
Subject: [PATCH] bitfield-dist: fix state update on gossip (#2817)

* bitfield-dist: fix state update on gossip

* fixes

* doc fixes

* oops

* 2 lines of code change
---
 .../network/approval-distribution/src/lib.rs  |   4 +-
 .../network/bitfield-distribution/src/lib.rs  |  15 +-
 .../network/statement-distribution/src/lib.rs | 196 +++++++++++-------
 3 files changed, 131 insertions(+), 84 deletions(-)

diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs
index 51b0feda6e1..984c301e7da 100644
--- a/polkadot/node/network/approval-distribution/src/lib.rs
+++ b/polkadot/node/network/approval-distribution/src/lib.rs
@@ -513,7 +513,7 @@ impl State {
 			Some(entry) => entry,
 			None => {
 				if let Some(peer_id) = source.peer_id() {
-					tracing::debug!(
+					tracing::trace!(
 						target: LOG_TARGET,
 						?peer_id,
 						?block_hash,
@@ -1008,7 +1008,7 @@ impl State {
 		peer_id: PeerId,
 		blocks: Vec<(BlockDepth, Hash)>,
 	) {
-		// we will only propagate local assignment/approvals after a certain depth
+		// we will propagate only local assignment/approvals after a certain depth
 		const DEPTH_THRESHOLD: usize = 5;
 
 		let mut assignments = Vec::new();
diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs
index 03409eb3c80..e53759b1eb2 100644
--- a/polkadot/node/network/bitfield-distribution/src/lib.rs
+++ b/polkadot/node/network/bitfield-distribution/src/lib.rs
@@ -345,12 +345,6 @@ where
 			// check interest in the peer in this message's relay parent
 			if view.contains(&message.relay_parent) {
 				let message_needed = job_data.message_from_validator_needed_by_peer(&peer, &validator);
-				// track the message as sent for this peer
-				job_data.message_sent_to_peer
-					.entry(peer.clone())
-					.or_default()
-					.insert(validator.clone());
-
 				if message_needed {
 					Some(peer.clone())
 				} else {
@@ -362,6 +356,15 @@ where
 		})
 		.collect::<Vec<PeerId>>();
 	let interested_peers = util::choose_random_sqrt_subset(interested_peers, MIN_GOSSIP_PEERS);
+	interested_peers.iter()
+		.for_each(|peer|{
+			// track the message as sent for this peer
+			job_data.message_sent_to_peer
+				.entry(peer.clone())
+				.or_default()
+				.insert(validator.clone());
+		});
+
 	drop(_span);
 
 	if interested_peers.is_empty() {
diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs
index c01342d1f63..4470ebdcbf0 100644
--- a/polkadot/node/network/statement-distribution/src/lib.rs
+++ b/polkadot/node/network/statement-distribution/src/lib.rs
@@ -33,7 +33,7 @@ use polkadot_subsystem::{
 };
 use polkadot_node_subsystem_util::{
 	metrics::{self, prometheus},
-	MIN_GOSSIP_PEERS,
+	self as util, MIN_GOSSIP_PEERS,
 };
 use polkadot_node_primitives::{SignedFullStatement};
 use polkadot_primitives::v1::{
@@ -158,25 +158,21 @@ struct PeerRelayParentKnowledge {
 }
 
 impl PeerRelayParentKnowledge {
-	/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based
+	/// Updates our view of the peer's knowledge with this statement's fingerprint based
 	/// on something that we would like to send to the peer.
 	///
-	/// This returns `None` if the peer cannot accept this statement, without altering internal
-	/// state.
+	/// NOTE: assumes `self.can_send` returned true before this call.
 	///
-	/// If the peer can accept the statement, this returns `Some` and updates the internal state.
 	/// Once the knowledge has incorporated a statement, it cannot be incorporated again.
 	///
-	/// This returns `Some(true)` if this is the first time the peer has become aware of a
+	/// This returns `true` if this is the first time the peer has become aware of a
 	/// candidate with the given hash.
 	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
-	fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> Option<bool> {
-		let already_known = self.sent_statements.contains(fingerprint)
-			|| self.received_statements.contains(fingerprint);
-
-		if already_known {
-			return None;
-		}
+	fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool {
+		debug_assert!(
+			self.can_send(fingerprint),
+			"send is only called after `can_send` returns true; qed",
+		);
 
 		let new_known = match fingerprint.0 {
 			CompactStatement::Seconded(ref h) => {
@@ -186,20 +182,36 @@ impl PeerRelayParentKnowledge {
 
 				self.known_candidates.insert(h.clone())
 			},
-			CompactStatement::Valid(ref h) => {
-				// The peer can only accept Valid and Invalid statements for which it is aware
-				// of the corresponding candidate.
-				if !self.known_candidates.contains(h) {
-					return None;
-				}
-
+			CompactStatement::Valid(_) => {
 				false
 			}
 		};
 
 		self.sent_statements.insert(fingerprint.clone());
 
-		Some(new_known)
+		new_known
+	}
+
+	/// This returns `true` if the peer cannot accept this statement, without altering internal
+	/// state, `false` otherwise.
+	fn can_send(&self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool {
+		let already_known = self.sent_statements.contains(fingerprint)
+			|| self.received_statements.contains(fingerprint);
+
+		if already_known {
+			return false;
+		}
+
+		match fingerprint.0 {
+			CompactStatement::Valid(ref h) => {
+				// The peer can only accept Valid and Invalid statements for which it is aware
+				// of the corresponding candidate.
+				self.known_candidates.contains(h)
+			}
+			CompactStatement::Seconded(_) => {
+				true
+			},
+		}
 	}
 
 	/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on
@@ -274,24 +286,41 @@ struct PeerData {
 }
 
 impl PeerData {
-	/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based
+	/// Updates our view of the peer's knowledge with this statement's fingerprint based
 	/// on something that we would like to send to the peer.
 	///
-	/// This returns `None` if the peer cannot accept this statement, without altering internal
-	/// state.
+	/// NOTE: assumes `self.can_send` returned true before this call.
 	///
-	/// If the peer can accept the statement, this returns `Some` and updates the internal state.
 	/// Once the knowledge has incorporated a statement, it cannot be incorporated again.
 	///
-	/// This returns `Some(true)` if this is the first time the peer has become aware of a
+	/// This returns `true` if this is the first time the peer has become aware of a
 	/// candidate with the given hash.
 	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
 	fn send(
 		&mut self,
 		relay_parent: &Hash,
 		fingerprint: &(CompactStatement, ValidatorIndex),
-	) -> Option<bool> {
-		self.view_knowledge.get_mut(relay_parent).map_or(None, |k| k.send(fingerprint))
+	) -> bool {
+		debug_assert!(
+			self.can_send(relay_parent, fingerprint),
+			"send is only called after `can_send` returns true; qed",
+		);
+		self.view_knowledge
+			.get_mut(relay_parent)
+			.expect("send is only called after `can_send` returns true; qed")
+			.send(fingerprint)
+	}
+
+	/// This returns `None` if the peer cannot accept this statement, without altering internal
+	/// state.
+	fn can_send(
+		&self,
+		relay_parent: &Hash,
+		fingerprint: &(CompactStatement, ValidatorIndex),
+	) -> bool {
+		self.view_knowledge
+			.get(relay_parent)
+			.map_or(false, |k| k.can_send(fingerprint))
 	}
 
 	/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on
@@ -623,12 +652,21 @@ async fn circulate_statement(
 ) -> Vec<PeerId> {
 	let fingerprint = stored.fingerprint();
 
-	let len_sqrt = (peers.len() as f64).sqrt() as usize;
-	let cap = std::cmp::max(MIN_GOSSIP_PEERS, len_sqrt);
-
-	let peers_to_send: HashMap<PeerId, bool> = peers.iter_mut().filter_map(|(peer, data)| {
-		data.send(&relay_parent, &fingerprint).map(|new| (peer.clone(), new))
-	}).take(cap).collect();
+	let peers_to_send: Vec<PeerId> = peers.iter().filter_map(|(peer, data)| {
+		if data.can_send(&relay_parent, &fingerprint) {
+			Some(peer.clone())
+		} else {
+			None
+		}
+	}).collect();
+	let peers_to_send = util::choose_random_sqrt_subset(peers_to_send, MIN_GOSSIP_PEERS);
+	let peers_to_send: Vec<(PeerId, bool)> = peers_to_send.into_iter()
+		.map(|peer_id| {
+			let new = peers.get_mut(&peer_id)
+				.expect("a subset is taken above, so it exists; qed")
+				.send(&relay_parent, &fingerprint);
+			(peer_id, new)
+		}).collect();
 
 	// Send all these peers the initial statement.
 	if !peers_to_send.is_empty() {
@@ -638,10 +676,10 @@ async fn circulate_statement(
 			?peers_to_send,
 			?relay_parent,
 			statement = ?stored.statement,
-			"Sending statement"
+			"Sending statement",
 		);
 		ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
-			peers_to_send.keys().cloned().collect(),
+			peers_to_send.iter().map(|(p, _)| p.clone()).collect(),
 			payload,
 		))).await;
 	}
@@ -665,26 +703,29 @@ async fn send_statements_about(
 	metrics: &Metrics,
 ) {
 	for statement in active_head.statements_about(candidate_hash) {
-		if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() {
-			let payload = statement_message(
-				relay_parent,
-				statement.statement.clone(),
-			);
+		let fingerprint = statement.fingerprint();
+		if !peer_data.can_send(&relay_parent, &fingerprint) {
+			continue;
+		}
+		peer_data.send(&relay_parent, &fingerprint);
+		let payload = statement_message(
+			relay_parent,
+			statement.statement.clone(),
+		);
 
-			tracing::trace!(
-				target: LOG_TARGET,
-				?peer,
-				?relay_parent,
-				?candidate_hash,
-				statement = ?statement.statement,
-				"Sending statement"
-			);
-			ctx.send_message(AllMessages::NetworkBridge(
-				NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
-			)).await;
+		tracing::trace!(
+			target: LOG_TARGET,
+			?peer,
+			?relay_parent,
+			?candidate_hash,
+			statement = ?statement.statement,
+			"Sending statement",
+		);
+		ctx.send_message(AllMessages::NetworkBridge(
+			NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
+		)).await;
 
-			metrics.on_statement_distributed();
-		}
+		metrics.on_statement_distributed();
 	}
 }
 
@@ -699,25 +740,28 @@ async fn send_statements(
 	metrics: &Metrics,
 ) {
 	for statement in active_head.statements() {
-		if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() {
-			let payload = statement_message(
-				relay_parent,
-				statement.statement.clone(),
-			);
+		let fingerprint = statement.fingerprint();
+		if !peer_data.can_send(&relay_parent, &fingerprint) {
+			continue;
+		}
+		peer_data.send(&relay_parent, &fingerprint);
+		let payload = statement_message(
+			relay_parent,
+			statement.statement.clone(),
+		);
 
-			tracing::trace!(
-				target: LOG_TARGET,
-				?peer,
-				?relay_parent,
-				statement = ?statement.statement,
-				"Sending statement"
-			);
-			ctx.send_message(AllMessages::NetworkBridge(
-				NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
-			)).await;
+		tracing::trace!(
+			target: LOG_TARGET,
+			?peer,
+			?relay_parent,
+			statement = ?statement.statement,
+			"Sending statement"
+		);
+		ctx.send_message(AllMessages::NetworkBridge(
+			NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
+		)).await;
 
-			metrics.on_statement_distributed();
-		}
+		metrics.on_statement_distributed();
 	}
 }
 
@@ -1331,7 +1375,7 @@ mod tests {
 		let hash_a = CandidateHash([1; 32].into());
 
 		// Sending an un-pinned statement should not work and should have no effect.
-		assert!(knowledge.send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))).is_none());
+		assert!(!knowledge.can_send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))));
 		assert!(!knowledge.known_candidates.contains(&hash_a));
 		assert!(knowledge.sent_statements.is_empty());
 		assert!(knowledge.received_statements.is_empty());
@@ -1339,8 +1383,8 @@ mod tests {
 		assert!(knowledge.received_message_count.is_empty());
 
 		// Make the peer aware of the candidate.
-		assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))), Some(true));
-		assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(1))), Some(false));
+		assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))), true);
+		assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(1))), false);
 		assert!(knowledge.known_candidates.contains(&hash_a));
 		assert_eq!(knowledge.sent_statements.len(), 2);
 		assert!(knowledge.received_statements.is_empty());
@@ -1348,7 +1392,7 @@ mod tests {
 		assert!(knowledge.received_message_count.get(&hash_a).is_none());
 
 		// And now it should accept the dependent message.
-		assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))), Some(false));
+		assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))), false);
 		assert!(knowledge.known_candidates.contains(&hash_a));
 		assert_eq!(knowledge.sent_statements.len(), 3);
 		assert!(knowledge.received_statements.is_empty());
@@ -1362,7 +1406,7 @@ mod tests {
 
 		let hash_a = CandidateHash([1; 32].into());
 		assert!(knowledge.receive(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0)), 3).unwrap());
-		assert!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))).is_none());
+		assert!(!knowledge.can_send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))));
 	}
 
 	#[test]
-- 
GitLab