From 2388141b253c6117a28ada506b57b792de2ad163 Mon Sep 17 00:00:00 2001
From: Robert Habermeier <rphmeier@gmail.com>
Date: Thu, 25 Mar 2021 21:21:55 +0100
Subject: [PATCH] overseer: AllSubsystems magic and subsystem channel sizes
 metrics (#2711)

* overseer: AllSubsystems magic and report subsystem channel sizes to prometheus

* fix tests
---
 polkadot/node/overseer/src/lib.rs | 430 +++++++++++++++++++-----------
 1 file changed, 272 insertions(+), 158 deletions(-)

diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs
index 8fef61948c3..c2fe73514cc 100644
--- a/polkadot/node/overseer/src/lib.rs
+++ b/polkadot/node/overseer/src/lib.rs
@@ -511,62 +511,28 @@ impl<M> OverseenSubsystem<M> {
 
 /// The `Overseer` itself.
 pub struct Overseer<S> {
-	/// A candidate validation subsystem.
-	candidate_validation_subsystem: OverseenSubsystem<CandidateValidationMessage>,
-
-	/// A candidate backing subsystem.
-	candidate_backing_subsystem: OverseenSubsystem<CandidateBackingMessage>,
-
-	/// A candidate selection subsystem.
-	candidate_selection_subsystem: OverseenSubsystem<CandidateSelectionMessage>,
-
-	/// A statement distribution subsystem.
-	statement_distribution_subsystem: OverseenSubsystem<StatementDistributionMessage>,
-
-	/// An availability distribution subsystem.
-	availability_distribution_subsystem: OverseenSubsystem<AvailabilityDistributionMessage>,
-
-	/// An availability recovery subsystem.
-	availability_recovery_subsystem: OverseenSubsystem<AvailabilityRecoveryMessage>,
-
-	/// A bitfield signing subsystem.
-	bitfield_signing_subsystem: OverseenSubsystem<BitfieldSigningMessage>,
-
-	/// A bitfield distribution subsystem.
-	bitfield_distribution_subsystem: OverseenSubsystem<BitfieldDistributionMessage>,
-
-	/// A provisioner subsystem.
-	provisioner_subsystem: OverseenSubsystem<ProvisionerMessage>,
-
-	/// A PoV distribution subsystem.
-	pov_distribution_subsystem: OverseenSubsystem<PoVDistributionMessage>,
-
-	/// A runtime API subsystem.
-	runtime_api_subsystem: OverseenSubsystem<RuntimeApiMessage>,
-
-	/// An availability store subsystem.
-	availability_store_subsystem: OverseenSubsystem<AvailabilityStoreMessage>,
-
-	/// A network bridge subsystem.
-	network_bridge_subsystem: OverseenSubsystem<NetworkBridgeMessage>,
-
-	/// A Chain API subsystem.
-	chain_api_subsystem: OverseenSubsystem<ChainApiMessage>,
-
-	/// A Collation Generation subsystem.
-	collation_generation_subsystem: OverseenSubsystem<CollationGenerationMessage>,
-
-	/// A Collator Protocol subsystem.
-	collator_protocol_subsystem: OverseenSubsystem<CollatorProtocolMessage>,
-
-	/// An Approval Distribution subsystem.
-	approval_distribution_subsystem: OverseenSubsystem<ApprovalDistributionMessage>,
-
-	/// An Approval Voting subsystem.
-	approval_voting_subsystem: OverseenSubsystem<ApprovalVotingMessage>,
-
-	/// A Gossip Support subsystem.
-	gossip_support_subsystem: OverseenSubsystem<GossipSupportMessage>,
+	/// Handles to all subsystems.
+	subsystems: AllSubsystems<
+		OverseenSubsystem<CandidateValidationMessage>,
+		OverseenSubsystem<CandidateBackingMessage>,
+		OverseenSubsystem<CandidateSelectionMessage>,
+		OverseenSubsystem<StatementDistributionMessage>,
+		OverseenSubsystem<AvailabilityDistributionMessage>,
+		OverseenSubsystem<AvailabilityRecoveryMessage>,
+		OverseenSubsystem<BitfieldSigningMessage>,
+		OverseenSubsystem<BitfieldDistributionMessage>,
+		OverseenSubsystem<ProvisionerMessage>,
+		OverseenSubsystem<PoVDistributionMessage>,
+		OverseenSubsystem<RuntimeApiMessage>,
+		OverseenSubsystem<AvailabilityStoreMessage>,
+		OverseenSubsystem<NetworkBridgeMessage>,
+		OverseenSubsystem<ChainApiMessage>,
+		OverseenSubsystem<CollationGenerationMessage>,
+		OverseenSubsystem<CollatorProtocolMessage>,
+		OverseenSubsystem<ApprovalDistributionMessage>,
+		OverseenSubsystem<ApprovalVotingMessage>,
+		OverseenSubsystem<GossipSupportMessage>,
+	>,
 
 	/// Spawner to spawn tasks to.
 	s: S,
@@ -598,6 +564,20 @@ pub struct Overseer<S> {
 	metrics: Metrics,
 }
 
+trait MapSubsystem<T> {
+	type Output;
+
+	fn map_subsystem(&self, sub: T) -> Self::Output;
+}
+
+impl<F, T, U> MapSubsystem<T> for F where F: Fn(T) -> U {
+	type Output = U;
+
+	fn map_subsystem(&self, sub: T) -> U {
+		(self)(sub)
+	}
+}
+
 /// This struct is passed as an argument to create a new instance of an [`Overseer`].
 ///
 /// As any entity that satisfies the interface may act as a [`Subsystem`] this allows
@@ -1241,8 +1221,105 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP, ApD, ApV,
 			gossip_support,
 		}
 	}
+
+	fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ CS, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ PoVD, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS> {
+		AllSubsystems {
+			candidate_validation: &self.candidate_validation,
+			candidate_backing: &self.candidate_backing,
+			candidate_selection: &self.candidate_selection,
+			statement_distribution: &self.statement_distribution,
+			availability_distribution: &self.availability_distribution,
+			availability_recovery: &self.availability_recovery,
+			bitfield_signing: &self.bitfield_signing,
+			bitfield_distribution: &self.bitfield_distribution,
+			provisioner: &self.provisioner,
+			pov_distribution: &self.pov_distribution,
+			runtime_api: &self.runtime_api,
+			availability_store: &self.availability_store,
+			network_bridge: &self.network_bridge,
+			chain_api: &self.chain_api,
+			collation_generation: &self.collation_generation,
+			collator_protocol: &self.collator_protocol,
+			approval_distribution: &self.approval_distribution,
+			approval_voting: &self.approval_voting,
+			gossip_support: &self.gossip_support,
+		}
+	}
+
+	fn map_subsystems<M>(self, m: M)
+		-> AllSubsystems<
+			<M as MapSubsystem<CV>>::Output,
+			<M as MapSubsystem<CB>>::Output,
+			<M as MapSubsystem<CS>>::Output,
+			<M as MapSubsystem<SD>>::Output,
+			<M as MapSubsystem<AD>>::Output,
+			<M as MapSubsystem<AR>>::Output,
+			<M as MapSubsystem<BS>>::Output,
+			<M as MapSubsystem<BD>>::Output,
+			<M as MapSubsystem<P>>::Output,
+			<M as MapSubsystem<PoVD>>::Output,
+			<M as MapSubsystem<RA>>::Output,
+			<M as MapSubsystem<AS>>::Output,
+			<M as MapSubsystem<NB>>::Output,
+			<M as MapSubsystem<CA>>::Output,
+			<M as MapSubsystem<CG>>::Output,
+			<M as MapSubsystem<CP>>::Output,
+			<M as MapSubsystem<ApD>>::Output,
+			<M as MapSubsystem<ApV>>::Output,
+			<M as MapSubsystem<GS>>::Output,
+		>
+	where
+		M: MapSubsystem<CV>,
+		M: MapSubsystem<CB>,
+		M: MapSubsystem<CS>,
+		M: MapSubsystem<SD>,
+		M: MapSubsystem<AD>,
+		M: MapSubsystem<AR>,
+		M: MapSubsystem<BS>,
+		M: MapSubsystem<BD>,
+		M: MapSubsystem<P>,
+		M: MapSubsystem<PoVD>,
+		M: MapSubsystem<RA>,
+		M: MapSubsystem<AS>,
+		M: MapSubsystem<NB>,
+		M: MapSubsystem<CA>,
+		M: MapSubsystem<CG>,
+		M: MapSubsystem<CP>,
+		M: MapSubsystem<ApD>,
+		M: MapSubsystem<ApV>,
+		M: MapSubsystem<GS>,
+	{
+		AllSubsystems {
+			candidate_validation: m.map_subsystem(self.candidate_validation),
+			candidate_backing: m.map_subsystem(self.candidate_backing),
+			candidate_selection: m.map_subsystem(self.candidate_selection),
+			statement_distribution: m.map_subsystem(self.statement_distribution),
+			availability_distribution: m.map_subsystem(self.availability_distribution),
+			availability_recovery: m.map_subsystem(self.availability_recovery),
+			bitfield_signing: m.map_subsystem(self.bitfield_signing),
+			bitfield_distribution: m.map_subsystem(self.bitfield_distribution),
+			provisioner: m.map_subsystem(self.provisioner),
+			pov_distribution: m.map_subsystem(self.pov_distribution),
+			runtime_api: m.map_subsystem(self.runtime_api),
+			availability_store: m.map_subsystem(self.availability_store),
+			network_bridge: m.map_subsystem(self.network_bridge),
+			chain_api: m.map_subsystem(self.chain_api),
+			collation_generation: m.map_subsystem(self.collation_generation),
+			collator_protocol: m.map_subsystem(self.collator_protocol),
+			approval_distribution: m.map_subsystem(self.approval_distribution),
+			approval_voting: m.map_subsystem(self.approval_voting),
+			gossip_support: m.map_subsystem(self.gossip_support),
+		}
+	}
 }
 
