diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index c20fadac3306505860e0a818118a2dbc2aff3aa6..f55eca42ee6530805f8e33aaf1ccbb471712a6c0 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -6243,6 +6243,7 @@ dependencies = [ "parity-scale-codec", "pin-project 1.0.8", "polkadot-core-primitives", + "polkadot-node-subsystem-util", "polkadot-parachain", "rand 0.8.4", "sc-executor", @@ -7011,6 +7012,7 @@ dependencies = [ "parity-util-mem", "polkadot-cli", "polkadot-node-core-candidate-validation", + "polkadot-node-core-pvf", "polkadot-node-subsystem", "polkadot-node-subsystem-util", "structopt", diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 9485908ac6830f33ac6a318db11e72253e06e402..e8f1f5bfe19198f0f968ce3b626489ea32ea68d4 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -70,6 +70,7 @@ pub struct Config { /// The candidate validation subsystem. pub struct CandidateValidationSubsystem { metrics: Metrics, + pvf_metrics: polkadot_node_core_pvf::Metrics, config: Config, } @@ -78,8 +79,12 @@ impl CandidateValidationSubsystem { /// strategy. /// /// Check out [`IsolationStrategy`] to get more details. - pub fn with_config(config: Config, metrics: Metrics) -> Self { - CandidateValidationSubsystem { config, metrics } + pub fn with_config( + config: Config, + metrics: Metrics, + pvf_metrics: polkadot_node_core_pvf::Metrics, + ) -> Self { + CandidateValidationSubsystem { config, metrics, pvf_metrics } } } @@ -89,10 +94,15 @@ where Context: overseer::SubsystemContext<Message = CandidateValidationMessage>, { fn start(self, ctx: Context) -> SpawnedSubsystem { - let future = - run(ctx, self.metrics, self.config.artifacts_cache_path, self.config.program_path) - .map_err(|e| SubsystemError::with_origin("candidate-validation", e)) - .boxed(); + let future = run( + ctx, + self.metrics, + self.pvf_metrics, + self.config.artifacts_cache_path, + self.config.program_path, + ) + .map_err(|e| SubsystemError::with_origin("candidate-validation", e)) + .boxed(); SpawnedSubsystem { name: "candidate-validation-subsystem", future } } } @@ -100,6 +110,7 @@ where async fn run<Context>( mut ctx: Context, metrics: Metrics, + pvf_metrics: polkadot_node_core_pvf::Metrics, cache_path: PathBuf, program_path: PathBuf, ) -> SubsystemResult<()> @@ -109,6 +120,7 @@ where { let (mut validation_host, task) = polkadot_node_core_pvf::start( polkadot_node_core_pvf::Config::new(cache_path, program_path), + pvf_metrics, ); ctx.spawn_blocking("pvf-validation-host", task.boxed())?; diff --git a/polkadot/node/core/pvf/Cargo.toml b/polkadot/node/core/pvf/Cargo.toml index a313bd5c20077494280c5d97b366d2bc2f618f56..809d8269ac2e743d595aed2418ca6a7b74f34d8f 100644 --- a/polkadot/node/core/pvf/Cargo.toml +++ b/polkadot/node/core/pvf/Cargo.toml @@ -23,6 +23,7 @@ rand = "0.8.3" parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] } polkadot-parachain = { path = "../../../parachain" } polkadot-core-primitives = { path = "../../../core-primitives" } +polkadot-node-subsystem-util = { path = "../../subsystem-util"} sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs index ca3c7c92180376d79330313001b6c23b1eaf3690..09e848196820226cb581e116afdf9d4611d0285a 100644 --- a/polkadot/node/core/pvf/src/execute/queue.rs +++ b/polkadot/node/core/pvf/src/execute/queue.rs @@ -20,6 +20,7 @@ use super::worker::Outcome; use crate::{ artifacts::{ArtifactId, ArtifactPathId}, host::ResultSender, + metrics::Metrics, worker_common::{IdleWorker, WorkerHandle}, InvalidCandidate, ValidationError, LOG_TARGET, }; @@ -95,6 +96,8 @@ enum QueueEvent { type Mux = FuturesUnordered<BoxFuture<'static, QueueEvent>>; struct Queue { + metrics: Metrics, + /// The receiver that receives messages to the pool. to_queue_rx: mpsc::Receiver<ToQueue>, @@ -109,12 +112,14 @@ struct Queue { impl Queue { fn new( + metrics: Metrics, program_path: PathBuf, worker_capacity: usize, spawn_timeout: Duration, to_queue_rx: mpsc::Receiver<ToQueue>, ) -> Self { Self { + metrics, program_path, spawn_timeout, to_queue_rx, @@ -141,12 +146,12 @@ impl Queue { ev = self.mux.select_next_some() => handle_mux(&mut self, ev).await, } - purge_dead(&mut self.workers).await; + purge_dead(&self.metrics, &mut self.workers).await; } } } -async fn purge_dead(workers: &mut Workers) { +async fn purge_dead(metrics: &Metrics, workers: &mut Workers) { let mut to_remove = vec![]; for (worker, data) in workers.running.iter_mut() { if futures::poll!(&mut data.handle).is_ready() { @@ -155,7 +160,9 @@ async fn purge_dead(workers: &mut Workers) { } } for w in to_remove { - let _ = workers.running.remove(w); + if workers.running.remove(w).is_some() { + metrics.execute_worker().on_retired(); + } } } @@ -166,6 +173,7 @@ fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) { validation_code_hash = ?artifact.id.code_hash, "enqueueing an artifact for execution", ); + queue.metrics.execute_enqueued(); let job = ExecuteJob { artifact, params, result_tx }; if let Some(available) = queue.workers.find_available() { @@ -190,6 +198,7 @@ async fn handle_mux(queue: &mut Queue, event: QueueEvent) { } fn handle_worker_spawned(queue: &mut Queue, idle: IdleWorker, handle: WorkerHandle) { + queue.metrics.execute_worker().on_spawned(); queue.workers.spawn_inflight -= 1; let worker = queue.workers.running.insert(WorkerData { idle: Some(idle), handle }); @@ -228,6 +237,7 @@ fn handle_job_finish( (None, Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbigiousWorkerDeath))), }; + queue.metrics.execute_finished(); tracing::debug!( target: LOG_TARGET, validation_code_hash = ?artifact_id.code_hash, @@ -257,7 +267,9 @@ fn handle_job_finish( } } else { // Note it's possible that the worker was purged already by `purge_dead` - queue.workers.running.remove(worker); + if queue.workers.running.remove(worker).is_some() { + queue.metrics.execute_worker().on_retired(); + } if !queue.queue.is_empty() { // The worker has died and we still have work we have to do. Request an extra worker. @@ -269,6 +281,7 @@ fn handle_job_finish( } fn spawn_extra_worker(queue: &mut Queue) { + queue.metrics.execute_worker().on_begin_spawn(); tracing::debug!(target: LOG_TARGET, "spawning an extra worker"); queue @@ -309,8 +322,10 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) { thus claim_idle cannot return None; qed.", ); + let execution_timer = queue.metrics.time_execution(); queue.mux.push( async move { + let _timer = execution_timer; let outcome = super::worker::start_work(idle, job.artifact.clone(), job.params).await; QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx) } @@ -319,11 +334,12 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) { } pub fn start( + metrics: Metrics, program_path: PathBuf, worker_capacity: usize, spawn_timeout: Duration, ) -> (mpsc::Sender<ToQueue>, impl Future<Output = ()>) { let (to_queue_tx, to_queue_rx) = mpsc::channel(20); - let run = Queue::new(program_path, worker_capacity, spawn_timeout, to_queue_rx).run(); + let run = Queue::new(metrics, program_path, worker_capacity, spawn_timeout, to_queue_rx).run(); (to_queue_tx, run) } diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 08fb573fd9786d25f0b35d0e43c1fdfbbafeecf7..89b230bc90d791989f37dcb35a01fba60a5b86e9 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -22,7 +22,9 @@ use crate::{ artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts}, - execute, prepare, Priority, Pvf, ValidationError, LOG_TARGET, + execute, + metrics::Metrics, + prepare, Priority, Pvf, ValidationError, LOG_TARGET, }; use always_assert::never; use async_std::path::{Path, PathBuf}; @@ -134,18 +136,20 @@ impl Config { /// The future should not return normally but if it does then that indicates an unrecoverable error. /// In that case all pending requests will be canceled, dropping the result senders and new ones /// will be rejected. -pub fn start(config: Config) -> (ValidationHost, impl Future<Output = ()>) { +pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<Output = ()>) { let (to_host_tx, to_host_rx) = mpsc::channel(10); let validation_host = ValidationHost { to_host_tx }; let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool( + metrics.clone(), config.prepare_worker_program_path.clone(), config.cache_path.clone(), config.prepare_worker_spawn_timeout, ); let (to_prepare_queue_tx, from_prepare_queue_rx, run_prepare_queue) = prepare::start_queue( + metrics.clone(), config.prepare_workers_soft_max_num, config.prepare_workers_hard_max_num, config.cache_path.clone(), @@ -154,6 +158,7 @@ pub fn start(config: Config) -> (ValidationHost, impl Future<Output = ()>) { ); let (to_execute_queue_tx, run_execute_queue) = execute::start( + metrics.clone(), config.execute_worker_program_path.to_owned(), config.execute_workers_max_num, config.execute_worker_spawn_timeout, diff --git a/polkadot/node/core/pvf/src/lib.rs b/polkadot/node/core/pvf/src/lib.rs index bd72a5e1ed0885613035131e614f160e2a00fa22..387d10e96061e0fbaa8d40c6c1bf3548807fec1e 100644 --- a/polkadot/node/core/pvf/src/lib.rs +++ b/polkadot/node/core/pvf/src/lib.rs @@ -80,6 +80,7 @@ mod error; mod execute; mod executor_intf; mod host; +mod metrics; mod prepare; mod priority; mod pvf; @@ -96,6 +97,7 @@ pub use priority::Priority; pub use pvf::Pvf; pub use host::{start, Config, ValidationHost}; +pub use metrics::Metrics; pub use execute::worker_entrypoint as execute_worker_entrypoint; pub use prepare::worker_entrypoint as prepare_worker_entrypoint; diff --git a/polkadot/node/core/pvf/src/metrics.rs b/polkadot/node/core/pvf/src/metrics.rs new file mode 100644 index 0000000000000000000000000000000000000000..031397158efc8d637e69a8d4a561264832a71f6a --- /dev/null +++ b/polkadot/node/core/pvf/src/metrics.rs @@ -0,0 +1,214 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +//! Prometheus metrics related to the validation host. + +use polkadot_node_subsystem_util::metrics::{self, prometheus}; + +/// Validation host metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option<MetricsInner>); + +impl Metrics { + /// Returns a handle to submit prepare workers metrics. + pub(crate) fn prepare_worker(&'_ self) -> WorkerRelatedMetrics<'_> { + WorkerRelatedMetrics { metrics: self, flavor: WorkerFlavor::Prepare } + } + + /// Returns a handle to submit execute workers metrics. + pub(crate) fn execute_worker(&'_ self) -> WorkerRelatedMetrics<'_> { + WorkerRelatedMetrics { metrics: self, flavor: WorkerFlavor::Execute } + } + + /// When preparation pipeline had a new item enqueued. + pub(crate) fn prepare_enqueued(&self) { + if let Some(metrics) = &self.0 { + metrics.prepare_enqueued.inc(); + } + } + + /// When preparation pipeline concluded working on an item. + pub(crate) fn prepare_concluded(&self) { + if let Some(metrics) = &self.0 { + metrics.prepare_concluded.inc(); + } + } + + /// When execution pipeline had a new item enqueued. + pub(crate) fn execute_enqueued(&self) { + if let Some(metrics) = &self.0 { + metrics.execute_enqueued.inc(); + } + } + + /// When execution pipeline finished executing a request. + pub(crate) fn execute_finished(&self) { + if let Some(metrics) = &self.0 { + metrics.execute_finished.inc(); + } + } + + /// Time between sending preparation request to a worker to having the response. + pub(crate) fn time_preparation( + &self, + ) -> Option<metrics::prometheus::prometheus::HistogramTimer> { + self.0.as_ref().map(|metrics| metrics.preparation_time.start_timer()) + } + + /// Time between sending execution request to a worker to having the response. + pub(crate) fn time_execution(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> { + self.0.as_ref().map(|metrics| metrics.execution_time.start_timer()) + } +} + +#[derive(Clone)] +struct MetricsInner { + worker_spawning: prometheus::CounterVec<prometheus::U64>, + worker_spawned: prometheus::CounterVec<prometheus::U64>, + worker_retired: prometheus::CounterVec<prometheus::U64>, + prepare_enqueued: prometheus::Counter<prometheus::U64>, + prepare_concluded: prometheus::Counter<prometheus::U64>, + execute_enqueued: prometheus::Counter<prometheus::U64>, + execute_finished: prometheus::Counter<prometheus::U64>, + preparation_time: prometheus::Histogram, + execution_time: prometheus::Histogram, +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> { + let inner = MetricsInner { + worker_spawning: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "pvf_worker_spawning", + "The total number of workers began to spawn", + ), + &["flavor"], + )?, + registry, + )?, + worker_spawned: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "pvf_worker_spawned", + "The total number of workers spawned successfully", + ), + &["flavor"], + )?, + registry, + )?, + worker_retired: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "pvf_worker_retired", + "The total number of workers retired, either killed by the host or died on duty", + ), + &["flavor"], + )?, + registry, + )?, + prepare_enqueued: prometheus::register( + prometheus::Counter::new( + "pvf_prepare_enqueued", + "The total number of jobs enqueued into the preparation pipeline" + )?, + registry, + )?, + prepare_concluded: prometheus::register( + prometheus::Counter::new( + "pvf_prepare_concluded", + "The total number of jobs concluded in the preparation pipeline" + )?, + registry, + )?, + execute_enqueued: prometheus::register( + prometheus::Counter::new( + "pvf_execute_enqueued", + "The total number of jobs enqueued into the execution pipeline" + )?, + registry, + )?, + execute_finished: prometheus::register( + prometheus::Counter::new( + "pvf_execute_finished", + "The total number of jobs done in the execution pipeline" + )?, + registry, + )?, + preparation_time: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "pvf_preparation_time", + "Time spent in preparing PVF artifacts", + ) + )?, + registry, + )?, + execution_time: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "pvf_execution_time", + "Time spent in executing PVFs", + ) + )?, + registry, + )?, + }; + Ok(Metrics(Some(inner))) + } +} + +enum WorkerFlavor { + Prepare, + Execute, +} + +impl WorkerFlavor { + fn as_label(&self) -> &'static str { + match *self { + WorkerFlavor::Prepare => "prepare", + WorkerFlavor::Execute => "execute", + } + } +} + +pub(crate) struct WorkerRelatedMetrics<'a> { + metrics: &'a Metrics, + flavor: WorkerFlavor, +} + +impl<'a> WorkerRelatedMetrics<'a> { + /// When the spawning of a worker started. + pub(crate) fn on_begin_spawn(&self) { + if let Some(metrics) = &self.metrics.0 { + metrics.worker_spawning.with_label_values(&[self.flavor.as_label()]).inc(); + } + } + + /// When the worker successfully spawned. + pub(crate) fn on_spawned(&self) { + if let Some(metrics) = &self.metrics.0 { + metrics.worker_spawned.with_label_values(&[self.flavor.as_label()]).inc(); + } + } + + /// When the worker was killed or died. + pub(crate) fn on_retired(&self) { + if let Some(metrics) = &self.metrics.0 { + metrics.worker_spawned.with_label_values(&[self.flavor.as_label()]).inc(); + } + } +} diff --git a/polkadot/node/core/pvf/src/prepare/pool.rs b/polkadot/node/core/pvf/src/prepare/pool.rs index f74ad78a9fee53312c32ac1dca85144f2a729403..035d799ac5941c963d64746edad36621a832e8de 100644 --- a/polkadot/node/core/pvf/src/prepare/pool.rs +++ b/polkadot/node/core/pvf/src/prepare/pool.rs @@ -16,6 +16,7 @@ use super::worker::{self, Outcome}; use crate::{ + metrics::Metrics, worker_common::{IdleWorker, WorkerHandle}, LOG_TARGET, }; @@ -111,6 +112,7 @@ struct Pool { from_pool: mpsc::UnboundedSender<FromPool>, spawned: HopSlotMap<Worker, WorkerData>, mux: Mux, + metrics: Metrics, } /// A fatal error that warrants stopping the event loop of the pool. @@ -125,6 +127,7 @@ async fn run( mut from_pool, mut spawned, mut mux, + metrics, }: Pool, ) { macro_rules! break_if_fatal { @@ -143,6 +146,7 @@ async fn run( to_pool = to_pool.next() => { let to_pool = break_if_fatal!(to_pool.ok_or(Fatal)); handle_to_pool( + &metrics, &program_path, &cache_path, spawn_timeout, @@ -151,14 +155,17 @@ async fn run( to_pool, ) } - ev = mux.select_next_some() => break_if_fatal!(handle_mux(&mut from_pool, &mut spawned, ev)), + ev = mux.select_next_some() => { + break_if_fatal!(handle_mux(&metrics, &mut from_pool, &mut spawned, ev)) + } } - break_if_fatal!(purge_dead(&mut from_pool, &mut spawned).await); + break_if_fatal!(purge_dead(&metrics, &mut from_pool, &mut spawned).await); } } async fn purge_dead( + metrics: &Metrics, from_pool: &mut mpsc::UnboundedSender<FromPool>, spawned: &mut HopSlotMap<Worker, WorkerData>, ) -> Result<(), Fatal> { @@ -177,7 +184,7 @@ async fn purge_dead( } } for w in to_remove { - if spawned.remove(w).is_some() { + if attempt_retire(metrics, spawned, w) { reply(from_pool, FromPool::Rip(w))?; } } @@ -185,6 +192,7 @@ async fn purge_dead( } fn handle_to_pool( + metrics: &Metrics, program_path: &Path, cache_path: &Path, spawn_timeout: Duration, @@ -195,11 +203,13 @@ fn handle_to_pool( match to_pool { ToPool::Spawn => { tracing::debug!(target: LOG_TARGET, "spawning a new prepare worker"); + metrics.prepare_worker().on_begin_spawn(); mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed()); }, ToPool::StartWork { worker, code, artifact_path, background_priority } => { if let Some(data) = spawned.get_mut(worker) { if let Some(idle) = data.idle.take() { + let preparation_timer = metrics.time_preparation(); mux.push( start_work_task( worker, @@ -208,6 +218,7 @@ fn handle_to_pool( cache_path.to_owned(), artifact_path, background_priority, + preparation_timer, ) .boxed(), ); @@ -227,7 +238,7 @@ fn handle_to_pool( ToPool::Kill(worker) => { tracing::debug!(target: LOG_TARGET, ?worker, "killing prepare worker"); // It may be absent if it were previously already removed by `purge_dead`. - let _ = spawned.remove(worker); + let _ = attempt_retire(metrics, spawned, worker); }, ToPool::BumpPriority(worker) => if let Some(data) = spawned.get(worker) { @@ -252,13 +263,14 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Po } } -async fn start_work_task( +async fn start_work_task<Timer>( worker: Worker, idle: IdleWorker, code: Arc<Vec<u8>>, cache_path: PathBuf, artifact_path: PathBuf, background_priority: bool, + _preparation_timer: Option<Timer>, ) -> PoolEvent { let outcome = worker::start_work(idle, code, &cache_path, artifact_path, background_priority).await; @@ -266,12 +278,15 @@ async fn start_work_task( } fn handle_mux( + metrics: &Metrics, from_pool: &mut mpsc::UnboundedSender<FromPool>, spawned: &mut HopSlotMap<Worker, WorkerData>, event: PoolEvent, ) -> Result<(), Fatal> { match event { PoolEvent::Spawn(idle, handle) => { + metrics.prepare_worker().on_spawned(); + let worker = spawned.insert(WorkerData { idle: Some(idle), handle }); reply(from_pool, FromPool::Spawned(worker))?; @@ -300,14 +315,14 @@ fn handle_mux( Ok(()) }, Outcome::Unreachable => { - if spawned.remove(worker).is_some() { + if attempt_retire(metrics, spawned, worker) { reply(from_pool, FromPool::Rip(worker))?; } Ok(()) }, Outcome::DidntMakeIt => { - if spawned.remove(worker).is_some() { + if attempt_retire(metrics, spawned, worker) { reply(from_pool, FromPool::Concluded(worker, true))?; } @@ -322,8 +337,28 @@ fn reply(from_pool: &mut mpsc::UnboundedSender<FromPool>, m: FromPool) -> Result from_pool.unbounded_send(m).map_err(|_| Fatal) } +/// Removes the given worker from the registry if it there. This will lead to dropping and hence +/// to killing the worker process. +/// +/// Returns `true` if the worker exists and was removed and the process was killed. +/// +/// This function takes care about counting the retired workers metric. +fn attempt_retire( + metrics: &Metrics, + spawned: &mut HopSlotMap<Worker, WorkerData>, + worker: Worker, +) -> bool { + if spawned.remove(worker).is_some() { + metrics.prepare_worker().on_retired(); + true + } else { + false + } +} + /// Spins up the pool and returns the future that should be polled to make the pool functional. pub fn start( + metrics: Metrics, program_path: PathBuf, cache_path: PathBuf, spawn_timeout: Duration, @@ -332,6 +367,7 @@ pub fn start( let (from_pool_tx, from_pool_rx) = mpsc::unbounded(); let run = run(Pool { + metrics, program_path, cache_path, spawn_timeout, diff --git a/polkadot/node/core/pvf/src/prepare/queue.rs b/polkadot/node/core/pvf/src/prepare/queue.rs index 7240152a941700f78b53bcdfa473505e0ceb49d7..4ffa21de435b44fcc2f4241f6ed19c83945bf344 100644 --- a/polkadot/node/core/pvf/src/prepare/queue.rs +++ b/polkadot/node/core/pvf/src/prepare/queue.rs @@ -17,7 +17,7 @@ //! A queue that handles requests for PVF preparation. use super::pool::{self, Worker}; -use crate::{artifacts::ArtifactId, Priority, Pvf, LOG_TARGET}; +use crate::{artifacts::ArtifactId, metrics::Metrics, Priority, Pvf, LOG_TARGET}; use always_assert::{always, never}; use async_std::path::PathBuf; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; @@ -127,6 +127,8 @@ impl Unscheduled { } struct Queue { + metrics: Metrics, + to_queue_rx: mpsc::Receiver<ToQueue>, from_queue_tx: mpsc::UnboundedSender<FromQueue>, @@ -155,6 +157,7 @@ struct Fatal; impl Queue { fn new( + metrics: Metrics, soft_capacity: usize, hard_capacity: usize, cache_path: PathBuf, @@ -164,6 +167,7 @@ impl Queue { from_pool_rx: mpsc::UnboundedReceiver<pool::FromPool>, ) -> Self { Self { + metrics, to_queue_rx, from_queue_tx, to_pool_tx, @@ -218,6 +222,7 @@ async fn handle_enqueue(queue: &mut Queue, priority: Priority, pvf: Pvf) -> Resu ?priority, "PVF is enqueued for preparation.", ); + queue.metrics.prepare_enqueued(); let artifact_id = pvf.as_artifact_id(); if never!( @@ -316,6 +321,8 @@ async fn handle_worker_concluded( worker: Worker, rip: bool, ) -> Result<(), Fatal> { + queue.metrics.prepare_concluded(); + macro_rules! never_none { ($expr:expr) => { match $expr { @@ -486,6 +493,7 @@ async fn send_pool( /// Spins up the queue and returns the future that should be polled to make the queue functional. pub fn start( + metrics: Metrics, soft_capacity: usize, hard_capacity: usize, cache_path: PathBuf, @@ -496,6 +504,7 @@ pub fn start( let (from_queue_tx, from_queue_rx) = mpsc::unbounded(); let run = Queue::new( + metrics, soft_capacity, hard_capacity, cache_path, @@ -565,6 +574,7 @@ mod tests { let workers: SlotMap<Worker, ()> = SlotMap::with_key(); let (to_queue_tx, from_queue_rx, run) = start( + Metrics::default(), soft_capacity, hard_capacity, tempdir.path().to_owned().into(), diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index 57fea2e2ca34a7995d9cc9f454bc83b62a9b1492..3689217880efd31f28e9f3f6e84198362145e83c 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -17,7 +17,7 @@ use async_std::sync::Mutex; use parity_scale_codec::Encode as _; use polkadot_node_core_pvf::{ - start, Config, InvalidCandidate, Pvf, ValidationError, ValidationHost, + start, Config, InvalidCandidate, Metrics, Pvf, ValidationError, ValidationHost, }; use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult}; @@ -44,7 +44,7 @@ impl TestHost { let program_path = std::path::PathBuf::from(PUPPET_EXE); let mut config = Config::new(cache_dir.path().to_owned(), program_path); f(&mut config); - let (host, task) = start(config); + let (host, task) = start(config, Metrics::default()); let _ = async_std::task::spawn(task); Self { _cache_dir: cache_dir, host: Mutex::new(host) } } diff --git a/polkadot/node/malus/Cargo.toml b/polkadot/node/malus/Cargo.toml index 3f94c477b4a7304d56ccdf0cbc5d5a1ba6339dae..7e5dbb4291688e3e4069f3b264af214593e2c90a 100644 --- a/polkadot/node/malus/Cargo.toml +++ b/polkadot/node/malus/Cargo.toml @@ -21,6 +21,7 @@ polkadot-cli = { path = "../../cli", default-features = false, features = [ "cli polkadot-node-subsystem = { path = "../subsystem" } polkadot-node-subsystem-util = { path = "../subsystem-util" } polkadot-node-core-candidate-validation = { path = "../core/candidate-validation" } +polkadot-node-core-pvf = { path = "../core/pvf" } parity-util-mem = { version = "0.10.0", default-features = false, features = ["jemalloc-global"] } color-eyre = { version = "0.5.11", default-features = false } assert_matches = "1.5" diff --git a/polkadot/node/malus/src/variant-a.rs b/polkadot/node/malus/src/variant-a.rs index 014e2b59857464ed8a8b0449592ae116fed46490..542f4e00ecb50a53d5427ccaa6e64d2022af1cd6 100644 --- a/polkadot/node/malus/src/variant-a.rs +++ b/polkadot/node/malus/src/variant-a.rs @@ -92,6 +92,7 @@ impl OverseerGen for BehaveMaleficient { CandidateValidationSubsystem::with_config( candidate_validation_config, Metrics::register(registry)?, + polkadot_node_core_pvf::Metrics::register(registry)?, ), Skippy::default(), ), diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs index 79585135cfaed856071e171b4fdab1b030edafb9..fc82b7cf308a9e4762a1688d45bf1083761762a2 100644 --- a/polkadot/node/service/src/overseer.rs +++ b/polkadot/node/service/src/overseer.rs @@ -189,7 +189,8 @@ where ), candidate_validation: CandidateValidationSubsystem::with_config( candidate_validation_config, - Metrics::register(registry)?, + Metrics::register(registry)?, // candidate-validation metrics + Metrics::register(registry)?, // validation host metrics ), chain_api: ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?), collation_generation: CollationGenerationSubsystem::new(Metrics::register(registry)?),