From ad0e42537dcc0291c9a40580fb18703a482960e1 Mon Sep 17 00:00:00 2001
From: Sergei Shulepov <sergei@parity.io>
Date: Fri, 20 Aug 2021 11:50:47 +0200
Subject: [PATCH] Introduce metrics into PVF validation host (#3603)

---
 polkadot/Cargo.lock                           |   2 +
 .../node/core/candidate-validation/src/lib.rs |  24 +-
 polkadot/node/core/pvf/Cargo.toml             |   1 +
 polkadot/node/core/pvf/src/execute/queue.rs   |  26 ++-
 polkadot/node/core/pvf/src/host.rs            |   9 +-
 polkadot/node/core/pvf/src/lib.rs             |   2 +
 polkadot/node/core/pvf/src/metrics.rs         | 214 ++++++++++++++++++
 polkadot/node/core/pvf/src/prepare/pool.rs    |  50 +++-
 polkadot/node/core/pvf/src/prepare/queue.rs   |  12 +-
 polkadot/node/core/pvf/tests/it/main.rs       |   4 +-
 polkadot/node/malus/Cargo.toml                |   1 +
 polkadot/node/malus/src/variant-a.rs          |   1 +
 polkadot/node/service/src/overseer.rs         |   3 +-
 13 files changed, 325 insertions(+), 24 deletions(-)
 create mode 100644 polkadot/node/core/pvf/src/metrics.rs

diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index c20fadac330..f55eca42ee6 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -6243,6 +6243,7 @@ dependencies = [
  "parity-scale-codec",
  "pin-project 1.0.8",
  "polkadot-core-primitives",
+ "polkadot-node-subsystem-util",
  "polkadot-parachain",
  "rand 0.8.4",
  "sc-executor",
@@ -7011,6 +7012,7 @@ dependencies = [
  "parity-util-mem",
  "polkadot-cli",
  "polkadot-node-core-candidate-validation",
+ "polkadot-node-core-pvf",
  "polkadot-node-subsystem",
  "polkadot-node-subsystem-util",
  "structopt",
diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs
index 9485908ac68..e8f1f5bfe19 100644
--- a/polkadot/node/core/candidate-validation/src/lib.rs
+++ b/polkadot/node/core/candidate-validation/src/lib.rs
@@ -70,6 +70,7 @@ pub struct Config {
 /// The candidate validation subsystem.
 pub struct CandidateValidationSubsystem {
 	metrics: Metrics,
+	pvf_metrics: polkadot_node_core_pvf::Metrics,
 	config: Config,
 }
 
@@ -78,8 +79,12 @@ impl CandidateValidationSubsystem {
 	/// strategy.
 	///
 	/// Check out [`IsolationStrategy`] to get more details.
-	pub fn with_config(config: Config, metrics: Metrics) -> Self {
-		CandidateValidationSubsystem { config, metrics }
+	pub fn with_config(
+		config: Config,
+		metrics: Metrics,
+		pvf_metrics: polkadot_node_core_pvf::Metrics,
+	) -> Self {
+		CandidateValidationSubsystem { config, metrics, pvf_metrics }
 	}
 }
 
@@ -89,10 +94,15 @@ where
 	Context: overseer::SubsystemContext<Message = CandidateValidationMessage>,
 {
 	fn start(self, ctx: Context) -> SpawnedSubsystem {
-		let future =
-			run(ctx, self.metrics, self.config.artifacts_cache_path, self.config.program_path)
-				.map_err(|e| SubsystemError::with_origin("candidate-validation", e))
-				.boxed();
+		let future = run(
+			ctx,
+			self.metrics,
+			self.pvf_metrics,
+			self.config.artifacts_cache_path,
+			self.config.program_path,
+		)
+		.map_err(|e| SubsystemError::with_origin("candidate-validation", e))
+		.boxed();
 		SpawnedSubsystem { name: "candidate-validation-subsystem", future }
 	}
 }
@@ -100,6 +110,7 @@ where
 async fn run<Context>(
 	mut ctx: Context,
 	metrics: Metrics,
+	pvf_metrics: polkadot_node_core_pvf::Metrics,
 	cache_path: PathBuf,
 	program_path: PathBuf,
 ) -> SubsystemResult<()>
@@ -109,6 +120,7 @@ where
 {
 	let (mut validation_host, task) = polkadot_node_core_pvf::start(
 		polkadot_node_core_pvf::Config::new(cache_path, program_path),
+		pvf_metrics,
 	);
 	ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
 
diff --git a/polkadot/node/core/pvf/Cargo.toml b/polkadot/node/core/pvf/Cargo.toml
index a313bd5c200..809d8269ac2 100644
--- a/polkadot/node/core/pvf/Cargo.toml
+++ b/polkadot/node/core/pvf/Cargo.toml
@@ -23,6 +23,7 @@ rand = "0.8.3"
 parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
 polkadot-parachain = { path = "../../../parachain" }
 polkadot-core-primitives = { path = "../../../core-primitives" }
+polkadot-node-subsystem-util = { path = "../../subsystem-util"}
 sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" }
diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs
index ca3c7c92180..09e84819682 100644
--- a/polkadot/node/core/pvf/src/execute/queue.rs
+++ b/polkadot/node/core/pvf/src/execute/queue.rs
@@ -20,6 +20,7 @@ use super::worker::Outcome;
 use crate::{
 	artifacts::{ArtifactId, ArtifactPathId},
 	host::ResultSender,
+	metrics::Metrics,
 	worker_common::{IdleWorker, WorkerHandle},
 	InvalidCandidate, ValidationError, LOG_TARGET,
 };
@@ -95,6 +96,8 @@ enum QueueEvent {
 type Mux = FuturesUnordered<BoxFuture<'static, QueueEvent>>;
 
 struct Queue {
+	metrics: Metrics,
+
 	/// The receiver that receives messages to the pool.
 	to_queue_rx: mpsc::Receiver<ToQueue>,
 
@@ -109,12 +112,14 @@ struct Queue {
 
 impl Queue {
 	fn new(
+		metrics: Metrics,
 		program_path: PathBuf,
 		worker_capacity: usize,
 		spawn_timeout: Duration,
 		to_queue_rx: mpsc::Receiver<ToQueue>,
 	) -> Self {
 		Self {
+			metrics,
 			program_path,
 			spawn_timeout,
 			to_queue_rx,
@@ -141,12 +146,12 @@ impl Queue {
 				ev = self.mux.select_next_some() => handle_mux(&mut self, ev).await,
 			}
 
-			purge_dead(&mut self.workers).await;
+			purge_dead(&self.metrics, &mut self.workers).await;
 		}
 	}
 }
 
-async fn purge_dead(workers: &mut Workers) {
+async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
 	let mut to_remove = vec![];
 	for (worker, data) in workers.running.iter_mut() {
 		if futures::poll!(&mut data.handle).is_ready() {
@@ -155,7 +160,9 @@ async fn purge_dead(workers: &mut Workers) {
 		}
 	}
 	for w in to_remove {
-		let _ = workers.running.remove(w);
+		if workers.running.remove(w).is_some() {
+			metrics.execute_worker().on_retired();
+		}
 	}
 }
 
@@ -166,6 +173,7 @@ fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
 		validation_code_hash = ?artifact.id.code_hash,
 		"enqueueing an artifact for execution",
 	);
+	queue.metrics.execute_enqueued();
 	let job = ExecuteJob { artifact, params, result_tx };
 
 	if let Some(available) = queue.workers.find_available() {
@@ -190,6 +198,7 @@ async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
 }
 
 fn handle_worker_spawned(queue: &mut Queue, idle: IdleWorker, handle: WorkerHandle) {
+	queue.metrics.execute_worker().on_spawned();
 	queue.workers.spawn_inflight -= 1;
 	let worker = queue.workers.running.insert(WorkerData { idle: Some(idle), handle });
 
@@ -228,6 +237,7 @@ fn handle_job_finish(
 			(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbigiousWorkerDeath))),
 	};
 
+	queue.metrics.execute_finished();
 	tracing::debug!(
 		target: LOG_TARGET,
 		validation_code_hash = ?artifact_id.code_hash,
@@ -257,7 +267,9 @@ fn handle_job_finish(
 		}
 	} else {
 		// Note it's possible that the worker was purged already by `purge_dead`
-		queue.workers.running.remove(worker);
+		if queue.workers.running.remove(worker).is_some() {
+			queue.metrics.execute_worker().on_retired();
+		}
 
 		if !queue.queue.is_empty() {
 			// The worker has died and we still have work we have to do. Request an extra worker.
@@ -269,6 +281,7 @@ fn handle_job_finish(
 }
 
 fn spawn_extra_worker(queue: &mut Queue) {
+	queue.metrics.execute_worker().on_begin_spawn();
 	tracing::debug!(target: LOG_TARGET, "spawning an extra worker");
 
 	queue
@@ -309,8 +322,10 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
 			thus claim_idle cannot return None;
 			qed.",
 	);
+	let execution_timer = queue.metrics.time_execution();
 	queue.mux.push(
 		async move {
+			let _timer = execution_timer;
 			let outcome = super::worker::start_work(idle, job.artifact.clone(), job.params).await;
 			QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx)
 		}
@@ -319,11 +334,12 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
 }
 
 pub fn start(
+	metrics: Metrics,
 	program_path: PathBuf,
 	worker_capacity: usize,
 	spawn_timeout: Duration,
 ) -> (mpsc::Sender<ToQueue>, impl Future<Output = ()>) {
 	let (to_queue_tx, to_queue_rx) = mpsc::channel(20);
-	let run = Queue::new(program_path, worker_capacity, spawn_timeout, to_queue_rx).run();
+	let run = Queue::new(metrics, program_path, worker_capacity, spawn_timeout, to_queue_rx).run();
 	(to_queue_tx, run)
 }
diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs
index 08fb573fd97..89b230bc90d 100644
--- a/polkadot/node/core/pvf/src/host.rs
+++ b/polkadot/node/core/pvf/src/host.rs
@@ -22,7 +22,9 @@
 
 use crate::{
 	artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
-	execute, prepare, Priority, Pvf, ValidationError, LOG_TARGET,
+	execute,
+	metrics::Metrics,
+	prepare, Priority, Pvf, ValidationError, LOG_TARGET,
 };
 use always_assert::never;
 use async_std::path::{Path, PathBuf};
@@ -134,18 +136,20 @@ impl Config {
 /// The future should not return normally but if it does then that indicates an unrecoverable error.
 /// In that case all pending requests will be canceled, dropping the result senders and new ones
 /// will be rejected.
-pub fn start(config: Config) -> (ValidationHost, impl Future<Output = ()>) {
+pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<Output = ()>) {
 	let (to_host_tx, to_host_rx) = mpsc::channel(10);
 
 	let validation_host = ValidationHost { to_host_tx };
 
 	let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool(
+		metrics.clone(),
 		config.prepare_worker_program_path.clone(),
 		config.cache_path.clone(),
 		config.prepare_worker_spawn_timeout,
 	);
 
 	let (to_prepare_queue_tx, from_prepare_queue_rx, run_prepare_queue) = prepare::start_queue(
+		metrics.clone(),
 		config.prepare_workers_soft_max_num,
 		config.prepare_workers_hard_max_num,
 		config.cache_path.clone(),
@@ -154,6 +158,7 @@ pub fn start(config: Config) -> (ValidationHost, impl Future<Output = ()>) {
 	);
 
 	let (to_execute_queue_tx, run_execute_queue) = execute::start(
+		metrics.clone(),
 		config.execute_worker_program_path.to_owned(),
 		config.execute_workers_max_num,
 		config.execute_worker_spawn_timeout,
diff --git a/polkadot/node/core/pvf/src/lib.rs b/polkadot/node/core/pvf/src/lib.rs
index bd72a5e1ed0..387d10e9606 100644
--- a/polkadot/node/core/pvf/src/lib.rs
+++ b/polkadot/node/core/pvf/src/lib.rs
@@ -80,6 +80,7 @@ mod error;
 mod execute;
 mod executor_intf;
 mod host;
+mod metrics;
 mod prepare;
 mod priority;
 mod pvf;
@@ -96,6 +97,7 @@ pub use priority::Priority;
 pub use pvf::Pvf;
 
 pub use host::{start, Config, ValidationHost};
+pub use metrics::Metrics;
 
 pub use execute::worker_entrypoint as execute_worker_entrypoint;
 pub use prepare::worker_entrypoint as prepare_worker_entrypoint;
diff --git a/polkadot/node/core/pvf/src/metrics.rs b/polkadot/node/core/pvf/src/metrics.rs
new file mode 100644
index 00000000000..031397158ef
--- /dev/null
+++ b/polkadot/node/core/pvf/src/metrics.rs
@@ -0,0 +1,214 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Prometheus metrics related to the validation host.
+
+use polkadot_node_subsystem_util::metrics::{self, prometheus};
+
+/// Validation host metrics.
+#[derive(Default, Clone)]
+pub struct Metrics(Option<MetricsInner>);
+
+impl Metrics {
+	/// Returns a handle to submit prepare workers metrics.
+	pub(crate) fn prepare_worker(&'_ self) -> WorkerRelatedMetrics<'_> {
+		WorkerRelatedMetrics { metrics: self, flavor: WorkerFlavor::Prepare }
+	}
+
+	/// Returns a handle to submit execute workers metrics.
+	pub(crate) fn execute_worker(&'_ self) -> WorkerRelatedMetrics<'_> {
+		WorkerRelatedMetrics { metrics: self, flavor: WorkerFlavor::Execute }
+	}
+
+	/// When preparation pipeline had a new item enqueued.
+	pub(crate) fn prepare_enqueued(&self) {
+		if let Some(metrics) = &self.0 {
+			metrics.prepare_enqueued.inc();
+		}
+	}
+
+	/// When preparation pipeline concluded working on an item.
+	pub(crate) fn prepare_concluded(&self) {
+		if let Some(metrics) = &self.0 {
+			metrics.prepare_concluded.inc();
+		}
+	}
+
+	/// When execution pipeline had a new item enqueued.
+	pub(crate) fn execute_enqueued(&self) {
+		if let Some(metrics) = &self.0 {
+			metrics.execute_enqueued.inc();
+		}
+	}
+
+	/// When execution pipeline finished executing a request.
+	pub(crate) fn execute_finished(&self) {
+		if let Some(metrics) = &self.0 {
+			metrics.execute_finished.inc();
+		}
+	}
+
+	/// Time between sending preparation request to a worker to having the response.
+	pub(crate) fn time_preparation(
+		&self,
+	) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
+		self.0.as_ref().map(|metrics| metrics.preparation_time.start_timer())
+	}
+
+	/// Time between sending execution request to a worker to having the response.
+	pub(crate) fn time_execution(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
+		self.0.as_ref().map(|metrics| metrics.execution_time.start_timer())
+	}
+}
+
+#[derive(Clone)]
+struct MetricsInner {
+	worker_spawning: prometheus::CounterVec<prometheus::U64>,
+	worker_spawned: prometheus::CounterVec<prometheus::U64>,
+	worker_retired: prometheus::CounterVec<prometheus::U64>,
+	prepare_enqueued: prometheus::Counter<prometheus::U64>,
+	prepare_concluded: prometheus::Counter<prometheus::U64>,
+	execute_enqueued: prometheus::Counter<prometheus::U64>,
+	execute_finished: prometheus::Counter<prometheus::U64>,
+	preparation_time: prometheus::Histogram,
+	execution_time: prometheus::Histogram,
+}
+
+impl metrics::Metrics for Metrics {
+	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
+		let inner = MetricsInner {
+			worker_spawning: prometheus::register(
+				prometheus::CounterVec::new(
+					prometheus::Opts::new(
+						"pvf_worker_spawning",
+						"The total number of workers began to spawn",
+					),
+					&["flavor"],
+				)?,
+				registry,
+			)?,
+			worker_spawned: prometheus::register(
+				prometheus::CounterVec::new(
+					prometheus::Opts::new(
+						"pvf_worker_spawned",
+						"The total number of workers spawned successfully",
+					),
+					&["flavor"],
+				)?,
+				registry,
+			)?,
+			worker_retired: prometheus::register(
+				prometheus::CounterVec::new(
+					prometheus::Opts::new(
+						"pvf_worker_retired",
+						"The total number of workers retired, either killed by the host or died on duty",
+					),
+					&["flavor"],
+				)?,
+				registry,
+			)?,
+			prepare_enqueued: prometheus::register(
+				prometheus::Counter::new(
+					"pvf_prepare_enqueued",
+					"The total number of jobs enqueued into the preparation pipeline"
+				)?,
+				registry,
+			)?,
+			prepare_concluded: prometheus::register(
+				prometheus::Counter::new(
+					"pvf_prepare_concluded",
+					"The total number of jobs concluded in the preparation pipeline"
+				)?,
+				registry,
+			)?,
+			execute_enqueued: prometheus::register(
+				prometheus::Counter::new(
+					"pvf_execute_enqueued",
+					"The total number of jobs enqueued into the execution pipeline"
+				)?,
+				registry,
+			)?,
+			execute_finished: prometheus::register(
+				prometheus::Counter::new(
+					"pvf_execute_finished",
+					"The total number of jobs done in the execution pipeline"
+				)?,
+				registry,
+			)?,
+			preparation_time: prometheus::register(
+				prometheus::Histogram::with_opts(
+					prometheus::HistogramOpts::new(
+						"pvf_preparation_time",
+						"Time spent in preparing PVF artifacts",
+					)
+				)?,
+				registry,
+			)?,
+			execution_time: prometheus::register(
+				prometheus::Histogram::with_opts(
+					prometheus::HistogramOpts::new(
+						"pvf_execution_time",
+						"Time spent in executing PVFs",
+					)
+				)?,
+				registry,
+			)?,
+		};
+		Ok(Metrics(Some(inner)))
+	}
+}
+
+enum WorkerFlavor {
+	Prepare,
+	Execute,
+}
+
+impl WorkerFlavor {
+	fn as_label(&self) -> &'static str {
+		match *self {
+			WorkerFlavor::Prepare => "prepare",
+			WorkerFlavor::Execute => "execute",
+		}
+	}
+}
+
+pub(crate) struct WorkerRelatedMetrics<'a> {
+	metrics: &'a Metrics,
+	flavor: WorkerFlavor,
+}
+
+impl<'a> WorkerRelatedMetrics<'a> {
+	/// When the spawning of a worker started.
+	pub(crate) fn on_begin_spawn(&self) {
+		if let Some(metrics) = &self.metrics.0 {
+			metrics.worker_spawning.with_label_values(&[self.flavor.as_label()]).inc();
+		}
+	}
+
+	/// When the worker successfully spawned.
+	pub(crate) fn on_spawned(&self) {
+		if let Some(metrics) = &self.metrics.0 {
+			metrics.worker_spawned.with_label_values(&[self.flavor.as_label()]).inc();
+		}
+	}
+
+	/// When the worker was killed or died.
+	pub(crate) fn on_retired(&self) {
+		if let Some(metrics) = &self.metrics.0 {
+			metrics.worker_spawned.with_label_values(&[self.flavor.as_label()]).inc();
+		}
+	}
+}
diff --git a/polkadot/node/core/pvf/src/prepare/pool.rs b/polkadot/node/core/pvf/src/prepare/pool.rs
index f74ad78a9fe..035d799ac59 100644
--- a/polkadot/node/core/pvf/src/prepare/pool.rs
+++ b/polkadot/node/core/pvf/src/prepare/pool.rs
@@ -16,6 +16,7 @@
 
 use super::worker::{self, Outcome};
 use crate::{
+	metrics::Metrics,
 	worker_common::{IdleWorker, WorkerHandle},
 	LOG_TARGET,
 };
@@ -111,6 +112,7 @@ struct Pool {
 	from_pool: mpsc::UnboundedSender<FromPool>,
 	spawned: HopSlotMap<Worker, WorkerData>,
 	mux: Mux,
+	metrics: Metrics,
 }
 
 /// A fatal error that warrants stopping the event loop of the pool.
@@ -125,6 +127,7 @@ async fn run(
 		mut from_pool,
 		mut spawned,
 		mut mux,
+		metrics,
 	}: Pool,
 ) {
 	macro_rules! break_if_fatal {
@@ -143,6 +146,7 @@ async fn run(
 			to_pool = to_pool.next() => {
 				let to_pool = break_if_fatal!(to_pool.ok_or(Fatal));
 				handle_to_pool(
+					&metrics,
 					&program_path,
 					&cache_path,
 					spawn_timeout,
@@ -151,14 +155,17 @@ async fn run(
 					to_pool,
 				)
 			}
-			ev = mux.select_next_some() => break_if_fatal!(handle_mux(&mut from_pool, &mut spawned, ev)),
+			ev = mux.select_next_some() => {
+				break_if_fatal!(handle_mux(&metrics, &mut from_pool, &mut spawned, ev))
+			}
 		}
 
-		break_if_fatal!(purge_dead(&mut from_pool, &mut spawned).await);
+		break_if_fatal!(purge_dead(&metrics, &mut from_pool, &mut spawned).await);
 	}
 }
 
 async fn purge_dead(
+	metrics: &Metrics,
 	from_pool: &mut mpsc::UnboundedSender<FromPool>,
 	spawned: &mut HopSlotMap<Worker, WorkerData>,
 ) -> Result<(), Fatal> {
@@ -177,7 +184,7 @@ async fn purge_dead(
 		}
 	}
 	for w in to_remove {
-		if spawned.remove(w).is_some() {
+		if attempt_retire(metrics, spawned, w) {
 			reply(from_pool, FromPool::Rip(w))?;
 		}
 	}
@@ -185,6 +192,7 @@ async fn purge_dead(
 }
 
 fn handle_to_pool(
+	metrics: &Metrics,
 	program_path: &Path,
 	cache_path: &Path,
 	spawn_timeout: Duration,
@@ -195,11 +203,13 @@ fn handle_to_pool(
 	match to_pool {
 		ToPool::Spawn => {
 			tracing::debug!(target: LOG_TARGET, "spawning a new prepare worker");
+			metrics.prepare_worker().on_begin_spawn();
 			mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
 		},
 		ToPool::StartWork { worker, code, artifact_path, background_priority } => {
 			if let Some(data) = spawned.get_mut(worker) {
 				if let Some(idle) = data.idle.take() {
+					let preparation_timer = metrics.time_preparation();
 					mux.push(
 						start_work_task(
 							worker,
@@ -208,6 +218,7 @@ fn handle_to_pool(
 							cache_path.to_owned(),
 							artifact_path,
 							background_priority,
+							preparation_timer,
 						)
 						.boxed(),
 					);
@@ -227,7 +238,7 @@ fn handle_to_pool(
 		ToPool::Kill(worker) => {
 			tracing::debug!(target: LOG_TARGET, ?worker, "killing prepare worker");
 			// It may be absent if it were previously already removed by `purge_dead`.
-			let _ = spawned.remove(worker);
+			let _ = attempt_retire(metrics, spawned, worker);
 		},
 		ToPool::BumpPriority(worker) =>
 			if let Some(data) = spawned.get(worker) {
@@ -252,13 +263,14 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Po
 	}
 }
 
-async fn start_work_task(
+async fn start_work_task<Timer>(
 	worker: Worker,
 	idle: IdleWorker,
 	code: Arc<Vec<u8>>,
 	cache_path: PathBuf,
 	artifact_path: PathBuf,
 	background_priority: bool,
+	_preparation_timer: Option<Timer>,
 ) -> PoolEvent {
 	let outcome =
 		worker::start_work(idle, code, &cache_path, artifact_path, background_priority).await;
@@ -266,12 +278,15 @@ async fn start_work_task(
 }
 
 fn handle_mux(
+	metrics: &Metrics,
 	from_pool: &mut mpsc::UnboundedSender<FromPool>,
 	spawned: &mut HopSlotMap<Worker, WorkerData>,
 	event: PoolEvent,
 ) -> Result<(), Fatal> {
 	match event {
 		PoolEvent::Spawn(idle, handle) => {
+			metrics.prepare_worker().on_spawned();
+
 			let worker = spawned.insert(WorkerData { idle: Some(idle), handle });
 
 			reply(from_pool, FromPool::Spawned(worker))?;
@@ -300,14 +315,14 @@ fn handle_mux(
 					Ok(())
 				},
 				Outcome::Unreachable => {
-					if spawned.remove(worker).is_some() {
+					if attempt_retire(metrics, spawned, worker) {
 						reply(from_pool, FromPool::Rip(worker))?;
 					}
 
 					Ok(())
 				},
 				Outcome::DidntMakeIt => {
-					if spawned.remove(worker).is_some() {
+					if attempt_retire(metrics, spawned, worker) {
 						reply(from_pool, FromPool::Concluded(worker, true))?;
 					}
 
@@ -322,8 +337,28 @@ fn reply(from_pool: &mut mpsc::UnboundedSender<FromPool>, m: FromPool) -> Result
 	from_pool.unbounded_send(m).map_err(|_| Fatal)
 }
 
+/// Removes the given worker from the registry if it there. This will lead to dropping and hence
+/// to killing the worker process.
+///
+/// Returns `true` if the worker exists and was removed and the process was killed.
+///
+/// This function takes care about counting the retired workers metric.
+fn attempt_retire(
+	metrics: &Metrics,
+	spawned: &mut HopSlotMap<Worker, WorkerData>,
+	worker: Worker,
+) -> bool {
+	if spawned.remove(worker).is_some() {
+		metrics.prepare_worker().on_retired();
+		true
+	} else {
+		false
+	}
+}
+
 /// Spins up the pool and returns the future that should be polled to make the pool functional.
 pub fn start(
+	metrics: Metrics,
 	program_path: PathBuf,
 	cache_path: PathBuf,
 	spawn_timeout: Duration,
@@ -332,6 +367,7 @@ pub fn start(
 	let (from_pool_tx, from_pool_rx) = mpsc::unbounded();
 
 	let run = run(Pool {
+		metrics,
 		program_path,
 		cache_path,
 		spawn_timeout,
diff --git a/polkadot/node/core/pvf/src/prepare/queue.rs b/polkadot/node/core/pvf/src/prepare/queue.rs
index 7240152a941..4ffa21de435 100644
--- a/polkadot/node/core/pvf/src/prepare/queue.rs
+++ b/polkadot/node/core/pvf/src/prepare/queue.rs
@@ -17,7 +17,7 @@
 //! A queue that handles requests for PVF preparation.
 
 use super::pool::{self, Worker};
-use crate::{artifacts::ArtifactId, Priority, Pvf, LOG_TARGET};
+use crate::{artifacts::ArtifactId, metrics::Metrics, Priority, Pvf, LOG_TARGET};
 use always_assert::{always, never};
 use async_std::path::PathBuf;
 use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
@@ -127,6 +127,8 @@ impl Unscheduled {
 }
 
 struct Queue {
+	metrics: Metrics,
+
 	to_queue_rx: mpsc::Receiver<ToQueue>,
 	from_queue_tx: mpsc::UnboundedSender<FromQueue>,
 
@@ -155,6 +157,7 @@ struct Fatal;
 
 impl Queue {
 	fn new(
+		metrics: Metrics,
 		soft_capacity: usize,
 		hard_capacity: usize,
 		cache_path: PathBuf,
@@ -164,6 +167,7 @@ impl Queue {
 		from_pool_rx: mpsc::UnboundedReceiver<pool::FromPool>,
 	) -> Self {
 		Self {
+			metrics,
 			to_queue_rx,
 			from_queue_tx,
 			to_pool_tx,
@@ -218,6 +222,7 @@ async fn handle_enqueue(queue: &mut Queue, priority: Priority, pvf: Pvf) -> Resu
 		?priority,
 		"PVF is enqueued for preparation.",
 	);
+	queue.metrics.prepare_enqueued();
 
 	let artifact_id = pvf.as_artifact_id();
 	if never!(
@@ -316,6 +321,8 @@ async fn handle_worker_concluded(
 	worker: Worker,
 	rip: bool,
 ) -> Result<(), Fatal> {
+	queue.metrics.prepare_concluded();
+
 	macro_rules! never_none {
 		($expr:expr) => {
 			match $expr {
@@ -486,6 +493,7 @@ async fn send_pool(
 
 /// Spins up the queue and returns the future that should be polled to make the queue functional.
 pub fn start(
+	metrics: Metrics,
 	soft_capacity: usize,
 	hard_capacity: usize,
 	cache_path: PathBuf,
@@ -496,6 +504,7 @@ pub fn start(
 	let (from_queue_tx, from_queue_rx) = mpsc::unbounded();
 
 	let run = Queue::new(
+		metrics,
 		soft_capacity,
 		hard_capacity,
 		cache_path,
@@ -565,6 +574,7 @@ mod tests {
 			let workers: SlotMap<Worker, ()> = SlotMap::with_key();
 
 			let (to_queue_tx, from_queue_rx, run) = start(
+				Metrics::default(),
 				soft_capacity,
 				hard_capacity,
 				tempdir.path().to_owned().into(),
diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs
index 57fea2e2ca3..3689217880e 100644
--- a/polkadot/node/core/pvf/tests/it/main.rs
+++ b/polkadot/node/core/pvf/tests/it/main.rs
@@ -17,7 +17,7 @@
 use async_std::sync::Mutex;
 use parity_scale_codec::Encode as _;
 use polkadot_node_core_pvf::{
-	start, Config, InvalidCandidate, Pvf, ValidationError, ValidationHost,
+	start, Config, InvalidCandidate, Metrics, Pvf, ValidationError, ValidationHost,
 };
 use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult};
 
@@ -44,7 +44,7 @@ impl TestHost {
 		let program_path = std::path::PathBuf::from(PUPPET_EXE);
 		let mut config = Config::new(cache_dir.path().to_owned(), program_path);
 		f(&mut config);
-		let (host, task) = start(config);
+		let (host, task) = start(config, Metrics::default());
 		let _ = async_std::task::spawn(task);
 		Self { _cache_dir: cache_dir, host: Mutex::new(host) }
 	}
diff --git a/polkadot/node/malus/Cargo.toml b/polkadot/node/malus/Cargo.toml
index 3f94c477b4a..7e5dbb42916 100644
--- a/polkadot/node/malus/Cargo.toml
+++ b/polkadot/node/malus/Cargo.toml
@@ -21,6 +21,7 @@ polkadot-cli = { path = "../../cli", default-features = false, features = [ "cli
 polkadot-node-subsystem = { path = "../subsystem" }
 polkadot-node-subsystem-util = { path = "../subsystem-util" }
 polkadot-node-core-candidate-validation = { path = "../core/candidate-validation" }
+polkadot-node-core-pvf = { path = "../core/pvf" }
 parity-util-mem = { version = "0.10.0", default-features = false, features = ["jemalloc-global"] }
 color-eyre = { version = "0.5.11", default-features = false }
 assert_matches = "1.5"
diff --git a/polkadot/node/malus/src/variant-a.rs b/polkadot/node/malus/src/variant-a.rs
index 014e2b59857..542f4e00ecb 100644
--- a/polkadot/node/malus/src/variant-a.rs
+++ b/polkadot/node/malus/src/variant-a.rs
@@ -92,6 +92,7 @@ impl OverseerGen for BehaveMaleficient {
 				CandidateValidationSubsystem::with_config(
 					candidate_validation_config,
 					Metrics::register(registry)?,
+					polkadot_node_core_pvf::Metrics::register(registry)?,
 				),
 				Skippy::default(),
 			),
diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs
index 79585135cfa..fc82b7cf308 100644
--- a/polkadot/node/service/src/overseer.rs
+++ b/polkadot/node/service/src/overseer.rs
@@ -189,7 +189,8 @@ where
 		),
 		candidate_validation: CandidateValidationSubsystem::with_config(
 			candidate_validation_config,
-			Metrics::register(registry)?,
+			Metrics::register(registry)?, // candidate-validation metrics
+			Metrics::register(registry)?, // validation host metrics
 		),
 		chain_api: ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?),
 		collation_generation: CollationGenerationSubsystem::new(Metrics::register(registry)?),
-- 
GitLab