From 189dfdc00679321c22e35597c313babda1a0bb5c Mon Sep 17 00:00:00 2001
From: Bernhard Schuster <bernhard@ahoi.io>
Date: Mon, 21 Mar 2022 13:17:54 +0100
Subject: [PATCH] approval dist imrpo (#5161)

* minor approval voting refactor

* ignore recently outdated ones

* check in another branch

* don't be a doofus

* add newline

* remove a superflous check

* fix missing ,

* consistency

* Update node/network/approval-distribution/src/lib.rs

Co-authored-by: Andronik <write@reusable.software>

* fixup

Co-authored-by: Andronik <write@reusable.software>
---
 .../network/approval-distribution/src/lib.rs  | 278 +++++++-----------
 .../approval-distribution/src/metrics.rs      | 122 ++++++++
 2 files changed, 228 insertions(+), 172 deletions(-)
 create mode 100644 polkadot/node/network/approval-distribution/src/metrics.rs

diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs
index bf5835016dd..ca6f5dccc5a 100644
--- a/polkadot/node/network/approval-distribution/src/lib.rs
+++ b/polkadot/node/network/approval-distribution/src/lib.rs
@@ -35,15 +35,15 @@ use polkadot_node_subsystem::{
 	overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext,
 	SubsystemError,
 };
-use polkadot_node_subsystem_util::{
-	self as util,
-	metrics::{self, prometheus},
-	MIN_GOSSIP_PEERS,
-};
+use polkadot_node_subsystem_util::{self as util, MIN_GOSSIP_PEERS};
 use polkadot_primitives::v2::{
 	BlockNumber, CandidateIndex, Hash, ValidatorIndex, ValidatorSignature,
 };
-use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
+use std::collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque};
+
+use self::metrics::Metrics;
+
+mod metrics;
 
 #[cfg(test)]
 mod tests;
@@ -66,6 +66,29 @@ pub struct ApprovalDistribution {
 	metrics: Metrics,
 }
 
+/// Contains recently finalized
+/// or those pruned due to finalization.
+#[derive(Default)]
+struct RecentlyOutdated {
+	buf: VecDeque<Hash>,
+}
+
+impl RecentlyOutdated {
+	fn note_outdated(&mut self, hash: Hash) {
+		const MAX_BUF_LEN: usize = 20;
+
+		self.buf.push_back(hash);
+
+		while self.buf.len() > MAX_BUF_LEN {
+			let _ = self.buf.pop_front();
+		}
+	}
+
+	fn is_recent_outdated(&self, hash: &Hash) -> bool {
+		self.buf.contains(hash)
+	}
+}
+
 /// The [`State`] struct is responsible for tracking the overall state of the subsystem.
 ///
 /// It tracks metadata about our view of the unfinalized chain,
@@ -90,6 +113,9 @@ struct State {
 	/// Track all our neighbors in the current gossip topology.
 	/// We're not necessarily connected to all of them.
 	gossip_peers: HashSet<PeerId>,
+
+	/// Tracks recently finalized blocks.
+	recent_outdated_blocks: RecentlyOutdated,
 }
 
 /// A short description of a validator's assignment or approval.
@@ -500,12 +526,14 @@ impl State {
 		// split_off returns everything after the given key, including the key
 		let split_point = finalized_number.saturating_add(1);
 		let mut old_blocks = self.blocks_by_number.split_off(&split_point);
+
 		// after split_off old_blocks actually contains new blocks, we need to swap
 		std::mem::swap(&mut self.blocks_by_number, &mut old_blocks);
 
 		// now that we pruned `self.blocks_by_number`, let's clean up `self.blocks` too
-		old_blocks.values().flatten().for_each(|h| {
-			self.blocks.remove(h);
+		old_blocks.values().flatten().for_each(|relay_block| {
+			self.recent_outdated_blocks.note_outdated(*relay_block);
+			self.blocks.remove(relay_block);
 		});
 	}
 
@@ -528,11 +556,13 @@ impl State {
 					gum::trace!(
 						target: LOG_TARGET,
 						?peer_id,
-						?block_hash,
+						hash = ?block_hash,
 						?validator_index,
 						"Unexpected assignment",
 					);
-					modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await;
+					if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
+						modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await;
+					}
 				}
 				return
 			},
