lib.rs 31.5 KiB
Newer Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The availability distribution
//!
//! Transforms `AvailableData` into erasure chunks, which are distributed to peers
//! who are interested in the relevant candidates.
//! Gossip messages received from other peers are verified and gossiped to interested
//! peers. Verified in this context means, the erasure chunks contained merkle proof
//! is checked.

use codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt};

use keystore::KeyStorePtr;
use sp_core::{
	crypto::Public,
	traits::BareCryptoStore,
};
use sc_keystore as keystore;

use log::{trace, warn};
use polkadot_erasure_coding::branch_hash;
use polkadot_primitives::v1::{
	PARACHAIN_KEY_TYPE_ID,
	BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk,
	Hash as Hash, HashT, Id as ParaId,
	ValidatorId, ValidatorIndex, SessionIndex,
};
use polkadot_subsystem::messages::{
	AllMessages, AvailabilityDistributionMessage, NetworkBridgeMessage, RuntimeApiMessage,
	RuntimeApiRequest, AvailabilityStoreMessage, ChainApiMessage,
};
use polkadot_subsystem::{
	errors::{ChainApiError, RuntimeApiError},
	ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
	SubsystemContext, SubsystemError,
};
use polkadot_node_network_protocol::{
	v1 as protocol_v1, View, ReputationChange as Rep, PeerId,
	NetworkBridgeEvent,
};
use std::collections::{HashMap, HashSet};
use std::io;
use std::iter;

const TARGET: &'static str = "avad";

#[derive(Debug, derive_more::From)]
enum Error {
	#[from]
	Erasure(polkadot_erasure_coding::Error),
	#[from]
	Io(io::Error),
	#[from]
	Oneshot(oneshot::Canceled),
	#[from]
	Subsystem(SubsystemError),
	#[from]
	RuntimeApi(RuntimeApiError),
	#[from]
	ChainApi(ChainApiError),
}

type Result<T> = std::result::Result<T, Error>;

const COST_MERKLE_PROOF_INVALID: Rep = Rep::new(-100, "Merkle proof was invalid");
const COST_NOT_A_LIVE_CANDIDATE: Rep = Rep::new(-51, "Candidate is not live");
const COST_PEER_DUPLICATE_MESSAGE: Rep = Rep::new(-500, "Peer sent identical messages");
const BENEFIT_VALID_MESSAGE_FIRST: Rep = Rep::new(15, "Valid message with new information");
const BENEFIT_VALID_MESSAGE: Rep = Rep::new(10, "Valid message");

/// Checked signed availability bitfield that is distributed
/// to other peers.
#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq, Hash)]
pub struct AvailabilityGossipMessage {
	/// Anchor hash of the candidate the `ErasureChunk` is associated to.
	pub candidate_hash: Hash,
	/// The erasure chunk, a encoded information part of `AvailabilityData`.
	pub erasure_chunk: ErasureChunk,
}

/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Default, Clone, Debug)]
struct ProtocolState {
	/// Track all active peers and their views
	/// to determine what is relevant to them.
	peer_views: HashMap<PeerId, View>,

	/// Our own view.
	view: View,

	/// Caches a mapping of relay parents or ancestor to live candidate receipts.
	/// Allows fast intersection of live candidates with views and consecutive unioning.
	/// Maps relay parent / ancestor -> live candidate receipts + its hash.
	receipts: HashMap<Hash, HashSet<(Hash, CommittedCandidateReceipt)>>,

	/// Allow reverse caching of view checks.
	/// Maps candidate hash -> relay parent for extracting meta information from `PerRelayParent`.
	/// Note that the presence of this is not sufficient to determine if deletion is OK, i.e.
	/// two histories could cover this.
	reverse: HashMap<Hash, Hash>,

	/// Keeps track of which candidate receipts are required due to ancestors of which relay parents
	/// of our view.
	/// Maps ancestor -> relay parents in view
	ancestry: HashMap<Hash, HashSet<Hash>>,

	/// Track things needed to start and stop work on a particular relay parent.
	per_relay_parent: HashMap<Hash, PerRelayParent>,

	/// Track data that is specific to a candidate.
	per_candidate: HashMap<Hash, PerCandidate>,
}

