lib.rs 38.6 KiB
Newer Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The availability distribution
//!
//! Transforms `AvailableData` into erasure chunks, which are distributed to peers
//! who are interested in the relevant candidates.
//! Gossip messages received from other peers are verified and gossiped to interested
//! peers. Verified in this context means, the erasure chunks contained merkle proof
//! is checked.

#![deny(unused_crate_dependencies, unused_qualifications)]

use parity_scale_codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt, TryFutureExt};
use sp_core::crypto::Public;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};

use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::{
	v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, View,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{
	BlakeTwo256, CoreState, ErasureChunk, Hash, HashT,
	SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID, CandidateHash,
};
use polkadot_subsystem::messages::{
	AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
	NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest,
};
use polkadot_subsystem::{
	errors::{ChainApiError, RuntimeApiError},
	ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
	SubsystemContext, SubsystemError,
};
use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry;
use std::iter;
use thiserror::Error;
const LOG_TARGET: &'static str = "availability_distribution";
#[derive(Debug, Error)]
enum Error {
	#[error("Response channel to obtain StoreChunk failed")]
	StoreChunkResponseChannel(#[source] oneshot::Canceled),

	#[error("Response channel to obtain QueryChunk failed")]
	QueryChunkResponseChannel(#[source] oneshot::Canceled),

	#[error("Response channel to obtain QueryAncestors failed")]
	QueryAncestorsResponseChannel(#[source] oneshot::Canceled),
	#[error("RuntimeAPI to obtain QueryAncestors failed")]
	QueryAncestors(#[source] ChainApiError),

	#[error("Response channel to obtain QuerySession failed")]
	QuerySessionResponseChannel(#[source] oneshot::Canceled),
	#[error("RuntimeAPI to obtain QuerySession failed")]
	QuerySession(#[source] RuntimeApiError),

	#[error("Response channel to obtain QueryValidators failed")]
	QueryValidatorsResponseChannel(#[source] oneshot::Canceled),
	#[error("RuntimeAPI to obtain QueryValidators failed")]
	QueryValidators(#[source] RuntimeApiError),

	#[error("Response channel to obtain AvailabilityCores failed")]
	AvailabilityCoresResponseChannel(#[source] oneshot::Canceled),
	#[error("RuntimeAPI to obtain AvailabilityCores failed")]
	AvailabilityCores(#[source] RuntimeApiError),

	#[error("Response channel to obtain AvailabilityCores failed")]
	QueryAvailabilityResponseChannel(#[source] oneshot::Canceled),

	#[error("Receive channel closed")]
	IncomingMessageChannel(#[source] SubsystemError),
}

type Result<T> = std::result::Result<T, Error>;

const COST_MERKLE_PROOF_INVALID: Rep = Rep::new(-100, "Merkle proof was invalid");
const COST_NOT_A_LIVE_CANDIDATE: Rep = Rep::new(-51, "Candidate is not live");
const COST_PEER_DUPLICATE_MESSAGE: Rep = Rep::new(-500, "Peer sent identical messages");
const BENEFIT_VALID_MESSAGE_FIRST: Rep = Rep::new(15, "Valid message with new information");
const BENEFIT_VALID_MESSAGE: Rep = Rep::new(10, "Valid message");

/// Checked signed availability bitfield that is distributed
/// to other peers.
#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq, Hash)]
pub struct AvailabilityGossipMessage {
	/// Anchor hash of the candidate the `ErasureChunk` is associated to.
	pub candidate_hash: CandidateHash,
	/// The erasure chunk, a encoded information part of `AvailabilityData`.
	pub erasure_chunk: ErasureChunk,
}

impl From<AvailabilityGossipMessage> for protocol_v1::AvailabilityDistributionMessage {
	fn from(message: AvailabilityGossipMessage) -> Self {
		Self::Chunk(message.candidate_hash, message.erasure_chunk)
	}
}

/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Debug, Default)]
struct ProtocolState {
	/// Track all active peers and their views
	/// to determine what is relevant to them.
	peer_views: HashMap<PeerId, View>,

	/// Our own view.
	view: View,

	/// Caches a mapping of relay parents or ancestor to live candidate hashes.
	/// Allows fast intersection of live candidates with views and consecutive unioning.
	/// Maps relay parent / ancestor -> candidate hashes.
	live_under: HashMap<Hash, HashSet<CandidateHash>>,

	/// Track things needed to start and stop work on a particular relay parent.
	per_relay_parent: HashMap<Hash, PerRelayParent>,

	/// Track data that is specific to a candidate.
	per_candidate: HashMap<CandidateHash, PerCandidate>,
#[derive(Debug)]
struct PerCandidate {
	/// A Candidate and a set of known erasure chunks in form of messages to be gossiped / distributed if the peer view wants that.
	/// This is _across_ peers and not specific to a particular one.
	/// candidate hash + erasure chunk index -> gossip message
	message_vault: HashMap<u32, AvailabilityGossipMessage>,

	/// Track received erasure chunk indices per peer.
	received_messages: HashMap<PeerId, HashSet<ValidatorIndex>>,
	/// Track sent erasure chunk indices per peer.
	sent_messages: HashMap<PeerId, HashSet<ValidatorIndex>>,

	/// The set of validators.
	validators: Vec<ValidatorId>,

	/// If this node is a validator, note the index in the validator set.
	validator_index: Option<ValidatorIndex>,

	/// The descriptor of this candidate.
	descriptor: CandidateDescriptor,

	/// The set of relay chain blocks this appears to be live in.
	live_in: HashSet<Hash>,

	/// A Jaeger span relating to this candidate.
	span: jaeger::JaegerSpan,
	/// Returns `true` iff the given `validator_index` is required by the given `peer`.
	fn message_required_by_peer(&self, peer: &PeerId, validator_index: ValidatorIndex) -> bool {
		self.received_messages.get(peer).map(|v| !v.contains(&validator_index)).unwrap_or(true)
			&& self.sent_messages.get(peer).map(|v| !v.contains(&validator_index)).unwrap_or(true)
	}

	/// Add a chunk to the message vault. Overwrites anything that was already present.
	fn add_message(&mut self, chunk_index: u32, message: AvailabilityGossipMessage) {
		let _ = self.message_vault.insert(chunk_index, message);
	}

	/// Clean up the span if we've got our own chunk.
	fn drop_span_after_own_availability(&mut self) {
		if let Some(validator_index) = self.validator_index {
			if self.message_vault.contains_key(&validator_index) {
				self.span = jaeger::JaegerSpan::Disabled;
			}
		}
#[derive(Debug, Clone, Default)]
struct PerRelayParent {
	/// Set of `K` ancestors for this relay parent.
	ancestors: Vec<Hash>,
	/// Live candidates, according to this relay parent.
	live_candidates: HashSet<CandidateHash>,
}

impl ProtocolState {
	/// Unionize all live candidate hashes of the given relay parents and their recent
	/// ancestors.
	///
	/// Ignores all non existent relay parents, so this can be used directly with a peers view.
	/// Returns a set of candidate hashes.
	#[tracing::instrument(level = "trace", skip(relay_parents), fields(subsystem = LOG_TARGET))]
	fn cached_live_candidates_unioned<'a>(
		&'a self,
		relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
		cached_live_candidates_unioned(
			&self.per_relay_parent,
			relay_parents
		)
	#[tracing::instrument(level = "trace", skip(candidates), fields(subsystem = LOG_TARGET))]
	fn add_relay_parent(
		&mut self,
		relay_parent: Hash,
		validators: Vec<ValidatorId>,
		validator_index: Option<ValidatorIndex>,
		candidates: HashMap<CandidateHash, FetchedLiveCandidate>,
		ancestors: Vec<Hash>,
	) {
		let per_relay_parent = self.per_relay_parent.entry(relay_parent).or_default();
		per_relay_parent.ancestors = ancestors;
		per_relay_parent.live_candidates.extend(candidates.keys().cloned());

		// register the relation of relay_parent to candidate..
		for (receipt_hash, fetched) in candidates {
			let candidate_entry = match self.per_candidate.entry(receipt_hash) {
				Entry::Occupied(e) => e.into_mut(),
				Entry::Vacant(e) => {
					if let FetchedLiveCandidate::Fresh(descriptor) = fetched {
						e.insert(PerCandidate {
							message_vault: HashMap::new(),
							received_messages: HashMap::new(),
							sent_messages: HashMap::new(),
							validators: validators.clone(),
							validator_index,
							descriptor,
							live_in: HashSet::new(),
							span: if validator_index.is_some() {
								jaeger::candidate_hash_span(&receipt_hash, "pending-availability")
							} else {
								jaeger::JaegerSpan::Disabled
							},
						})
					} else {
						tracing::warn!(target: LOG_TARGET, "No `per_candidate` but not fresh. logic error");
						continue;
					}
				}
			};

			candidate_entry.live_in.insert(relay_parent);
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	fn remove_relay_parent(&mut self, relay_parent: &Hash) {
		if let Some(per_relay_parent) = self.per_relay_parent.remove(relay_parent) {
			for candidate_hash in per_relay_parent.live_candidates {
				// Prune the candidate if this was the last member of our view
				// to consider it live (including its ancestors).
				if let Entry::Occupied(mut occ) = self.per_candidate.entry(candidate_hash) {
					occ.get_mut().live_in.remove(relay_parent);
					if occ.get().live_in.is_empty() {
						occ.remove();
	// Removes all entries from live_under which aren't referenced in the ancestry of
	// one of our live relay-chain heads.
	fn clean_up_live_under_cache(&mut self) {
		let extended_view: HashSet<_> = self.per_relay_parent.iter()
			.map(|(r_hash, v)| v.ancestors.iter().cloned().chain(iter::once(*r_hash)))
		self.live_under.retain(|ancestor_hash, _| extended_view.contains(ancestor_hash));
fn cached_live_candidates_unioned<'a>(
	per_relay_parent: &'a HashMap<Hash, PerRelayParent>,
	relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
) -> HashSet<CandidateHash> {
	relay_parents
		.into_iter()
		.filter_map(|r| per_relay_parent.get(r))
		.map(|per_relay_parent| per_relay_parent.live_candidates.iter().cloned())
		.flatten()
		.collect()
}

/// Deal with network bridge updates and track what needs to be tracked
/// which depends on the message type received.
#[tracing::instrument(level = "trace", skip(ctx, keystore, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_network_msg<Context>(
	ctx: &mut Context,
	state: &mut ProtocolState,
	metrics: &Metrics,
	bridge_message: NetworkBridgeEvent<protocol_v1::AvailabilityDistributionMessage>,
) -> Result<()>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	match bridge_message {
		NetworkBridgeEvent::PeerConnected(peerid, _role) => {
			// insert if none already present
			state.peer_views.entry(peerid).or_default();
		}
		NetworkBridgeEvent::PeerDisconnected(peerid) => {
			// get rid of superfluous data
			state.peer_views.remove(&peerid);
		}
		NetworkBridgeEvent::PeerViewChange(peerid, view) => {
			handle_peer_view_change(ctx, state, peerid, view, metrics).await;
		}
		NetworkBridgeEvent::OurViewChange(view) => {
			handle_our_view_change(ctx, keystore, state, view, metrics).await?;
		NetworkBridgeEvent::PeerMessage(remote, msg) => {
			let gossiped_availability = match msg {
				protocol_v1::AvailabilityDistributionMessage::Chunk(candidate_hash, chunk) => {
					AvailabilityGossipMessage {
						candidate_hash,
						erasure_chunk: chunk,
					}
				}
			process_incoming_peer_message(ctx, state, remote, gossiped_availability, metrics)
				.await?;
		}
	}
	Ok(())
}

/// Handle the changes necessary when our view changes.
#[tracing::instrument(level = "trace", skip(ctx, keystore, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_our_view_change<Context>(
	ctx: &mut Context,
	state: &mut ProtocolState,
	view: View,
	metrics: &Metrics,
) -> Result<()>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let _timer = metrics.time_handle_our_view_change();

	let old_view = std::mem::replace(&mut state.view, view);

	// needed due to borrow rules
	let view = state.view.clone();

	// add all the relay parents and fill the cache
	for added in view.difference(&old_view) {
		let validators = query_validators(ctx, *added).await?;
		let validator_index = obtain_our_validator_index(&validators, keystore.clone()).await;
			= query_live_candidates(ctx, &mut state.live_under, *added).await?;

		state.add_relay_parent(
			*added,
			validators,
			validator_index,
			candidates,
			ancestors,
		);
	}

	// handle all candidates
	let mut messages_out = Vec::new();
	for candidate_hash in state.cached_live_candidates_unioned(view.difference(&old_view)) {
		// If we are not a validator for this candidate, let's skip it.
		match state.per_candidate.get(&candidate_hash) {
			None => continue,
			Some(c) if c.validator_index.is_none() => continue,
			Some(_) => {},
		};

		// check if the availability is present in the store exists
		if !query_data_availability(ctx, candidate_hash).await? {
			continue;
		}

		// obtain interested peers in the candidate hash
		let peers: Vec<PeerId> = state
			.peer_views
			.clone()
			.into_iter()
			.filter(|(_peer, view)| {
				// collect all direct interests of a peer w/o ancestors
				state
					.cached_live_candidates_unioned(view.heads.iter())
			})
			.map(|(peer, _view)| peer.clone())
			.collect();

		let per_candidate = state.per_candidate.get_mut(&candidate_hash)
			.expect("existence checked above; qed");

		let validator_count = per_candidate.validators.len();

		// distribute all erasure messages to interested peers
		for chunk_index in 0u32..(validator_count as u32) {
			let _span = {
				let mut span = per_candidate.span.child("load-and-distribute");
				span.add_string_tag("chunk-index", &format!("{}", chunk_index));
				span
			};
			let message = if let Some(message) = per_candidate.message_vault.get(&chunk_index) {
				tracing::trace!(
					target: LOG_TARGET,
					%chunk_index,
					?candidate_hash,
					"Retrieved chunk from message vault",
				);
				message.clone()
			} else if let Some(erasure_chunk) = query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await? {
				tracing::trace!(
					target: LOG_TARGET,
					%chunk_index,
					?candidate_hash,
					"Retrieved chunk from availability storage",
				);


				let msg = AvailabilityGossipMessage {
				};

				per_candidate.add_message(chunk_index, msg.clone());

				msg
				tracing::error!(
					target: LOG_TARGET,
					%chunk_index,
					?candidate_hash,
					"Availability store reported that we have the availability data, but we could not retrieve a chunk of it!",
				);
			debug_assert_eq!(message.erasure_chunk.index, chunk_index);

			let peers = peers
				.iter()
				.filter(|peer| per_candidate.message_required_by_peer(peer, chunk_index))
				.collect::<Vec<_>>();

			add_tracked_messages_to_batch(&mut messages_out, per_candidate, metrics, peers, iter::once(message));

		// traces are better if we wait until the loop is done to drop.
		per_candidate.drop_span_after_own_availability();
	// send all batched messages out.
	send_batch_to_network(ctx, messages_out).await;

	// cleanup the removed relay parents and their states
	old_view.difference(&view).for_each(|r| state.remove_relay_parent(r));
	state.clean_up_live_under_cache();
// After this function is invoked, the state reflects the messages as having been sent to a peer.
#[tracing::instrument(level = "trace", skip(batch, metrics, message_iter), fields(subsystem = LOG_TARGET))]
fn add_tracked_messages_to_batch(
	batch: &mut Vec<(Vec<PeerId>, protocol_v1::ValidationProtocol)>,
	per_candidate: &mut PerCandidate,
	metrics: &Metrics,
	peers: Vec<PeerId>,
	message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>,
	for message in message_iter {
		for peer in peers.iter() {
			per_candidate
				.sent_messages
				.entry(peer.clone())
				.or_default()
				.insert(message.erasure_chunk.index);
			batch.push((
				peers.clone(),
				protocol_v1::ValidationProtocol::AvailabilityDistribution(message.into()),
async fn send_batch_to_network(
	ctx: &mut impl SubsystemContext,
	batch: Vec<(Vec<PeerId>, protocol_v1::ValidationProtocol)>,
) {
	if !batch.is_empty() {
		let _ = ctx.send_message(NetworkBridgeMessage::SendValidationMessages(batch).into()).await;
// Send the difference between two views which were not sent
// to that particular peer.
#[tracing::instrument(level = "trace", skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_peer_view_change<Context>(
	ctx: &mut Context,
	state: &mut ProtocolState,
	origin: PeerId,
	view: View,
	metrics: &Metrics,
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let current = state.peer_views.entry(origin.clone()).or_default();

	let added: Vec<Hash> = view.difference(&*current).cloned().collect();

	*current = view;

	// only contains the intersection of what we are interested and
	// the union of all relay parent's candidates.
	let added_candidates = state.cached_live_candidates_unioned(added.iter());

	// Send all messages we've seen before and the peer is now interested in.
	let mut batch = Vec::new();
	for candidate_hash in added_candidates {
		let per_candidate = match state.per_candidate.get_mut(&candidate_hash) {
			Some(p) => p,
			None => continue,
		};

		// obtain the relevant chunk indices not sent yet
		let messages = ((0 as ValidatorIndex)..(per_candidate.validators.len() as ValidatorIndex))
			.into_iter()
			.filter_map(|erasure_chunk_index: ValidatorIndex| {
				// try to pick up the message from the message vault
				// so we send as much as we have
				per_candidate
					.message_vault
					.get(&erasure_chunk_index)
					.filter(|_| per_candidate.message_required_by_peer(&origin, erasure_chunk_index))
			})
			.cloned()
			.collect::<HashSet<_>>();

		add_tracked_messages_to_batch(&mut batch, per_candidate, metrics, vec![origin.clone()], messages);

	send_batch_to_network(ctx, batch).await;
}

/// Obtain the first key which has a signing key.
/// Returns the index within the validator set as `ValidatorIndex`, if there exists one,
/// otherwise, `None` is returned.
async fn obtain_our_validator_index(
	validators: &[ValidatorId],
) -> Option<ValidatorIndex> {
	for (idx, validator) in validators.iter().enumerate() {
		if CryptoStore::has_keys(
			&*keystore,
			&[(validator.to_raw_vec(), PARACHAIN_KEY_TYPE_ID)],
		)
		.await
		{
			return Some(idx as ValidatorIndex);
}

/// Handle an incoming message from a peer.
#[tracing::instrument(level = "trace", skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
async fn process_incoming_peer_message<Context>(
	ctx: &mut Context,
	state: &mut ProtocolState,
	origin: PeerId,
	message: AvailabilityGossipMessage,
	metrics: &Metrics,
) -> Result<()>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let _timer = metrics.time_process_incoming_peer_message();

	// obtain the set of candidates we are interested in based on our current view
	let live_candidates = state.cached_live_candidates_unioned(state.view.heads.iter());

	// check if the candidate is of interest
	let candidate_entry = if live_candidates.contains(&message.candidate_hash) {
			.get_mut(&message.candidate_hash)
			.expect("All live candidates are contained in per_candidate; qed")
		tracing::trace!(
			target: LOG_TARGET,
			candidate_hash = ?message.candidate_hash,
			peer = %origin,
			"Peer send not live candidate",
		);
		modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await;
	// Handle a duplicate before doing expensive checks.
	if let Some(existing) = candidate_entry.message_vault.get(&message.erasure_chunk.index) {
		let span = candidate_entry.span.child("handle-duplicate");
		// check if this particular erasure chunk was already sent by that peer before
		{
			let _span = span.child("check-entry");
			let received_set = candidate_entry
				.received_messages
				.entry(origin.clone())
				.or_default();

			if !received_set.insert(message.erasure_chunk.index) {
				modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
				return Ok(());
			}
		}

		// check that the message content matches what we have already before rewarding
		// the peer.
		{
			let _span = span.child("check-accurate");
			if existing == &message {
				modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await;
			} else {
				modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
			}
		}

		return Ok(());
	}

	let span = {
		let mut span = candidate_entry.span.child("process-new-chunk");
		span.add_string_tag("peer-id", &origin.to_base58());
		span
	};

	// check the merkle proof against the erasure root in the candidate descriptor.
	let anticipated_hash = {
		let _span = span.child("check-merkle-root");
		match branch_hash(
			&candidate_entry.descriptor.erasure_root,
			&message.erasure_chunk.proof,
			message.erasure_chunk.index as usize,
		) {
			Ok(hash) => hash,
			Err(e) => {
				tracing::trace!(
					target: LOG_TARGET,
					candidate_hash = ?message.candidate_hash,
					peer = %origin,
					error = ?e,
					"Failed to calculate chunk merkle proof",
				);
				modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
				return Ok(());
			},
		}
	};

	{
		let _span = span.child("check-chunk-hash");
		let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk);
		if anticipated_hash != erasure_chunk_hash {
			tracing::trace!(
				target: LOG_TARGET,
				candidate_hash = ?message.candidate_hash,
				peer = %origin,
				"Peer sent chunk with invalid merkle proof",
			);
			modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
			return Ok(());
		// insert into known messages and change reputation. we've guaranteed
		// above that the message vault doesn't contain any message under this
		// chunk index already.
		candidate_entry
				.received_messages
				.entry(origin.clone())
				.or_default()
				.insert(message.erasure_chunk.index);

		modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await;

		// save the chunk for our index
		if Some(message.erasure_chunk.index) == candidate_entry.validator_index {
			let _span = span.child("store-our-chunk");
			if store_chunk(
				ctx,
				message.candidate_hash,
				candidate_entry.descriptor.relay_parent,
				message.erasure_chunk.index,
				message.erasure_chunk.clone(),
			).await?.is_err() {
				tracing::warn!(
					target: LOG_TARGET,
					"Failed to store erasure chunk to availability store"
				);
		candidate_entry.add_message(message.erasure_chunk.index, message.clone());
		candidate_entry.drop_span_after_own_availability();
	// condense the peers to the peers with interest on the candidate
	let peers = {
		let _span = span.child("determine-recipient-peers");
		let per_relay_parent = &state.per_relay_parent;

		state
			.peer_views
			.clone()
			.into_iter()
			.filter(|(_, view)| {
				// peers view must contain the candidate hash too
				cached_live_candidates_unioned(
					per_relay_parent,
					view.heads.iter(),
				).contains(&message.candidate_hash)
			})
			.map(|(peer, _)| -> PeerId { peer.clone() })
			.filter(|peer| candidate_entry.message_required_by_peer(peer, message.erasure_chunk.index))
			.collect::<Vec<_>>()
	};
	drop(span);
	// gossip that message to interested peers
	{
		let mut batch = Vec::new();
		add_tracked_messages_to_batch(&mut batch, candidate_entry, metrics, peers, iter::once(message));
		send_batch_to_network(ctx, batch).await;
	}
}

/// The bitfield distribution subsystem.
pub struct AvailabilityDistributionSubsystem {
	/// Pointer to a keystore, which is required for determining this nodes validator index.
	/// Prometheus metrics.
	metrics: Metrics,
}

impl AvailabilityDistributionSubsystem {
	/// Number of ancestors to keep around for the relay-chain heads.
	const K: usize = 3;

	/// Create a new instance of the availability distribution.
	pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
		Self { keystore, metrics }
	}

	/// Start processing work as passed on from the Overseer.
	async fn run<Context>(self, ctx: Context) -> Result<()>
	where
		Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
	{
		let mut state = ProtocolState {
			peer_views: HashMap::new(),
			view: Default::default(),
			live_under: HashMap::new(),
			per_relay_parent: HashMap::new(),
			per_candidate: HashMap::new(),
		};

		self.run_inner(ctx, &mut state).await
	}

	/// Start processing work.
	#[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))]
	async fn run_inner<Context>(self, mut ctx: Context, state: &mut ProtocolState) -> Result<()>
	where
		Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
	{
		// work: process incoming messages from the overseer.
		loop {
			let message = ctx
				.recv()
				.await
				.map_err(|e| Error::IncomingMessageChannel(e))?;
			match message {
				FromOverseer::Communication {
					msg: AvailabilityDistributionMessage::NetworkBridgeUpdateV1(event),
				} => {
					if let Err(e) = handle_network_msg(
						&mut ctx,
						&self.metrics,
						event,
						tracing::warn!(
							target: LOG_TARGET,
							err = ?e,
							"Failed to handle incoming network messages",
						);
					}
				}
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
					activated: _,
					deactivated: _,
				})) => {
					// handled at view change
				}
				FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {}
				FromOverseer::Signal(OverseerSignal::Conclude) => {
					return Ok(());
				}
			}
		}
	}
}

impl<Context> Subsystem<Context> for AvailabilityDistributionSubsystem
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage> + Sync + Send,
{
	fn start(self, ctx: Context) -> SpawnedSubsystem {
		let future = self
			.run(ctx)
			.map_err(|e| SubsystemError::with_origin("availability-distribution", e))
			.boxed();

		SpawnedSubsystem {
			name: "availability-distribution-subsystem",
// Metadata about a candidate that is part of the live_candidates set.
//
// Those which were not present in a cache are "fresh" and have their candidate descriptor attached. This
// information is propagated to the higher level where it can be used to create data entries. Cached candidates
// already have entries associated with them, and thus don't need this metadata to be fetched.
#[derive(Debug)]
enum FetchedLiveCandidate {
	Cached,
	Fresh(CandidateDescriptor),
}

/// Obtain all live candidates for all given `relay_blocks`.
///
/// This returns a set of all candidate hashes pending availability within the state
/// of the explicitly referenced relay heads.
///
/// This also queries the provided `live_under` cache before reaching into the
/// runtime and updates it with the information learned.
#[tracing::instrument(level = "trace", skip(ctx, relay_blocks, live_under), fields(subsystem = LOG_TARGET))]
async fn query_pending_availability_at<Context>(
	ctx: &mut Context,
	relay_blocks: impl IntoIterator<Item = Hash>,
	live_under: &mut HashMap<Hash, HashSet<CandidateHash>>,
) -> Result<HashMap<CandidateHash, FetchedLiveCandidate>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let mut live_candidates = HashMap::new();

	// fetch and fill out cache for each of these
	for relay_parent in relay_blocks {
		let receipts_for = match live_under.entry(relay_parent) {
			Entry::Occupied(e) => {
				live_candidates.extend(
					e.get().iter().cloned().map(|c| (c, FetchedLiveCandidate::Cached))
				);
				continue
			},
			e => e.or_default(),
		};
		for (receipt_hash, descriptor) in query_pending_availability(ctx, relay_parent).await? {
			// unfortunately we have no good way of telling the candidate was
			// cached until now. But we don't clobber a `Cached` entry if there
			// is one already.
			live_candidates.entry(receipt_hash).or_insert(FetchedLiveCandidate::Fresh(descriptor));
			receipts_for.insert(receipt_hash);
	Ok(live_candidates)
}

/// Obtain all live candidates under a particular relay head. This implicitly includes
/// `K` ancestors of the head, such that the candidates pending availability in all of
/// the states of the head and the ancestors are unioned together to produce the
/// return type of this function. Each candidate hash is paired with information about
/// from where it was fetched.
/// This also updates all `live_under` cached by the protocol state and returns a list
/// of up to `K` ancestors of the relay-parent.
#[tracing::instrument(level = "trace", skip(ctx, live_under), fields(subsystem = LOG_TARGET))]
async fn query_live_candidates<Context>(
	ctx: &mut Context,
	live_under: &mut HashMap<Hash, HashSet<CandidateHash>>,
	relay_parent: Hash,
) -> Result<(HashMap<CandidateHash, FetchedLiveCandidate>, Vec<Hash>)>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	// register one of relay parents (not the ancestors)
	let ancestors = query_up_to_k_ancestors_in_same_session(
		ctx,
		relay_parent,
		AvailabilityDistributionSubsystem::K,
	)
	.await?;

	// query the ones that were not present in the live_under cache and add them
	// to it.
	let live_candidates = query_pending_availability_at(
		ctx,
		ancestors.iter().cloned().chain(iter::once(relay_parent)),
		live_under,
/// Query all hashes and descriptors of candidates pending availability at a particular block.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_pending_availability<Context>(ctx: &mut Context, relay_parent: Hash)
	-> Result<Vec<(CandidateHash, CandidateDescriptor)>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let (tx, rx) = oneshot::channel();
	ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
		relay_parent,
		RuntimeApiRequest::AvailabilityCores(tx),
	)))
	let cores: Vec<_> = rx
		.await
		.map_err(|e| Error::AvailabilityCoresResponseChannel(e))?
		.map_err(|e| Error::AvailabilityCores(e))?;
	Ok(cores.into_iter()
		.filter_map(|core_state| if let CoreState::Occupied(occupied) = core_state {
			Some((occupied.candidate_hash, occupied.candidate_descriptor))
		} else {
			None
}

/// Modify the reputation of a peer based on its behavior.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep)
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	tracing::trace!(
		target: LOG_TARGET,
		rep = ?rep,
		peer_id = ?peer,
		"Reputation change for peer",
	);
	ctx.send_message(AllMessages::NetworkBridge(
		NetworkBridgeMessage::ReportPeer(peer, rep),
}

/// Query the proof of validity for a particular candidate hash.