+type AllSubsystemsSame<T> = AllSubsystems<
+	T, T, T, T, T,
+	T, T, T, T, T,
+	T, T, T, T, T,
+	T, T, T, T,
+>;
+
 /// Overseer Prometheus metrics.
 #[derive(Clone)]
 struct MetricsInner {
@@ -1251,7 +1328,7 @@ struct MetricsInner {
 	messages_relayed_total: prometheus::Counter<prometheus::U64>,
 	message_relay_timings: prometheus::Histogram,
 	to_overseer_channel_queue_size: prometheus::Gauge<prometheus::U64>,
-	from_overseer_channel_queue_size: prometheus::Gauge<prometheus::U64>,
+	from_overseer_channel_queue_size: prometheus::GaugeVec<prometheus::U64>,
 }
 
 #[derive(Default, Clone)]
@@ -1281,9 +1358,17 @@ impl Metrics {
 		self.0.as_ref().map(|metrics| metrics.message_relay_timings.start_timer())
 	}
 
-	fn channel_fill_level_snapshot(&self, from_overseer: usize, to_overseer: usize) {
+	fn channel_fill_level_snapshot(
+		&self,
+		from_overseer: AllSubsystemsSame<(&'static str, usize)>,
+		to_overseer: usize,
+	) {
+		self.0.as_ref().map(|metrics| {
+			from_overseer.map_subsystems(|(name, queue_size): (_, usize)| {
+				metrics.from_overseer_channel_queue_size.with_label_values(&[name]).set(queue_size as u64);
+			})
+		});
 		self.0.as_ref().map(|metrics| metrics.to_overseer_channel_queue_size.set(to_overseer as u64));
-		self.0.as_ref().map(|metrics| metrics.from_overseer_channel_queue_size.set(from_overseer as u64));
 	}
 }
 
@@ -1334,11 +1419,14 @@ impl metrics::Metrics for Metrics {
 				registry,
 			)?,
 			from_overseer_channel_queue_size: prometheus::register(
-				prometheus::Gauge::<prometheus::U64>::with_opts(
+				prometheus::GaugeVec::<prometheus::U64>::new(
 					prometheus::Opts::new(
 						"parachain_from_overseer_channel_queue_size",
-						"Number of elements sitting in the channel waiting to be processed.",
+						"Number of elements sitting in the channel from the overseer waiting to be processed.",
 					),
+					&[
+						"subsystem_name",
+					],
 				)?,
 				registry,
 			)?,
@@ -1346,7 +1434,7 @@ impl metrics::Metrics for Metrics {
 				prometheus::Gauge::<prometheus::U64>::with_opts(
 					prometheus::Opts::new(
 						"parachain_to_overseer_channel_queue_size",
-						"Number of elements sitting in the channel waiting to be processed.",
+						"Number of elements sitting in the channel to the overseer waiting to be processed.",
 					),
 				)?,
 				registry,
@@ -1486,21 +1574,6 @@ where
 
 		let (to_overseer_tx, to_overseer_rx) = metered::unbounded("to_overseer");
 
-		{
-			let meter_from_overseer = events_rx.meter().clone();
-			let meter_to_overseer = to_overseer_rx.meter().clone();
-			let metronome_metrics = metrics.clone();
-			let metronome = Metronome::new(std::time::Duration::from_millis(950))
-			.for_each(move |_| {
-				metronome_metrics.channel_fill_level_snapshot(meter_from_overseer.queue_count(), meter_to_overseer.queue_count());
-
-				async move {
-					()
-				}
-			});
-			s.spawn("metrics_metronome", Box::pin(metronome));
-		}
-
 		let mut running_subsystems = FuturesUnordered::new();
 
 		let mut seed = 0x533d; // arbitrary
@@ -1704,26 +1777,68 @@ where
 		let active_leaves = HashMap::new();
 		let activation_external_listeners = HashMap::new();
 
+		let subsystems = AllSubsystems {
+			candidate_validation: candidate_validation_subsystem,
+			candidate_backing: candidate_backing_subsystem,
+			candidate_selection: candidate_selection_subsystem,
+			statement_distribution: statement_distribution_subsystem,
+			availability_distribution: availability_distribution_subsystem,
+			availability_recovery: availability_recovery_subsystem,
+			bitfield_signing: bitfield_signing_subsystem,
+			bitfield_distribution: bitfield_distribution_subsystem,
+			provisioner: provisioner_subsystem,
+			pov_distribution: pov_distribution_subsystem,
+			runtime_api: runtime_api_subsystem,
+			availability_store: availability_store_subsystem,
+			network_bridge: network_bridge_subsystem,
+			chain_api: chain_api_subsystem,
+			collation_generation: collation_generation_subsystem,
+			collator_protocol: collator_protocol_subsystem,
+			approval_distribution: approval_distribution_subsystem,
+			approval_voting: approval_voting_subsystem,
+			gossip_support: gossip_support_subsystem,
+		};
+
+		{
+			struct ExtractNameAndMeter;
+			impl<'a, T: 'a> MapSubsystem<&'a OverseenSubsystem<T>> for ExtractNameAndMeter {
+				type Output = (&'static str, metered::Meter);
+
+				fn map_subsystem(&self, subsystem: &'a OverseenSubsystem<T>) -> Self::Output {
+					let instance = subsystem.instance.as_ref()
+						.expect("Extraction is done directly after spawning when subsystems\
+						have not concluded; qed");
+
+					(instance.name, instance.tx.meter().clone())
+				}
+			}
+
+			let meter_external_to_overseer = events_rx.meter().clone();
+			let meter_subsystem_to_overseer = to_overseer_rx.meter().clone();
+			let subsystem_meters = subsystems.as_ref().map_subsystems(ExtractNameAndMeter);
+			let metronome_metrics = metrics.clone();
+			let metronome = Metronome::new(std::time::Duration::from_millis(950))
+				.for_each(move |_| {
+					let to_subsystem_counts = subsystem_meters.as_ref()
+						.map_subsystems(|&(name, ref meter): &(_, metered::Meter)| (name, meter.queue_count()));
+
+					// We combine the amount of messages from subsystems to the overseer
+					// as well as the amount of messages from external sources to the overseer
+					// into one to_overseer value.
+					metronome_metrics.channel_fill_level_snapshot(
+						to_subsystem_counts,
+						meter_subsystem_to_overseer.queue_count() + meter_external_to_overseer.queue_count(),
+					);
+
+					async move {
+						()
+					}
+				});
+			s.spawn("metrics_metronome", Box::pin(metronome));
+		}
+
 		let this = Self {
-			candidate_validation_subsystem,
-			candidate_backing_subsystem,
-			candidate_selection_subsystem,
-			statement_distribution_subsystem,
-			availability_distribution_subsystem,
-			availability_recovery_subsystem,
-			bitfield_signing_subsystem,
-			bitfield_distribution_subsystem,
-			provisioner_subsystem,
-			pov_distribution_subsystem,
-			runtime_api_subsystem,
-			availability_store_subsystem,
-			network_bridge_subsystem,
-			chain_api_subsystem,
-			collation_generation_subsystem,
-			collator_protocol_subsystem,
-			approval_distribution_subsystem,
-			approval_voting_subsystem,
-			gossip_support_subsystem,
+			subsystems,
 			s,
 			running_subsystems,
 			to_overseer_rx: to_overseer_rx.fuse(),
@@ -1740,25 +1855,25 @@ where
 
 	// Stop the overseer.
 	async fn stop(mut self) {
-		let _ = self.candidate_validation_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.candidate_backing_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.candidate_selection_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.statement_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.availability_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.availability_recovery_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.bitfield_signing_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.bitfield_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.provisioner_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.pov_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.runtime_api_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.availability_store_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.network_bridge_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.chain_api_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.collator_protocol_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.collation_generation_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.approval_distribution_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.approval_voting_subsystem.send_signal(OverseerSignal::Conclude).await;
-		let _ = self.gossip_support_subsystem.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.candidate_validation.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.candidate_backing.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.candidate_selection.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.statement_distribution.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.availability_distribution.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.availability_recovery.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.bitfield_signing.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.bitfield_distribution.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.provisioner.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.pov_distribution.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.runtime_api.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.availability_store.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.network_bridge.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.chain_api.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.collator_protocol.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.collation_generation.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.approval_distribution.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.approval_voting.send_signal(OverseerSignal::Conclude).await;
+		let _ = self.subsystems.gossip_support.send_signal(OverseerSignal::Conclude).await;
 
 		let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse();
 
@@ -1911,25 +2026,25 @@ where
 
 	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
 	async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
-		self.candidate_validation_subsystem.send_signal(signal.clone()).await?;
-		self.candidate_backing_subsystem.send_signal(signal.clone()).await?;
-		self.candidate_selection_subsystem.send_signal(signal.clone()).await?;
-		self.statement_distribution_subsystem.send_signal(signal.clone()).await?;
-		self.availability_distribution_subsystem.send_signal(signal.clone()).await?;
-		self.availability_recovery_subsystem.send_signal(signal.clone()).await?;
-		self.bitfield_signing_subsystem.send_signal(signal.clone()).await?;
-		self.bitfield_distribution_subsystem.send_signal(signal.clone()).await?;
-		self.provisioner_subsystem.send_signal(signal.clone()).await?;
-		self.pov_distribution_subsystem.send_signal(signal.clone()).await?;
-		self.runtime_api_subsystem.send_signal(signal.clone()).await?;
-		self.availability_store_subsystem.send_signal(signal.clone()).await?;
-		self.network_bridge_subsystem.send_signal(signal.clone()).await?;
-		self.chain_api_subsystem.send_signal(signal.clone()).await?;
-		self.collator_protocol_subsystem.send_signal(signal.clone()).await?;
-		self.collation_generation_subsystem.send_signal(signal.clone()).await?;
-		self.approval_distribution_subsystem.send_signal(signal.clone()).await?;
-		self.approval_voting_subsystem.send_signal(signal.clone()).await?;
-		self.gossip_support_subsystem.send_signal(signal).await?;
+		self.subsystems.candidate_validation.send_signal(signal.clone()).await?;
+		self.subsystems.candidate_backing.send_signal(signal.clone()).await?;
+		self.subsystems.candidate_selection.send_signal(signal.clone()).await?;
+		self.subsystems.statement_distribution.send_signal(signal.clone()).await?;
+		self.subsystems.availability_distribution.send_signal(signal.clone()).await?;
+		self.subsystems.availability_recovery.send_signal(signal.clone()).await?;
+		self.subsystems.bitfield_signing.send_signal(signal.clone()).await?;
+		self.subsystems.bitfield_distribution.send_signal(signal.clone()).await?;
+		self.subsystems.provisioner.send_signal(signal.clone()).await?;
+		self.subsystems.pov_distribution.send_signal(signal.clone()).await?;
+		self.subsystems.runtime_api.send_signal(signal.clone()).await?;
+		self.subsystems.availability_store.send_signal(signal.clone()).await?;
+		self.subsystems.network_bridge.send_signal(signal.clone()).await?;
+		self.subsystems.chain_api.send_signal(signal.clone()).await?;
+		self.subsystems.collator_protocol.send_signal(signal.clone()).await?;
+		self.subsystems.collation_generation.send_signal(signal.clone()).await?;
+		self.subsystems.approval_distribution.send_signal(signal.clone()).await?;
+		self.subsystems.approval_voting.send_signal(signal.clone()).await?;
+		self.subsystems.gossip_support.send_signal(signal).await?;
 
 		Ok(())
 	}
@@ -1940,61 +2055,61 @@ where
 		self.metrics.on_message_relayed();
 		match msg {
 			AllMessages::CandidateValidation(msg) => {
-				self.candidate_validation_subsystem.send_message(msg).await?;
+				self.subsystems.candidate_validation.send_message(msg).await?;
 			},
 			AllMessages::CandidateBacking(msg) => {
-				self.candidate_backing_subsystem.send_message(msg).await?;
+				self.subsystems.candidate_backing.send_message(msg).await?;
 			},
 			AllMessages::CandidateSelection(msg) => {
-				self.candidate_selection_subsystem.send_message(msg).await?;
+				self.subsystems.candidate_selection.send_message(msg).await?;
 			},
 			AllMessages::StatementDistribution(msg) => {
-				self.statement_distribution_subsystem.send_message(msg).await?;
+				self.subsystems.statement_distribution.send_message(msg).await?;
 			},
 			AllMessages::AvailabilityDistribution(msg) => {
-				self.availability_distribution_subsystem.send_message(msg).await?;
+				self.subsystems.availability_distribution.send_message(msg).await?;
 			},
 			AllMessages::AvailabilityRecovery(msg) => {
-				self.availability_recovery_subsystem.send_message(msg).await?;
+				self.subsystems.availability_recovery.send_message(msg).await?;
 			},
 			AllMessages::BitfieldDistribution(msg) => {
-				self.bitfield_distribution_subsystem.send_message(msg).await?;
+				self.subsystems.bitfield_distribution.send_message(msg).await?;
 			},
 			AllMessages::BitfieldSigning(msg) => {
-				self.bitfield_signing_subsystem.send_message(msg).await?;
+				self.subsystems.bitfield_signing.send_message(msg).await?;
 			},
 			AllMessages::Provisioner(msg) => {
-				self.provisioner_subsystem.send_message(msg).await?;
+				self.subsystems.provisioner.send_message(msg).await?;
 			},
 			AllMessages::PoVDistribution(msg) => {
-				self.pov_distribution_subsystem.send_message(msg).await?;
+				self.subsystems.pov_distribution.send_message(msg).await?;
 			},
 			AllMessages::RuntimeApi(msg) => {
-				self.runtime_api_subsystem.send_message(msg).await?;
+				self.subsystems.runtime_api.send_message(msg).await?;
 			},
 			AllMessages::AvailabilityStore(msg) => {
-				self.availability_store_subsystem.send_message(msg).await?;
+				self.subsystems.availability_store.send_message(msg).await?;
 			},
 			AllMessages::NetworkBridge(msg) => {
-				self.network_bridge_subsystem.send_message(msg).await?;
+				self.subsystems.network_bridge.send_message(msg).await?;
 			},
 			AllMessages::ChainApi(msg) => {
-				self.chain_api_subsystem.send_message(msg).await?;
+				self.subsystems.chain_api.send_message(msg).await?;
 			},
 			AllMessages::CollationGeneration(msg) => {
-				self.collation_generation_subsystem.send_message(msg).await?;
+				self.subsystems.collation_generation.send_message(msg).await?;
 			},
 			AllMessages::CollatorProtocol(msg) => {
-				self.collator_protocol_subsystem.send_message(msg).await?;
+				self.subsystems.collator_protocol.send_message(msg).await?;
 			},
 			AllMessages::ApprovalDistribution(msg) => {
-				self.approval_distribution_subsystem.send_message(msg).await?;
+				self.subsystems.approval_distribution.send_message(msg).await?;
 			},
 			AllMessages::ApprovalVoting(msg) => {
-				self.approval_voting_subsystem.send_message(msg).await?;
+				self.subsystems.approval_voting.send_message(msg).await?;
 			},
 			AllMessages::GossipSupport(msg) => {
-				self.gossip_support_subsystem.send_message(msg).await?;
+				self.subsystems.gossip_support.send_message(msg).await?;
 			},
 		}
 
@@ -2342,13 +2457,12 @@ mod tests {
 		let gather = registry.gather();
 		assert_eq!(gather[0].get_name(), "parachain_activated_heads_total");
 		assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total");
-		assert_eq!(gather[2].get_name(), "parachain_from_overseer_channel_queue_size");
-		assert_eq!(gather[3].get_name(), "parachain_messages_relayed_total");
-		assert_eq!(gather[4].get_name(), "parachain_overseer_messages_relay_timings");
-		assert_eq!(gather[5].get_name(), "parachain_to_overseer_channel_queue_size");
+		assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total");
+		assert_eq!(gather[3].get_name(), "parachain_overseer_messages_relay_timings");
+		assert_eq!(gather[4].get_name(), "parachain_to_overseer_channel_queue_size");
 		let activated = gather[0].get_metric()[0].get_counter().get_value() as u64;
 		let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64;
-		let relayed = gather[3].get_metric()[0].get_counter().get_value() as u64;
+		let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64;
 		let mut result = HashMap::new();
 		result.insert("activated", activated);
 		result.insert("deactivated", deactivated);
-- 
GitLab