Newer
Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The availability distribution
//!
//! Transforms `AvailableData` into erasure chunks, which are distributed to peers
//! who are interested in the relevant candidates.
//! Gossip messages received from other peers are verified and gossiped to interested
//! peers. Verified in this context means, the erasure chunks contained merkle proof
//! is checked.
#![deny(unused_crate_dependencies, unused_qualifications)]
use parity_scale_codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt, TryFutureExt};
use sp_core::crypto::Public;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::{
v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, View,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk, Hash, HashT, Id as ParaId,
SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID, CandidateHash,
CandidateDescriptor,
};
use polkadot_subsystem::messages::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest,
errors::{ChainApiError, RuntimeApiError},
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
SubsystemContext, SubsystemError,
};
use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry;
Bastian Köcher
committed
#[cfg(test)]
mod tests;
const LOG_TARGET: &'static str = "availability_distribution";
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#[error("Response channel to obtain PendingAvailability failed")]
QueryPendingAvailabilityResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain PendingAvailability failed")]
QueryPendingAvailability(#[source] RuntimeApiError),
#[error("Response channel to obtain StoreChunk failed")]
StoreChunkResponseChannel(#[source] oneshot::Canceled),
#[error("Response channel to obtain QueryChunk failed")]
QueryChunkResponseChannel(#[source] oneshot::Canceled),
#[error("Response channel to obtain QueryAncestors failed")]
QueryAncestorsResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QueryAncestors failed")]
QueryAncestors(#[source] ChainApiError),
#[error("Response channel to obtain QuerySession failed")]
QuerySessionResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QuerySession failed")]
QuerySession(#[source] RuntimeApiError),
#[error("Response channel to obtain QueryValidators failed")]
QueryValidatorsResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QueryValidators failed")]
QueryValidators(#[source] RuntimeApiError),
#[error("Response channel to obtain AvailabilityCores failed")]
AvailabilityCoresResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain AvailabilityCores failed")]
AvailabilityCores(#[source] RuntimeApiError),
#[error("Response channel to obtain AvailabilityCores failed")]
QueryAvailabilityResponseChannel(#[source] oneshot::Canceled),
#[error("Receive channel closed")]
IncomingMessageChannel(#[source] SubsystemError),
}
type Result<T> = std::result::Result<T, Error>;
const COST_MERKLE_PROOF_INVALID: Rep = Rep::new(-100, "Merkle proof was invalid");
const COST_NOT_A_LIVE_CANDIDATE: Rep = Rep::new(-51, "Candidate is not live");
const COST_PEER_DUPLICATE_MESSAGE: Rep = Rep::new(-500, "Peer sent identical messages");
const BENEFIT_VALID_MESSAGE_FIRST: Rep = Rep::new(15, "Valid message with new information");
const BENEFIT_VALID_MESSAGE: Rep = Rep::new(10, "Valid message");
/// Checked signed availability bitfield that is distributed
/// to other peers.
#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq, Hash)]
pub struct AvailabilityGossipMessage {
/// Anchor hash of the candidate the `ErasureChunk` is associated to.
pub candidate_hash: CandidateHash,
/// The erasure chunk, a encoded information part of `AvailabilityData`.
pub erasure_chunk: ErasureChunk,
}
impl From<AvailabilityGossipMessage> for protocol_v1::AvailabilityDistributionMessage {
fn from(message: AvailabilityGossipMessage) -> Self {
Self::Chunk(message.candidate_hash, message.erasure_chunk)
}
}
/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Default, Clone, Debug)]
struct ProtocolState {
/// Track all active peers and their views
/// to determine what is relevant to them.
peer_views: HashMap<PeerId, View>,
/// Our own view.
view: View,
/// Caches a mapping of relay parents or ancestor to live candidate receipts.
/// Allows fast intersection of live candidates with views and consecutive unioning.
/// Maps relay parent / ancestor -> candidate receipts.
receipts: HashMap<Hash, HashSet<CandidateHash>>,
/// Track things needed to start and stop work on a particular relay parent.
per_relay_parent: HashMap<Hash, PerRelayParent>,
/// Track data that is specific to a candidate.
per_candidate: HashMap<CandidateHash, PerCandidate>,
}
#[derive(Debug, Clone, Default)]
struct PerCandidate {
/// A Candidate and a set of known erasure chunks in form of messages to be gossiped / distributed if the peer view wants that.
/// This is _across_ peers and not specific to a particular one.
/// candidate hash + erasure chunk index -> gossip message
message_vault: HashMap<u32, AvailabilityGossipMessage>,
/// Track received erasure chunk indices per peer.
received_messages: HashMap<PeerId, HashSet<ValidatorIndex>>,
/// Track sent erasure chunk indices per peer.
sent_messages: HashMap<PeerId, HashSet<ValidatorIndex>>,
/// The set of validators.
validators: Vec<ValidatorId>,
/// If this node is a validator, note the index in the validator set.
validator_index: Option<ValidatorIndex>,
/// The descriptor of this candidate.
descriptor: CandidateDescriptor,
/// The set of relay chain blocks this appears to be live in.
live_in: HashSet<Hash>,
Bastian Köcher
committed
impl PerCandidate {
/// Returns `true` iff the given `validator_index` is required by the given `peer`.
fn message_required_by_peer(&self, peer: &PeerId, validator_index: &ValidatorIndex) -> bool {
self.received_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true)
&& self.sent_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true)
Bastian Köcher
committed
}
}
#[derive(Debug, Clone, Default)]
struct PerRelayParent {
/// Set of `K` ancestors for this relay parent.
ancestors: Vec<Hash>,
/// Live candidates, according to this relay parent.
live_candidates: HashSet<CandidateHash>,
/// Unionize all live candidate hashes of the given relay parents and their recent
/// ancestors.
///
/// Ignores all non existent relay parents, so this can be used directly with a peers view.
/// Returns a set of candidate hashes.
#[tracing::instrument(level = "trace", skip(relay_parents), fields(subsystem = LOG_TARGET))]
fn cached_live_candidates_unioned<'a>(
&'a self,
relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
) -> HashSet<CandidateHash> {
relay_parents
.filter_map(|r| self.per_relay_parent.get(r))
.map(|per_relay_parent| per_relay_parent.live_candidates.iter().cloned())
#[tracing::instrument(level = "trace", skip(candidates), fields(subsystem = LOG_TARGET))]
fn add_relay_parent(
&mut self,
relay_parent: Hash,
validators: Vec<ValidatorId>,
validator_index: Option<ValidatorIndex>,
candidates: HashMap<CandidateHash, FetchedLiveCandidate>,
ancestors: Vec<Hash>,
) {
let candidate_hashes: Vec<_> = candidates.keys().cloned().collect();
// register the relation of relay_parent to candidate..
for (receipt_hash, fetched) in candidates {
let per_candidate = self.per_candidate.entry(receipt_hash).or_default();
// Cached candidates already have entries and thus don't need this
// information to be set.
if let FetchedLiveCandidate::Fresh(descriptor) = fetched {
per_candidate.validator_index = validator_index.clone();
per_candidate.validators = validators.clone();
per_candidate.descriptor = descriptor;
}
per_candidate.live_in.insert(relay_parent);
let per_relay_parent = self.per_relay_parent.entry(relay_parent).or_default();
per_relay_parent.ancestors = ancestors;
per_relay_parent.live_candidates.extend(candidate_hashes);
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn remove_relay_parent(&mut self, relay_parent: &Hash) {
if let Some(per_relay_parent) = self.per_relay_parent.remove(relay_parent) {
for candidate_hash in per_relay_parent.live_candidates {
// Prune the candidate if this was the last member of our view
// to consider it live (including its ancestors).
if let Entry::Occupied(mut occ) = self.per_candidate.entry(candidate_hash) {
occ.get_mut().live_in.remove(relay_parent);
if occ.get().live_in.is_empty() {
occ.remove();
// Removes all entries from receipts which aren't referenced in the ancestry of
// one of our live relay-chain heads.
fn clean_up_receipts_cache(&mut self) {
let extended_view: HashSet<_> = self.per_relay_parent.iter()
.map(|(r_hash, v)| v.ancestors.iter().cloned().chain(std::iter::once(*r_hash)))
.flatten()
.collect();
self.receipts.retain(|ancestor_hash, _| extended_view.contains(ancestor_hash));
}
}
/// Deal with network bridge updates and track what needs to be tracked
/// which depends on the message type received.
#[tracing::instrument(level = "trace", skip(ctx, keystore, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_network_msg<Context>(
ctx: &mut Context,
keystore: &SyncCryptoStorePtr,
bridge_message: NetworkBridgeEvent<protocol_v1::AvailabilityDistributionMessage>,
) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
match bridge_message {
NetworkBridgeEvent::PeerConnected(peerid, _role) => {
// insert if none already present
state.peer_views.entry(peerid).or_default();
}
NetworkBridgeEvent::PeerDisconnected(peerid) => {
// get rid of superfluous data
state.peer_views.remove(&peerid);
}
NetworkBridgeEvent::PeerViewChange(peerid, view) => {
handle_peer_view_change(ctx, state, peerid, view, metrics).await;
}
NetworkBridgeEvent::OurViewChange(view) => {
handle_our_view_change(ctx, keystore, state, view, metrics).await?;
NetworkBridgeEvent::PeerMessage(remote, msg) => {
let gossiped_availability = match msg {
protocol_v1::AvailabilityDistributionMessage::Chunk(candidate_hash, chunk) => {
AvailabilityGossipMessage {
candidate_hash,
erasure_chunk: chunk,
}
}
let mut _span = jaeger::hash_span(&gossiped_availability.candidate_hash.0, "availability-message-received");
process_incoming_peer_message(ctx, state, remote, gossiped_availability, metrics)
.await?;
}
}
Ok(())
}
/// Handle the changes necessary when our view changes.
#[tracing::instrument(level = "trace", skip(ctx, keystore, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_our_view_change<Context>(
ctx: &mut Context,
keystore: &SyncCryptoStorePtr,
state: &mut ProtocolState,
view: View,
) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let _timer = metrics.time_handle_our_view_change();
Bastian Köcher
committed
let old_view = std::mem::replace(&mut state.view, view);
// needed due to borrow rules
let view = state.view.clone();
// add all the relay parents and fill the cache
Bastian Köcher
committed
for added in view.difference(&old_view) {
let validators = query_validators(ctx, *added).await?;
let validator_index = obtain_our_validator_index(&validators, keystore.clone()).await;
let (candidates, ancestors)
= query_live_candidates(ctx, &mut state.receipts, *added).await?;
state.add_relay_parent(
*added,
validators,
validator_index,
candidates,
ancestors,
);
for candidate_hash in state.cached_live_candidates_unioned(view.difference(&old_view)) {
// If we are not a validator for this candidate, let's skip it.
if state.per_candidate.entry(candidate_hash).or_default().validator_index.is_none() {
continue
}
// check if the availability is present in the store exists
if !query_data_availability(ctx, candidate_hash).await? {
continue;
}
// obtain interested peers in the candidate hash
let peers: Vec<PeerId> = state
.peer_views
.clone()
.into_iter()
.filter(|(_peer, view)| {
// collect all direct interests of a peer w/o ancestors
state
.cached_live_candidates_unioned(view.heads.iter())
.contains(&candidate_hash)
})
.map(|(peer, _view)| peer.clone())
.collect();
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
let validator_count = per_candidate.validators.len();
// distribute all erasure messages to interested peers
for chunk_index in 0u32..(validator_count as u32) {
let message = if let Some(message) = per_candidate.message_vault.get(&chunk_index) {
tracing::trace!(
target: LOG_TARGET,
%chunk_index,
?candidate_hash,
"Retrieved chunk from message vault",
);
message.clone()
Bastian Köcher
committed
} else if let Some(erasure_chunk) = query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await? {
tracing::trace!(
target: LOG_TARGET,
%chunk_index,
?candidate_hash,
"Retrieved chunk from availability storage",
);
AvailabilityGossipMessage {
candidate_hash,
erasure_chunk,
}
tracing::error!(
target: LOG_TARGET,
%chunk_index,
?candidate_hash,
"Availability store reported that we have the availability data, but we could not retrieve a chunk of it!",
);
debug_assert_eq!(message.erasure_chunk.index, chunk_index);
.filter(|peer| per_candidate.message_required_by_peer(peer, &chunk_index))
Bastian Köcher
committed
.cloned()
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await;
}
}
// cleanup the removed relay parents and their states
old_view.difference(&view).for_each(|r| state.remove_relay_parent(r));
state.clean_up_receipts_cache();
Ok(())
#[tracing::instrument(level = "trace", skip(ctx, metrics, message_iter), fields(subsystem = LOG_TARGET))]
async fn send_tracked_gossip_messages_to_peers<Context>(
ctx: &mut Context,
per_candidate: &mut PerCandidate,
peers: Vec<PeerId>,
message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>,
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
for message in message_iter {
for peer in peers.iter() {
per_candidate
.sent_messages
.entry(peer.clone())
.or_default()
.insert(message.erasure_chunk.index);
}
per_candidate
.message_vault
.insert(message.erasure_chunk.index, message.clone());
if !peers.is_empty() {
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
protocol_v1::ValidationProtocol::AvailabilityDistribution(message.into()),
).into()).await;
metrics.on_chunk_distributed();
}
}
}
// Send the difference between two views which were not sent
// to that particular peer.
#[tracing::instrument(level = "trace", skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_peer_view_change<Context>(
ctx: &mut Context,
state: &mut ProtocolState,
origin: PeerId,
view: View,
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let current = state.peer_views.entry(origin.clone()).or_default();
let added: Vec<Hash> = view.difference(&*current).cloned().collect();
*current = view;
// only contains the intersection of what we are interested and
// the union of all relay parent's candidates.
let added_candidates = state.cached_live_candidates_unioned(added.iter());
// Send all messages we've seen before and the peer is now interested in.
for candidate_hash in added_candidates {
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
// obtain the relevant chunk indices not sent yet
let messages = ((0 as ValidatorIndex)..(per_candidate.validators.len() as ValidatorIndex))
.into_iter()
.filter_map(|erasure_chunk_index: ValidatorIndex| {
// try to pick up the message from the message vault
// so we send as much as we have
per_candidate
.message_vault
.get(&erasure_chunk_index)
.filter(|_| per_candidate.message_required_by_peer(&origin, &erasure_chunk_index))
})
.cloned()
.collect::<HashSet<_>>();
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![origin.clone()], messages).await;
}
}
/// Obtain the first key which has a signing key.
/// Returns the index within the validator set as `ValidatorIndex`, if there exists one,
/// otherwise, `None` is returned.
async fn obtain_our_validator_index(
keystore: SyncCryptoStorePtr,
for (idx, validator) in validators.iter().enumerate() {
if CryptoStore::has_keys(
&*keystore,
&[(validator.to_raw_vec(), PARACHAIN_KEY_TYPE_ID)],
)
.await
{
return Some(idx as ValidatorIndex);
}
None
}
/// Handle an incoming message from a peer.
#[tracing::instrument(level = "trace", skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
async fn process_incoming_peer_message<Context>(
ctx: &mut Context,
state: &mut ProtocolState,
origin: PeerId,
message: AvailabilityGossipMessage,
) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let _timer = metrics.time_process_incoming_peer_message();
// obtain the set of candidates we are interested in based on our current view
let live_candidates = state.cached_live_candidates_unioned(state.view.heads.iter());
// check if the candidate is of interest
let descriptor = if live_candidates.contains(&message.candidate_hash) {
state.per_candidate
.get(&message.candidate_hash)
.expect("All live candidates are contained in per_candidate; qed")
.descriptor
.clone()
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?message.candidate_hash,
peer = %origin,
"Peer send not live candidate",
);
modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await;
return Ok(())
// check the merkle proof against the erasure root in the candidate descriptor.
let anticipated_hash = match branch_hash(
&descriptor.erasure_root,
&message.erasure_chunk.proof,
message.erasure_chunk.index as usize,
) {
Ok(hash) => hash,
Err(e) => {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?message.candidate_hash,
peer = %origin,
error = ?e,
"Failed to calculate chunk merkle proof",
);
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
return Ok(());
},
};
let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk);
if anticipated_hash != erasure_chunk_hash {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?message.candidate_hash,
peer = %origin,
"Peer send chunk with invalid merkle proof",
);
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
return Ok(());
let erasure_chunk_index = &message.erasure_chunk.index;
let per_candidate = state.per_candidate.entry(message.candidate_hash).or_default();
// check if this particular erasure chunk was already sent by that peer before
{
let received_set = per_candidate
.received_messages
.entry(origin.clone())
.or_default();
if !received_set.insert(*erasure_chunk_index) {
modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
return Ok(());
}
}
// insert into known messages and change reputation
if per_candidate
.message_vault
.insert(*erasure_chunk_index, message.clone())
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await;
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await;
if Some(*erasure_chunk_index) == per_candidate.validator_index {
if store_chunk(
ctx,
message.candidate_hash,
descriptor.relay_parent,
message.erasure_chunk.index,
message.erasure_chunk.clone(),
).await?.is_err() {
tracing::warn!(
target: LOG_TARGET,
"Failed to store erasure chunk to availability store"
);
}
}
};
}
// condense the peers to the peers with interest on the candidate
let peers = state
.peer_views
.clone()
.into_iter()
.filter(|(_, view)| {
// peers view must contain the candidate hash too
state
.cached_live_candidates_unioned(view.heads.iter())
.contains(&message.candidate_hash)
})
.map(|(peer, _)| -> PeerId { peer.clone() })
.collect::<Vec<_>>();
let per_candidate = state.per_candidate.entry(message.candidate_hash).or_default();
let peers = peers
.into_iter()
.filter(|peer| per_candidate.message_required_by_peer(peer, erasure_chunk_index))
.collect::<Vec<_>>();
// gossip that message to interested peers
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await;
}
/// The bitfield distribution subsystem.
pub struct AvailabilityDistributionSubsystem {
/// Pointer to a keystore, which is required for determining this nodes validator index.
keystore: SyncCryptoStorePtr,
/// Prometheus metrics.
metrics: Metrics,
}
impl AvailabilityDistributionSubsystem {
/// Number of ancestors to keep around for the relay-chain heads.
const K: usize = 3;
/// Create a new instance of the availability distribution.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
}
/// Start processing work as passed on from the Overseer.
async fn run<Context>(self, ctx: Context) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let mut state = ProtocolState::default();
self.run_inner(ctx, &mut state).await
}
/// Start processing work.
#[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))]
async fn run_inner<Context>(self, mut ctx: Context, state: &mut ProtocolState) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
// work: process incoming messages from the overseer.
loop {
let message = ctx
.recv()
.await
.map_err(|e| Error::IncomingMessageChannel(e))?;
match message {
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::NetworkBridgeUpdateV1(event),
} => {
if let Err(e) = handle_network_msg(
&mut ctx,
&self.keystore.clone(),
state,
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to handle incoming network messages",
);
}
}
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: _,
deactivated: _,
})) => {
// handled at view change
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {}
FromOverseer::Signal(OverseerSignal::Conclude) => {
return Ok(());
}
}
}
}
}
impl<Context> Subsystem<Context> for AvailabilityDistributionSubsystem
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage> + Sync + Send,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self
.run(ctx)
.map_err(|e| SubsystemError::with_origin("availability-distribution", e))
.boxed();
SpawnedSubsystem {
name: "availability-distribution-subsystem",
// Metadata about a candidate that is part of the live_candidates set.
//
// Those which were not present in a cache are "fresh" and have their candidate descriptor attached. This
// information is propagated to the higher level where it can be used to create data entries. Cached candidates
// already have entries associated with them, and thus don't need this metadata to be fetched.
#[derive(Debug)]
enum FetchedLiveCandidate {
Cached,
Fresh(CandidateDescriptor),
}
/// Obtain all live candidates for all given `relay_blocks`.
///
/// This returns a set of all candidate hashes pending availability within the state
/// of the explicitly referenced relay heads.
///
/// This also queries the provided `receipts` cache before reaching into the
/// runtime and updates it with the information learned.
#[tracing::instrument(level = "trace", skip(ctx, relay_blocks, receipts), fields(subsystem = LOG_TARGET))]
async fn query_pending_availability_at<Context>(
relay_blocks: impl IntoIterator<Item = Hash>,
receipts: &mut HashMap<Hash, HashSet<CandidateHash>>,
) -> Result<HashMap<CandidateHash, FetchedLiveCandidate>>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let mut live_candidates = HashMap::new();
// fetch and fill out cache for each of these
for relay_parent in relay_blocks {
let receipts_for = match receipts.entry(relay_parent) {
Entry::Occupied(e) => {
live_candidates.extend(
e.get().iter().cloned().map(|c| (c, FetchedLiveCandidate::Cached))
);
continue
},
e => e.or_default(),
};
for para in query_para_ids(ctx, relay_parent).await? {
if let Some(ccr) = query_pending_availability(ctx, relay_parent, para).await? {
let receipt_hash = ccr.hash();
let descriptor = ccr.descriptor().clone();
// unfortunately we have no good way of telling the candidate was
// cached until now. But we don't clobber a `Cached` entry if there
// is one already.
live_candidates.entry(receipt_hash)
.or_insert(FetchedLiveCandidate::Fresh(descriptor));
receipts_for.insert(receipt_hash);
/// Obtain all live candidates under a particular relay head. This implicitly includes
/// `K` ancestors of the head, such that the candidates pending availability in all of
/// the states of the head and the ancestors are unioned together to produce the
/// return type of this function. Each candidate hash is paired.
/// This also updates all `receipts` cached by the protocol state and returns a list
/// of up to `K` ancestors of the relay-parent.
#[tracing::instrument(level = "trace", skip(ctx, receipts), fields(subsystem = LOG_TARGET))]
async fn query_live_candidates<Context>(
ctx: &mut Context,
receipts: &mut HashMap<Hash, HashSet<CandidateHash>>,
relay_parent: Hash,
) -> Result<(HashMap<CandidateHash, FetchedLiveCandidate>, Vec<Hash>)>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
// register one of relay parents (not the ancestors)
let ancestors = query_up_to_k_ancestors_in_same_session(
ctx,
relay_parent,
AvailabilityDistributionSubsystem::K,
)
.await?;
// query the ones that were not present in the receipts cache and add them
// to it.
let live_candidates = query_pending_availability_at(
ctx,
ancestors.iter().cloned().chain(std::iter::once(relay_parent)),
receipts,
).await?;
Ok((live_candidates, ancestors))
/// Query all para IDs that are occupied under a given relay-parent.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_para_ids<Context>(ctx: &mut Context, relay_parent: Hash) -> Result<Vec<ParaId>>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::AvailabilityCores(tx),
)))
let all_para_ids = rx
.await
.map_err(|e| Error::AvailabilityCoresResponseChannel(e))?
.map_err(|e| Error::AvailabilityCores(e))?;
let occupied_para_ids = all_para_ids
.into_iter()
.filter_map(|core_state| {
if let CoreState::Occupied(occupied) = core_state {
Some(occupied.para_id)
} else {
None
}
})
.collect();
Ok(occupied_para_ids)
}
/// Modify the reputation of a peer based on its behavior.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
tracing::trace!(
target: LOG_TARGET,
rep = ?rep,
peer_id = ?peer,
"Reputation change for peer",
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
}
/// Query the proof of validity for a particular candidate hash.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_data_availability<Context>(ctx: &mut Context, candidate_hash: CandidateHash) -> Result<bool>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx),
rx.await.map_err(|e| Error::QueryAvailabilityResponseChannel(e))
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_chunk<Context>(
ctx: &mut Context,
candidate_hash: CandidateHash,
validator_index: ValidatorIndex,
) -> Result<Option<ErasureChunk>>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx),
rx.await.map_err(|e| Error::QueryChunkResponseChannel(e))
#[tracing::instrument(level = "trace", skip(ctx, erasure_chunk), fields(subsystem = LOG_TARGET))]
async fn store_chunk<Context>(
ctx: &mut Context,
candidate_hash: CandidateHash,
validator_index: ValidatorIndex,
erasure_chunk: ErasureChunk,
) -> Result<std::result::Result<(), ()>>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
validator_index,
chunk: erasure_chunk,
tx,
}
)).await;
rx.await.map_err(|e| Error::StoreChunkResponseChannel(e))
}
/// Request the head data for a particular para.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_pending_availability<Context>(
ctx: &mut Context,
relay_parent: Hash,
para: ParaId,
) -> Result<Option<CommittedCandidateReceipt>>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CandidatePendingAvailability(para, tx),
rx.await
.map_err(|e| Error::QueryPendingAvailabilityResponseChannel(e))?
.map_err(|e| Error::QueryPendingAvailability(e))
}
/// Query the validator set.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_validators<Context>(
ctx: &mut Context,
relay_parent: Hash,
) -> Result<Vec<ValidatorId>>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
let query_validators = AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::Validators(tx),
));
ctx.send_message(query_validators)
rx.await
.map_err(|e| Error::QueryValidatorsResponseChannel(e))?
.map_err(|e| Error::QueryValidators(e))