From ad22fa6e785bccfb5e1ceb113c870e02f132ce46 Mon Sep 17 00:00:00 2001
From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com>
Date: Tue, 28 May 2024 18:53:53 +0300
Subject: [PATCH] Add metric to measure the time it takes to gather enough
 assignments (#4587)

To understand with high granularity how many assignment tranches are
triggered before we concur that we have enough assignments.

This metric is important because the triggering of an assignment creates
a lot of work in the system for approving the candidate and gossiping
the necessary messages.

---------

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
Co-authored-by: ordian <write@reusable.software>
---
 .../node/core/approval-voting/src/import.rs   |   6 +-
 polkadot/node/core/approval-voting/src/lib.rs | 170 ++++++++++++-
 .../node/core/approval-voting/src/tests.rs    | 236 +++++++++++++++++-
 3 files changed, 406 insertions(+), 6 deletions(-)

diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs
index f4be42a4845..13b0b1bae1b 100644
--- a/polkadot/node/core/approval-voting/src/import.rs
+++ b/polkadot/node/core/approval-voting/src/import.rs
@@ -607,7 +607,7 @@ pub(crate) mod tests {
 	use super::*;
 	use crate::{
 		approval_db::common::{load_block_entry, DbBackend},
-		RuntimeInfo, RuntimeInfoConfig,
+		RuntimeInfo, RuntimeInfoConfig, MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
 	};
 	use ::test_helpers::{dummy_candidate_receipt, dummy_hash};
 	use assert_matches::assert_matches;
@@ -622,6 +622,7 @@ pub(crate) mod tests {
 		node_features::FeatureIndex, ExecutorParams, Id as ParaId, IndexedVec, NodeFeatures,
 		SessionInfo, ValidatorId, ValidatorIndex,
 	};
+	use schnellru::{ByLength, LruMap};
 	pub(crate) use sp_consensus_babe::{
 		digests::{CompatibleDigestItem, PreDigest, SecondaryVRFPreDigest},
 		AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch,
@@ -658,6 +659,9 @@ pub(crate) mod tests {
 			clock: Box::new(MockClock::default()),
 			assignment_criteria: Box::new(MockAssignmentCriteria::default()),
 			spans: HashMap::new(),
+			per_block_assignments_gathering_times: LruMap::new(ByLength::new(
+				MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
+			)),
 		}
 	}
 
diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs
index c667aee7361..eece6b15805 100644
--- a/polkadot/node/core/approval-voting/src/lib.rs
+++ b/polkadot/node/core/approval-voting/src/lib.rs
@@ -63,6 +63,12 @@ use sc_keystore::LocalKeystore;
 use sp_application_crypto::Pair;
 use sp_consensus::SyncOracle;
 use sp_consensus_slots::Slot;
+use std::time::Instant;
+
+// The max number of blocks we keep track of assignments gathering times. Normally,
+// this would never be reached because we prune the data on finalization, but we need
+// to also ensure the data is not growing unecessarily large.
+const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100;
 
 use futures::{
 	channel::oneshot,
@@ -182,6 +188,14 @@ struct MetricsInner {
 	time_recover_and_approve: prometheus::Histogram,
 	candidate_signatures_requests_total: prometheus::Counter<prometheus::U64>,
 	unapproved_candidates_in_unfinalized_chain: prometheus::Gauge<prometheus::U64>,
+	// The time it takes in each stage to gather enough assignments.
+	// We defined a `stage` as being the entire process of gathering enough assignments to
+	// be able to approve a candidate:
+	// E.g:
+	// - Stage 0: We wait for the needed_approvals assignments to be gathered.
+	// - Stage 1: We wait for enough tranches to cover all no-shows in stage 0.
+	// - Stage 2: We wait for enough tranches to cover all no-shows  of stage 1.
+	assignments_gathering_time_by_stage: prometheus::HistogramVec,
 }
 
 /// Approval Voting metrics.
@@ -302,6 +316,20 @@ impl Metrics {
 			metrics.unapproved_candidates_in_unfinalized_chain.set(count as u64);
 		}
 	}
+
+	pub fn observe_assignment_gathering_time(&self, stage: usize, elapsed_as_millis: usize) {
+		if let Some(metrics) = &self.0 {
+			let stage_string = stage.to_string();
+			// We don't want to have too many metrics entries with this label to not put unncessary
+			// pressure on the metrics infrastructure, so we cap the stage at 10, which is
+			// equivalent to having already a finalization lag to 10 * no_show_slots, so it should
+			// be more than enough.
+			metrics
+				.assignments_gathering_time_by_stage
+				.with_label_values(&[if stage < 10 { stage_string.as_str() } else { "inf" }])
+				.observe(elapsed_as_millis as f64);
+		}
+	}
 }
 
 impl metrics::Metrics for Metrics {
@@ -431,6 +459,17 @@ impl metrics::Metrics for Metrics {
 				)?,
 				registry,
 			)?,
+			assignments_gathering_time_by_stage: prometheus::register(
+				prometheus::HistogramVec::new(
+					prometheus::HistogramOpts::new(
+						"polkadot_parachain_assignments_gather_time_by_stage_ms",
+						"The time in ms it takes for each stage to gather enough assignments needed for approval",
+					)
+					.buckets(vec![0.0, 250.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0]),
+					&["stage"],
+				)?,
+				registry,
+			)?,
 		};
 
 		Ok(Metrics(Some(metrics)))