#[derive(Debug, Clone, Default)]
struct PerCandidate {
	/// A Candidate and a set of known erasure chunks in form of messages to be gossiped / distributed if the peer view wants that.
	/// This is _across_ peers and not specific to a particular one.
	/// candidate hash + erasure chunk index -> gossip message
	message_vault: HashMap<u32, AvailabilityGossipMessage>,

	/// Track received candidate hashes and chunk indices from peers.
	received_messages: HashMap<PeerId, HashSet<(Hash, ValidatorIndex)>>,

	/// Track already sent candidate hashes and the erasure chunk index to the peers.
	sent_messages: HashMap<PeerId, HashSet<(Hash, ValidatorIndex)>>,

	/// The set of validators.
	validators: Vec<ValidatorId>,

	/// If this node is a validator, note the index in the validator set.
	validator_index: Option<ValidatorIndex>,
}

#[derive(Debug, Clone, Default)]
struct PerRelayParent {
	/// Set of `K` ancestors for this relay parent.
	ancestors: Vec<Hash>,
}

impl ProtocolState {
	/// Collects the relay_parents ancestors including the relay parents themselfes.
	fn extend_with_ancestors<'a>(
		&'a self,
		relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
	) -> HashSet<Hash> {
		relay_parents
			.into_iter()
			.map(|relay_parent| {
				self.per_relay_parent
					.get(relay_parent)
					.into_iter()
					.map(|per_relay_parent| per_relay_parent.ancestors.iter().cloned())
					.flatten()
					.chain(iter::once(*relay_parent))
			})
			.flatten()
			.collect::<HashSet<Hash>>()
	}

	/// Unionize all cached entries for the given relay parents and its ancestors.
	/// Ignores all non existent relay parents, so this can be used directly with a peers view.
	/// Returns a map from candidate hash -> receipt
	fn cached_live_candidates_unioned<'a>(
		&'a self,
		relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
	) -> HashMap<Hash, CommittedCandidateReceipt> {
		let relay_parents_and_ancestors = self.extend_with_ancestors(relay_parents);
		relay_parents_and_ancestors
			.into_iter()
			.filter_map(|relay_parent_or_ancestor| self.receipts.get(&relay_parent_or_ancestor))
			.map(|receipt_set| receipt_set.into_iter())
			.flatten()
			.map(|(receipt_hash, receipt)| (receipt_hash.clone(), receipt.clone()))
			.collect::<HashMap<Hash, CommittedCandidateReceipt>>()
	}

	async fn add_relay_parent<Context>(
		&mut self,
		ctx: &mut Context,
		relay_parent: Hash,
		validators: Vec<ValidatorId>,
		validator_index: Option<ValidatorIndex>,
	) -> Result<()>
	where
		Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
	{
		let candidates =
			query_live_candidates(ctx, self, std::iter::once(relay_parent)).await?;

		// register the relation of relay_parent to candidate..
		// ..and the reverse association.
		for (relay_parent_or_ancestor, (receipt_hash, receipt)) in candidates.clone() {
			self
				.reverse
				.insert(receipt_hash.clone(), relay_parent_or_ancestor.clone());
			let per_candidate = self.per_candidate.entry(receipt_hash.clone())
				.or_default();
			per_candidate.validator_index = validator_index.clone();
			per_candidate.validators = validators.clone();

			self
				.receipts
				.entry(relay_parent_or_ancestor)
				.or_default()
				.insert((receipt_hash, receipt));
		}

		// collect the ancestors again from the hash map
		let ancestors = candidates
			.iter()
			.filter_map(|(ancestor_or_relay_parent, _receipt)| {
				if ancestor_or_relay_parent == &relay_parent {
					None
				} else {
					Some(*ancestor_or_relay_parent)
				}
			})
			.collect::<Vec<Hash>>();

		// mark all the ancestors as "needed" by this newly added relay parent
		for ancestor in ancestors.iter() {
			self.ancestry
				.entry(ancestor.clone())
				.or_default()
				.insert(relay_parent);
		}

		self
			.per_relay_parent
			.entry(relay_parent)
			.or_default()
			.ancestors = ancestors;

		Ok(())
	}

	fn remove_relay_parent(&mut self, relay_parent: &Hash) -> Result<()> {
		// we might be ancestor of some other relay_parent
		if let Some(ref mut descendants) = self.ancestry.get_mut(relay_parent) {
			// if we were the last user, and it is
			// not explicitly set to be worked on by the overseer
			if descendants.is_empty() {
				// remove from the ancestry index
				self.ancestry.remove(relay_parent);
				// and also remove the actual receipt
				self.receipts.remove(relay_parent);
				self.per_candidate.remove(relay_parent);
			}
		}
		if let Some(per_relay_parent) = self.per_relay_parent.remove(relay_parent) {
			// remove all "references" from the hash maps and sets for all ancestors
			for ancestor in per_relay_parent.ancestors {
				// one of our decendants might be ancestor of some other relay_parent
				if let Some(ref mut descendants) = self.ancestry.get_mut(&ancestor) {
					// we do not need this descendant anymore
					descendants.remove(&relay_parent);
					// if we were the last user, and it is
					// not explicitly set to be worked on by the overseer
					if descendants.is_empty() && !self.per_relay_parent.contains_key(&ancestor) {
						// remove from the ancestry index
						self.ancestry.remove(&ancestor);
						// and also remove the actual receipt
						self.receipts.remove(&ancestor);
						self.per_candidate.remove(&ancestor);
					}
				}
			}
		}
		Ok(())
	}
}

