lib.rs 32.7 KiB
Newer Older
		RuntimeApiRequest::Validators(tx),
	));

	ctx.send_message(query_validators)
		.await?;
	rx.await?
		.map_err::<Error, _>(Into::into)
}

/// Query the hash of the `K` ancestors
async fn query_k_ancestors<Context>(
	ctx: &mut Context,
	relay_parent: Hash,
	k: usize,
) -> Result<Vec<Hash>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let (tx, rx) = oneshot::channel();
	let query_ancestors = AllMessages::ChainApi(ChainApiMessage::Ancestors {
		hash: relay_parent,
		k,
		response_channel: tx,
	});

	ctx.send_message(query_ancestors)
		.await?;
	rx.await?
		.map_err::<Error, _>(Into::into)
}

/// Query the session index of a relay parent
async fn query_session_index_for_child<Context>(
	ctx: &mut Context,
	relay_parent: Hash,
) -> Result<SessionIndex>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let (tx, rx) = oneshot::channel();
	let query_session_idx_for_child = AllMessages::RuntimeApi(RuntimeApiMessage::Request(
		relay_parent,
		RuntimeApiRequest::SessionIndexForChild(tx),
	));

	ctx.send_message(query_session_idx_for_child)
		.await?;
	rx.await?
		.map_err::<Error, _>(Into::into)
}

/// Queries up to k ancestors with the constraints of equiv session
async fn query_up_to_k_ancestors_in_same_session<Context>(
	ctx: &mut Context,
	relay_parent: Hash,
	k: usize,
) -> Result<Vec<Hash>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	// k + 1 since we always query the child's session index
	// ordering is [parent, grandparent, greatgrandparent, greatgreatgrandparent, ...]
	let ancestors = query_k_ancestors(ctx, relay_parent, k + 1).await?;
	let desired_session = query_session_index_for_child(ctx, relay_parent).await?;
	// we would only need `ancestors.len() - 1`, but the one extra could avoid a re-alloc
	// if the consumer wants to push the `relay_parent` onto it too and does not hurt otherwise
	let mut acc = Vec::with_capacity(ancestors.len());

	// iterate from youngest to oldest
	let mut iter = ancestors.into_iter().peekable();

	while let Some(ancestor) = iter.next() {
		if let Some(ancestor_parent) = iter.peek() {
			let session = query_session_index_for_child(ctx, *ancestor_parent).await?;
			if session != desired_session {
				break;
			}
			acc.push(ancestor);
		} else {
			// either ended up at genesis or the blocks were
			// already pruned
			break;
		}
	}

	debug_assert!(acc.len() <= k);
	Ok(acc)
}


#[derive(Clone)]
struct MetricsInner {
	gossipped_availability_chunks: prometheus::Counter<prometheus::U64>,
}

/// Availability Distribution metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_chunk_distributed(&self) {
		if let Some(metrics) = &self.0 {
			metrics.gossipped_availability_chunks.inc();
		}
	}
}

impl metrics::Metrics for Metrics {
	fn try_register(registry: &prometheus::Registry) -> std::result::Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			gossipped_availability_chunks: prometheus::register(
				prometheus::Counter::new(
					"parachain_gossipped_availability_chunks_total",
					"Number of availability chunks gossipped to other peers."
				)?,
				registry,
			)?,
		};
		Ok(Metrics(Some(metrics)))
	}
}

#[cfg(test)]
mod tests;