@@ -788,6 +827,28 @@ struct State {
 	clock: Box<dyn Clock + Send + Sync>,
 	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
 	spans: HashMap<Hash, jaeger::PerLeafSpan>,
+	// Per block, candidate records about how long we take until we gather enough
+	// assignments, this is relevant because it gives us a good idea about how many
+	// tranches we trigger and why.
+	per_block_assignments_gathering_times:
+		LruMap<BlockNumber, HashMap<(Hash, CandidateHash), AssignmentGatheringRecord>>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+struct AssignmentGatheringRecord {
+	// The stage we are in.
+	// Candidate assignment gathering goes in stages, first we wait for needed_approvals(stage 0)
+	// Then if we have no-shows, we move into stage 1 and wait for enough tranches to cover all
+	// no-shows.
+	stage: usize,
+	// The time we started the stage.
+	stage_start: Option<Instant>,
+}
+
+impl Default for AssignmentGatheringRecord {
+	fn default() -> Self {
+		AssignmentGatheringRecord { stage: 0, stage_start: Some(Instant::now()) }
+	}
 }
 
 #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
@@ -893,6 +954,96 @@ impl State {
 			},
 		}
 	}
+
+	fn mark_begining_of_gathering_assignments(
+		&mut self,
+		block_number: BlockNumber,
+		block_hash: Hash,
+		candidate: CandidateHash,
+	) {
+		if let Some(record) = self
+			.per_block_assignments_gathering_times
+			.get_or_insert(block_number, HashMap::new)
+			.and_then(|records| Some(records.entry((block_hash, candidate)).or_default()))
+		{
+			if record.stage_start.is_none() {
+				record.stage += 1;
+				gum::debug!(
+					target: LOG_TARGET,
+					stage = ?record.stage,
+					?block_hash,
+					?candidate,
+					"Started a new assignment gathering stage",
+				);
+				record.stage_start = Some(Instant::now());
+			}
+		}
+	}
+
+	fn mark_gathered_enough_assignments(
+		&mut self,
+		block_number: BlockNumber,
+		block_hash: Hash,
+		candidate: CandidateHash,
+	) -> AssignmentGatheringRecord {
+		let record = self
+			.per_block_assignments_gathering_times
+			.get(&block_number)
+			.and_then(|entry| entry.get_mut(&(block_hash, candidate)));
+		let stage = record.as_ref().map(|record| record.stage).unwrap_or_default();
+		AssignmentGatheringRecord {
+			stage,
+			stage_start: record.and_then(|record| record.stage_start.take()),
+		}
+	}
+
+	fn cleanup_assignments_gathering_timestamp(&mut self, remove_lower_than: BlockNumber) {
+		while let Some((block_number, _)) = self.per_block_assignments_gathering_times.peek_oldest()
+		{
+			if *block_number < remove_lower_than {
+				self.per_block_assignments_gathering_times.pop_oldest();
+			} else {
+				break
+			}
+		}
+	}
+
+	fn observe_assignment_gathering_status(
+		&mut self,
+		metrics: &Metrics,
+		required_tranches: &RequiredTranches,
+		block_hash: Hash,
+		block_number: BlockNumber,
+		candidate_hash: CandidateHash,
+	) {
+		match required_tranches {
+			RequiredTranches::All | RequiredTranches::Pending { .. } => {
+				self.mark_begining_of_gathering_assignments(
+					block_number,
+					block_hash,
+					candidate_hash,
+				);
+			},
+			RequiredTranches::Exact { .. } => {
+				let time_to_gather =
+					self.mark_gathered_enough_assignments(block_number, block_hash, candidate_hash);
+				if let Some(gathering_started) = time_to_gather.stage_start {
+					if gathering_started.elapsed().as_millis() > 6000 {
+						gum::trace!(
+							target: LOG_TARGET,
+							?block_hash,
+							?candidate_hash,
+							"Long assignment gathering time",
+						);
+					}
+					metrics.observe_assignment_gathering_time(
+						time_to_gather.stage,
+						gathering_started.elapsed().as_millis() as usize,
+					)
+				}
+			},
+		}
+	}
 }
 
 #[derive(Debug, Clone)]
