lib.rs 38.6 KiB
Newer Older
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_data_availability<Context>(ctx: &mut Context, candidate_hash: CandidateHash) -> Result<bool>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let (tx, rx) = oneshot::channel();
	ctx.send_message(AllMessages::AvailabilityStore(
		AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx),
	rx.await.map_err(|e| Error::QueryAvailabilityResponseChannel(e))
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_chunk<Context>(
	ctx: &mut Context,
	candidate_hash: CandidateHash,
	validator_index: ValidatorIndex,
) -> Result<Option<ErasureChunk>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let (tx, rx) = oneshot::channel();
	ctx.send_message(AllMessages::AvailabilityStore(
		AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx),
	rx.await.map_err(|e| Error::QueryChunkResponseChannel(e))
#[tracing::instrument(level = "trace", skip(ctx, erasure_chunk), fields(subsystem = LOG_TARGET))]
async fn store_chunk<Context>(
	ctx: &mut Context,
	candidate_hash: CandidateHash,
	relay_parent: Hash,
	validator_index: ValidatorIndex,
	erasure_chunk: ErasureChunk,
) -> Result<std::result::Result<(), ()>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let (tx, rx) = oneshot::channel();
	ctx.send_message(AllMessages::AvailabilityStore(
		AvailabilityStoreMessage::StoreChunk {
			candidate_hash,
			relay_parent,
			validator_index,
			chunk: erasure_chunk,
			tx,
		}
	)).await;

	rx.await.map_err(|e| Error::StoreChunkResponseChannel(e))
}

/// Query the validator set.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_validators<Context>(
	ctx: &mut Context,
	relay_parent: Hash,
) -> Result<Vec<ValidatorId>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let (tx, rx) = oneshot::channel();
	let query_validators = AllMessages::RuntimeApi(RuntimeApiMessage::Request(
		relay_parent,
		RuntimeApiRequest::Validators(tx),
	));

	ctx.send_message(query_validators)
	rx.await
		.map_err(|e| Error::QueryValidatorsResponseChannel(e))?
		.map_err(|e| Error::QueryValidators(e))
}

/// Query the hash of the `K` ancestors
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_k_ancestors<Context>(
	ctx: &mut Context,
	relay_parent: Hash,
	k: usize,
) -> Result<Vec<Hash>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let (tx, rx) = oneshot::channel();
	let query_ancestors = AllMessages::ChainApi(ChainApiMessage::Ancestors {
		hash: relay_parent,
		k,
		response_channel: tx,
	});

	ctx.send_message(query_ancestors)
	rx.await
		.map_err(|e| Error::QueryAncestorsResponseChannel(e))?
		.map_err(|e| Error::QueryAncestors(e))
}

/// Query the session index of a relay parent
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_session_index_for_child<Context>(
	ctx: &mut Context,
	relay_parent: Hash,
) -> Result<SessionIndex>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let (tx, rx) = oneshot::channel();
	let query_session_idx_for_child = AllMessages::RuntimeApi(RuntimeApiMessage::Request(
		relay_parent,
		RuntimeApiRequest::SessionIndexForChild(tx),
	));

	ctx.send_message(query_session_idx_for_child)
	rx.await
		.map_err(|e| Error::QuerySessionResponseChannel(e))?
		.map_err(|e| Error::QuerySession(e))
}

/// Queries up to k ancestors with the constraints of equiv session
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_up_to_k_ancestors_in_same_session<Context>(
	ctx: &mut Context,
	relay_parent: Hash,
	k: usize,
) -> Result<Vec<Hash>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	// k + 1 since we always query the child's session index
	// ordering is [parent, grandparent, greatgrandparent, greatgreatgrandparent, ...]
	let ancestors = query_k_ancestors(ctx, relay_parent, k + 1).await?;
	let desired_session = query_session_index_for_child(ctx, relay_parent).await?;
	// we would only need `ancestors.len() - 1`, but the one extra could avoid a re-alloc
	// if the consumer wants to push the `relay_parent` onto it too and does not hurt otherwise
	let mut acc = Vec::with_capacity(ancestors.len());

	// iterate from youngest to oldest
	let mut iter = ancestors.into_iter().peekable();

	while let Some((ancestor, ancestor_parent)) = iter.next().and_then(|a| iter.peek().map(|ap| (a, ap))) {
		if query_session_index_for_child(ctx, *ancestor_parent).await? != desired_session {
	}

	debug_assert!(acc.len() <= k);
	Ok(acc)
}

#[derive(Clone)]
struct MetricsInner {
	gossipped_availability_chunks: prometheus::Counter<prometheus::U64>,
	handle_our_view_change: prometheus::Histogram,
	process_incoming_peer_message: prometheus::Histogram,
}

/// Availability Distribution metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_chunk_distributed(&self) {
		if let Some(metrics) = &self.0 {
			metrics.gossipped_availability_chunks.inc();
		}
	}

	/// Provide a timer for `handle_our_view_change` which observes on drop.
	fn time_handle_our_view_change(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.handle_our_view_change.start_timer())
	}

	/// Provide a timer for `process_incoming_peer_message` which observes on drop.
	fn time_process_incoming_peer_message(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.process_incoming_peer_message.start_timer())
	}
}

impl metrics::Metrics for Metrics {
	fn try_register(
		registry: &prometheus::Registry,
	) -> std::result::Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			gossipped_availability_chunks: prometheus::register(
				prometheus::Counter::new(
					"parachain_gossipped_availability_chunks_total",
					"Number of availability chunks gossipped to other peers.",
			handle_our_view_change: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_availability_distribution_handle_our_view_change",
						"Time spent within `availability_distribution::handle_our_view_change`",
					)
				)?,
				registry,
			)?,
			process_incoming_peer_message: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_availability_distribution_process_incoming_peer_message",
						"Time spent within `availability_distribution::process_incoming_peer_message`",
					)
				)?,
				registry,
			)?,
		};
		Ok(Metrics(Some(metrics)))
	}
}