Unverified Commit e655654e authored by Peter Goodspeed-Niklaus's avatar Peter Goodspeed-Niklaus Committed by GitHub
Browse files

Add Prometheus timers to the subsystems (#1923)

* reexport prometheus-super for ease of use of other subsystems

* add some prometheus timers for collation generation subsystem

* add timing metrics to av-store

* add metrics to candidate backing

* add timing metric to bitfield signing

* add timing metrics to candidate selection

* add timing metrics to candidate-validation

* add timing metrics to chain-api

* add timing metrics to provisioner

* add timing metrics to runtime-api

* add timing metrics to availability-distribution

* add timing metrics to bitfield-distribution

* add timing metrics to collator protocol: collator side

* add timing metrics to collator protocol: validator side

* fix candidate validation test failures

* add timing metrics to pov distribution

* add timing metrics to statement-distribution

* use substrate_prometheus_endpoint prometheus reexport instead of prometheus_super

* don't include JOB_DELAY in bitfield-signing metrics

* give adder-collator ability to easily export its genesis-state and validation code

* wip: adder-collator pushbutton script

* don't attempt to register the adder-collator automatically

Instead, get these values with

```sh
target/release/adder-collator export-genesis-state
target/release/adder-collator export-genesis-wasm
```

And then register the parachain on https://polkadot.js.org/apps/?rpc=ws%3A%2F%2F127.0.0.1%3A9944#/explorer

To collect prometheus data, after running the script, create `prometheus.yml` per the instructions
at https://www.notion.so/paritytechnologies/Setting-up-Prometheus-locally-835cb3a9df7541a781c381006252b5ff


and then run:

```sh
docker run -v `pwd`/prometheus.yml:/etc/prometheus/prometheus.yml:z --network host prom/prometheus
```

Demonstrates that data makes it across to prometheus, though it is likely to be useful in the future
to tweak the buckets.

* Update parachain/test-parachains/adder/collator/src/cli.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* use the grandpa-pause parameter

* skip metrics in tracing instrumentation

* remove unnecessary grandpa_pause cli param

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent f7ea3d07
Pipeline #114609 passed with stages
in 25 minutes and 25 seconds
......@@ -189,7 +189,11 @@ async fn handle_new_activations<Context: SubsystemContext>(
// follow the procedure from the guide:
// https://w3f.github.io/parachain-implementers-guide/node/collators/collation-generation.html
let _overall_timer = metrics.time_new_activations();
for relay_parent in activated.iter().copied() {
let _relay_parent_timer = metrics.time_new_activations_relay_parent();
// double-future magic happens here: the first layer of requests takes a mutable borrow of the context, and
// returns a receiver. The second layer of requests actually polls those receivers to completion.
let (availability_cores, validators) = join!(
......@@ -201,6 +205,8 @@ async fn handle_new_activations<Context: SubsystemContext>(
let n_validators = validators??.len();
for core in availability_cores {
let _availability_core_timer = metrics.time_new_activations_availability_core();
let (scheduled_core, assumption) = match core {
CoreState::Scheduled(scheduled_core) => {
(scheduled_core, OccupiedCoreAssumption::Free)
......@@ -335,6 +341,9 @@ fn erasure_root(
#[derive(Clone)]
struct MetricsInner {
collations_generated_total: prometheus::Counter<prometheus::U64>,
new_activations_overall: prometheus::Histogram,
new_activations_per_relay_parent: prometheus::Histogram,
new_activations_per_availability_core: prometheus::Histogram,
}
/// CollationGenerationSubsystem metrics.
......@@ -347,6 +356,21 @@ impl Metrics {
metrics.collations_generated_total.inc();
}
}
/// Provide a timer for new activations which updates on drop.
fn time_new_activations(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.new_activations_overall.start_timer())
}
/// Provide a timer per relay parents which updates on drop.
fn time_new_activations_relay_parent(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.new_activations_per_relay_parent.start_timer())
}
/// Provide a timer per availability core which updates on drop.
fn time_new_activations_availability_core(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.new_activations_per_availability_core.start_timer())
}
}
impl metrics::Metrics for Metrics {
......@@ -359,6 +383,33 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
new_activations_overall: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_collation_generation_new_activations",
"Time spent within fn handle_new_activations",
)
)?,
registry,
)?,
new_activations_per_relay_parent: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_collation_generation_per_relay_parent",
"Time spent handling a particular relay parent within fn handle_new_activations"
)
)?,
registry,
)?,
new_activations_per_availability_core: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_collation_generation_per_availability_core",
"Time spent handling a particular availability core for a relay parent in fn handle_new_activations",
)
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
......
......@@ -313,6 +313,8 @@ impl AvailabilityStoreSubsystem {
// Perform pruning of PoVs
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn prune_povs(&self) -> Result<(), Error> {
let _timer = self.metrics.time_prune_povs();
let mut tx = DBTransaction::new();
let mut pov_pruning = pov_pruning(&self.inner).unwrap_or_default();
let now = PruningDelay::now()?;
......@@ -338,6 +340,8 @@ impl AvailabilityStoreSubsystem {
// Perform pruning of chunks.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn prune_chunks(&self) -> Result<(), Error> {
let _timer = self.metrics.time_prune_chunks();
let mut tx = DBTransaction::new();
let mut chunk_pruning = chunk_pruning(&self.inner).unwrap_or_default();
let now = PruningDelay::now()?;
......@@ -522,7 +526,7 @@ where
ActiveLeavesUpdate { activated, .. })
) => {
for activated in activated.into_iter() {
process_block_activated(ctx, &subsystem.inner, activated).await?;
process_block_activated(ctx, &subsystem.inner, activated, &subsystem.metrics).await?;
}
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash)) => {
......@@ -561,6 +565,8 @@ async fn process_block_finalized<Context>(
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>
{
let _timer = subsystem.metrics.time_process_block_finalized();
let block_number = get_block_number(ctx, hash).await?;
if let Some(mut pov_pruning) = pov_pruning(db) {
......@@ -606,15 +612,18 @@ where
Ok(())
}
#[tracing::instrument(level = "trace", skip(ctx, db), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(ctx, db, metrics), fields(subsystem = LOG_TARGET))]
async fn process_block_activated<Context>(
ctx: &mut Context,
db: &Arc<dyn KeyValueDB>,
hash: Hash,
metrics: &Metrics,
) -> Result<(), Error>
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>
{
let _timer = metrics.time_block_activated();
let events = match request_candidate_events(ctx, hash).await {
Ok(events) => events,
Err(err) => {
......@@ -697,6 +706,8 @@ where
{
use AvailabilityStoreMessage::*;
let _timer = subsystem.metrics.time_process_message();
match msg {
QueryAvailableData(hash, tx) => {
tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data))
......@@ -860,6 +871,8 @@ fn store_available_data(
n_validators: u32,
available_data: AvailableData,
) -> Result<(), Error> {
let _timer = subsystem.metrics.time_store_available_data();
let mut tx = DBTransaction::new();
let block_number = available_data.validation_data.block_number;
......@@ -927,6 +940,8 @@ fn store_chunk(
chunk: ErasureChunk,
block_number: BlockNumber,
) -> Result<(), Error> {
let _timer = subsystem.metrics.time_store_chunk();
let mut tx = DBTransaction::new();
let dbkey = erasure_chunk_key(candidate_hash, chunk.index);
......@@ -977,6 +992,8 @@ fn get_chunk(
candidate_hash: &CandidateHash,
index: u32,
) -> Result<Option<ErasureChunk>, Error> {
let _timer = subsystem.metrics.time_get_chunk();
if let Some(chunk) = query_inner(
&subsystem.inner,
columns::DATA,
......@@ -1059,6 +1076,14 @@ fn get_chunks(data: &AvailableData, n_validators: usize, metrics: &Metrics) -> R
#[derive(Clone)]
struct MetricsInner {
received_availability_chunks_total: prometheus::Counter<prometheus::U64>,
prune_povs: prometheus::Histogram,
prune_chunks: prometheus::Histogram,
process_block_finalized: prometheus::Histogram,
block_activated: prometheus::Histogram,
process_message: prometheus::Histogram,
store_available_data: prometheus::Histogram,
store_chunk: prometheus::Histogram,
get_chunk: prometheus::Histogram,
}
/// Availability metrics.
......@@ -1074,6 +1099,46 @@ impl Metrics {
metrics.received_availability_chunks_total.inc_by(by);
}
}
/// Provide a timer for `prune_povs` which observes on drop.
fn time_prune_povs(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.prune_povs.start_timer())
}
/// Provide a timer for `prune_chunks` which observes on drop.
fn time_prune_chunks(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.prune_chunks.start_timer())
}
/// Provide a timer for `process_block_finalized` which observes on drop.
fn time_process_block_finalized(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_block_finalized.start_timer())
}
/// Provide a timer for `block_activated` which observes on drop.
fn time_block_activated(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.block_activated.start_timer())
}
/// Provide a timer for `process_message` which observes on drop.
fn time_process_message(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_message.start_timer())
}
/// Provide a timer for `store_available_data` which observes on drop.
fn time_store_available_data(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.store_available_data.start_timer())
}
/// Provide a timer for `store_chunk` which observes on drop.
fn time_store_chunk(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.store_chunk.start_timer())
}
/// Provide a timer for `get_chunk` which observes on drop.
fn time_get_chunk(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.get_chunk.start_timer())
}
}
impl metrics::Metrics for Metrics {
......@@ -1086,6 +1151,78 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
prune_povs: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_av_store_prune_povs",
"Time spent within `av_store::prune_povs`",
)
)?,
registry,
)?,
prune_chunks: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_av_store_prune_chunks",
"Time spent within `av_store::prune_chunks`",
)
)?,
registry,
)?,
process_block_finalized: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_av_store_process_block_finalized",
"Time spent within `av_store::block_finalized`",
)
)?,
registry,
)?,
block_activated: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_av_store_block_activated",
"Time spent within `av_store::block_activated`",
)
)?,
registry,
)?,
process_message: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_av_store_process_message",
"Time spent within `av_store::process_message`",
)
)?,
registry,
)?,
store_available_data: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_av_store_store_available_data",
"Time spent within `av_store::store_available_data`",
)
)?,
registry,
)?,
store_chunk: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_av_store_store_chunk",
"Time spent within `av_store::store_chunk`",
)
)?,
registry,
)?,
get_chunk: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_av_store_get_chunk",
"Time spent within `av_store::get_chunk`",
)
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
......
......@@ -481,8 +481,11 @@ impl CandidateBackingJob {
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn process_msg(&mut self, msg: CandidateBackingMessage) -> Result<(), Error> {
match msg {
CandidateBackingMessage::Second(_, candidate, pov) => {
let _timer = self.metrics.time_process_second();
// Sanity check that candidate is from our assignment.
if candidate.descriptor().para_id != self.assignment {
return Ok(());
......@@ -509,6 +512,8 @@ impl CandidateBackingJob {
}
}
CandidateBackingMessage::Statement(_, statement) => {
let _timer = self.metrics.time_process_statement();
self.check_statement_signature(&statement)?;
match self.maybe_validate_and_import(statement).await {
Err(Error::ValidationFailed(_)) => return Ok(()),
......@@ -517,6 +522,8 @@ impl CandidateBackingJob {
}
}
CandidateBackingMessage::GetBackedCandidates(_, tx) => {
let _timer = self.metrics.time_get_backed_candidates();
let backed = self.get_backed();
tx.send(backed).map_err(|data| Error::Send(data))?;
......@@ -898,7 +905,10 @@ impl util::JobTrait for CandidateBackingJob {
#[derive(Clone)]
struct MetricsInner {
signed_statements_total: prometheus::Counter<prometheus::U64>,
candidates_seconded_total: prometheus::Counter<prometheus::U64>
candidates_seconded_total: prometheus::Counter<prometheus::U64>,
process_second: prometheus::Histogram,
process_statement: prometheus::Histogram,
get_backed_candidates: prometheus::Histogram,
}
/// Candidate backing metrics.
......@@ -917,6 +927,21 @@ impl Metrics {
metrics.candidates_seconded_total.inc();
}
}
/// Provide a timer for handling `CandidateBackingMessage:Second` which observes on drop.
fn time_process_second(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_second.start_timer())
}
/// Provide a timer for handling `CandidateBackingMessage::Statement` which observes on drop.
fn time_process_statement(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_statement.start_timer())
}
/// Provide a timer for handling `CandidateBackingMessage::GetBackedCandidates` which observes on drop.
fn time_get_backed_candidates(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.get_backed_candidates.start_timer())
}
}
impl metrics::Metrics for Metrics {
......@@ -924,18 +949,45 @@ impl metrics::Metrics for Metrics {
let metrics = MetricsInner {
signed_statements_total: prometheus::register(
prometheus::Counter::new(
"parachain_signed_statements_total",
"parachain_candidate_backing_signed_statements_total",
"Number of statements signed.",
)?,
registry,
)?,
candidates_seconded_total: prometheus::register(
prometheus::Counter::new(
"parachain_candidates_seconded_total",
"parachain_candidate_backing_candidates_seconded_total",
"Number of candidates seconded.",
)?,
registry,
)?,
process_second: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_candidate_backing_process_second",
"Time spent within `candidate_backing::process_second`",
)
)?,
registry,
)?,
process_statement: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_candidate_backing_process_statement",
"Time spent within `candidate_backing::process_statement`",
)
)?,
registry,
)?,
get_backed_candidates: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_candidate_backing_get_backed_candidates",
"Time spent within `candidate_backing::get_backed_candidates`",
)
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
......
......@@ -230,6 +230,7 @@ async fn construct_availability_bitfield(
#[derive(Clone)]
struct MetricsInner {
bitfields_signed_total: prometheus::Counter<prometheus::U64>,
run: prometheus::Histogram,
}
/// Bitfield signing metrics.
......@@ -242,6 +243,11 @@ impl Metrics {
metrics.bitfields_signed_total.inc();
}
}
/// Provide a timer for `prune_povs` which observes on drop.
fn time_run(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.run.start_timer())
}
}
impl metrics::Metrics for Metrics {
......@@ -254,6 +260,15 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
run: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_bitfield_signing_run",
"Time spent within `bitfield_signing::run`",
)
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
......@@ -277,6 +292,7 @@ impl JobTrait for BitfieldSigningJob {
_receiver: mpsc::Receiver<ToJob>,
mut sender: mpsc::Sender<FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let metrics = metrics.clone();
async move {
let wait_until = Instant::now() + JOB_DELAY;
......@@ -291,6 +307,10 @@ impl JobTrait for BitfieldSigningJob {
// wait a bit before doing anything else
Delay::new_at(wait_until).await?;
// this timer does not appear at the head of the function because we don't want to include
// JOB_DELAY each time.
let _timer = metrics.time_run();
let bitfield =
match construct_availability_bitfield(relay_parent, validator.index(), &mut sender).await
{
......
......@@ -204,6 +204,8 @@ impl CandidateSelectionJob {
para_id: ParaId,
collator_id: CollatorId,
) {
let _timer = self.metrics.time_handle_collation();
if self.seconded_candidate.is_none() {
let (candidate_receipt, pov) =
match get_collation(
......@@ -240,6 +242,8 @@ impl CandidateSelectionJob {
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn handle_invalid(&mut self, candidate_receipt: CandidateReceipt) {
let _timer = self.metrics.time_handle_invalid();
let received_from = match &self.seconded_candidate {
Some(peer) => peer,
None => {
......@@ -336,6 +340,8 @@ async fn forward_invalidity_note(
struct MetricsInner {
seconds: prometheus::CounterVec<prometheus::U64>,
invalid_selections: prometheus::CounterVec<prometheus::U64>,
handle_collation: prometheus::Histogram,
handle_invalid: prometheus::Histogram,
}
/// Candidate selection metrics.
......@@ -356,6 +362,16 @@ impl Metrics {
metrics.invalid_selections.with_label_values(&[label]).inc();
}
}
/// Provide a timer for `handle_collation` which observes on drop.
fn time_handle_collation(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.handle_collation.start_timer())
}
/// Provide a timer for `handle_invalid` which observes on drop.
fn time_handle_invalid(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.handle_invalid.start_timer())
}
}
impl metrics::Metrics for Metrics {
......@@ -381,6 +397,24 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
handle_collation: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_candidate_selection_handle_collation",
"Time spent within `candidate_selection::handle_collation`",
)
)?,
registry,
)?,
handle_invalid: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_candidate_selection:handle_invalid",
"Time spent within `candidate_selection::handle_invalid`",
)
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
......
......@@ -103,12 +103,15 @@ async fn run(
pov,
response_sender,
) => {
let _timer = metrics.time_validate_from_chain_state();
let res = spawn_validate_from_chain_state(
&mut ctx,
isolation_strategy.clone(),
descriptor,
pov,
spawn.clone(),
&metrics,
).await;
match res {
......@@ -126,6 +129,8 @@ async fn run(
pov,
response_sender,
) => {
let _timer = metrics.time_validate_from_exhaustive();
let res = spawn_validate_exhaustive(
&mut ctx,
isolation_strategy.clone(),
......@@ -134,6 +139,7 @@ async fn run(
descriptor,
pov,
spawn.clone(),
&metrics,
).await;
match res {
......@@ -260,13 +266,14 @@ async fn find_assumed_validation_data(
Ok(AssumptionCheckOutcome::DoesNotMatch)
}
#[tracing::instrument(level = "trace", skip(ctx, pov, spawn), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(ctx, pov, spawn, metrics), fields(subsystem = LOG_TARGET))]
async fn spawn_validate_from_chain_state(
ctx: &mut impl SubsystemContext<Message = CandidateValidationMessage>,
isolation_strategy: IsolationStrategy,
descriptor: CandidateDescriptor,
pov: Arc<PoV>,
spawn: impl SpawnNamed + 'static,
metrics: &Metrics,
) -> SubsystemResult<Result<ValidationResult, ValidationFailed>> {
let (validation_data, validation_code) =
match find_assumed_validation_data(ctx, &descriptor).await? {
......@@ -292,6 +299,7 @@ async fn spawn_validate_from_chain_state(
descriptor.clone(),
pov,
spawn,
metrics,
)
.await;
......@@ -320,7 +328,7 @@ async fn spawn_validate_from_chain_state(
validation_result
}
#[tracing::instrument(level = "trace", skip(ctx, validation_code, pov, spawn), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(ctx, validation_code, pov, spawn, metrics), fields(subsystem = LOG_TARGET))]
async fn spawn_validate_exhaustive(
ctx: &mut impl SubsystemContext<Message = CandidateValidationMessage>,