@@ -942,6 +1093,9 @@ where
 		clock: subsystem.clock,
 		assignment_criteria,
 		spans: HashMap::new(),
+		per_block_assignments_gathering_times: LruMap::new(ByLength::new(
+			MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
+		)),
 	};
 
 	// `None` on start-up. Gets initialized/updated on leaf update
@@ -973,7 +1127,7 @@ where
 				subsystem.metrics.on_wakeup();
 				process_wakeup(
 					&mut ctx,
-					&state,
+					&mut state,
 					&mut overlayed_db,
 					&mut session_info_provider,
 					woken_block,
@@ -1632,6 +1786,7 @@ async fn handle_from_overseer<Context>(
 			// `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
 			// accordingly.
 			wakeups.prune_finalized_wakeups(block_number, &mut state.spans);
+			state.cleanup_assignments_gathering_timestamp(block_number);
 
 			// // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
 			// accordingly. let hash_set =
@@ -2478,7 +2633,7 @@ where
 
 async fn check_and_import_approval<T, Sender>(
 	sender: &mut Sender,
-	state: &State,
+	state: &mut State,
 	db: &mut OverlayedBackend<'_, impl Backend>,
 	session_info_provider: &mut RuntimeInfo,
 	metrics: &Metrics,
@@ -2710,7 +2865,7 @@ impl ApprovalStateTransition {
 // as necessary and schedules any further wakeups.
 async fn advance_approval_state<Sender>(
 	sender: &mut Sender,
-	state: &State,
+	state: &mut State,
 	db: &mut OverlayedBackend<'_, impl Backend>,
 	session_info_provider: &mut RuntimeInfo,
 	metrics: &Metrics,
@@ -2761,6 +2916,13 @@ where
 			approval_entry,
 			status.required_tranches.clone(),
 		);
+		state.observe_assignment_gathering_status(
+			&metrics,
+			&status.required_tranches,
+			block_hash,
+			block_entry.block_number(),
+			candidate_hash,
+		);
 
 		// Check whether this is approved, while allowing a maximum
 		// assignment tick of `now - APPROVAL_DELAY` - that is, that
@@ -2941,7 +3103,7 @@ fn should_trigger_assignment(
 #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
 async fn process_wakeup<Context>(
 	ctx: &mut Context,
-	state: &State,
+	state: &mut State,
 	db: &mut OverlayedBackend<'_, impl Backend>,
 	session_info_provider: &mut RuntimeInfo,
 	relay_block: Hash,
diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs
index c3709de59e8..43af8d476a6 100644
--- a/polkadot/node/core/approval-voting/src/tests.rs
+++ b/polkadot/node/core/approval-voting/src/tests.rs
@@ -17,6 +17,10 @@
 use self::test_helpers::mock::new_leaf;
 use super::*;
 use crate::backend::V1ReadBackend;
+use overseer::prometheus::{
+	prometheus::{IntCounter, IntCounterVec},
+	Histogram, HistogramOpts, HistogramVec, Opts,
+};
 use polkadot_node_primitives::{
 	approval::{
 		v1::{
@@ -40,7 +44,7 @@ use polkadot_primitives::{
 	ApprovalVote, CandidateCommitments, CandidateEvent, CoreIndex, GroupIndex, Header,
 	Id as ParaId, IndexedVec, NodeFeatures, ValidationCode, ValidatorSignature,
 };
-use std::time::Duration;
+use std::{cmp::max, time::Duration};
 
 use assert_matches::assert_matches;
 use async_trait::async_trait;
@@ -5049,3 +5053,233 @@ fn subsystem_sends_pending_approvals_on_approval_restart() {
 		virtual_overseer
 	});
 }
+
+// Test we correctly update the timer when we mark the beginning of gathering assignments.
+#[test]
+fn test_gathering_assignments_statements() {
+	let mut state = State {
+		keystore: Arc::new(LocalKeystore::in_memory()),
+		slot_duration_millis: 6_000,
+		clock: Box::new(MockClock::default()),
+		assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|_| Ok(0))),
+		spans: HashMap::new(),
+		per_block_assignments_gathering_times: LruMap::new(ByLength::new(
+			MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
+		)),
+	};
+
+	for i in 0..200i32 {
+		state.mark_begining_of_gathering_assignments(
+			i as u32,
+			Hash::repeat_byte(i as u8),
+			CandidateHash(Hash::repeat_byte(i as u8)),
+		);
+		assert!(
+			state.per_block_assignments_gathering_times.len() <=
+				MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as usize
+		);
+
+		assert_eq!(
+			state
+				.per_block_assignments_gathering_times
+				.iter()
+				.map(|(block_number, _)| block_number)
+				.min(),
+			Some(max(0, i - MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as i32 + 1) as u32).as_ref()
+		)
+	}
+	assert_eq!(
+		state.per_block_assignments_gathering_times.len(),
+		MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as usize
+	);
+
+	let nothing_changes = state
+		.per_block_assignments_gathering_times
+		.iter()
+		.map(|(block_number, _)| *block_number)
+		.sorted()
+		.collect::<Vec<_>>();
+
+	for i in 150..200i32 {
+		state.mark_begining_of_gathering_assignments(
+			i as u32,
+			Hash::repeat_byte(i as u8),
+			CandidateHash(Hash::repeat_byte(i as u8)),
+		);
+		assert_eq!(
+			nothing_changes,
+			state
+				.per_block_assignments_gathering_times
+				.iter()
+				.map(|(block_number, _)| *block_number)
+				.sorted()
+				.collect::<Vec<_>>()
+		);
+	}
+
+	for i in 110..120 {
+		let block_hash = Hash::repeat_byte(i as u8);
+		let candidate_hash = CandidateHash(Hash::repeat_byte(i as u8));
+
+		state.mark_gathered_enough_assignments(i as u32, block_hash, candidate_hash);
+
+		assert!(state
+			.per_block_assignments_gathering_times
+			.get(&i)
+			.unwrap()
+			.get(&(block_hash, candidate_hash))
+			.unwrap()
+			.stage_start
+			.is_none());
+		state.mark_begining_of_gathering_assignments(i as u32, block_hash, candidate_hash);
+		let record = state
+			.per_block_assignments_gathering_times
+			.get(&i)
+			.unwrap()
+			.get(&(block_hash, candidate_hash))
+			.unwrap();
+
+		assert!(record.stage_start.is_some());
+		assert_eq!(record.stage, 1);
+	}
+
+	state.cleanup_assignments_gathering_timestamp(200);
+	assert_eq!(state.per_block_assignments_gathering_times.len(), 0);
+}
+
+// Test we note the time we took to transition RequiredTranche  from Pending to Exact and
+// that we increase the stage when we transition from Exact to Pending.
+#[test]
+fn test_observe_assignment_gathering_status() {
+	let mut state = State {
+		keystore: Arc::new(LocalKeystore::in_memory()),
+		slot_duration_millis: 6_000,
+		clock: Box::new(MockClock::default()),
+		assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|_| Ok(0))),
+		spans: HashMap::new(),
+		per_block_assignments_gathering_times: LruMap::new(ByLength::new(
+			MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
+		)),
+	};
+
+	let metrics_inner = MetricsInner {
+		imported_candidates_total: IntCounter::new("dummy", "dummy").unwrap(),
+		assignments_produced: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")).unwrap(),
+		approvals_produced_total: IntCounterVec::new(Opts::new("dummy", "dummy"), &["dummy"])
+			.unwrap(),
+		no_shows_total: IntCounter::new("dummy", "dummy").unwrap(),
+		observed_no_shows: IntCounter::new("dummy", "dummy").unwrap(),
+		approved_by_one_third: IntCounter::new("dummy", "dummy").unwrap(),
+		wakeups_triggered_total: IntCounter::new("dummy", "dummy").unwrap(),
+		coalesced_approvals_buckets: Histogram::with_opts(HistogramOpts::new("dummy", "dummy"))
+			.unwrap(),
+		coalesced_approvals_delay: Histogram::with_opts(HistogramOpts::new("dummy", "dummy"))
+			.unwrap(),
+		candidate_approval_time_ticks: Histogram::with_opts(HistogramOpts::new("dummy", "dummy"))
+			.unwrap(),
+		block_approval_time_ticks: Histogram::with_opts(HistogramOpts::new("dummy", "dummy"))
+			.unwrap(),
+		time_db_transaction: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")).unwrap(),
+		time_recover_and_approve: Histogram::with_opts(HistogramOpts::new("dummy", "dummy"))
+			.unwrap(),
+		candidate_signatures_requests_total: IntCounter::new("dummy", "dummy").unwrap(),
+		unapproved_candidates_in_unfinalized_chain: prometheus::Gauge::<prometheus::U64>::new(
+			"dummy", "dummy",
+		)
+		.unwrap(),
+		assignments_gathering_time_by_stage: HistogramVec::new(
+			HistogramOpts::new("test", "test"),
+			&["stage"],
+		)
+		.unwrap(),
+	};
+
+	let metrics = Metrics(Some(metrics_inner));
+	let block_hash = Hash::repeat_byte(1);
+	let candidate_hash = CandidateHash(Hash::repeat_byte(1));
+	let block_number = 1;
+
+	// Transition from Pending to Exact and check stage 0 time is recorded.
+	state.observe_assignment_gathering_status(
+		&metrics,
+		&RequiredTranches::Pending {
+			considered: 0,
+			next_no_show: None,
+			maximum_broadcast: 0,
+			clock_drift: 0,
+		},
+		block_hash,
+		block_number,
+		candidate_hash,
+	);
+
+	state.observe_assignment_gathering_status(
+		&metrics,
+		&RequiredTranches::Exact {
+			needed: 2,
+			tolerated_missing: 2,
+			next_no_show: None,
+			last_assignment_tick: None,
+		},
+		block_hash,
+		block_number,
+		candidate_hash,
+	);
+
+	let value = metrics
+		.0
+		.as_ref()
+		.unwrap()
+		.assignments_gathering_time_by_stage
+		.get_metric_with_label_values(&["0"])
+		.unwrap();
+
+	assert_eq!(value.get_sample_count(), 1);
+
+	// Transition from Exact to Pending to Exact and check stage 1 time is recorded.
+	state.observe_assignment_gathering_status(
+		&metrics,
+		&RequiredTranches::Pending {
+			considered: 0,
+			next_no_show: None,
+			maximum_broadcast: 0,
+			clock_drift: 0,
+		},
+		block_hash,
+		block_number,
+		candidate_hash,
+	);
+
+	state.observe_assignment_gathering_status(
+		&metrics,
+		&RequiredTranches::Exact {
+			needed: 2,
+			tolerated_missing: 2,
+			next_no_show: None,
+			last_assignment_tick: None,
+		},
+		block_hash,
+		block_number,
+		candidate_hash,
+	);
+
+	let value = metrics
+		.0
+		.as_ref()
+		.unwrap()
+		.assignments_gathering_time_by_stage
+		.get_metric_with_label_values(&["0"])
+		.unwrap();
+
+	assert_eq!(value.get_sample_count(), 1);
+
+	let value = metrics
+		.0
+		.as_ref()
+		.unwrap()
+		.assignments_gathering_time_by_stage
+		.get_metric_with_label_values(&["1"])
+		.unwrap();
+
+	assert_eq!(value.get_sample_count(), 1);
+}
-- 
GitLab