Unverified Commit 804958ab authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

initial prometheus metrics (#1536)

* service-new: cosmetic changes

* overseer: draft of prometheus metrics

* metrics: update active_leaves metrics

* metrics: extract into functions

* metrics: resolve XXX

* metrics: it's ugly, but it works

* Bump Substrate

* metrics: move a bunch of code around

* Bumb substrate again

* metrics: fix a warning

* fix a warning in runtime

* metrics: statements signed

* metrics: statements impl RegisterMetrics

* metrics: refactor Metrics trait

* metrics: add Metrics assoc type to JobTrait

* metrics: move Metrics trait to util

* metrics: fix overseer

* metrics: fix backing

* metrics: fix candidate validation

* metrics: derive Default

* metrics: docs

* metrics: add stubs for other subsystems

* metrics: add more stubs and fix compilation

* metrics: fix doctest

* metrics: move to subsystem

* metrics: fix candidate validation

* metrics: bitfield signing

* metrics: av store

* metrics: chain API

* metrics: runtime API

* metrics: stub for avad

* metrics: candidates seconded

* metrics: ok I gave up

* metrics: provisioner

* metrics: remove a clone by requiring Metrics: Sync

* metrics: YAGNI

* metrics: remove another TODO

* metrics: for later

* metrics: add parachain_ prefix

* metrics: s/signed_statement/signed_statements

* utils: add a comment for job metrics

* metrics: address review comments

* metrics: oops

* metrics: make sure to save files before commit 😅



* use _total suffix for requests metrics
Co-authored-by: default avatarMax Inden <mail@max-inden.de>

* metrics: add tests for overseer

* update Cargo.lock

* overseer: add a test for CollationGeneration

* collation-generation: impl metrics

* collation-generation: use kebab-case for name

* collation-generation: add a constructor
Co-authored-by: default avatarGav Wood <gavin@parity.io>
Co-authored-by: default avatarAshley Ruglys <ashley.ruglys@gmail.com>
Co-authored-by: default avatarMax Inden <mail@max-inden.de>
parent 62b9d4a5
Pipeline #104132 failed with stages
in 0 seconds
......@@ -4861,6 +4861,7 @@ dependencies = [
"sc-network",
"smallvec 1.4.1",
"sp-core",
"substrate-prometheus-endpoint",
]
[[package]]
......
......@@ -31,6 +31,7 @@ use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{AllMessages, CollationGenerationMessage, CollatorProtocolMessage},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
metrics::{self, prometheus},
};
use polkadot_node_subsystem_util::{
self as util, request_availability_cores_ctx, request_global_validation_data_ctx,
......@@ -47,9 +48,18 @@ use std::sync::Arc;
/// Collation Generation Subsystem
pub struct CollationGenerationSubsystem {
config: Option<Arc<CollationGenerationConfig>>,
metrics: Metrics,
}
impl CollationGenerationSubsystem {
/// Create a new instance of the `CollationGenerationSubsystem`.
pub fn new(metrics: Metrics) -> Self {
Self {
config: None,
metrics,
}
}
/// Run this subsystem
///
/// Conceptually, this is very simple: it just loops forever.
......@@ -112,8 +122,9 @@ impl CollationGenerationSubsystem {
Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, .. }))) => {
// follow the procedure from the guide
if let Some(config) = &self.config {
let metrics = self.metrics.clone();
if let Err(err) =
handle_new_activations(config.clone(), &activated, ctx, sender).await
handle_new_activations(config.clone(), &activated, ctx, metrics, sender).await
{
log::warn!(target: "collation_generation", "failed to handle new activations: {:?}", err);
return true;
......@@ -146,13 +157,13 @@ impl<Context> Subsystem<Context> for CollationGenerationSubsystem
where
Context: SubsystemContext<Message = CollationGenerationMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let subsystem = CollationGenerationSubsystem { config: None };
type Metrics = Metrics;
let future = Box::pin(subsystem.run(ctx));
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = Box::pin(self.run(ctx));
SpawnedSubsystem {
name: "CollationGenerationSubsystem",
name: "collation-generation-subsystem",
future,
}
}
......@@ -178,6 +189,7 @@ async fn handle_new_activations<Context: SubsystemContext>(
config: Arc<CollationGenerationConfig>,
activated: &[Hash],
ctx: &mut Context,
metrics: Metrics,
sender: &mpsc::Sender<AllMessages>,
) -> Result<()> {
// follow the procedure from the guide:
......@@ -230,6 +242,7 @@ async fn handle_new_activations<Context: SubsystemContext>(
let task_global_validation_data = global_validation_data.clone();
let task_config = config.clone();
let mut task_sender = sender.clone();
let metrics = metrics.clone();
ctx.spawn("collation generation collation builder", Box::pin(async move {
let validation_data_hash =
validation_data_hash(&task_global_validation_data, &local_validation_data);
......@@ -273,6 +286,8 @@ async fn handle_new_activations<Context: SubsystemContext>(
},
};
metrics.on_collation_generated();
if let Err(err) = task_sender.send(AllMessages::CollatorProtocol(
CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity)
)).await {
......@@ -305,6 +320,38 @@ fn erasure_root(
Ok(polkadot_erasure_coding::branches(&chunks).root())
}
#[derive(Clone)]
struct MetricsInner {
collations_generated_total: prometheus::Counter<prometheus::U64>,
}
/// CollationGenerationSubsystem metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_collation_generated(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_generated_total.inc();
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
collations_generated_total: prometheus::register(
prometheus::Counter::new(
"parachain_collations_generated_total",
"Number of collations generated."
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)]
mod tests {
mod handle_new_activations {
......@@ -411,6 +458,7 @@ mod tests {
test_config(123),
&subsystem_activated_hashes,
&mut ctx,
Metrics(None),
&tx,
)
.await
......@@ -498,7 +546,7 @@ mod tests {
let (tx, _rx) = mpsc::channel(0);
subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(test_config(16), &activated_hashes, &mut ctx, &tx)
handle_new_activations(test_config(16), &activated_hashes, &mut ctx, Metrics(None), &tx)
.await
.unwrap();
});
......@@ -581,7 +629,7 @@ mod tests {
let sent_messages = Arc::new(Mutex::new(Vec::new()));
let subsystem_sent_messages = sent_messages.clone();
subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(subsystem_config, &activated_hashes, &mut ctx, &tx)
handle_new_activations(subsystem_config, &activated_hashes, &mut ctx, Metrics(None), &tx)
.await
.unwrap();
......
......@@ -34,6 +34,7 @@ use polkadot_primitives::v1::{
};
use polkadot_subsystem::{
FromOverseer, SubsystemError, Subsystem, SubsystemContext, SpawnedSubsystem,
metrics::{self, prometheus},
};
use polkadot_subsystem::messages::AvailabilityStoreMessage;
......@@ -59,6 +60,7 @@ enum Error {
/// An implementation of the Availability Store subsystem.
pub struct AvailabilityStoreSubsystem {
inner: Arc<dyn KeyValueDB>,
metrics: Metrics,
}
fn available_data_key(candidate_hash: &Hash) -> Vec<u8> {
......@@ -85,7 +87,7 @@ pub struct Config {
impl AvailabilityStoreSubsystem {
/// Create a new `AvailabilityStoreSubsystem` with a given config on disk.
pub fn new_on_disk(config: Config) -> io::Result<Self> {
pub fn new_on_disk(config: Config, metrics: Metrics) -> io::Result<Self> {
let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS);
if let Some(cache_size) = config.cache_size {
......@@ -106,6 +108,7 @@ impl AvailabilityStoreSubsystem {
Ok(Self {
inner: Arc::new(db),
metrics,
})
}
......@@ -113,6 +116,7 @@ impl AvailabilityStoreSubsystem {
fn new_in_memory(inner: Arc<dyn KeyValueDB>) -> Self {
Self {
inner,
metrics: Metrics(None),
}
}
}
......@@ -130,7 +134,7 @@ where
Ok(FromOverseer::Signal(Conclude)) => break,
Ok(FromOverseer::Signal(_)) => (),
Ok(FromOverseer::Communication { msg }) => {
process_message(&subsystem.inner, msg)?;
process_message(&subsystem.inner, &subsystem.metrics, msg)?;
}
Err(_) => break,
}
......@@ -142,7 +146,7 @@ where
Ok(())
}
fn process_message(db: &Arc<dyn KeyValueDB>, msg: AvailabilityStoreMessage) -> Result<(), Error> {
fn process_message(db: &Arc<dyn KeyValueDB>, metrics: &Metrics, msg: AvailabilityStoreMessage) -> Result<(), Error> {
use AvailabilityStoreMessage::*;
match msg {
QueryAvailableData(hash, tx) => {
......@@ -152,10 +156,10 @@ fn process_message(db: &Arc<dyn KeyValueDB>, msg: AvailabilityStoreMessage) -> R
tx.send(available_data(db, &hash).is_some()).map_err(|_| oneshot::Canceled)?;
}
QueryChunk(hash, id, tx) => {
tx.send(get_chunk(db, &hash, id)?).map_err(|_| oneshot::Canceled)?;
tx.send(get_chunk(db, &hash, id, metrics)?).map_err(|_| oneshot::Canceled)?;
}
QueryChunkAvailability(hash, id, tx) => {
tx.send(get_chunk(db, &hash, id)?.is_some()).map_err(|_| oneshot::Canceled)?;
tx.send(get_chunk(db, &hash, id, metrics)?.is_some()).map_err(|_| oneshot::Canceled)?;
}
StoreChunk(hash, id, chunk, tx) => {
match store_chunk(db, &hash, id, chunk) {
......@@ -169,7 +173,7 @@ fn process_message(db: &Arc<dyn KeyValueDB>, msg: AvailabilityStoreMessage) -> R
}
}
StoreAvailableData(hash, id, n_validators, av_data, tx) => {
match store_available_data(db, &hash, id, n_validators, av_data) {
match store_available_data(db, &hash, id, n_validators, av_data, metrics) {
Err(e) => {
tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
return Err(e);
......@@ -194,11 +198,12 @@ fn store_available_data(
id: Option<ValidatorIndex>,
n_validators: u32,
available_data: AvailableData,
metrics: &Metrics,
) -> Result<(), Error> {
let mut tx = DBTransaction::new();
if let Some(index) = id {
let chunks = get_chunks(&available_data, n_validators as usize)?;
let chunks = get_chunks(&available_data, n_validators as usize, metrics)?;
store_chunk(db, candidate_hash, n_validators, chunks[index as usize].clone())?;
}
......@@ -231,7 +236,7 @@ fn store_chunk(db: &Arc<dyn KeyValueDB>, candidate_hash: &Hash, _n_validators: u
Ok(())
}
fn get_chunk(db: &Arc<dyn KeyValueDB>, candidate_hash: &Hash, index: u32)
fn get_chunk(db: &Arc<dyn KeyValueDB>, candidate_hash: &Hash, index: u32, metrics: &Metrics)
-> Result<Option<ErasureChunk>, Error>
{
if let Some(chunk) = query_inner(
......@@ -242,7 +247,7 @@ fn get_chunk(db: &Arc<dyn KeyValueDB>, candidate_hash: &Hash, index: u32)
}
if let Some(data) = available_data(db, candidate_hash) {
let mut chunks = get_chunks(&data.data, data.n_validators as usize)?;
let mut chunks = get_chunks(&data.data, data.n_validators as usize, metrics)?;
let desired_chunk = chunks.get(index as usize).cloned();
for chunk in chunks.drain(..) {
store_chunk(db, candidate_hash, data.n_validators, chunk)?;
......@@ -271,6 +276,8 @@ impl<Context> Subsystem<Context> for AvailabilityStoreSubsystem
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{
type Metrics = Metrics;
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = Box::pin(async move {
if let Err(e) = run(self, ctx).await {
......@@ -285,8 +292,9 @@ impl<Context> Subsystem<Context> for AvailabilityStoreSubsystem
}
}
fn get_chunks(data: &AvailableData, n_validators: usize) -> Result<Vec<ErasureChunk>, Error> {
fn get_chunks(data: &AvailableData, n_validators: usize, metrics: &Metrics) -> Result<Vec<ErasureChunk>, Error> {
let chunks = erasure::obtain_chunks_v1(n_validators, data)?;
metrics.on_chunks_received(chunks.len());
let branches = erasure::branches(chunks.as_ref());
Ok(chunks
......@@ -302,6 +310,41 @@ fn get_chunks(data: &AvailableData, n_validators: usize) -> Result<Vec<ErasureCh
)
}
#[derive(Clone)]
struct MetricsInner {
received_availability_chunks_total: prometheus::Counter<prometheus::U64>,
}
/// Availability metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_chunks_received(&self, count: usize) {
if let Some(metrics) = &self.0 {
use core::convert::TryFrom as _;
// assume usize fits into u64
let by = u64::try_from(count).unwrap_or_default();
metrics.received_availability_chunks_total.inc_by(by);
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
received_availability_chunks_total: prometheus::register(
prometheus::Counter::new(
"parachain_received_availability_chunks_total",
"Number of availability chunks received.",
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)]
mod tests {
use super::*;
......@@ -501,7 +544,8 @@ mod tests {
omitted_validation,
};
let chunks_expected = get_chunks(&available_data, n_validators as usize).unwrap();
let no_metrics = Metrics(None);
let chunks_expected = get_chunks(&available_data, n_validators as usize, &no_metrics).unwrap();
let (tx, rx) = oneshot::channel();
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
......
......@@ -45,6 +45,7 @@ use polkadot_subsystem::{
ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed,
RuntimeApiRequest,
},
metrics::{self, prometheus},
};
use polkadot_node_subsystem_util::{
self as util,
......@@ -100,6 +101,7 @@ struct CandidateBackingJob {
reported_misbehavior_for: HashSet<ValidatorIndex>,
table: Table<TableContext>,
table_context: TableContext,
metrics: Metrics,
}
const fn group_quorum(n_validators: usize) -> usize {
......@@ -432,6 +434,7 @@ impl CandidateBackingJob {
&candidate,
pov,
).await {
self.metrics.on_candidate_seconded();
self.seconded = Some(candidate_hash);
}
}
......@@ -528,7 +531,9 @@ impl CandidateBackingJob {
}
fn sign_statement(&self, statement: Statement) -> Option<SignedFullStatement> {
Some(self.table_context.validator.as_ref()?.sign(statement))
let signed = self.table_context.validator.as_ref()?.sign(statement);
self.metrics.on_statement_signed();
Some(signed)
}
fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> {
......@@ -672,12 +677,14 @@ impl util::JobTrait for CandidateBackingJob {
type FromJob = FromJob;
type Error = Error;
type RunArgs = KeyStorePtr;
type Metrics = Metrics;
const NAME: &'static str = "CandidateBackingJob";
fn run(
parent: Hash,
keystore: KeyStorePtr,
metrics: Metrics,
rx_to: mpsc::Receiver<Self::ToJob>,
mut tx_from: mpsc::Sender<Self::FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
......@@ -764,6 +771,7 @@ impl util::JobTrait for CandidateBackingJob {
reported_misbehavior_for: HashSet::new(),
table: Table::default(),
table_context,
metrics,
};
job.run_loop().await
......@@ -772,7 +780,53 @@ impl util::JobTrait for CandidateBackingJob {
}
}
delegated_subsystem!(CandidateBackingJob(KeyStorePtr) <- ToJob as CandidateBackingSubsystem);
#[derive(Clone)]
struct MetricsInner {
signed_statements_total: prometheus::Counter<prometheus::U64>,
candidates_seconded_total: prometheus::Counter<prometheus::U64>
}
/// Candidate backing metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_statement_signed(&self) {
if let Some(metrics) = &self.0 {
metrics.signed_statements_total.inc();
}
}
fn on_candidate_seconded(&self) {
if let Some(metrics) = &self.0 {
metrics.candidates_seconded_total.inc();
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
signed_statements_total: prometheus::register(
prometheus::Counter::new(
"parachain_signed_statements_total",
"Number of statements signed.",
)?,
registry,
)?,
candidates_seconded_total: prometheus::register(
prometheus::Counter::new(
"parachain_candidates_seconded_total",
"Number of candidates seconded.",
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
delegated_subsystem!(CandidateBackingJob(KeyStorePtr, Metrics) <- ToJob as CandidateBackingSubsystem);
#[cfg(test)]
mod tests {
......@@ -904,7 +958,7 @@ mod tests {
let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool.clone());
let subsystem = CandidateBackingSubsystem::run(context, keystore, pool.clone());
let subsystem = CandidateBackingSubsystem::run(context, keystore, Metrics(None), pool.clone());
let test_fut = test(TestHarness {
virtual_overseer,
......
......@@ -29,6 +29,7 @@ use polkadot_node_subsystem::{
BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage,
},
errors::RuntimeApiError,
metrics::{self, prometheus},
};
use polkadot_node_subsystem_util::{
self as util, JobManager, JobTrait, ToJobTrait, Validator
......@@ -252,11 +253,44 @@ async fn construct_availability_bitfield(
}
}
#[derive(Clone)]
struct MetricsInner {
bitfields_signed_total: prometheus::Counter<prometheus::U64>,
}
/// Bitfield signing metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_bitfield_signed(&self) {
if let Some(metrics) = &self.0 {
metrics.bitfields_signed_total.inc();
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
bitfields_signed_total: prometheus::register(
prometheus::Counter::new(
"parachain_bitfields_signed_total",
"Number of bitfields signed.",
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
impl JobTrait for BitfieldSigningJob {
type ToJob = ToJob;
type FromJob = FromJob;
type Error = Error;
type RunArgs = KeyStorePtr;
type Metrics = Metrics;
const NAME: &'static str = "BitfieldSigningJob";
......@@ -264,6 +298,7 @@ impl JobTrait for BitfieldSigningJob {
fn run(
relay_parent: Hash,
keystore: Self::RunArgs,
metrics: Self::Metrics,
_receiver: mpsc::Receiver<ToJob>,
mut sender: mpsc::Sender<FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
......@@ -295,6 +330,7 @@ impl JobTrait for BitfieldSigningJob {
};
let signed_bitfield = validator.sign(bitfield);
metrics.on_bitfield_signed();
// make an anonymous scope to contain some use statements to simplify creating the outbound message
{
......
......@@ -23,9 +23,11 @@
use polkadot_subsystem::{
Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemResult,
FromOverseer, OverseerSignal,
};
use polkadot_subsystem::messages::{
AllMessages, CandidateValidationMessage, RuntimeApiMessage, ValidationFailed, RuntimeApiRequest,
messages::{
AllMessages, CandidateValidationMessage, RuntimeApiMessage,
ValidationFailed, RuntimeApiRequest,
},
metrics::{self, prometheus},
};
use polkadot_subsystem::errors::RuntimeApiError;
use polkadot_node_primitives::{ValidationResult, ValidationOutputs, InvalidCandidate};
......@@ -45,13 +47,63 @@ use futures::prelude::*;
use std::sync::Arc;
const LOG_TARGET: &'static str = "candidate_validation";
/// The candidate validation subsystem.
pub struct CandidateValidationSubsystem<S>(S);
pub struct CandidateValidationSubsystem<S> {
spawn: S,
metrics: Metrics,
}
#[derive(Clone)]
struct MetricsInner {
validation_requests: prometheus::CounterVec<prometheus::U64>,
}
/// Candidate validation metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_validation_event(&self, event: &Result<ValidationResult, ValidationFailed>) {
if let Some(metrics) = &self.0 {
match event {
Ok(ValidationResult::Valid(_)) => {
metrics.validation_requests.with_label_values(&["valid"]).inc();
},
Ok(ValidationResult::Invalid(_)) => {
metrics.validation_requests.with_label_values(&["invalid"]).inc();
},
Err(_) => {
metrics.validation_requests.with_label_values(&["failed"]).inc();
},
}
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
validation_requests: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_validation_requests_total",
"Number of validation requests served.",
),
&["valid", "invalid", "failed"],
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
impl<S> CandidateValidationSubsystem<S> {
/// Create a new `CandidateValidationSubsystem` with the given task spawner.
pub fn new(spawn: S) -> Self {
CandidateValidationSubsystem(spawn)
pub fn new(spawn: S, metrics: Metrics) -> Self {
CandidateValidationSubsystem { spawn, metrics }
}
}
......@@ -59,10 +111,12 @@ impl<S, C> Subsystem<C> for CandidateValidationSubsystem<S> where
C: SubsystemContext<Message = CandidateValidationMessage>,
S: SpawnNamed + Clone + 'static,
{
type Metrics = Metrics;
fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem {