Newer
Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The availability distribution
//!
//! Transforms `AvailableData` into erasure chunks, which are distributed to peers
//! who are interested in the relevant candidates.
//! Gossip messages received from other peers are verified and gossiped to interested
//! peers. Verified in this context means, the erasure chunks contained merkle proof
//! is checked.
#![deny(unused_crate_dependencies, unused_qualifications)]
use parity_scale_codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt, TryFutureExt};
use sp_core::crypto::Public;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::{
v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, View,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk, Hash, HashT, Id as ParaId,
SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID, CandidateHash,
};
use polkadot_subsystem::messages::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest,
};
use polkadot_subsystem::{
errors::{ChainApiError, RuntimeApiError},
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
SubsystemContext, SubsystemError,
};
use std::collections::{HashMap, HashSet};
use std::iter;
Bastian Köcher
committed
#[cfg(test)]
mod tests;
const LOG_TARGET: &'static str = "availability_distribution";
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#[error("Response channel to obtain PendingAvailability failed")]
QueryPendingAvailabilityResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain PendingAvailability failed")]
QueryPendingAvailability(#[source] RuntimeApiError),
#[error("Response channel to obtain StoreChunk failed")]
StoreChunkResponseChannel(#[source] oneshot::Canceled),
#[error("Response channel to obtain QueryChunk failed")]
QueryChunkResponseChannel(#[source] oneshot::Canceled),
#[error("Response channel to obtain QueryAncestors failed")]
QueryAncestorsResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QueryAncestors failed")]
QueryAncestors(#[source] ChainApiError),
#[error("Response channel to obtain QuerySession failed")]
QuerySessionResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QuerySession failed")]
QuerySession(#[source] RuntimeApiError),
#[error("Response channel to obtain QueryValidators failed")]
QueryValidatorsResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QueryValidators failed")]
QueryValidators(#[source] RuntimeApiError),
#[error("Response channel to obtain AvailabilityCores failed")]
AvailabilityCoresResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain AvailabilityCores failed")]
AvailabilityCores(#[source] RuntimeApiError),
#[error("Response channel to obtain AvailabilityCores failed")]
QueryAvailabilityResponseChannel(#[source] oneshot::Canceled),
#[error("Receive channel closed")]
IncomingMessageChannel(#[source] SubsystemError),
}
type Result<T> = std::result::Result<T, Error>;
const COST_MERKLE_PROOF_INVALID: Rep = Rep::new(-100, "Merkle proof was invalid");
const COST_NOT_A_LIVE_CANDIDATE: Rep = Rep::new(-51, "Candidate is not live");
const COST_PEER_DUPLICATE_MESSAGE: Rep = Rep::new(-500, "Peer sent identical messages");
const BENEFIT_VALID_MESSAGE_FIRST: Rep = Rep::new(15, "Valid message with new information");
const BENEFIT_VALID_MESSAGE: Rep = Rep::new(10, "Valid message");
/// Checked signed availability bitfield that is distributed
/// to other peers.
#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq, Hash)]
pub struct AvailabilityGossipMessage {
/// Anchor hash of the candidate the `ErasureChunk` is associated to.
pub candidate_hash: CandidateHash,
/// The erasure chunk, a encoded information part of `AvailabilityData`.
pub erasure_chunk: ErasureChunk,
}
/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Default, Clone, Debug)]
struct ProtocolState {
/// Track all active peers and their views
/// to determine what is relevant to them.
peer_views: HashMap<PeerId, View>,
/// Our own view.
view: View,
/// Caches a mapping of relay parents or ancestor to live candidate receipts.
/// Allows fast intersection of live candidates with views and consecutive unioning.
/// Maps relay parent / ancestor -> live candidate receipts + its hash.
receipts: HashMap<Hash, HashSet<(CandidateHash, CommittedCandidateReceipt)>>,
/// Allow reverse caching of view checks.
/// Maps candidate hash -> relay parent for extracting meta information from `PerRelayParent`.
/// Note that the presence of this is not sufficient to determine if deletion is OK, i.e.
/// two histories could cover this.
reverse: HashMap<CandidateHash, Hash>,
/// Keeps track of which candidate receipts are required due to ancestors of which relay parents
/// of our view.
/// Maps ancestor -> relay parents in view
ancestry: HashMap<Hash, HashSet<Hash>>,
/// Track things needed to start and stop work on a particular relay parent.
per_relay_parent: HashMap<Hash, PerRelayParent>,
/// Track data that is specific to a candidate.
per_candidate: HashMap<CandidateHash, PerCandidate>,
}
#[derive(Debug, Clone, Default)]
struct PerCandidate {
/// A Candidate and a set of known erasure chunks in form of messages to be gossiped / distributed if the peer view wants that.
/// This is _across_ peers and not specific to a particular one.
/// candidate hash + erasure chunk index -> gossip message
message_vault: HashMap<u32, AvailabilityGossipMessage>,
/// Track received candidate hashes and validator indices from peers.
received_messages: HashMap<PeerId, HashSet<(CandidateHash, ValidatorIndex)>>,
/// Track already sent candidate hashes and the erasure chunk index to the peers.
sent_messages: HashMap<PeerId, HashSet<(CandidateHash, ValidatorIndex)>>,
/// The set of validators.
validators: Vec<ValidatorId>,
/// If this node is a validator, note the index in the validator set.
validator_index: Option<ValidatorIndex>,
}
Bastian Köcher
committed
impl PerCandidate {
/// Returns `true` iff the given `message` is required by the given `peer`.
fn message_required_by_peer(&self, peer: &PeerId, message: &(CandidateHash, ValidatorIndex)) -> bool {
self.received_messages.get(peer).map(|v| !v.contains(message)).unwrap_or(true)
&& self.sent_messages.get(peer).map(|v| !v.contains(message)).unwrap_or(true)
}
}
#[derive(Debug, Clone, Default)]
struct PerRelayParent {
/// Set of `K` ancestors for this relay parent.
ancestors: Vec<Hash>,
}
impl ProtocolState {
/// Collects the relay_parents ancestors including the relay parents themselfes.
#[tracing::instrument(level = "trace", skip(relay_parents), fields(subsystem = LOG_TARGET))]
fn extend_with_ancestors<'a>(
&'a self,
relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
) -> HashSet<Hash> {
relay_parents
.into_iter()
.map(|relay_parent| {
self.per_relay_parent
.get(relay_parent)
.into_iter()
.map(|per_relay_parent| per_relay_parent.ancestors.iter().cloned())
.flatten()
Loading full blame...