@@ -548,16 +578,17 @@ impl State {
 				hash_map::Entry::Occupied(mut peer_knowledge) => {
 					let peer_knowledge = peer_knowledge.get_mut();
 					if peer_knowledge.contains(&fingerprint) {
-						if peer_knowledge.received.contains(&fingerprint) {
+						// wasn't included before
+						if !peer_knowledge.received.insert(fingerprint.clone()) {
 							gum::debug!(
 								target: LOG_TARGET,
 								?peer_id,
+								hash = ?block_hash,
 								?fingerprint,
 								"Duplicate assignment",
 							);
 							modify_reputation(ctx, peer_id, COST_DUPLICATE_MESSAGE).await;
 						}
-						peer_knowledge.received.insert(fingerprint);
 						return
 					}
 				},
@@ -565,6 +596,7 @@ impl State {
 					gum::debug!(
 						target: LOG_TARGET,
 						?peer_id,
+						hash = ?block_hash,
 						?fingerprint,
 						"Assignment from a peer is out of view",
 					);
@@ -601,7 +633,7 @@ impl State {
 			};
 			drop(timer);
 
-			gum::trace!(target: LOG_TARGET, ?source, ?fingerprint, ?result, "Checked assignment",);
+			gum::trace!(target: LOG_TARGET, hash = ?block_hash, ?source, ?fingerprint, ?result, "Checked assignment",);
 			match result {
 				AssignmentCheckResult::Accepted => {
 					modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE_FIRST).await;
@@ -619,6 +651,7 @@ impl State {
 					}
 					gum::debug!(
 						target: LOG_TARGET,
+						hash = ?block_hash,
 						?peer_id,
 						"Got an `AcceptedDuplicate` assignment",
 					);
@@ -627,6 +660,7 @@ impl State {
 				AssignmentCheckResult::TooFarInFuture => {
 					gum::debug!(
 						target: LOG_TARGET,
+						hash = ?block_hash,
 						?peer_id,
 						"Got an assignment too far in the future",
 					);
@@ -636,6 +670,7 @@ impl State {
 				AssignmentCheckResult::Bad(error) => {
 					gum::info!(
 						target: LOG_TARGET,
+						hash = ?block_hash,
 						?peer_id,
 						%error,
 						"Got a bad assignment from peer",
@@ -741,7 +776,9 @@ impl State {
 			Some(entry) if entry.candidates.get(candidate_index as usize).is_some() => entry,
 			_ => {
 				if let Some(peer_id) = source.peer_id() {
-					modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await;
+					if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
+						modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await;
+					}
 				}
 				return
 			},
@@ -774,7 +811,7 @@ impl State {
 				hash_map::Entry::Occupied(mut knowledge) => {
 					let peer_knowledge = knowledge.get_mut();
 					if peer_knowledge.contains(&fingerprint) {
-						if peer_knowledge.received.contains(&fingerprint) {
+						if !peer_knowledge.received.insert(fingerprint.clone()) {
 							gum::debug!(
 								target: LOG_TARGET,
 								?peer_id,
@@ -784,7 +821,6 @@ impl State {
 
 							modify_reputation(ctx, peer_id, COST_DUPLICATE_MESSAGE).await;
 						}
-						peer_knowledge.received.insert(fingerprint);
 						return
 					}
 				},
@@ -1221,61 +1257,15 @@ impl ApprovalDistribution {
 				},
 			};
 			match message {
-				FromOverseer::Communication {
-					msg: ApprovalDistributionMessage::NetworkBridgeUpdateV1(event),
-				} => {
-					state.handle_network_msg(&mut ctx, &self.metrics, event).await;
-				},
-				FromOverseer::Communication {
-					msg: ApprovalDistributionMessage::NewBlocks(metas),
-				} => {
-					gum::debug!(target: LOG_TARGET, "Processing NewBlocks");
-					state.handle_new_blocks(&mut ctx, &self.metrics, metas).await;
-				},
-				FromOverseer::Communication {
-					msg: ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index),
-				} => {
-					gum::debug!(
-						target: LOG_TARGET,
-						"Distributing our assignment on candidate (block={}, index={})",
-						cert.block_hash,
-						candidate_index,
-					);
-
-					state
-						.import_and_circulate_assignment(
-							&mut ctx,
-							&self.metrics,
-							MessageSource::Local,
-							cert,
-							candidate_index,
-						)
-						.await;
-				},
-				FromOverseer::Communication {
-					msg: ApprovalDistributionMessage::DistributeApproval(vote),
-				} => {
-					gum::debug!(
-						target: LOG_TARGET,
-						"Distributing our approval vote on candidate (block={}, index={})",
-						vote.block_hash,
-						vote.candidate_index,
-					);
-
-					state
-						.import_and_circulate_approval(
-							&mut ctx,
-							&self.metrics,
-							MessageSource::Local,
-							vote,
-						)
-						.await;
-				},
+				FromOverseer::Communication { msg } =>
+					Self::handle_incoming(&mut ctx, state, msg, &self.metrics).await,
 				FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
 					..
 				})) => {
 					gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
-					// handled by NewBlocks
+					// the relay chain blocks relevant to the approval subsystems
+					// are those that are available, but not finalized yet
+					// actived and deactivated heads hence are irrelevant to this subsystem
 				},
 				FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
 					gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
@@ -1285,6 +1275,55 @@ impl ApprovalDistribution {
 			}
 		}
 	}
+
+	async fn handle_incoming<Context>(
+		ctx: &mut Context,
+		state: &mut State,
+		msg: ApprovalDistributionMessage,
+		metrics: &Metrics,
+	) where
+		Context: SubsystemContext<Message = ApprovalDistributionMessage>,
+		Context: overseer::SubsystemContext<Message = ApprovalDistributionMessage>,
+	{
+		match msg {
+			ApprovalDistributionMessage::NetworkBridgeUpdateV1(event) => {
+				state.handle_network_msg(ctx, metrics, event).await;
+			},
+			ApprovalDistributionMessage::NewBlocks(metas) => {
+				state.handle_new_blocks(ctx, metrics, metas).await;
+			},
+			ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index) => {
+				gum::debug!(
+					target: LOG_TARGET,
+					"Distributing our assignment on candidate (block={}, index={})",
+					cert.block_hash,
+					candidate_index,
+				);
+
+				state
+					.import_and_circulate_assignment(
+						ctx,
+						&metrics,
+						MessageSource::Local,
+						cert,
+						candidate_index,
+					)
+					.await;
+			},
+			ApprovalDistributionMessage::DistributeApproval(vote) => {
+				gum::debug!(
+					target: LOG_TARGET,
+					"Distributing our approval vote on candidate (block={}, index={})",
+					vote.block_hash,
+					vote.candidate_index,
+				);
+
+				state
+					.import_and_circulate_approval(ctx, metrics, MessageSource::Local, vote)
+					.await;
+			},
+		}
+	}
 }
 
 impl<Context> overseer::Subsystem<Context, SubsystemError> for ApprovalDistribution
@@ -1298,108 +1337,3 @@ where
 		SpawnedSubsystem { name: "approval-distribution-subsystem", future }
 	}
 }
-
-/// Approval Distribution metrics.
-#[derive(Default, Clone)]
-pub struct Metrics(Option<MetricsInner>);
-
-#[derive(Clone)]
-struct MetricsInner {
-	assignments_imported_total: prometheus::Counter<prometheus::U64>,
-	approvals_imported_total: prometheus::Counter<prometheus::U64>,
-	unified_with_peer_total: prometheus::Counter<prometheus::U64>,
-
-	time_unify_with_peer: prometheus::Histogram,
-	time_import_pending_now_known: prometheus::Histogram,
-	time_awaiting_approval_voting: prometheus::Histogram,
-}
-
-impl Metrics {
-	fn on_assignment_imported(&self) {
-		if let Some(metrics) = &self.0 {
-			metrics.assignments_imported_total.inc();
-		}
-	}
-
-	fn on_approval_imported(&self) {
-		if let Some(metrics) = &self.0 {
-			metrics.approvals_imported_total.inc();
-		}
-	}
-
-	fn on_unify_with_peer(&self) {
-		if let Some(metrics) = &self.0 {
-			metrics.unified_with_peer_total.inc();
-		}
-	}
-
-	fn time_unify_with_peer(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
-		self.0.as_ref().map(|metrics| metrics.time_unify_with_peer.start_timer())
-	}
-
-	fn time_import_pending_now_known(
-		&self,
-	) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
-		self.0
-			.as_ref()
-			.map(|metrics| metrics.time_import_pending_now_known.start_timer())
-	}
-
-	fn time_awaiting_approval_voting(
-		&self,
-	) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
-		self.0
-			.as_ref()
-			.map(|metrics| metrics.time_awaiting_approval_voting.start_timer())
-	}
-}
-
-impl metrics::Metrics for Metrics {
-	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
-		let metrics = MetricsInner {
-			assignments_imported_total: prometheus::register(
-				prometheus::Counter::new(
-					"polkadot_parachain_assignments_imported_total",
-					"Number of valid assignments imported locally or from other peers.",
-				)?,
-				registry,
-			)?,
-			approvals_imported_total: prometheus::register(
-				prometheus::Counter::new(
-					"polkadot_parachain_approvals_imported_total",
-					"Number of valid approvals imported locally or from other peers.",
-				)?,
-				registry,
-			)?,
-			unified_with_peer_total: prometheus::register(
-				prometheus::Counter::new(
-					"polkadot_parachain_unified_with_peer_total",
-					"Number of times `unify_with_peer` is called.",
-				)?,
-				registry,
-			)?,
-			time_unify_with_peer: prometheus::register(
-				prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
-					"polkadot_parachain_time_unify_with_peer",
-					"Time spent within fn `unify_with_peer`.",
-				))?,
-				registry,
-			)?,
-			time_import_pending_now_known: prometheus::register(
-				prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
-					"polkadot_parachain_time_import_pending_now_known",
-					"Time spent on importing pending assignments and approvals.",
-				))?,
-				registry,
-			)?,
-			time_awaiting_approval_voting: prometheus::register(
-				prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
-					"polkadot_parachain_time_awaiting_approval_voting",
-					"Time spent awaiting a reply from the Approval Voting Subsystem.",
-				))?,
-				registry,
-			)?,
-		};
-		Ok(Metrics(Some(metrics)))
-	}
-}
diff --git a/polkadot/node/network/approval-distribution/src/metrics.rs b/polkadot/node/network/approval-distribution/src/metrics.rs
new file mode 100644
index 00000000000..b96916a7f0e
--- /dev/null
+++ b/polkadot/node/network/approval-distribution/src/metrics.rs
@@ -0,0 +1,122 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use polkadot_node_subsystem_util::metrics::{prometheus, Metrics as MetricsTrait};
+
+/// Approval Distribution metrics.
+#[derive(Default, Clone)]
+pub struct Metrics(Option<MetricsInner>);
+
+#[derive(Clone)]
+struct MetricsInner {
+	assignments_imported_total: prometheus::Counter<prometheus::U64>,
+	approvals_imported_total: prometheus::Counter<prometheus::U64>,
+	unified_with_peer_total: prometheus::Counter<prometheus::U64>,
+
+	time_unify_with_peer: prometheus::Histogram,
+	time_import_pending_now_known: prometheus::Histogram,
+	time_awaiting_approval_voting: prometheus::Histogram,
+}
+
+impl Metrics {
+	pub(crate) fn on_assignment_imported(&self) {
+		if let Some(metrics) = &self.0 {
+			metrics.assignments_imported_total.inc();
+		}
+	}
+
+	pub(crate) fn on_approval_imported(&self) {
+		if let Some(metrics) = &self.0 {
+			metrics.approvals_imported_total.inc();
+		}
+	}
+
+	pub(crate) fn on_unify_with_peer(&self) {
+		if let Some(metrics) = &self.0 {
+			metrics.unified_with_peer_total.inc();
+		}
+	}
+
+	pub(crate) fn time_unify_with_peer(&self) -> Option<prometheus::prometheus::HistogramTimer> {
+		self.0.as_ref().map(|metrics| metrics.time_unify_with_peer.start_timer())
+	}
+
+	pub(crate) fn time_import_pending_now_known(
+		&self,
+	) -> Option<prometheus::prometheus::HistogramTimer> {
+		self.0
+			.as_ref()
+			.map(|metrics| metrics.time_import_pending_now_known.start_timer())
+	}
+
+	pub(crate) fn time_awaiting_approval_voting(
+		&self,
+	) -> Option<prometheus::prometheus::HistogramTimer> {
+		self.0
+			.as_ref()
+			.map(|metrics| metrics.time_awaiting_approval_voting.start_timer())
+	}
+}
+
+impl MetricsTrait for Metrics {
+	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
+		let metrics = MetricsInner {
+			assignments_imported_total: prometheus::register(
+				prometheus::Counter::new(
+					"polkadot_parachain_assignments_imported_total",
+					"Number of valid assignments imported locally or from other peers.",
+				)?,
+				registry,
+			)?,
+			approvals_imported_total: prometheus::register(
+				prometheus::Counter::new(
+					"polkadot_parachain_approvals_imported_total",
+					"Number of valid approvals imported locally or from other peers.",
+				)?,
+				registry,
+			)?,
+			unified_with_peer_total: prometheus::register(
+				prometheus::Counter::new(
+					"polkadot_parachain_unified_with_peer_total",
+					"Number of times `unify_with_peer` is called.",
+				)?,
+				registry,
+			)?,
+			time_unify_with_peer: prometheus::register(
+				prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
+					"polkadot_parachain_time_unify_with_peer",
+					"Time spent within fn `unify_with_peer`.",
+				))?,
+				registry,
+			)?,
+			time_import_pending_now_known: prometheus::register(
+				prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
+					"polkadot_parachain_time_import_pending_now_known",
+					"Time spent on importing pending assignments and approvals.",
+				))?,
+				registry,
+			)?,
+			time_awaiting_approval_voting: prometheus::register(
+				prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
+					"polkadot_parachain_time_awaiting_approval_voting",
+					"Time spent awaiting a reply from the Approval Voting Subsystem.",
+				))?,
+				registry,
+			)?,
+		};
+		Ok(Metrics(Some(metrics)))
+	}
+}
-- 
GitLab