Unverified Commit 76c0f402 authored by Sergey Pepyakin's avatar Sergey Pepyakin Committed by GitHub
Browse files

Introduce metrics into PVF validation host (#3603)

parent 2b0d32a7
Pipeline #153550 failed with stages
in 39 minutes and 12 seconds
......@@ -6243,6 +6243,7 @@ dependencies = [
"parity-scale-codec",
"pin-project 1.0.8",
"polkadot-core-primitives",
"polkadot-node-subsystem-util",
"polkadot-parachain",
"rand 0.8.4",
"sc-executor",
......@@ -7011,6 +7012,7 @@ dependencies = [
"parity-util-mem",
"polkadot-cli",
"polkadot-node-core-candidate-validation",
"polkadot-node-core-pvf",
"polkadot-node-subsystem",
"polkadot-node-subsystem-util",
"structopt",
......
......@@ -70,6 +70,7 @@ pub struct Config {
/// The candidate validation subsystem.
pub struct CandidateValidationSubsystem {
metrics: Metrics,
pvf_metrics: polkadot_node_core_pvf::Metrics,
config: Config,
}
......@@ -78,8 +79,12 @@ impl CandidateValidationSubsystem {
/// strategy.
///
/// Check out [`IsolationStrategy`] to get more details.
pub fn with_config(config: Config, metrics: Metrics) -> Self {
CandidateValidationSubsystem { config, metrics }
pub fn with_config(
config: Config,
metrics: Metrics,
pvf_metrics: polkadot_node_core_pvf::Metrics,
) -> Self {
CandidateValidationSubsystem { config, metrics, pvf_metrics }
}
}
......@@ -89,10 +94,15 @@ where
Context: overseer::SubsystemContext<Message = CandidateValidationMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future =
run(ctx, self.metrics, self.config.artifacts_cache_path, self.config.program_path)
.map_err(|e| SubsystemError::with_origin("candidate-validation", e))
.boxed();
let future = run(
ctx,
self.metrics,
self.pvf_metrics,
self.config.artifacts_cache_path,
self.config.program_path,
)
.map_err(|e| SubsystemError::with_origin("candidate-validation", e))
.boxed();
SpawnedSubsystem { name: "candidate-validation-subsystem", future }
}
}
......@@ -100,6 +110,7 @@ where
async fn run<Context>(
mut ctx: Context,
metrics: Metrics,
pvf_metrics: polkadot_node_core_pvf::Metrics,
cache_path: PathBuf,
program_path: PathBuf,
) -> SubsystemResult<()>
......@@ -109,6 +120,7 @@ where
{
let (mut validation_host, task) = polkadot_node_core_pvf::start(
polkadot_node_core_pvf::Config::new(cache_path, program_path),
pvf_metrics,
);
ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
......
......@@ -23,6 +23,7 @@ rand = "0.8.3"
parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
polkadot-parachain = { path = "../../../parachain" }
polkadot-core-primitives = { path = "../../../core-primitives" }
polkadot-node-subsystem-util = { path = "../../subsystem-util"}
sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" }
......
......@@ -20,6 +20,7 @@ use super::worker::Outcome;
use crate::{
artifacts::{ArtifactId, ArtifactPathId},
host::ResultSender,
metrics::Metrics,
worker_common::{IdleWorker, WorkerHandle},
InvalidCandidate, ValidationError, LOG_TARGET,
};
......@@ -95,6 +96,8 @@ enum QueueEvent {
type Mux = FuturesUnordered<BoxFuture<'static, QueueEvent>>;
struct Queue {
metrics: Metrics,
/// The receiver that receives messages to the pool.
to_queue_rx: mpsc::Receiver<ToQueue>,
......@@ -109,12 +112,14 @@ struct Queue {
impl Queue {
fn new(
metrics: Metrics,
program_path: PathBuf,
worker_capacity: usize,
spawn_timeout: Duration,
to_queue_rx: mpsc::Receiver<ToQueue>,
) -> Self {
Self {
metrics,
program_path,
spawn_timeout,
to_queue_rx,
......@@ -141,12 +146,12 @@ impl Queue {
ev = self.mux.select_next_some() => handle_mux(&mut self, ev).await,
}
purge_dead(&mut self.workers).await;
purge_dead(&self.metrics, &mut self.workers).await;
}
}
}
async fn purge_dead(workers: &mut Workers) {
async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
let mut to_remove = vec![];
for (worker, data) in workers.running.iter_mut() {
if futures::poll!(&mut data.handle).is_ready() {
......@@ -155,7 +160,9 @@ async fn purge_dead(workers: &mut Workers) {
}
}
for w in to_remove {
let _ = workers.running.remove(w);
if workers.running.remove(w).is_some() {
metrics.execute_worker().on_retired();
}
}
}
......@@ -166,6 +173,7 @@ fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
validation_code_hash = ?artifact.id.code_hash,
"enqueueing an artifact for execution",
);
queue.metrics.execute_enqueued();
let job = ExecuteJob { artifact, params, result_tx };
if let Some(available) = queue.workers.find_available() {
......@@ -190,6 +198,7 @@ async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
}
fn handle_worker_spawned(queue: &mut Queue, idle: IdleWorker, handle: WorkerHandle) {
queue.metrics.execute_worker().on_spawned();
queue.workers.spawn_inflight -= 1;
let worker = queue.workers.running.insert(WorkerData { idle: Some(idle), handle });
......@@ -228,6 +237,7 @@ fn handle_job_finish(
(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbigiousWorkerDeath))),
};
queue.metrics.execute_finished();
tracing::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact_id.code_hash,
......@@ -257,7 +267,9 @@ fn handle_job_finish(
}
} else {
// Note it's possible that the worker was purged already by `purge_dead`
queue.workers.running.remove(worker);
if queue.workers.running.remove(worker).is_some() {
queue.metrics.execute_worker().on_retired();
}
if !queue.queue.is_empty() {
// The worker has died and we still have work we have to do. Request an extra worker.
......@@ -269,6 +281,7 @@ fn handle_job_finish(
}
fn spawn_extra_worker(queue: &mut Queue) {
queue.metrics.execute_worker().on_begin_spawn();
tracing::debug!(target: LOG_TARGET, "spawning an extra worker");
queue
......@@ -309,8 +322,10 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
thus claim_idle cannot return None;
qed.",
);
let execution_timer = queue.metrics.time_execution();
queue.mux.push(
async move {
let _timer = execution_timer;
let outcome = super::worker::start_work(idle, job.artifact.clone(), job.params).await;
QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx)
}
......@@ -319,11 +334,12 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
}
pub fn start(
metrics: Metrics,
program_path: PathBuf,
worker_capacity: usize,
spawn_timeout: Duration,
) -> (mpsc::Sender<ToQueue>, impl Future<Output = ()>) {
let (to_queue_tx, to_queue_rx) = mpsc::channel(20);
let run = Queue::new(program_path, worker_capacity, spawn_timeout, to_queue_rx).run();
let run = Queue::new(metrics, program_path, worker_capacity, spawn_timeout, to_queue_rx).run();
(to_queue_tx, run)
}
......@@ -22,7 +22,9 @@
use crate::{
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
execute, prepare, Priority, Pvf, ValidationError, LOG_TARGET,
execute,
metrics::Metrics,
prepare, Priority, Pvf, ValidationError, LOG_TARGET,
};
use always_assert::never;
use async_std::path::{Path, PathBuf};
......@@ -134,18 +136,20 @@ impl Config {
/// The future should not return normally but if it does then that indicates an unrecoverable error.
/// In that case all pending requests will be canceled, dropping the result senders and new ones
/// will be rejected.
pub fn start(config: Config) -> (ValidationHost, impl Future<Output = ()>) {
pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<Output = ()>) {
let (to_host_tx, to_host_rx) = mpsc::channel(10);
let validation_host = ValidationHost { to_host_tx };
let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool(
metrics.clone(),
config.prepare_worker_program_path.clone(),
config.cache_path.clone(),
config.prepare_worker_spawn_timeout,
);
let (to_prepare_queue_tx, from_prepare_queue_rx, run_prepare_queue) = prepare::start_queue(
metrics.clone(),
config.prepare_workers_soft_max_num,
config.prepare_workers_hard_max_num,
config.cache_path.clone(),
......@@ -154,6 +158,7 @@ pub fn start(config: Config) -> (ValidationHost, impl Future<Output = ()>) {
);
let (to_execute_queue_tx, run_execute_queue) = execute::start(
metrics.clone(),
config.execute_worker_program_path.to_owned(),
config.execute_workers_max_num,
config.execute_worker_spawn_timeout,
......
......@@ -80,6 +80,7 @@ mod error;
mod execute;
mod executor_intf;
mod host;
mod metrics;
mod prepare;
mod priority;
mod pvf;
......@@ -96,6 +97,7 @@ pub use priority::Priority;
pub use pvf::Pvf;
pub use host::{start, Config, ValidationHost};
pub use metrics::Metrics;
pub use execute::worker_entrypoint as execute_worker_entrypoint;
pub use prepare::worker_entrypoint as prepare_worker_entrypoint;
......
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Prometheus metrics related to the validation host.
use polkadot_node_subsystem_util::metrics::{self, prometheus};
/// Validation host metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
/// Returns a handle to submit prepare workers metrics.
pub(crate) fn prepare_worker(&'_ self) -> WorkerRelatedMetrics<'_> {
WorkerRelatedMetrics { metrics: self, flavor: WorkerFlavor::Prepare }
}
/// Returns a handle to submit execute workers metrics.
pub(crate) fn execute_worker(&'_ self) -> WorkerRelatedMetrics<'_> {
WorkerRelatedMetrics { metrics: self, flavor: WorkerFlavor::Execute }
}
/// When preparation pipeline had a new item enqueued.
pub(crate) fn prepare_enqueued(&self) {
if let Some(metrics) = &self.0 {
metrics.prepare_enqueued.inc();
}
}
/// When preparation pipeline concluded working on an item.
pub(crate) fn prepare_concluded(&self) {
if let Some(metrics) = &self.0 {
metrics.prepare_concluded.inc();
}
}
/// When execution pipeline had a new item enqueued.
pub(crate) fn execute_enqueued(&self) {
if let Some(metrics) = &self.0 {
metrics.execute_enqueued.inc();
}
}
/// When execution pipeline finished executing a request.
pub(crate) fn execute_finished(&self) {
if let Some(metrics) = &self.0 {
metrics.execute_finished.inc();
}
}
/// Time between sending preparation request to a worker to having the response.
pub(crate) fn time_preparation(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.preparation_time.start_timer())
}
/// Time between sending execution request to a worker to having the response.
pub(crate) fn time_execution(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.execution_time.start_timer())
}
}
#[derive(Clone)]
struct MetricsInner {
worker_spawning: prometheus::CounterVec<prometheus::U64>,
worker_spawned: prometheus::CounterVec<prometheus::U64>,
worker_retired: prometheus::CounterVec<prometheus::U64>,
prepare_enqueued: prometheus::Counter<prometheus::U64>,
prepare_concluded: prometheus::Counter<prometheus::U64>,
execute_enqueued: prometheus::Counter<prometheus::U64>,
execute_finished: prometheus::Counter<prometheus::U64>,
preparation_time: prometheus::Histogram,
execution_time: prometheus::Histogram,
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let inner = MetricsInner {
worker_spawning: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"pvf_worker_spawning",
"The total number of workers began to spawn",
),
&["flavor"],
)?,
registry,
)?,
worker_spawned: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"pvf_worker_spawned",
"The total number of workers spawned successfully",
),
&["flavor"],
)?,
registry,
)?,
worker_retired: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"pvf_worker_retired",
"The total number of workers retired, either killed by the host or died on duty",
),
&["flavor"],
)?,
registry,
)?,
prepare_enqueued: prometheus::register(
prometheus::Counter::new(
"pvf_prepare_enqueued",
"The total number of jobs enqueued into the preparation pipeline"
)?,
registry,
)?,
prepare_concluded: prometheus::register(
prometheus::Counter::new(
"pvf_prepare_concluded",
"The total number of jobs concluded in the preparation pipeline"
)?,
registry,
)?,
execute_enqueued: prometheus::register(
prometheus::Counter::new(
"pvf_execute_enqueued",
"The total number of jobs enqueued into the execution pipeline"
)?,
registry,
)?,
execute_finished: prometheus::register(
prometheus::Counter::new(
"pvf_execute_finished",
"The total number of jobs done in the execution pipeline"
)?,
registry,
)?,
preparation_time: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"pvf_preparation_time",
"Time spent in preparing PVF artifacts",
)
)?,
registry,
)?,
execution_time: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"pvf_execution_time",
"Time spent in executing PVFs",
)
)?,
registry,
)?,
};
Ok(Metrics(Some(inner)))
}
}
enum WorkerFlavor {
Prepare,
Execute,
}
impl WorkerFlavor {
fn as_label(&self) -> &'static str {
match *self {
WorkerFlavor::Prepare => "prepare",
WorkerFlavor::Execute => "execute",
}
}
}
pub(crate) struct WorkerRelatedMetrics<'a> {
metrics: &'a Metrics,
flavor: WorkerFlavor,
}
impl<'a> WorkerRelatedMetrics<'a> {
/// When the spawning of a worker started.
pub(crate) fn on_begin_spawn(&self) {
if let Some(metrics) = &self.metrics.0 {
metrics.worker_spawning.with_label_values(&[self.flavor.as_label()]).inc();
}
}
/// When the worker successfully spawned.
pub(crate) fn on_spawned(&self) {
if let Some(metrics) = &self.metrics.0 {
metrics.worker_spawned.with_label_values(&[self.flavor.as_label()]).inc();
}
}
/// When the worker was killed or died.
pub(crate) fn on_retired(&self) {
if let Some(metrics) = &self.metrics.0 {
metrics.worker_spawned.with_label_values(&[self.flavor.as_label()]).inc();
}
}
}
......@@ -16,6 +16,7 @@
use super::worker::{self, Outcome};
use crate::{
metrics::Metrics,
worker_common::{IdleWorker, WorkerHandle},
LOG_TARGET,
};
......@@ -111,6 +112,7 @@ struct Pool {
from_pool: mpsc::UnboundedSender<FromPool>,
spawned: HopSlotMap<Worker, WorkerData>,
mux: Mux,
metrics: Metrics,
}
/// A fatal error that warrants stopping the event loop of the pool.
......@@ -125,6 +127,7 @@ async fn run(
mut from_pool,
mut spawned,
mut mux,
metrics,
}: Pool,
) {
macro_rules! break_if_fatal {
......@@ -143,6 +146,7 @@ async fn run(
to_pool = to_pool.next() => {
let to_pool = break_if_fatal!(to_pool.ok_or(Fatal));
handle_to_pool(
&metrics,
&program_path,
&cache_path,
spawn_timeout,
......@@ -151,14 +155,17 @@ async fn run(
to_pool,
)
}
ev = mux.select_next_some() => break_if_fatal!(handle_mux(&mut from_pool, &mut spawned, ev)),
ev = mux.select_next_some() => {
break_if_fatal!(handle_mux(&metrics, &mut from_pool, &mut spawned, ev))
}
}
break_if_fatal!(purge_dead(&mut from_pool, &mut spawned).await);
break_if_fatal!(purge_dead(&metrics, &mut from_pool, &mut spawned).await);
}
}
async fn purge_dead(
metrics: &Metrics,
from_pool: &mut mpsc::UnboundedSender<FromPool>,
spawned: &mut HopSlotMap<Worker, WorkerData>,
) -> Result<(), Fatal> {
......@@ -177,7 +184,7 @@ async fn purge_dead(
}
}
for w in to_remove {
if spawned.remove(w).is_some() {
if attempt_retire(metrics, spawned, w) {
reply(from_pool, FromPool::Rip(w))?;
}
}
......@@ -185,6 +192,7 @@ async fn purge_dead(
}
fn handle_to_pool(
metrics: &Metrics,
program_path: &Path,
cache_path: &Path,
spawn_timeout: Duration,
......@@ -195,11 +203,13 @@ fn handle_to_pool(
match to_pool {
ToPool::Spawn => {
tracing::debug!(target: LOG_TARGET, "spawning a new prepare worker");
metrics.prepare_worker().on_begin_spawn();
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
},
ToPool::StartWork { worker, code, artifact_path, background_priority } => {
if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() {
let preparation_timer = metrics.time_preparation();
mux.push(
start_work_task(
worker,
......@@ -208,6 +218,7 @@ fn handle_to_pool(
cache_path.to_owned(),
artifact_path,
background_priority,
preparation_timer,
)
.boxed(),
);
......@@ -227,7 +238,7 @@ fn handle_to_pool(
ToPool::Kill(worker) => {
tracing::debug!(target: LOG_TARGET, ?worker, "killing prepare worker");
// It may be absent if it were previously already removed by `purge_dead`.
let _ = spawned.remove(worker);
let _ = attempt_retire(metrics, spawned, worker);
},
ToPool::BumpPriority(worker) =>
if let Some(data) = spawned.get(worker) {
......@@ -252,13 +263,14 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Po
}
}
async fn start_work_task(
async fn start_work_task<Timer>(
worker: Worker,
idle: IdleWorker,
code: Arc<Vec<u8>>,
cache_path: PathBuf,
artifact_path: PathBuf,
background_priority: bool,
_preparation_timer: Option<Timer>,
) -> PoolEvent {
let outcome =
worker::start_work(idle, code, &cache_path, artifact_path, background_priority).await;
......@@ -266,12 +278,15 @@ async fn start_work_task(
}
fn handle_mux(
metrics: &Metrics,
from_pool: &mut mpsc::UnboundedSender<FromPool>,
spawned: &mut HopSlotMap<Worker, WorkerData>,
event: PoolEvent,
) -> Result<(), Fatal> {
match event {
PoolEvent::Spawn(idle, handle) => {
metrics.prepare_worker().on_spawned();
let worker = spawned.insert(WorkerData { idle: Some(idle), handle });
reply(from_pool, FromPool::Spawned(worker))?;
......@@ -300,14 +315,14 @@ fn handle_mux(
Ok(())
},
Outcome::Unreachable => {
if spawned.remove(worker).is_some() {
if attempt_retire(metrics, spawned, worker) {
reply(from_pool, FromPool::Rip(worker))?;
}
Ok(())
},
Outcome::DidntMakeIt => {
if spawned.remove(worker).is_some() {
if attempt_retire(metrics, spawned, worker) {