Unverified Commit 547bc8a1 authored by Bernhard Schuster's avatar Bernhard Schuster Committed by GitHub
Browse files

availability recovery type name clarifications (#4203)

* minor changes

* fmt

* rename to expressive types

* chore: fixup

* chore: remove `Data` prefixes

* address review comments

* guide items

* sourcer -> source, add `FromValdiators` suffix
parent e5530856
Pipeline #165038 passed with stages
in 45 minutes and 1 second
......@@ -33,10 +33,7 @@ use parity_scale_codec::{Decode, Encode, Error as CodecError, Input};
use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec};
use polkadot_node_primitives::{AvailableData, ErasureChunk};
use polkadot_node_subsystem_util::{
self as util,
metrics::{self, prometheus},
};
use polkadot_node_subsystem_util as util;
use polkadot_primitives::v1::{
BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, Hash, Header, ValidatorIndex,
};
......@@ -47,6 +44,9 @@ use polkadot_subsystem::{
SubsystemError,
};
mod metrics;
pub use self::metrics::*;
#[cfg(test)]
mod tests;
......@@ -1273,131 +1273,3 @@ fn prune_all(db: &Arc<dyn KeyValueDB>, config: &Config, clock: &dyn Clock) -> Re
db.write(tx)?;
Ok(())
}
#[derive(Clone)]
struct MetricsInner {
received_availability_chunks_total: prometheus::Counter<prometheus::U64>,
pruning: prometheus::Histogram,
process_block_finalized: prometheus::Histogram,
block_activated: prometheus::Histogram,
process_message: prometheus::Histogram,
store_available_data: prometheus::Histogram,
store_chunk: prometheus::Histogram,
get_chunk: prometheus::Histogram,
}
/// Availability metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_chunks_received(&self, count: usize) {
if let Some(metrics) = &self.0 {
use core::convert::TryFrom as _;
// assume usize fits into u64
let by = u64::try_from(count).unwrap_or_default();
metrics.received_availability_chunks_total.inc_by(by);
}
}
/// Provide a timer for `prune_povs` which observes on drop.
fn time_pruning(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.pruning.start_timer())
}
/// Provide a timer for `process_block_finalized` which observes on drop.
fn time_process_block_finalized(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_block_finalized.start_timer())
}
/// Provide a timer for `block_activated` which observes on drop.
fn time_block_activated(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.block_activated.start_timer())
}
/// Provide a timer for `process_message` which observes on drop.
fn time_process_message(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_message.start_timer())
}
/// Provide a timer for `store_available_data` which observes on drop.
fn time_store_available_data(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.store_available_data.start_timer())
}
/// Provide a timer for `store_chunk` which observes on drop.
fn time_store_chunk(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.store_chunk.start_timer())
}
/// Provide a timer for `get_chunk` which observes on drop.
fn time_get_chunk(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.get_chunk.start_timer())
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
received_availability_chunks_total: prometheus::register(
prometheus::Counter::new(
"parachain_received_availability_chunks_total",
"Number of availability chunks received.",
)?,
registry,
)?,
pruning: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_pruning",
"Time spent within `av_store::prune_all`",
))?,
registry,
)?,
process_block_finalized: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_process_block_finalized",
"Time spent within `av_store::process_block_finalized`",
))?,
registry,
)?,
block_activated: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_block_activated",
"Time spent within `av_store::process_block_activated`",
))?,
registry,
)?,
process_message: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_process_message",
"Time spent within `av_store::process_message`",
))?,
registry,
)?,
store_available_data: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_store_available_data",
"Time spent within `av_store::store_available_data`",
))?,
registry,
)?,
store_chunk: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_store_chunk",
"Time spent within `av_store::store_chunk`",
))?,
registry,
)?,
get_chunk: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_get_chunk",
"Time spent fetching requested chunks.`",
))?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use polkadot_node_subsystem_util::metrics::{self, prometheus};
#[derive(Clone)]
pub(crate) struct MetricsInner {
received_availability_chunks_total: prometheus::Counter<prometheus::U64>,
pruning: prometheus::Histogram,
process_block_finalized: prometheus::Histogram,
block_activated: prometheus::Histogram,
process_message: prometheus::Histogram,
store_available_data: prometheus::Histogram,
store_chunk: prometheus::Histogram,
get_chunk: prometheus::Histogram,
}
/// Availability metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
pub(crate) fn on_chunks_received(&self, count: usize) {
if let Some(metrics) = &self.0 {
use core::convert::TryFrom as _;
// assume usize fits into u64
let by = u64::try_from(count).unwrap_or_default();
metrics.received_availability_chunks_total.inc_by(by);
}
}
/// Provide a timer for `prune_povs` which observes on drop.
pub(crate) fn time_pruning(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.pruning.start_timer())
}
/// Provide a timer for `process_block_finalized` which observes on drop.
pub(crate) fn time_process_block_finalized(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_block_finalized.start_timer())
}
/// Provide a timer for `block_activated` which observes on drop.
pub(crate) fn time_block_activated(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.block_activated.start_timer())
}
/// Provide a timer for `process_message` which observes on drop.
pub(crate) fn time_process_message(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_message.start_timer())
}
/// Provide a timer for `store_available_data` which observes on drop.
pub(crate) fn time_store_available_data(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.store_available_data.start_timer())
}
/// Provide a timer for `store_chunk` which observes on drop.
pub(crate) fn time_store_chunk(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.store_chunk.start_timer())
}
/// Provide a timer for `get_chunk` which observes on drop.
pub(crate) fn time_get_chunk(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.get_chunk.start_timer())
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
received_availability_chunks_total: prometheus::register(
prometheus::Counter::new(
"parachain_received_availability_chunks_total",
"Number of availability chunks received.",
)?,
registry,
)?,
pruning: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_pruning",
"Time spent within `av_store::prune_all`",
))?,
registry,
)?,
process_block_finalized: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_process_block_finalized",
"Time spent within `av_store::process_block_finalized`",
))?,
registry,
)?,
block_activated: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_block_activated",
"Time spent within `av_store::process_block_activated`",
))?,
registry,
)?,
process_message: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_process_message",
"Time spent within `av_store::process_message`",
))?,
registry,
)?,
store_available_data: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_store_available_data",
"Time spent within `av_store::store_available_data`",
))?,
registry,
)?,
store_chunk: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_store_chunk",
"Time spent within `av_store::store_chunk`",
))?,
registry,
)?,
get_chunk: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_av_store_get_chunk",
"Time spent fetching requested chunks.`",
))?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
......@@ -73,7 +73,7 @@ mod tests;
const LOG_TARGET: &str = "parachain::availability-recovery";
// How many parallel requests interaction should have going at once.
// How many parallel recovery tasks should be running at once.
const N_PARALLEL: usize = 50;
// Size of the LRU cache where we keep recovered data.
......@@ -104,13 +104,13 @@ pub struct AvailabilityRecoverySubsystem {
metrics: Metrics,
}
struct RequestFromBackersPhase {
struct RequestFromBackers {
// a random shuffling of the validators from the backing group which indicates the order
// in which we connect to them and request the chunk.
shuffled_backers: Vec<ValidatorIndex>,
}
struct RequestChunksPhase {
struct RequestChunksFromValidators {
/// How many request have been unsuccessful so far.
error_count: usize,
/// Total number of responses that have been received.
......@@ -125,11 +125,11 @@ struct RequestChunksPhase {
requesting_chunks: FuturesUndead<Result<Option<ErasureChunk>, (ValidatorIndex, RequestError)>>,
}
struct InteractionParams {
struct RecoveryParams {
/// Discovery ids of `validators`.
validator_authority_keys: Vec<AuthorityDiscoveryId>,
/// Validators relevant to this `Interaction`.
/// Validators relevant to this `RecoveryTask`.
validators: Vec<ValidatorId>,
/// The number of pieces needed.
......@@ -145,33 +145,37 @@ struct InteractionParams {
metrics: Metrics,
}
enum InteractionPhase {
RequestFromBackers(RequestFromBackersPhase),
RequestChunks(RequestChunksPhase),
/// Source the availability data either by means
/// of direct request response protocol to
/// backers (a.k.a. fast-path), or recover from chunks.
enum Source {
RequestFromBackers(RequestFromBackers),
RequestChunks(RequestChunksFromValidators),
}
/// A state of a single interaction reconstructing an available data.
struct Interaction<S> {
/// A stateful reconstruction of availability data in reference to
/// a candidate hash.
struct RecoveryTask<S> {
sender: S,
/// The parameters of the interaction.
params: InteractionParams,
/// The parameters of the recovery process.
params: RecoveryParams,
/// The phase of the interaction.
phase: InteractionPhase,
/// The source to obtain the availability data from.
source: Source,
}
impl RequestFromBackersPhase {
impl RequestFromBackers {
fn new(mut backers: Vec<ValidatorIndex>) -> Self {
backers.shuffle(&mut rand::thread_rng());
RequestFromBackersPhase { shuffled_backers: backers }
RequestFromBackers { shuffled_backers: backers }
}
// Run this phase to completion.
async fn run(
&mut self,
params: &InteractionParams,
params: &RecoveryParams,
sender: &mut impl SubsystemSender,
) -> Result<AvailableData, RecoveryError> {
tracing::trace!(
......@@ -186,7 +190,7 @@ impl RequestFromBackersPhase {
self.shuffled_backers.pop().ok_or_else(|| RecoveryError::Unavailable)?;
// Request data.
let (req, res) = OutgoingRequest::new(
let (req, response) = OutgoingRequest::new(
Recipient::Authority(
params.validator_authority_keys[validator_index.0 as usize].clone(),
),
......@@ -203,7 +207,7 @@ impl RequestFromBackersPhase {
)
.await;
match res.await {
match response.await {
Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => {
if reconstructed_data_matches_root(
params.validators.len(),
......@@ -241,12 +245,12 @@ impl RequestFromBackersPhase {
}
}
impl RequestChunksPhase {
impl RequestChunksFromValidators {
fn new(n_validators: u32) -> Self {
let mut shuffling: Vec<_> = (0..n_validators).map(ValidatorIndex).collect();
shuffling.shuffle(&mut rand::thread_rng());
RequestChunksPhase {
RequestChunksFromValidators {
error_count: 0,
total_received_responses: 0,
shuffling: shuffling.into(),
......@@ -255,7 +259,7 @@ impl RequestChunksPhase {
}
}
fn is_unavailable(&self, params: &InteractionParams) -> bool {
fn is_unavailable(&self, params: &RecoveryParams) -> bool {
is_unavailable(
self.received_chunks.len(),
self.requesting_chunks.total_len(),
......@@ -264,7 +268,7 @@ impl RequestChunksPhase {
)
}
fn can_conclude(&self, params: &InteractionParams) -> bool {
fn can_conclude(&self, params: &RecoveryParams) -> bool {
self.received_chunks.len() >= params.threshold || self.is_unavailable(params)
}
......@@ -295,7 +299,7 @@ impl RequestChunksPhase {
async fn launch_parallel_requests(
&mut self,
params: &InteractionParams,
params: &RecoveryParams,
sender: &mut impl SubsystemSender,
) {
let num_requests = self.get_desired_request_count(params.threshold);
......@@ -346,7 +350,8 @@ impl RequestChunksPhase {
.await;
}
async fn wait_for_chunks(&mut self, params: &InteractionParams) {
/// Wait for a sufficient amount of chunks to reconstruct according to the provided `params`.
async fn wait_for_chunks(&mut self, params: &RecoveryParams) {
let metrics = &params.metrics;
// Wait for all current requests to conclude or time-out, or until we reach enough chunks.
......@@ -448,7 +453,7 @@ impl RequestChunksPhase {
async fn run(
&mut self,
params: &InteractionParams,
params: &RecoveryParams,
sender: &mut impl SubsystemSender,
) -> Result<AvailableData, RecoveryError> {
// First query the store for any chunks we've got.
......@@ -559,6 +564,9 @@ const fn is_unavailable(
received_chunks + requesting_chunks + unrequested_validators < threshold
}
/// Re-encode the data into erasure chunks in order to verify
/// the root hash of the provided merkle tree, which is built
/// on-top of the encoded chunks.
fn reconstructed_data_matches_root(
n_validators: usize,
expected_root: &Hash,
......@@ -581,7 +589,7 @@ fn reconstructed_data_matches_root(
branches.root() == *expected_root
}
impl<S: SubsystemSender> Interaction<S> {
impl<S: SubsystemSender> RecoveryTask<S> {
async fn run(mut self) -> Result<AvailableData, RecoveryError> {
// First just see if we have the data available locally.
{
......@@ -609,18 +617,18 @@ impl<S: SubsystemSender> Interaction<S> {
loop {
// These only fail if we cannot reach the underlying subsystem, which case there is nothing
// meaningful we can do.
match self.phase {
InteractionPhase::RequestFromBackers(ref mut from_backers) => {
match self.source {
Source::RequestFromBackers(ref mut from_backers) => {
match from_backers.run(&self.params, &mut self.sender).await {
Ok(data) => break Ok(data),
Err(RecoveryError::Invalid) => break Err(RecoveryError::Invalid),
Err(RecoveryError::Unavailable) =>
self.phase = InteractionPhase::RequestChunks(RequestChunksPhase::new(
self.source = Source::RequestChunks(RequestChunksFromValidators::new(
self.params.validators.len() as _,
)),
}
},
InteractionPhase::RequestChunks(ref mut from_all) =>
Source::RequestChunks(ref mut from_all) =>
break from_all.run(&self.params, &mut self.sender).await,
}
}
......@@ -628,13 +636,13 @@ impl<S: SubsystemSender> Interaction<S> {
}
/// Accumulate all awaiting sides for some particular `AvailableData`.
struct InteractionHandle {
struct RecoveryHandle {
candidate_hash: CandidateHash,
remote: RemoteHandle<Result<AvailableData, RecoveryError>>,
awaiting: Vec<oneshot::Sender<Result<AvailableData, RecoveryError>>>,
}
impl Future for InteractionHandle {
impl Future for RecoveryHandle {
type Output = Option<(CandidateHash, Result<AvailableData, RecoveryError>)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
......@@ -679,9 +687,9 @@ impl Future for InteractionHandle {
}
struct State {
/// Each interaction is implemented as its own async task,
/// Each recovery task is implemented as its own async task,
/// and these handles are for communicating with them.
interactions: FuturesUnordered<InteractionHandle>,
ongoing_recoveries: FuturesUnordered<RecoveryHandle>,
/// A recent block hash for which state should be available.
live_block: (BlockNumber, Hash),
......@@ -693,7 +701,7 @@ struct State {
impl Default for State {
fn default() -> Self {
Self {
interactions: FuturesUnordered::new(),
ongoing_recoveries: FuturesUnordered::new(),
live_block: (0, Hash::default()),
availability_lru: LruCache::new(LRU_SIZE),
}
......@@ -732,8 +740,8 @@ async fn handle_signal(state: &mut State, signal: OverseerSignal) -> SubsystemRe
}
}
/// Machinery around launching interactions into the background.
async fn launch_interaction<Context>(
/// Machinery around launching recovery tasks into the background.
async fn launch_recovery_task<Context>(
state: &mut State,
ctx: &mut Context,
session_info: SessionInfo,
......@@ -748,7 +756,7 @@ where
{
let candidate_hash = receipt.hash();
let params = InteractionParams {
let params = RecoveryParams {
validator_authority_keys: session_info.discovery_keys.clone(),
validators: session_info.validators.clone(),
threshold: recovery_threshold(session_info.validators.len())?,
......@@ -759,28 +767,26 @@ where
let phase = backing_group
.and_then(|g| session_info.validator_groups.get(g.0 as usize))
.map(|group| {
InteractionPhase::RequestFromBackers(RequestFromBackersPhase::new(group.clone()))
})
.map(|group| Source::RequestFromBackers(RequestFromBackers::new(group.clone())))
.unwrap_or_else(|| {
InteractionPhase::RequestChunks(RequestChunksPhase::new(params.validators.len() as _))
Source::RequestChunks(RequestChunksFromValidators::new(params.validators.len() as _))
});
let interaction = Interaction { sender: ctx.sender().clone(), params, phase };
let recovery_task = RecoveryTask { sender: ctx.sender().clone(), params, source: phase };
let (remote, remote_handle) = interaction.run().remote_handle();
let (remote, remote_handle) = recovery_task.run().remote_handle();
state.interactions.push(InteractionHandle {
state.ongoing_recoveries.push(RecoveryHandle {
candidate_hash,
remote: remote_handle,
awaiting: vec![response_sender],
});
if let Err(e) = ctx.spawn("recovery interaction", Box::pin(remote)) {
if let Err(e) = ctx.spawn("recovery task", Box::pin(remote)) {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to spawn a recovery interaction task",
"Failed to spawn a recovery task",
);
}
......@@ -817,7 +823,9 @@ where
return Ok(())
}
if let Some(i) = state.interactions.iter_mut().find(|i| i.candidate_hash == candidate_hash) {
if let Some(i) =