/// Deal with network bridge updates and track what needs to be tracked
/// which depends on the message type received.
async fn handle_network_msg<Context>(
	ctx: &mut Context,
	keystore: KeyStorePtr,
	state: &mut ProtocolState,
	bridge_message: NetworkBridgeEvent<protocol_v1::AvailabilityDistributionMessage>,
) -> Result<()>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	match bridge_message {
		NetworkBridgeEvent::PeerConnected(peerid, _role) => {
			// insert if none already present
			state.peer_views.entry(peerid).or_default();
		}
		NetworkBridgeEvent::PeerDisconnected(peerid) => {
			// get rid of superfluous data
			state.peer_views.remove(&peerid);
		}
		NetworkBridgeEvent::PeerViewChange(peerid, view) => {
			handle_peer_view_change(ctx, state, peerid, view).await?;
		}
		NetworkBridgeEvent::OurViewChange(view) => {
			handle_our_view_change(ctx, keystore, state, view).await?;
		}
		NetworkBridgeEvent::PeerMessage(remote, msg) => {
			let gossiped_availability = match msg {
				protocol_v1::AvailabilityDistributionMessage::Chunk(candidate_hash, chunk) =>
					AvailabilityGossipMessage { candidate_hash, erasure_chunk: chunk }
			};

			process_incoming_peer_message(ctx, state, remote, gossiped_availability).await?;
		}
	}
	Ok(())
}


