// Copyright 2020 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! The availability distribution //! //! Transforms `AvailableData` into erasure chunks, which are distributed to peers //! who are interested in the relevant candidates. //! Gossip messages received from other peers are verified and gossiped to interested //! peers. Verified in this context means, the erasure chunks contained merkle proof //! is checked. use codec::{Decode, Encode}; use futures::{channel::oneshot, FutureExt}; use keystore::KeyStorePtr; use sp_core::{ crypto::Public, traits::BareCryptoStore, }; use sc_keystore as keystore; use log::{trace, warn}; use polkadot_erasure_coding::branch_hash; use polkadot_primitives::v1::{ PARACHAIN_KEY_TYPE_ID, BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk, Hash as Hash, HashT, Id as ParaId, ValidatorId, ValidatorIndex, SessionIndex, }; use polkadot_subsystem::messages::{ AllMessages, AvailabilityDistributionMessage, NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest, AvailabilityStoreMessage, ChainApiMessage, }; use polkadot_subsystem::{ errors::{ChainApiError, RuntimeApiError}, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, }; use polkadot_node_subsystem_util::{ metrics::{self, prometheus}, }; use polkadot_node_network_protocol::{ v1 as protocol_v1, View, ReputationChange as Rep, PeerId, NetworkBridgeEvent, }; use std::collections::{HashMap, HashSet}; use std::io; use std::iter; const TARGET: &'static str = "avad"; #[derive(Debug, derive_more::From)] enum Error { #[from] Erasure(polkadot_erasure_coding::Error), #[from] Io(io::Error), #[from] Oneshot(oneshot::Canceled), #[from] Subsystem(SubsystemError), #[from] RuntimeApi(RuntimeApiError), #[from] ChainApi(ChainApiError), } type Result = std::result::Result; const COST_MERKLE_PROOF_INVALID: Rep = Rep::new(-100, "Merkle proof was invalid"); const COST_NOT_A_LIVE_CANDIDATE: Rep = Rep::new(-51, "Candidate is not live"); const COST_PEER_DUPLICATE_MESSAGE: Rep = Rep::new(-500, "Peer sent identical messages"); const BENEFIT_VALID_MESSAGE_FIRST: Rep = Rep::new(15, "Valid message with new information"); const BENEFIT_VALID_MESSAGE: Rep = Rep::new(10, "Valid message"); /// Checked signed availability bitfield that is distributed /// to other peers. #[derive(Encode, Decode, Debug, Clone, PartialEq, Eq, Hash)] pub struct AvailabilityGossipMessage { /// Anchor hash of the candidate the `ErasureChunk` is associated to. pub candidate_hash: Hash, /// The erasure chunk, a encoded information part of `AvailabilityData`. pub erasure_chunk: ErasureChunk, } /// Data used to track information of peers and relay parents the /// overseer ordered us to work on. #[derive(Default, Clone, Debug)] struct ProtocolState { /// Track all active peers and their views /// to determine what is relevant to them. peer_views: HashMap, /// Our own view. view: View, /// Caches a mapping of relay parents or ancestor to live candidate receipts. /// Allows fast intersection of live candidates with views and consecutive unioning. /// Maps relay parent / ancestor -> live candidate receipts + its hash. receipts: HashMap>, /// Allow reverse caching of view checks. /// Maps candidate hash -> relay parent for extracting meta information from `PerRelayParent`. /// Note that the presence of this is not sufficient to determine if deletion is OK, i.e. /// two histories could cover this. reverse: HashMap, /// Keeps track of which candidate receipts are required due to ancestors of which relay parents /// of our view. /// Maps ancestor -> relay parents in view ancestry: HashMap>, /// Track things needed to start and stop work on a particular relay parent. per_relay_parent: HashMap, /// Track data that is specific to a candidate. per_candidate: HashMap, } #[derive(Debug, Clone, Default)] struct PerCandidate { /// A Candidate and a set of known erasure chunks in form of messages to be gossiped / distributed if the peer view wants that. /// This is _across_ peers and not specific to a particular one. /// candidate hash + erasure chunk index -> gossip message message_vault: HashMap, /// Track received candidate hashes and chunk indices from peers. received_messages: HashMap>, /// Track already sent candidate hashes and the erasure chunk index to the peers. sent_messages: HashMap>, /// The set of validators. validators: Vec, /// If this node is a validator, note the index in the validator set. validator_index: Option, } #[derive(Debug, Clone, Default)] struct PerRelayParent { /// Set of `K` ancestors for this relay parent. ancestors: Vec, } impl ProtocolState { /// Collects the relay_parents ancestors including the relay parents themselfes. fn extend_with_ancestors<'a>( &'a self, relay_parents: impl IntoIterator + 'a, ) -> HashSet { relay_parents .into_iter() .map(|relay_parent| { self.per_relay_parent .get(relay_parent) .into_iter() .map(|per_relay_parent| per_relay_parent.ancestors.iter().cloned()) .flatten() .chain(iter::once(*relay_parent)) }) .flatten() .collect::>() } /// Unionize all cached entries for the given relay parents and its ancestors. /// Ignores all non existent relay parents, so this can be used directly with a peers view. /// Returns a map from candidate hash -> receipt fn cached_live_candidates_unioned<'a>( &'a self, relay_parents: impl IntoIterator + 'a, ) -> HashMap { let relay_parents_and_ancestors = self.extend_with_ancestors(relay_parents); relay_parents_and_ancestors .into_iter() .filter_map(|relay_parent_or_ancestor| self.receipts.get(&relay_parent_or_ancestor)) .map(|receipt_set| receipt_set.into_iter()) .flatten() .map(|(receipt_hash, receipt)| (receipt_hash.clone(), receipt.clone())) .collect::>() } async fn add_relay_parent( &mut self, ctx: &mut Context, relay_parent: Hash, validators: Vec, validator_index: Option, ) -> Result<()> where Context: SubsystemContext, { let candidates = query_live_candidates(ctx, self, std::iter::once(relay_parent)).await?; // register the relation of relay_parent to candidate.. // ..and the reverse association. for (relay_parent_or_ancestor, (receipt_hash, receipt)) in candidates.clone() { self .reverse .insert(receipt_hash.clone(), relay_parent_or_ancestor.clone()); let per_candidate = self.per_candidate.entry(receipt_hash.clone()) .or_default(); per_candidate.validator_index = validator_index.clone(); per_candidate.validators = validators.clone(); self .receipts .entry(relay_parent_or_ancestor) .or_default() .insert((receipt_hash, receipt)); } // collect the ancestors again from the hash map let ancestors = candidates .iter() .filter_map(|(ancestor_or_relay_parent, _receipt)| { if ancestor_or_relay_parent == &relay_parent { None } else { Some(*ancestor_or_relay_parent) } }) .collect::>(); // mark all the ancestors as "needed" by this newly added relay parent for ancestor in ancestors.iter() { self.ancestry .entry(ancestor.clone()) .or_default() .insert(relay_parent); } self .per_relay_parent .entry(relay_parent) .or_default() .ancestors = ancestors; Ok(()) } fn remove_relay_parent(&mut self, relay_parent: &Hash) -> Result<()> { // we might be ancestor of some other relay_parent if let Some(ref mut descendants) = self.ancestry.get_mut(relay_parent) { // if we were the last user, and it is // not explicitly set to be worked on by the overseer if descendants.is_empty() { // remove from the ancestry index self.ancestry.remove(relay_parent); // and also remove the actual receipt self.receipts.remove(relay_parent); self.per_candidate.remove(relay_parent); } } if let Some(per_relay_parent) = self.per_relay_parent.remove(relay_parent) { // remove all "references" from the hash maps and sets for all ancestors for ancestor in per_relay_parent.ancestors { // one of our decendants might be ancestor of some other relay_parent if let Some(ref mut descendants) = self.ancestry.get_mut(&ancestor) { // we do not need this descendant anymore descendants.remove(&relay_parent); // if we were the last user, and it is // not explicitly set to be worked on by the overseer if descendants.is_empty() && !self.per_relay_parent.contains_key(&ancestor) { // remove from the ancestry index self.ancestry.remove(&ancestor); // and also remove the actual receipt self.receipts.remove(&ancestor); self.per_candidate.remove(&ancestor); } } } } Ok(()) } } /// Deal with network bridge updates and track what needs to be tracked /// which depends on the message type received. async fn handle_network_msg( ctx: &mut Context, keystore: KeyStorePtr, state: &mut ProtocolState, metrics: &Metrics, bridge_message: NetworkBridgeEvent, ) -> Result<()> where Context: SubsystemContext, { match bridge_message { NetworkBridgeEvent::PeerConnected(peerid, _role) => { // insert if none already present state.peer_views.entry(peerid).or_default(); } NetworkBridgeEvent::PeerDisconnected(peerid) => { // get rid of superfluous data state.peer_views.remove(&peerid); } NetworkBridgeEvent::PeerViewChange(peerid, view) => { handle_peer_view_change(ctx, state, peerid, view, metrics).await?; } NetworkBridgeEvent::OurViewChange(view) => { handle_our_view_change(ctx, keystore, state, view, metrics).await?; } NetworkBridgeEvent::PeerMessage(remote, msg) => { let gossiped_availability = match msg { protocol_v1::AvailabilityDistributionMessage::Chunk(candidate_hash, chunk) => AvailabilityGossipMessage { candidate_hash, erasure_chunk: chunk } }; process_incoming_peer_message(ctx, state, remote, gossiped_availability, metrics).await?; } } Ok(()) } /// Handle the changes necessary when our view changes. async fn handle_our_view_change( ctx: &mut Context, keystore: KeyStorePtr, state: &mut ProtocolState, view: View, metrics: &Metrics, ) -> Result<()> where Context: SubsystemContext, { let old_view = std::mem::replace(&mut (state.view), view); // needed due to borrow rules let view = state.view.clone(); let added = view.difference(&old_view).collect::>(); // add all the relay parents and fill the cache for added in added.iter() { let added = **added; let validators = query_validators(ctx, added).await?; let validator_index = obtain_our_validator_index( &validators, keystore.clone(), ); state.add_relay_parent(ctx, added, validators, validator_index).await?; } // handle all candidates for (candidate_hash, _receipt) in state.cached_live_candidates_unioned(added) { let per_candidate = state .per_candidate .entry(candidate_hash) .or_default(); // assure the node has the validator role if per_candidate.validator_index.is_none() { continue; }; // check if the availability is present in the store exists if !query_data_availability(ctx, candidate_hash).await? { continue; } let validator_count = per_candidate.validators.len(); // obtain interested peers in the candidate hash let peers: Vec = state .peer_views .clone() .into_iter() .filter(|(_peer, view)| { // collect all direct interests of a peer w/o ancestors state .cached_live_candidates_unioned(view.0.iter()) .contains_key(&candidate_hash) }) .map(|(peer, _view)| peer.clone()) .collect(); // distribute all erasure messages to interested peers for chunk_index in 0u32..(validator_count as u32) { // only the peers which did not receive this particular erasure chunk let per_candidate = state .per_candidate .entry(candidate_hash) .or_default(); // obtain the chunks from the cache, if not fallback // and query the availability store let message_id = (candidate_hash, chunk_index); let erasure_chunk = if let Some(message) = per_candidate.message_vault.get(&chunk_index) { message.erasure_chunk.clone() } else if let Some(erasure_chunk) = query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await? { erasure_chunk } else { continue; }; debug_assert_eq!(erasure_chunk.index, chunk_index); let peers = peers .iter() .filter(|peer| { // only pick those which were not sent before !per_candidate .sent_messages .get(*peer) .filter(|set| { set.contains(&message_id) }) .is_some() }) .map(|peer| peer.clone()) .collect::>(); let message = AvailabilityGossipMessage { candidate_hash, erasure_chunk, }; send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await?; } } // cleanup the removed relay parents and their states let removed = old_view.difference(&view).collect::>(); for removed in removed { state.remove_relay_parent(&removed)?; } Ok(()) } #[inline(always)] async fn send_tracked_gossip_message_to_peers( ctx: &mut Context, per_candidate: &mut PerCandidate, metrics: &Metrics, peers: Vec, message: AvailabilityGossipMessage, ) -> Result<()> where Context: SubsystemContext, { send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await } #[inline(always)] async fn send_tracked_gossip_messages_to_peer( ctx: &mut Context, per_candidate: &mut PerCandidate, metrics: &Metrics, peer: PeerId, message_iter: impl IntoIterator, ) -> Result<()> where Context: SubsystemContext, { send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![peer], message_iter).await } async fn send_tracked_gossip_messages_to_peers( ctx: &mut Context, per_candidate: &mut PerCandidate, metrics: &Metrics, peers: Vec, message_iter: impl IntoIterator, ) -> Result<()> where Context: SubsystemContext, { if peers.is_empty() { return Ok(()) } for message in message_iter { for peer in peers.iter() { let message_id = (message.candidate_hash, message.erasure_chunk.index); per_candidate .sent_messages .entry(peer.clone()) .or_default() .insert(message_id); } per_candidate .message_vault .insert(message.erasure_chunk.index, message.clone()); let wire_message = protocol_v1::AvailabilityDistributionMessage::Chunk( message.candidate_hash, message.erasure_chunk, ); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage( peers.clone(), protocol_v1::ValidationProtocol::AvailabilityDistribution(wire_message), ), )) .await .map_err::(Into::into)?; metrics.on_chunk_distributed(); } Ok(()) } // Send the difference between two views which were not sent // to that particular peer. async fn handle_peer_view_change( ctx: &mut Context, state: &mut ProtocolState, origin: PeerId, view: View, metrics: &Metrics, ) -> Result<()> where Context: SubsystemContext, { let current = state.peer_views.entry(origin.clone()).or_default(); let added: Vec = view.difference(&*current).cloned().collect(); *current = view; // only contains the intersection of what we are interested and // the union of all relay parent's candidates. let added_candidates = state.cached_live_candidates_unioned(added.iter()); // Send all messages we've seen before and the peer is now interested // in to that peer. for (candidate_hash, _receipt) in added_candidates { let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); // obtain the relevant chunk indices not sent yet let messages = ((0 as ValidatorIndex) ..(per_candidate.validators.len() as ValidatorIndex)) .into_iter() .filter_map(|erasure_chunk_index: ValidatorIndex| { let message_id = (candidate_hash, erasure_chunk_index); // try to pick up the message from the message vault // so we send as much as we have per_candidate .message_vault .get(&erasure_chunk_index) .filter(|_| { // check if that erasure chunk was already sent before if let Some(sent_set) = per_candidate.sent_messages.get(&origin) { if sent_set.contains(&message_id) { return false; } } true }) }) .cloned() .collect::>(); send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages).await?; } Ok(()) } /// Obtain the first key which has a signing key. /// Returns the index within the validator set as `ValidatorIndex`, if there exists one, /// otherwise, `None` is returned. fn obtain_our_validator_index( validators: &[ValidatorId], keystore: KeyStorePtr, ) -> Option { let keystore = keystore.read(); validators.iter().enumerate().find_map(|(idx, validator)| { if keystore.has_keys(&[(validator.to_raw_vec(), PARACHAIN_KEY_TYPE_ID)]) { Some(idx as ValidatorIndex) } else { None } }) } /// Handle an incoming message from a peer. async fn process_incoming_peer_message( ctx: &mut Context, state: &mut ProtocolState, origin: PeerId, message: AvailabilityGossipMessage, metrics: &Metrics, ) -> Result<()> where Context: SubsystemContext, { // obtain the set of candidates we are interested in based on our current view let live_candidates = state.cached_live_candidates_unioned(state.view.0.iter()); // check if the candidate is of interest let live_candidate = if let Some(live_candidate) = live_candidates.get(&message.candidate_hash) { live_candidate } else { return modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await; }; // check the merkle proof let root = &live_candidate.commitments.erasure_root; let anticipated_hash = if let Ok(hash) = branch_hash( root, &message.erasure_chunk.proof, message.erasure_chunk.index as usize, ) { hash } else { return modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; }; let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk); if anticipated_hash != erasure_chunk_hash { return modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; } // an internal unique identifier of this message let message_id = (message.candidate_hash, message.erasure_chunk.index); { let per_candidate = state.per_candidate.entry(message_id.0.clone()).or_default(); // check if this particular erasure chunk was already sent by that peer before { let received_set = per_candidate .received_messages .entry(origin.clone()) .or_default(); if received_set.contains(&message_id) { return modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await; } else { received_set.insert(message_id.clone()); } } // insert into known messages and change reputation if per_candidate .message_vault .insert(message_id.1, message.clone()) .is_some() { modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await?; } else { modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await?; // save the chunk for our index if let Some(validator_index) = per_candidate.validator_index { if message.erasure_chunk.index == validator_index { if let Err(_e) = store_chunk( ctx, message.candidate_hash.clone(), message.erasure_chunk.index, message.erasure_chunk.clone(), ).await? { warn!(target: TARGET, "Failed to store erasure chunk to availability store"); } } } }; } // condense the peers to the peers with interest on the candidate let peers = state .peer_views .clone() .into_iter() .filter(|(_peer, view)| { // peers view must contain the candidate hash too state .cached_live_candidates_unioned(view.0.iter()) .contains_key(&message_id.0) }) .map(|(peer, _)| -> PeerId { peer.clone() }) .collect::>(); let per_candidate = state.per_candidate.entry(message_id.0.clone()).or_default(); let peers = peers .into_iter() .filter(|peer| { let peer: PeerId = peer.clone(); // avoid sending duplicate messages per_candidate .sent_messages .entry(peer) .or_default() .contains(&message_id) }) .collect::>(); // gossip that message to interested peers send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await } /// The bitfield distribution subsystem. pub struct AvailabilityDistributionSubsystem { /// Pointer to a keystore, which is required for determining this nodes validator index. keystore: KeyStorePtr, /// Prometheus metrics. metrics: Metrics, } impl AvailabilityDistributionSubsystem { /// Number of ancestors to keep around for the relay-chain heads. const K: usize = 3; /// Create a new instance of the availability distribution. pub fn new(keystore: KeyStorePtr, metrics: Metrics) -> Self { Self { keystore, metrics } } /// Start processing work as passed on from the Overseer. async fn run(self, mut ctx: Context) -> Result<()> where Context: SubsystemContext, { // work: process incoming messages from the overseer. let mut state = ProtocolState::default(); loop { let message = ctx.recv().await.map_err::(Into::into)?; match message { FromOverseer::Communication { msg: AvailabilityDistributionMessage::NetworkBridgeUpdateV1(event), } => { if let Err(e) = handle_network_msg( &mut ctx, self.keystore.clone(), &mut state, &self.metrics, event, ).await { warn!( target: TARGET, "Failed to handle incomming network messages: {:?}", e ); } } FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated: _, deactivated: _, })) => { // handled at view change } FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {} FromOverseer::Signal(OverseerSignal::Conclude) => { return Ok(()); } } } } } impl Subsystem for AvailabilityDistributionSubsystem where Context: SubsystemContext + Sync + Send, { fn start(self, ctx: Context) -> SpawnedSubsystem { SpawnedSubsystem { name: "availability-distribution-subsystem", future: Box::pin(async move { self.run(ctx) }.map(|_| ())), } } } /// Obtain all live candidates based on an iterator of relay heads. async fn query_live_candidates_without_ancestors( ctx: &mut Context, relay_parents: impl IntoIterator, ) -> Result> where Context: SubsystemContext, { let iter = relay_parents.into_iter(); let hint = iter.size_hint(); let mut live_candidates = HashSet::with_capacity(hint.1.unwrap_or(hint.0)); for relay_parent in iter { let paras = query_para_ids(ctx, relay_parent).await?; for para in paras { if let Some(ccr) = query_pending_availability(ctx, relay_parent, para).await? { live_candidates.insert(ccr); } } } Ok(live_candidates) } /// Obtain all live candidates based on an iterator or relay heads including `k` ancestors. /// /// Relay parent. async fn query_live_candidates( ctx: &mut Context, state: &mut ProtocolState, relay_parents: impl IntoIterator, ) -> Result> where Context: SubsystemContext, { let iter = relay_parents.into_iter(); let hint = iter.size_hint(); let capacity = hint.1.unwrap_or(hint.0) * (1 + AvailabilityDistributionSubsystem::K); let mut live_candidates = HashMap::::with_capacity(capacity); for relay_parent in iter { // register one of relay parents (not the ancestors) let mut ancestors = query_up_to_k_ancestors_in_same_session( ctx, relay_parent, AvailabilityDistributionSubsystem::K, ) .await?; ancestors.push(relay_parent); // ancestors might overlap, so check the cache too let unknown = ancestors .into_iter() .filter(|relay_parent_or_ancestor| { // use the ones which we pulled before // but keep the unknown relay parents state .receipts .get(relay_parent_or_ancestor) .and_then(|receipts| { // directly extend the live_candidates with the cached value live_candidates.extend(receipts.into_iter().map( |(receipt_hash, receipt)| { ( relay_parent, (receipt_hash.clone(), receipt.clone()), ) }, )); Some(()) }) .is_none() }) .collect::>(); // query the ones that were not present in the receipts cache let receipts = query_live_candidates_without_ancestors(ctx, unknown.clone()).await?; live_candidates.extend( unknown.into_iter().zip( receipts .into_iter() .map(|receipt| (receipt.hash(), receipt)), ), ); } Ok(live_candidates) } /// Query all para IDs. async fn query_para_ids(ctx: &mut Context, relay_parent: Hash) -> Result> where Context: SubsystemContext, { let (tx, rx) = oneshot::channel(); ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::AvailabilityCores(tx), ))) .await .map_err::(Into::into)?; let all_para_ids: Vec<_> = rx .await??; let occupied_para_ids = all_para_ids .into_iter() .filter_map(|core_state| { if let CoreState::Occupied(occupied) = core_state { Some(occupied.para_id) } else { None } }) .collect(); Ok(occupied_para_ids) } /// Modify the reputation of a peer based on its behavior. async fn modify_reputation(ctx: &mut Context, peer: PeerId, rep: Rep) -> Result<()> where Context: SubsystemContext, { trace!( target: TARGET, "Reputation change of {:?} for peer {:?}", rep, peer ); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep), )) .await .map_err::(Into::into) } /// Query the proof of validity for a particular candidate hash. async fn query_data_availability( ctx: &mut Context, candidate_hash: Hash, ) -> Result where Context: SubsystemContext, { let (tx, rx) = oneshot::channel(); ctx.send_message(AllMessages::AvailabilityStore( AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx), )) .await?; rx.await.map_err::(Into::into) } async fn query_chunk( ctx: &mut Context, candidate_hash: Hash, validator_index: ValidatorIndex, ) -> Result> where Context: SubsystemContext, { let (tx, rx) = oneshot::channel(); ctx.send_message(AllMessages::AvailabilityStore( AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx), )) .await?; rx.await.map_err::(Into::into) } async fn store_chunk( ctx: &mut Context, candidate_hash: Hash, validator_index: ValidatorIndex, erasure_chunk: ErasureChunk, ) -> Result> where Context: SubsystemContext, { let (tx, rx) = oneshot::channel(); ctx.send_message(AllMessages::AvailabilityStore( AvailabilityStoreMessage::StoreChunk(candidate_hash, validator_index, erasure_chunk, tx), )).await?; rx.await.map_err::(Into::into) } /// Request the head data for a particular para. async fn query_pending_availability( ctx: &mut Context, relay_parent: Hash, para: ParaId, ) -> Result> where Context: SubsystemContext, { let (tx, rx) = oneshot::channel(); ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::CandidatePendingAvailability(para, tx), ))) .await?; rx.await? .map_err::(Into::into) } /// Query the validator set. async fn query_validators( ctx: &mut Context, relay_parent: Hash, ) -> Result> where Context: SubsystemContext, { let (tx, rx) = oneshot::channel(); let query_validators = AllMessages::RuntimeApi(RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::Validators(tx), )); ctx.send_message(query_validators) .await?; rx.await? .map_err::(Into::into) } /// Query the hash of the `K` ancestors async fn query_k_ancestors( ctx: &mut Context, relay_parent: Hash, k: usize, ) -> Result> where Context: SubsystemContext, { let (tx, rx) = oneshot::channel(); let query_ancestors = AllMessages::ChainApi(ChainApiMessage::Ancestors { hash: relay_parent, k, response_channel: tx, }); ctx.send_message(query_ancestors) .await?; rx.await? .map_err::(Into::into) } /// Query the session index of a relay parent async fn query_session_index_for_child( ctx: &mut Context, relay_parent: Hash, ) -> Result where Context: SubsystemContext, { let (tx, rx) = oneshot::channel(); let query_session_idx_for_child = AllMessages::RuntimeApi(RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::SessionIndexForChild(tx), )); ctx.send_message(query_session_idx_for_child) .await?; rx.await? .map_err::(Into::into) } /// Queries up to k ancestors with the constraints of equiv session async fn query_up_to_k_ancestors_in_same_session( ctx: &mut Context, relay_parent: Hash, k: usize, ) -> Result> where Context: SubsystemContext, { // k + 1 since we always query the child's session index // ordering is [parent, grandparent, greatgrandparent, greatgreatgrandparent, ...] let ancestors = query_k_ancestors(ctx, relay_parent, k + 1).await?; let desired_session = query_session_index_for_child(ctx, relay_parent).await?; // we would only need `ancestors.len() - 1`, but the one extra could avoid a re-alloc // if the consumer wants to push the `relay_parent` onto it too and does not hurt otherwise let mut acc = Vec::with_capacity(ancestors.len()); // iterate from youngest to oldest let mut iter = ancestors.into_iter().peekable(); while let Some(ancestor) = iter.next() { if let Some(ancestor_parent) = iter.peek() { let session = query_session_index_for_child(ctx, *ancestor_parent).await?; if session != desired_session { break; } acc.push(ancestor); } else { // either ended up at genesis or the blocks were // already pruned break; } } debug_assert!(acc.len() <= k); Ok(acc) } #[derive(Clone)] struct MetricsInner { gossipped_availability_chunks: prometheus::Counter, } /// Availability Distribution metrics. #[derive(Default, Clone)] pub struct Metrics(Option); impl Metrics { fn on_chunk_distributed(&self) { if let Some(metrics) = &self.0 { metrics.gossipped_availability_chunks.inc(); } } } impl metrics::Metrics for Metrics { fn try_register(registry: &prometheus::Registry) -> std::result::Result { let metrics = MetricsInner { gossipped_availability_chunks: prometheus::register( prometheus::Counter::new( "parachain_gossipped_availability_chunks_total", "Number of availability chunks gossipped to other peers." )?, registry, )?, }; Ok(Metrics(Some(metrics))) } } #[cfg(test)] mod tests;