Newer
Older
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_data_availability<Context>(ctx: &mut Context, candidate_hash: CandidateHash) -> Result<bool>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx),
rx.await.map_err(|e| Error::QueryAvailabilityResponseChannel(e))
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_chunk<Context>(
ctx: &mut Context,
candidate_hash: CandidateHash,
validator_index: ValidatorIndex,
) -> Result<Option<ErasureChunk>>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx),
rx.await.map_err(|e| Error::QueryChunkResponseChannel(e))
#[tracing::instrument(level = "trace", skip(ctx, erasure_chunk), fields(subsystem = LOG_TARGET))]
async fn store_chunk<Context>(
ctx: &mut Context,
candidate_hash: CandidateHash,
validator_index: ValidatorIndex,
erasure_chunk: ErasureChunk,
) -> Result<std::result::Result<(), ()>>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
validator_index,
chunk: erasure_chunk,
tx,
}
)).await;
rx.await.map_err(|e| Error::StoreChunkResponseChannel(e))
}
/// Query the validator set.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_validators<Context>(
ctx: &mut Context,
relay_parent: Hash,
) -> Result<Vec<ValidatorId>>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
let query_validators = AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::Validators(tx),
));
ctx.send_message(query_validators)
rx.await
.map_err(|e| Error::QueryValidatorsResponseChannel(e))?
.map_err(|e| Error::QueryValidators(e))
}
/// Query the hash of the `K` ancestors
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_k_ancestors<Context>(
ctx: &mut Context,
relay_parent: Hash,
k: usize,
) -> Result<Vec<Hash>>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
let query_ancestors = AllMessages::ChainApi(ChainApiMessage::Ancestors {
hash: relay_parent,
k,
response_channel: tx,
});
ctx.send_message(query_ancestors)
rx.await
.map_err(|e| Error::QueryAncestorsResponseChannel(e))?
.map_err(|e| Error::QueryAncestors(e))
}
/// Query the session index of a relay parent
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_session_index_for_child<Context>(
ctx: &mut Context,
relay_parent: Hash,
) -> Result<SessionIndex>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
let query_session_idx_for_child = AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionIndexForChild(tx),
));
ctx.send_message(query_session_idx_for_child)
rx.await
.map_err(|e| Error::QuerySessionResponseChannel(e))?
.map_err(|e| Error::QuerySession(e))
}
/// Queries up to k ancestors with the constraints of equiv session
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_up_to_k_ancestors_in_same_session<Context>(
ctx: &mut Context,
relay_parent: Hash,
k: usize,
) -> Result<Vec<Hash>>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
// k + 1 since we always query the child's session index
// ordering is [parent, grandparent, greatgrandparent, greatgreatgrandparent, ...]
let ancestors = query_k_ancestors(ctx, relay_parent, k + 1).await?;
let desired_session = query_session_index_for_child(ctx, relay_parent).await?;
// we would only need `ancestors.len() - 1`, but the one extra could avoid a re-alloc
// if the consumer wants to push the `relay_parent` onto it too and does not hurt otherwise
let mut acc = Vec::with_capacity(ancestors.len());
// iterate from youngest to oldest
let mut iter = ancestors.into_iter().peekable();
while let Some((ancestor, ancestor_parent)) = iter.next().and_then(|a| iter.peek().map(|ap| (a, ap))) {
if query_session_index_for_child(ctx, *ancestor_parent).await? != desired_session {
acc.push(ancestor);
}
debug_assert!(acc.len() <= k);
Ok(acc)
}
#[derive(Clone)]
struct MetricsInner {
gossipped_availability_chunks: prometheus::Counter<prometheus::U64>,
handle_our_view_change: prometheus::Histogram,
process_incoming_peer_message: prometheus::Histogram,
}
/// Availability Distribution metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_chunk_distributed(&self) {
if let Some(metrics) = &self.0 {
metrics.gossipped_availability_chunks.inc();
}
}
/// Provide a timer for `handle_our_view_change` which observes on drop.
fn time_handle_our_view_change(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.handle_our_view_change.start_timer())
}
/// Provide a timer for `process_incoming_peer_message` which observes on drop.
fn time_process_incoming_peer_message(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_incoming_peer_message.start_timer())
}
}
impl metrics::Metrics for Metrics {
fn try_register(
registry: &prometheus::Registry,
) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
gossipped_availability_chunks: prometheus::register(
prometheus::Counter::new(
"parachain_gossipped_availability_chunks_total",
"Number of availability chunks gossipped to other peers.",
handle_our_view_change: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_availability_distribution_handle_our_view_change",
"Time spent within `availability_distribution::handle_our_view_change`",
)
)?,
registry,
)?,
process_incoming_peer_message: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_availability_distribution_process_incoming_peer_message",
"Time spent within `availability_distribution::process_incoming_peer_message`",
)
)?,
registry,
)?,