// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! The availability distribution
//!
//! Transforms `AvailableData` into erasure chunks, which are distributed to peers
//! who are interested in the relevant candidates.
//! Gossip messages received from other peers are verified and gossiped to interested
//! peers. Verified in this context means, the erasure chunks contained merkle proof
//! is checked.
#![deny(unused_crate_dependencies, unused_qualifications)]
use parity_scale_codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt, TryFutureExt};
use sp_core::crypto::Public;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use log::{trace, warn};
use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::{
v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, View,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{
BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk, Hash, HashT, Id as ParaId,
SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID, CandidateHash,
};
use polkadot_subsystem::messages::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest,
};
use polkadot_subsystem::{
errors::{ChainApiError, RuntimeApiError},
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
SubsystemContext, SubsystemError,
};
use std::collections::{HashMap, HashSet};
use std::iter;
use thiserror::Error;
const TARGET: &'static str = "avad";
#[derive(Debug, Error)]
enum Error {
#[error("Sending PendingAvailability query failed")]
QueryPendingAvailabilitySendQuery(#[source] SubsystemError),
#[error("Response channel to obtain PendingAvailability failed")]
QueryPendingAvailabilityResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain PendingAvailability failed")]
QueryPendingAvailability(#[source] RuntimeApiError),
#[error("Sending StoreChunk query failed")]
StoreChunkSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain StoreChunk failed")]
StoreChunkResponseChannel(#[source] oneshot::Canceled),
#[error("Sending QueryChunk query failed")]
QueryChunkSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QueryChunk failed")]
QueryChunkResponseChannel(#[source] oneshot::Canceled),
#[error("Sending QueryAncestors query failed")]
QueryAncestorsSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QueryAncestors failed")]
QueryAncestorsResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QueryAncestors failed")]
QueryAncestors(#[source] ChainApiError),
#[error("Sending QuerySession query failed")]
QuerySessionSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QuerySession failed")]
QuerySessionResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QuerySession failed")]
QuerySession(#[source] RuntimeApiError),
#[error("Sending QueryValidators query failed")]
QueryValidatorsSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QueryValidators failed")]
QueryValidatorsResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QueryValidators failed")]
QueryValidators(#[source] RuntimeApiError),
#[error("Sending AvailabilityCores query failed")]
AvailabilityCoresSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain AvailabilityCores failed")]
AvailabilityCoresResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain AvailabilityCores failed")]
AvailabilityCores(#[source] RuntimeApiError),
#[error("Sending AvailabilityCores query failed")]
QueryAvailabilitySendQuery(#[source] SubsystemError),
#[error("Response channel to obtain AvailabilityCores failed")]
QueryAvailabilityResponseChannel(#[source] oneshot::Canceled),
#[error("Sending out a peer report message")]
ReportPeerMessageSend(#[source] SubsystemError),
#[error("Sending a gossip message")]
TrackedGossipMessage(#[source] SubsystemError),
#[error("Receive channel closed")]
IncomingMessageChannel(#[source] SubsystemError),
}
type Result = std::result::Result;
const COST_MERKLE_PROOF_INVALID: Rep = Rep::new(-100, "Merkle proof was invalid");
const COST_NOT_A_LIVE_CANDIDATE: Rep = Rep::new(-51, "Candidate is not live");
const COST_PEER_DUPLICATE_MESSAGE: Rep = Rep::new(-500, "Peer sent identical messages");
const BENEFIT_VALID_MESSAGE_FIRST: Rep = Rep::new(15, "Valid message with new information");
const BENEFIT_VALID_MESSAGE: Rep = Rep::new(10, "Valid message");
/// Checked signed availability bitfield that is distributed
/// to other peers.
#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq, Hash)]
pub struct AvailabilityGossipMessage {
/// Anchor hash of the candidate the `ErasureChunk` is associated to.
pub candidate_hash: CandidateHash,
/// The erasure chunk, a encoded information part of `AvailabilityData`.
pub erasure_chunk: ErasureChunk,
}
/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Default, Clone, Debug)]
struct ProtocolState {
/// Track all active peers and their views
/// to determine what is relevant to them.
peer_views: HashMap,
/// Our own view.
view: View,
/// Caches a mapping of relay parents or ancestor to live candidate receipts.
/// Allows fast intersection of live candidates with views and consecutive unioning.
/// Maps relay parent / ancestor -> live candidate receipts + its hash.
receipts: HashMap>,
/// Allow reverse caching of view checks.
/// Maps candidate hash -> relay parent for extracting meta information from `PerRelayParent`.
/// Note that the presence of this is not sufficient to determine if deletion is OK, i.e.
/// two histories could cover this.
reverse: HashMap,
/// Keeps track of which candidate receipts are required due to ancestors of which relay parents
/// of our view.
/// Maps ancestor -> relay parents in view
ancestry: HashMap>,
/// Track things needed to start and stop work on a particular relay parent.
per_relay_parent: HashMap,
/// Track data that is specific to a candidate.
per_candidate: HashMap,
}
#[derive(Debug, Clone, Default)]
struct PerCandidate {
/// A Candidate and a set of known erasure chunks in form of messages to be gossiped / distributed if the peer view wants that.
/// This is _across_ peers and not specific to a particular one.
/// candidate hash + erasure chunk index -> gossip message
message_vault: HashMap,
/// Track received candidate hashes and validator indices from peers.
received_messages: HashMap>,
/// Track already sent candidate hashes and the erasure chunk index to the peers.
sent_messages: HashMap>,
/// The set of validators.
validators: Vec,
/// If this node is a validator, note the index in the validator set.
validator_index: Option,
}
#[derive(Debug, Clone, Default)]
struct PerRelayParent {
/// Set of `K` ancestors for this relay parent.
ancestors: Vec,
}
impl ProtocolState {
/// Collects the relay_parents ancestors including the relay parents themselfes.
fn extend_with_ancestors<'a>(
&'a self,
relay_parents: impl IntoIterator + 'a,
) -> HashSet {
relay_parents
.into_iter()
.map(|relay_parent| {
self.per_relay_parent
.get(relay_parent)
.into_iter()
.map(|per_relay_parent| per_relay_parent.ancestors.iter().cloned())
.flatten()
.chain(iter::once(*relay_parent))
})
.flatten()
.collect::>()
}
/// Unionize all cached entries for the given relay parents and its ancestors.
/// Ignores all non existent relay parents, so this can be used directly with a peers view.
/// Returns a map from candidate hash -> receipt
fn cached_live_candidates_unioned<'a>(
&'a self,
relay_parents: impl IntoIterator + 'a,
) -> HashMap {
let relay_parents_and_ancestors = self.extend_with_ancestors(relay_parents);
relay_parents_and_ancestors
.into_iter()
.filter_map(|relay_parent_or_ancestor| self.receipts.get(&relay_parent_or_ancestor))
.map(|receipt_set| receipt_set.into_iter())
.flatten()
.map(|(receipt_hash, receipt)| (receipt_hash.clone(), receipt.clone()))
.collect()
}
async fn add_relay_parent(
&mut self,
ctx: &mut Context,
relay_parent: Hash,
validators: Vec,
validator_index: Option,
) -> Result<()>
where
Context: SubsystemContext,
{
let candidates = query_live_candidates(ctx, self, std::iter::once(relay_parent)).await?;
// register the relation of relay_parent to candidate..
// ..and the reverse association.
for (relay_parent_or_ancestor, (receipt_hash, receipt)) in candidates.clone() {
self.reverse
.insert(receipt_hash.clone(), relay_parent_or_ancestor.clone());
let per_candidate = self.per_candidate.entry(receipt_hash.clone()).or_default();
per_candidate.validator_index = validator_index.clone();
per_candidate.validators = validators.clone();
self.receipts
.entry(relay_parent_or_ancestor)
.or_default()
.insert((receipt_hash, receipt));
}
// collect the ancestors again from the hash map
let ancestors = candidates
.iter()
.filter_map(|(ancestor_or_relay_parent, _receipt)| {
if ancestor_or_relay_parent == &relay_parent {
None
} else {
Some(*ancestor_or_relay_parent)
}
})
.collect::>();
// mark all the ancestors as "needed" by this newly added relay parent
for ancestor in ancestors.iter() {
self.ancestry
.entry(ancestor.clone())
.or_default()
.insert(relay_parent);
}
self.per_relay_parent
.entry(relay_parent)
.or_default()
.ancestors = ancestors;
Ok(())
}
fn remove_relay_parent(&mut self, relay_parent: &Hash) -> Result<()> {
// we might be ancestor of some other relay_parent
if let Some(ref mut descendants) = self.ancestry.get_mut(relay_parent) {
// if we were the last user, and it is
// not explicitly set to be worked on by the overseer
if descendants.is_empty() {
// remove from the ancestry index
self.ancestry.remove(relay_parent);
// and also remove the actual receipt
if let Some(candidates) = self.receipts.remove(relay_parent) {
candidates.into_iter().for_each(|c| { self.per_candidate.remove(&c.0); });
}
}
}
if let Some(per_relay_parent) = self.per_relay_parent.remove(relay_parent) {
// remove all "references" from the hash maps and sets for all ancestors
for ancestor in per_relay_parent.ancestors {
// one of our decendants might be ancestor of some other relay_parent
if let Some(ref mut descendants) = self.ancestry.get_mut(&ancestor) {
// we do not need this descendant anymore
descendants.remove(&relay_parent);
// if we were the last user, and it is
// not explicitly set to be worked on by the overseer
if descendants.is_empty() && !self.per_relay_parent.contains_key(&ancestor) {
// remove from the ancestry index
self.ancestry.remove(&ancestor);
// and also remove the actual receipt
if let Some(candidates) = self.receipts.remove(&ancestor) {
candidates.into_iter().for_each(|c| { self.per_candidate.remove(&c.0); });
}
}
}
}
}
Ok(())
}
}
/// Deal with network bridge updates and track what needs to be tracked
/// which depends on the message type received.
async fn handle_network_msg(
ctx: &mut Context,
keystore: &SyncCryptoStorePtr,
state: &mut ProtocolState,
metrics: &Metrics,
bridge_message: NetworkBridgeEvent,
) -> Result<()>
where
Context: SubsystemContext,
{
match bridge_message {
NetworkBridgeEvent::PeerConnected(peerid, _role) => {
// insert if none already present
state.peer_views.entry(peerid).or_default();
}
NetworkBridgeEvent::PeerDisconnected(peerid) => {
// get rid of superfluous data
state.peer_views.remove(&peerid);
}
NetworkBridgeEvent::PeerViewChange(peerid, view) => {
handle_peer_view_change(ctx, state, peerid, view, metrics).await?;
}
NetworkBridgeEvent::OurViewChange(view) => {
handle_our_view_change(ctx, keystore, state, view, metrics).await?;
}
NetworkBridgeEvent::PeerMessage(remote, msg) => {
let gossiped_availability = match msg {
protocol_v1::AvailabilityDistributionMessage::Chunk(candidate_hash, chunk) => {
AvailabilityGossipMessage {
candidate_hash,
erasure_chunk: chunk,
}
}
};
process_incoming_peer_message(ctx, state, remote, gossiped_availability, metrics)
.await?;
}
}
Ok(())
}
/// Handle the changes necessary when our view changes.
async fn handle_our_view_change(
ctx: &mut Context,
keystore: &SyncCryptoStorePtr,
state: &mut ProtocolState,
view: View,
metrics: &Metrics,
) -> Result<()>
where
Context: SubsystemContext,
{
let old_view = std::mem::replace(&mut (state.view), view);
// needed due to borrow rules
let view = state.view.clone();
let added = view.difference(&old_view).collect::>();
// add all the relay parents and fill the cache
for added in added.iter() {
let added = **added;
let validators = query_validators(ctx, added).await?;
let validator_index = obtain_our_validator_index(&validators, keystore.clone()).await;
state
.add_relay_parent(ctx, added, validators, validator_index)
.await?;
}
// handle all candidates
for (candidate_hash, _receipt) in state.cached_live_candidates_unioned(added) {
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
// assure the node has the validator role
if per_candidate.validator_index.is_none() {
continue;
};
// check if the availability is present in the store exists
if !query_data_availability(ctx, candidate_hash).await? {
continue;
}
let validator_count = per_candidate.validators.len();
// obtain interested peers in the candidate hash
let peers: Vec = state
.peer_views
.clone()
.into_iter()
.filter(|(_peer, view)| {
// collect all direct interests of a peer w/o ancestors
state
.cached_live_candidates_unioned(view.0.iter())
.contains_key(&candidate_hash)
})
.map(|(peer, _view)| peer.clone())
.collect();
// distribute all erasure messages to interested peers
for chunk_index in 0u32..(validator_count as u32) {
// only the peers which did not receive this particular erasure chunk
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
// obtain the chunks from the cache, if not fallback
// and query the availability store
let message_id = (candidate_hash, chunk_index);
let erasure_chunk = if let Some(message) = per_candidate.message_vault.get(&chunk_index)
{
message.erasure_chunk.clone()
} else if let Some(erasure_chunk) =
query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await?
{
erasure_chunk
} else {
continue;
};
debug_assert_eq!(erasure_chunk.index, chunk_index);
let peers = peers
.iter()
.filter(|peer| {
// only pick those which were not sent before
!per_candidate
.sent_messages
.get(*peer)
.filter(|set| set.contains(&message_id))
.is_some()
})
.map(|peer| peer.clone())
.collect::>();
let message = AvailabilityGossipMessage {
candidate_hash,
erasure_chunk,
};
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message)
.await?;
}
}
// cleanup the removed relay parents and their states
let removed = old_view.difference(&view).collect::>();
for removed in removed {
state.remove_relay_parent(&removed)?;
}
Ok(())
}
#[inline(always)]
async fn send_tracked_gossip_message_to_peers(
ctx: &mut Context,
per_candidate: &mut PerCandidate,
metrics: &Metrics,
peers: Vec,
message: AvailabilityGossipMessage,
) -> Result<()>
where
Context: SubsystemContext,
{
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message))
.await
}
#[inline(always)]
async fn send_tracked_gossip_messages_to_peer(
ctx: &mut Context,
per_candidate: &mut PerCandidate,
metrics: &Metrics,
peer: PeerId,
message_iter: impl IntoIterator,
) -> Result<()>
where
Context: SubsystemContext,
{
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![peer], message_iter)
.await
}
async fn send_tracked_gossip_messages_to_peers(
ctx: &mut Context,
per_candidate: &mut PerCandidate,
metrics: &Metrics,
peers: Vec,
message_iter: impl IntoIterator,
) -> Result<()>
where
Context: SubsystemContext,
{
if peers.is_empty() {
return Ok(());
}
for message in message_iter {
for peer in peers.iter() {
let message_id = (message.candidate_hash, message.erasure_chunk.index);
per_candidate
.sent_messages
.entry(peer.clone())
.or_default()
.insert(message_id);
}
per_candidate
.message_vault
.insert(message.erasure_chunk.index, message.clone());
let wire_message = protocol_v1::AvailabilityDistributionMessage::Chunk(
message.candidate_hash,
message.erasure_chunk,
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
peers.clone(),
protocol_v1::ValidationProtocol::AvailabilityDistribution(wire_message),
),
))
.await
.map_err(|e| Error::TrackedGossipMessage(e))?;
metrics.on_chunk_distributed();
}
Ok(())
}
// Send the difference between two views which were not sent
// to that particular peer.
async fn handle_peer_view_change(
ctx: &mut Context,
state: &mut ProtocolState,
origin: PeerId,
view: View,
metrics: &Metrics,
) -> Result<()>
where
Context: SubsystemContext,
{
let current = state.peer_views.entry(origin.clone()).or_default();
let added: Vec = view.difference(&*current).cloned().collect();
*current = view;
// only contains the intersection of what we are interested and
// the union of all relay parent's candidates.
let added_candidates = state.cached_live_candidates_unioned(added.iter());
// Send all messages we've seen before and the peer is now interested
// in to that peer.
for (candidate_hash, _receipt) in added_candidates {
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
// obtain the relevant chunk indices not sent yet
let messages = ((0 as ValidatorIndex)..(per_candidate.validators.len() as ValidatorIndex))
.into_iter()
.filter_map(|erasure_chunk_index: ValidatorIndex| {
let message_id = (candidate_hash, erasure_chunk_index);
// try to pick up the message from the message vault
// so we send as much as we have
per_candidate
.message_vault
.get(&erasure_chunk_index)
.filter(|_| {
// check if that erasure chunk was already sent before
if let Some(sent_set) = per_candidate.sent_messages.get(&origin) {
if sent_set.contains(&message_id) {
return false;
}
}
true
})
})
.cloned()
.collect::>();
send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages)
.await?;
}
Ok(())
}
/// Obtain the first key which has a signing key.
/// Returns the index within the validator set as `ValidatorIndex`, if there exists one,
/// otherwise, `None` is returned.
async fn obtain_our_validator_index(
validators: &[ValidatorId],
keystore: SyncCryptoStorePtr,
) -> Option {
for (idx, validator) in validators.iter().enumerate() {
if CryptoStore::has_keys(
&*keystore,
&[(validator.to_raw_vec(), PARACHAIN_KEY_TYPE_ID)],
)
.await
{
return Some(idx as ValidatorIndex);
}
}
None
}
/// Handle an incoming message from a peer.
async fn process_incoming_peer_message(
ctx: &mut Context,
state: &mut ProtocolState,
origin: PeerId,
message: AvailabilityGossipMessage,
metrics: &Metrics,
) -> Result<()>
where
Context: SubsystemContext,
{
// obtain the set of candidates we are interested in based on our current view
let live_candidates = state.cached_live_candidates_unioned(state.view.0.iter());
// check if the candidate is of interest
let live_candidate = if let Some(live_candidate) = live_candidates.get(&message.candidate_hash) {
live_candidate
} else {
return modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await;
};
// check the merkle proof
let root = &live_candidate.commitments.erasure_root;
let anticipated_hash = if let Ok(hash) = branch_hash(
root,
&message.erasure_chunk.proof,
message.erasure_chunk.index as usize,
) {
hash
} else {
return modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
};
let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk);
if anticipated_hash != erasure_chunk_hash {
return modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
}
// an internal unique identifier of this message
let message_id = (message.candidate_hash, message.erasure_chunk.index);
{
let per_candidate = state.per_candidate.entry(message_id.0.clone()).or_default();
// check if this particular erasure chunk was already sent by that peer before
{
let received_set = per_candidate
.received_messages
.entry(origin.clone())
.or_default();
if received_set.contains(&message_id) {
return modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
} else {
received_set.insert(message_id.clone());
}
}
// insert into known messages and change reputation
if per_candidate
.message_vault
.insert(message_id.1, message.clone())
.is_some()
{
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await?;
} else {
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await?;
// save the chunk for our index
if let Some(validator_index) = per_candidate.validator_index {
if message.erasure_chunk.index == validator_index {
if let Err(_e) = store_chunk(
ctx,
message.candidate_hash.clone(),
live_candidate.descriptor.relay_parent.clone(),
message.erasure_chunk.index,
message.erasure_chunk.clone(),
)
.await?
{
warn!(
target: TARGET,
"Failed to store erasure chunk to availability store"
);
}
}
}
};
}
// condense the peers to the peers with interest on the candidate
let peers = state
.peer_views
.clone()
.into_iter()
.filter(|(_peer, view)| {
// peers view must contain the candidate hash too
state
.cached_live_candidates_unioned(view.0.iter())
.contains_key(&message_id.0)
})
.map(|(peer, _)| -> PeerId { peer.clone() })
.collect::>();
let per_candidate = state.per_candidate.entry(message_id.0.clone()).or_default();
let peers = peers
.into_iter()
.filter(|peer| {
let peer: PeerId = peer.clone();
// avoid sending duplicate messages
per_candidate
.sent_messages
.entry(peer)
.or_default()
.contains(&message_id)
})
.collect::>();
// gossip that message to interested peers
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await
}
/// The bitfield distribution subsystem.
pub struct AvailabilityDistributionSubsystem {
/// Pointer to a keystore, which is required for determining this nodes validator index.
keystore: SyncCryptoStorePtr,
/// Prometheus metrics.
metrics: Metrics,
}
impl AvailabilityDistributionSubsystem {
/// Number of ancestors to keep around for the relay-chain heads.
const K: usize = 3;
/// Create a new instance of the availability distribution.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
Self { keystore, metrics }
}
/// Start processing work as passed on from the Overseer.
async fn run(self, mut ctx: Context) -> Result<()>
where
Context: SubsystemContext,
{
// work: process incoming messages from the overseer.
let mut state = ProtocolState::default();
loop {
let message = ctx
.recv()
.await
.map_err(|e| Error::IncomingMessageChannel(e))?;
match message {
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::NetworkBridgeUpdateV1(event),
} => {
if let Err(e) = handle_network_msg(
&mut ctx,
&self.keystore.clone(),
&mut state,
&self.metrics,
event,
)
.await
{
warn!(
target: TARGET,
"Failed to handle incoming network messages: {:?}", e
);
}
}
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: _,
deactivated: _,
})) => {
// handled at view change
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {}
FromOverseer::Signal(OverseerSignal::Conclude) => {
return Ok(());
}
}
}
}
}
impl Subsystem for AvailabilityDistributionSubsystem
where
Context: SubsystemContext + Sync + Send,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self
.run(ctx)
.map_err(|e| SubsystemError::with_origin("availability-distribution", e))
.boxed();
SpawnedSubsystem {
name: "availability-distribution-subsystem",
future,
}
}
}
/// Obtain all live candidates based on an iterator of relay heads.
async fn query_live_candidates_without_ancestors(
ctx: &mut Context,
relay_parents: impl IntoIterator,
) -> Result>
where
Context: SubsystemContext,
{
let iter = relay_parents.into_iter();
let hint = iter.size_hint();
let mut live_candidates = HashSet::with_capacity(hint.1.unwrap_or(hint.0));
for relay_parent in iter {
let paras = query_para_ids(ctx, relay_parent).await?;
for para in paras {
if let Some(ccr) = query_pending_availability(ctx, relay_parent, para).await? {
live_candidates.insert(ccr);
}
}
}
Ok(live_candidates)
}
/// Obtain all live candidates based on an iterator or relay heads including `k` ancestors.
///
/// Relay parent.
async fn query_live_candidates(
ctx: &mut Context,
state: &mut ProtocolState,
relay_parents: impl IntoIterator,
) -> Result>
where
Context: SubsystemContext,
{
let iter = relay_parents.into_iter();
let hint = iter.size_hint();
let capacity = hint.1.unwrap_or(hint.0) * (1 + AvailabilityDistributionSubsystem::K);
let mut live_candidates =
HashMap::::with_capacity(capacity);
for relay_parent in iter {
// register one of relay parents (not the ancestors)
let mut ancestors = query_up_to_k_ancestors_in_same_session(
ctx,
relay_parent,
AvailabilityDistributionSubsystem::K,
)
.await?;
ancestors.push(relay_parent);
// ancestors might overlap, so check the cache too
let unknown = ancestors
.into_iter()
.filter(|relay_parent_or_ancestor| {
// use the ones which we pulled before
// but keep the unknown relay parents
state
.receipts
.get(relay_parent_or_ancestor)
.and_then(|receipts| {
// directly extend the live_candidates with the cached value
live_candidates.extend(receipts.into_iter().map(
|(receipt_hash, receipt)| {
(relay_parent, (receipt_hash.clone(), receipt.clone()))
},
));
Some(())
})
.is_none()
})
.collect::>();
// query the ones that were not present in the receipts cache
let receipts = query_live_candidates_without_ancestors(ctx, unknown.clone()).await?;
live_candidates.extend(
unknown.into_iter().zip(
receipts
.into_iter()
.map(|receipt| (receipt.hash(), receipt)),
),
);
}
Ok(live_candidates)
}
/// Query all para IDs.
async fn query_para_ids(ctx: &mut Context, relay_parent: Hash) -> Result>
where
Context: SubsystemContext,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::AvailabilityCores(tx),
)))
.await
.map_err(|e| Error::AvailabilityCoresSendQuery(e))?;
let all_para_ids: Vec<_> = rx
.await
.map_err(|e| Error::AvailabilityCoresResponseChannel(e))?
.map_err(|e| Error::AvailabilityCores(e))?;
let occupied_para_ids = all_para_ids
.into_iter()
.filter_map(|core_state| {
if let CoreState::Occupied(occupied) = core_state {
Some(occupied.para_id)
} else {
None
}
})
.collect();
Ok(occupied_para_ids)
}
/// Modify the reputation of a peer based on its behavior.
async fn modify_reputation(ctx: &mut Context, peer: PeerId, rep: Rep) -> Result<()>
where
Context: SubsystemContext,
{
trace!(
target: TARGET,
"Reputation change of {:?} for peer {:?}",
rep,
peer
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
))
.await
.map_err(|e| Error::ReportPeerMessageSend(e))
}
/// Query the proof of validity for a particular candidate hash.
async fn query_data_availability(ctx: &mut Context, candidate_hash: CandidateHash) -> Result
where
Context: SubsystemContext,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx),
))
.await
.map_err(|e| Error::QueryAvailabilitySendQuery(e))?;
rx.await
.map_err(|e| Error::QueryAvailabilityResponseChannel(e))
}
async fn query_chunk(
ctx: &mut Context,
candidate_hash: CandidateHash,
validator_index: ValidatorIndex,
) -> Result