/// Handle the changes necessary when our view changes.
async fn handle_our_view_change<Context>(
	ctx: &mut Context,
	keystore: KeyStorePtr,
	state: &mut ProtocolState,
	view: View,
) -> Result<()>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let old_view = std::mem::replace(&mut (state.view), view);

	// needed due to borrow rules
	let view = state.view.clone();
	let added = view.difference(&old_view).collect::<Vec<&'_ Hash>>();

	// add all the relay parents and fill the cache
	for added in added.iter() {
		let added = **added;
		let validators = query_validators(ctx, added).await?;
		let validator_index = obtain_our_validator_index(
			&validators,
			keystore.clone(),
		);
		state.add_relay_parent(ctx, added, validators, validator_index).await?;
	}

	// handle all candidates
	for (candidate_hash, _receipt) in state.cached_live_candidates_unioned(added) {
		let per_candidate = state
			.per_candidate
			.entry(candidate_hash)
			.or_default();

		// assure the node has the validator role
		if per_candidate.validator_index.is_none() {
			continue;
		};

		// check if the availability is present in the store exists
		if !query_data_availability(ctx, candidate_hash).await? {
			continue;
		}

		let validator_count = per_candidate.validators.len();

		// obtain interested peers in the candidate hash
		let peers: Vec<PeerId> = state
			.peer_views
			.clone()
			.into_iter()
			.filter(|(_peer, view)| {
				// collect all direct interests of a peer w/o ancestors
				state
					.cached_live_candidates_unioned(view.0.iter())
					.contains_key(&candidate_hash)
			})
			.map(|(peer, _view)| peer.clone())
			.collect();

		// distribute all erasure messages to interested peers
		for chunk_index in 0u32..(validator_count as u32) {

			// only the peers which did not receive this particular erasure chunk
			let per_candidate = state
				.per_candidate
				.entry(candidate_hash)
				.or_default();

			// obtain the chunks from the cache, if not fallback
			// and query the availability store
			let message_id = (candidate_hash, chunk_index);
			let erasure_chunk = if let Some(message) = per_candidate.message_vault.get(&chunk_index) {
				message.erasure_chunk.clone()
			} else if let Some(erasure_chunk) = query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await? {
				erasure_chunk
			} else {
				continue;
			};

			debug_assert_eq!(erasure_chunk.index, chunk_index);

			let peers = peers
				.iter()
				.filter(|peer| {
					// only pick those which were not sent before
					!per_candidate
						.sent_messages
						.get(*peer)
						.filter(|set| {
							set.contains(&message_id)
						})
						.is_some()
				})
				.map(|peer| peer.clone())
				.collect::<Vec<_>>();
			let message = AvailabilityGossipMessage {
				candidate_hash,
				erasure_chunk,
			};

			send_tracked_gossip_message_to_peers(ctx, per_candidate, peers, message).await?;
		}
	}

	// cleanup the removed relay parents and their states
	let removed = old_view.difference(&view).collect::<Vec<_>>();
	for removed in removed {
		state.remove_relay_parent(&removed)?;
	}
	Ok(())
}

#[inline(always)]
async fn send_tracked_gossip_message_to_peers<Context>(
	ctx: &mut Context,
	per_candidate: &mut PerCandidate,
	peers: Vec<PeerId>,
	message: AvailabilityGossipMessage,
) -> Result<()>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	send_tracked_gossip_messages_to_peers(ctx, per_candidate, peers, iter::once(message)).await
}

#[inline(always)]
async fn send_tracked_gossip_messages_to_peer<Context>(
	ctx: &mut Context,
	per_candidate: &mut PerCandidate,
	peer: PeerId,
	message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>,
) -> Result<()>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	send_tracked_gossip_messages_to_peers(ctx, per_candidate, vec![peer], message_iter).await
}

async fn send_tracked_gossip_messages_to_peers<Context>(
	ctx: &mut Context,
	per_candidate: &mut PerCandidate,
	peers: Vec<PeerId>,
	message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>,
) -> Result<()>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	if peers.is_empty() {
		return Ok(())
	}
	for message in message_iter {
		for peer in peers.iter() {
			let message_id = (message.candidate_hash, message.erasure_chunk.index);
			per_candidate
				.sent_messages
				.entry(peer.clone())
				.or_default()
				.insert(message_id);
		}

		per_candidate
			.message_vault
			.insert(message.erasure_chunk.index, message.clone());

		let wire_message = protocol_v1::AvailabilityDistributionMessage::Chunk(
			message.candidate_hash,
			message.erasure_chunk,
		);

		ctx.send_message(AllMessages::NetworkBridge(
			NetworkBridgeMessage::SendValidationMessage(
				peers.clone(),
				protocol_v1::ValidationProtocol::AvailabilityDistribution(wire_message),
			),
		))
		.await
		.map_err::<Error, _>(Into::into)?;
	}

	Ok(())
}

// Send the difference between two views which were not sent
// to that particular peer.
async fn handle_peer_view_change<Context>(
	ctx: &mut Context,
	state: &mut ProtocolState,
	origin: PeerId,
	view: View,
) -> Result<()>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let current = state.peer_views.entry(origin.clone()).or_default();

	let added: Vec<Hash> = view.difference(&*current).cloned().collect();

	*current = view;

	// only contains the intersection of what we are interested and
	// the union of all relay parent's candidates.
	let added_candidates = state.cached_live_candidates_unioned(added.iter());

	// Send all messages we've seen before and the peer is now interested
	// in to that peer.

	for (candidate_hash, _receipt) in added_candidates {
		let per_candidate = state.per_candidate.entry(candidate_hash).or_default();

		// obtain the relevant chunk indices not sent yet
		let messages = ((0 as ValidatorIndex)
			..(per_candidate.validators.len() as ValidatorIndex))
			.into_iter()
			.filter_map(|erasure_chunk_index: ValidatorIndex| {
				let message_id = (candidate_hash, erasure_chunk_index);

				// try to pick up the message from the message vault
				// so we send as much as we have
				per_candidate
					.message_vault
					.get(&erasure_chunk_index)
					.filter(|_| {
						// check if that erasure chunk was already sent before
						if let Some(sent_set) = per_candidate.sent_messages.get(&origin) {
							if sent_set.contains(&message_id) {
								return false;
							}
						}
						true
					})
			})
			.cloned()
			.collect::<HashSet<_>>();

		send_tracked_gossip_messages_to_peer(ctx, per_candidate, origin.clone(), messages).await?;
	}
	Ok(())
}

/// Obtain the first key which has a signing key.
/// Returns the index within the validator set as `ValidatorIndex`, if there exists one,
/// otherwise, `None` is returned.
fn obtain_our_validator_index(
	validators: &[ValidatorId],
	keystore: KeyStorePtr,
) -> Option<ValidatorIndex> {
	let keystore = keystore.read();
	validators.iter().enumerate().find_map(|(idx, validator)| {
		if keystore.has_keys(&[(validator.to_raw_vec(), PARACHAIN_KEY_TYPE_ID)]) {
			Some(idx as ValidatorIndex)
		} else {
			None
		}
	})
}

/// Handle an incoming message from a peer.
async fn process_incoming_peer_message<Context>(
	ctx: &mut Context,
	state: &mut ProtocolState,
	origin: PeerId,
	message: AvailabilityGossipMessage,
) -> Result<()>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	// obtain the set of candidates we are interested in based on our current view
	let live_candidates = state.cached_live_candidates_unioned(state.view.0.iter());

	// check if the candidate is of interest
	let live_candidate = if let Some(live_candidate) = live_candidates.get(&message.candidate_hash)
	{
		live_candidate
	} else {
		return modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await;
	};

	// check the merkle proof
	let root = &live_candidate.commitments.erasure_root;
	let anticipated_hash = if let Ok(hash) = branch_hash(
		root,
		&message.erasure_chunk.proof,
		message.erasure_chunk.index as usize,
	) {
		hash
	} else {
		return modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
	};

	let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk);
	if anticipated_hash != erasure_chunk_hash {
		return modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
	}

	// an internal unique identifier of this message
	let message_id = (message.candidate_hash, message.erasure_chunk.index);

	{
		let per_candidate = state.per_candidate.entry(message_id.0.clone()).or_default();

		// check if this particular erasure chunk was already sent by that peer before
		{
			let received_set = per_candidate
				.received_messages
				.entry(origin.clone())
				.or_default();
			if received_set.contains(&message_id) {
				return modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
			} else {
				received_set.insert(message_id.clone());
			}
		}

		// insert into known messages and change reputation
		if per_candidate
			.message_vault
			.insert(message_id.1, message.clone())
			.is_some()
		{
			modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await?;
		} else {
			modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await?;

			// save the chunk for our index
			if let Some(validator_index) = per_candidate.validator_index {
				if message.erasure_chunk.index == validator_index {
					if let Err(_e) = store_chunk(
						ctx,
						message.candidate_hash.clone(),
						message.erasure_chunk.index,
						message.erasure_chunk.clone(),
					).await? {
						warn!(target: TARGET, "Failed to store erasure chunk to availability store");
					}
				}
			}
		};
	}
	// condense the peers to the peers with interest on the candidate
	let peers = state
		.peer_views
		.clone()
		.into_iter()
		.filter(|(_peer, view)| {
			// peers view must contain the candidate hash too
			state
				.cached_live_candidates_unioned(view.0.iter())
				.contains_key(&message_id.0)
		})
		.map(|(peer, _)| -> PeerId { peer.clone() })
		.collect::<Vec<_>>();

	let per_candidate = state.per_candidate.entry(message_id.0.clone()).or_default();

	let peers = peers
		.into_iter()
		.filter(|peer| {
			let peer: PeerId = peer.clone();
			// avoid sending duplicate messages
			per_candidate
				.sent_messages
				.entry(peer)
				.or_default()
				.contains(&message_id)
		})
		.collect::<Vec<_>>();

	// gossip that message to interested peers
	send_tracked_gossip_message_to_peers(ctx, per_candidate, peers, message).await
}

/// The bitfield distribution subsystem.
pub struct AvailabilityDistributionSubsystem {
	/// Pointer to a keystore, which is required for determining this nodes validator index.
	keystore: KeyStorePtr,
}

impl AvailabilityDistributionSubsystem {
	/// Number of ancestors to keep around for the relay-chain heads.
	const K: usize = 3;

	/// Create a new instance of the availability distribution.
	pub fn new(keystore: KeyStorePtr) -> Self {
		Self { keystore }
	}

	/// Start processing work as passed on from the Overseer.
	async fn run<Context>(self, mut ctx: Context) -> Result<()>
	where
		Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
	{
		// work: process incoming messages from the overseer.
		let mut state = ProtocolState::default();
		loop {
			let message = ctx.recv().await.map_err::<Error, _>(Into::into)?;
			match message {
				FromOverseer::Communication {
					msg: AvailabilityDistributionMessage::NetworkBridgeUpdateV1(event),
				} => {
					if let Err(e) = handle_network_msg(
						&mut ctx,
						self.keystore.clone(),
						&mut state,
						event
					).await {
						warn!(
							target: TARGET,
							"Failed to handle incomming network messages: {:?}", e
						);
					}
				}
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
					activated: _,
					deactivated: _,
				})) => {
					// handled at view change
				}
				FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {}
				FromOverseer::Signal(OverseerSignal::Conclude) => {
					return Ok(());
				}
			}
		}
	}
}

impl<Context> Subsystem<Context> for AvailabilityDistributionSubsystem
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage> + Sync + Send,
{
	fn start(self, ctx: Context) -> SpawnedSubsystem {
		SpawnedSubsystem {
			name: "availability-distribution-subsystem",
			future: Box::pin(async move { self.run(ctx) }.map(|_| ())),
		}
	}
}

/// Obtain all live candidates based on an iterator of relay heads.
async fn query_live_candidates_without_ancestors<Context>(
	ctx: &mut Context,
	relay_parents: impl IntoIterator<Item = Hash>,
) -> Result<HashSet<CommittedCandidateReceipt>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let iter = relay_parents.into_iter();
	let hint = iter.size_hint();

	let mut live_candidates = HashSet::with_capacity(hint.1.unwrap_or(hint.0));
	for relay_parent in iter {
		let paras = query_para_ids(ctx, relay_parent).await?;
		for para in paras {
			if let Some(ccr) = query_pending_availability(ctx, relay_parent, para).await? {
				live_candidates.insert(ccr);
			}
		}
	}
	Ok(live_candidates)
}

/// Obtain all live candidates based on an iterator or relay heads including `k` ancestors.
///
/// Relay parent.
async fn query_live_candidates<Context>(
	ctx: &mut Context,
	state: &mut ProtocolState,
	relay_parents: impl IntoIterator<Item = Hash>,
) -> Result<HashMap<Hash, (Hash, CommittedCandidateReceipt)>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let iter = relay_parents.into_iter();
	let hint = iter.size_hint();

	let capacity = hint.1.unwrap_or(hint.0) * (1 + AvailabilityDistributionSubsystem::K);
	let mut live_candidates =
		HashMap::<Hash, (Hash, CommittedCandidateReceipt)>::with_capacity(capacity);

	for relay_parent in iter {

		// register one of relay parents (not the ancestors)
		let mut ancestors = query_up_to_k_ancestors_in_same_session(
			ctx,
			relay_parent,
			AvailabilityDistributionSubsystem::K,
		)
		.await?;

		ancestors.push(relay_parent);


		// ancestors might overlap, so check the cache too
		let unknown = ancestors
			.into_iter()
			.filter(|relay_parent_or_ancestor| {
				// use the ones which we pulled before
				// but keep the unknown relay parents
				state
					.receipts
					.get(relay_parent_or_ancestor)
					.and_then(|receipts| {
						// directly extend the live_candidates with the cached value
						live_candidates.extend(receipts.into_iter().map(
							|(receipt_hash, receipt)| {
								(
									relay_parent,
									(receipt_hash.clone(), receipt.clone()),
								)
							},
						));
						Some(())
					})
					.is_none()
			})
			.collect::<Vec<_>>();

		// query the ones that were not present in the receipts cache
		let receipts = query_live_candidates_without_ancestors(ctx, unknown.clone()).await?;
		live_candidates.extend(
			unknown.into_iter().zip(
				receipts
					.into_iter()
					.map(|receipt| (receipt.hash(), receipt)),
			),
		);
	}
	Ok(live_candidates)
}

/// Query all para IDs.
async fn query_para_ids<Context>(ctx: &mut Context, relay_parent: Hash) -> Result<Vec<ParaId>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let (tx, rx) = oneshot::channel();
	ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
		relay_parent,
		RuntimeApiRequest::AvailabilityCores(tx),
	)))
	.await
	.map_err::<Error, _>(Into::into)?;

	let all_para_ids: Vec<_> = rx
		.await??;

	let occupied_para_ids = all_para_ids
		.into_iter()
		.filter_map(|core_state| {
			if let CoreState::Occupied(occupied) = core_state {
				Some(occupied.para_id)
			} else {
				None
			}
		})
		.collect();
	Ok(occupied_para_ids)
}

/// Modify the reputation of a peer based on its behavior.
async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep) -> Result<()>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	trace!(
		target: TARGET,
		"Reputation change of {:?} for peer {:?}",
		rep,
		peer
	);
	ctx.send_message(AllMessages::NetworkBridge(
		NetworkBridgeMessage::ReportPeer(peer, rep),
	))
	.await
	.map_err::<Error, _>(Into::into)
}

/// Query the proof of validity for a particular candidate hash.
async fn query_data_availability<Context>(
	ctx: &mut Context,
	candidate_hash: Hash,
) -> Result<bool>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let (tx, rx) = oneshot::channel();
	ctx.send_message(AllMessages::AvailabilityStore(
		AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx),
	))
	.await?;
	rx.await.map_err::<Error, _>(Into::into)
}


async fn query_chunk<Context>(
	ctx: &mut Context,
	candidate_hash: Hash,
	validator_index: ValidatorIndex,
) -> Result<Option<ErasureChunk>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let (tx, rx) = oneshot::channel();
	ctx.send_message(AllMessages::AvailabilityStore(
			AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx),
		))
		.await?;
	rx.await.map_err::<Error, _>(Into::into)
}


async fn store_chunk<Context>(
	ctx: &mut Context,
	candidate_hash: Hash,
	validator_index: ValidatorIndex,
	erasure_chunk: ErasureChunk,
) -> Result<std::result::Result<(), ()>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let (tx, rx) = oneshot::channel();
	ctx.send_message(AllMessages::AvailabilityStore(
		AvailabilityStoreMessage::StoreChunk(candidate_hash, validator_index, erasure_chunk, tx),
	)).await?;
	rx.await.map_err::<Error, _>(Into::into)
}

/// Request the head data for a particular para.
async fn query_pending_availability<Context>(
	ctx: &mut Context,
	relay_parent: Hash,
	para: ParaId,
) -> Result<Option<CommittedCandidateReceipt>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let (tx, rx) = oneshot::channel();
	ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
			relay_parent,
			RuntimeApiRequest::CandidatePendingAvailability(para, tx),
		)))
		.await?;
	rx.await?
		.map_err::<Error, _>(Into::into)
}

/// Query the validator set.
async fn query_validators<Context>(
	ctx: &mut Context,
	relay_parent: Hash,
) -> Result<Vec<ValidatorId>>
where
	Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
	let (tx, rx) = oneshot::channel();
	let query_validators = AllMessages::RuntimeApi(RuntimeApiMessage::Request(
		relay_parent,
		RuntimeApiRequest::Validators(tx),
	));

	ctx.send_message(query_validators)
		.await?;
	rx.await?
		.map_err::<Error, _>(Into::into)
}

/// Query the hash of the `K` ancestors
async fn query_k_ancestors<Context>(
	ctx: &mut Context,
	relay_parent: Hash,
	k: usize,
) -> Result<Vec<Hash>>