lib.rs 32.8 KiB
Newer Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The availability distribution
//!
//! Transforms `AvailableData` into erasure chunks, which are distributed to peers
//! who are interested in the relevant candidates.
//! Gossip messages received from other peers are verified and gossiped to interested
//! peers. Verified in this context means, the erasure chunks contained merkle proof
//! is checked.

use codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt};

use sp_core::crypto::Public;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};

use log::{trace, warn};
use polkadot_erasure_coding::branch_hash;
use polkadot_primitives::v1::{
	PARACHAIN_KEY_TYPE_ID,
	BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk,
	Hash as Hash, HashT, Id as ParaId,
	ValidatorId, ValidatorIndex, SessionIndex,
};
use polkadot_subsystem::messages::{
	AllMessages, AvailabilityDistributionMessage, NetworkBridgeMessage, RuntimeApiMessage,
	RuntimeApiRequest, AvailabilityStoreMessage, ChainApiMessage,
};
use polkadot_subsystem::{
	errors::{ChainApiError, RuntimeApiError},
	ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
	SubsystemContext, SubsystemError,
};
use polkadot_node_subsystem_util::{
	metrics::{self, prometheus},
};
use polkadot_node_network_protocol::{
	v1 as protocol_v1, View, ReputationChange as Rep, PeerId,
	NetworkBridgeEvent,
};
use std::collections::{HashMap, HashSet};
use std::io;
use std::iter;

const TARGET: &'static str = "avad";

#[derive(Debug, derive_more::From)]
enum Error {
	#[from]
	Erasure(polkadot_erasure_coding::Error),
	#[from]
	Io(io::Error),
	#[from]
	Oneshot(oneshot::Canceled),
	#[from]
	Subsystem(SubsystemError),
	#[from]
	RuntimeApi(RuntimeApiError),
	#[from]
	ChainApi(ChainApiError),
}

type Result<T> = std::result::Result<T, Error>;

const COST_MERKLE_PROOF_INVALID: Rep = Rep::new(-100, "Merkle proof was invalid");
const COST_NOT_A_LIVE_CANDIDATE: Rep = Rep::new(-51, "Candidate is not live");
const COST_PEER_DUPLICATE_MESSAGE: Rep = Rep::new(-500, "Peer sent identical messages");
const BENEFIT_VALID_MESSAGE_FIRST: Rep = Rep::new(15, "Valid message with new information");
const BENEFIT_VALID_MESSAGE: Rep = Rep::new(10, "Valid message");

/// Checked signed availability bitfield that is distributed
/// to other peers.
#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq, Hash)]
pub struct AvailabilityGossipMessage {
	/// Anchor hash of the candidate the `ErasureChunk` is associated to.
	pub candidate_hash: Hash,
	/// The erasure chunk, a encoded information part of `AvailabilityData`.
	pub erasure_chunk: ErasureChunk,
}

/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Default, Clone, Debug)]
struct ProtocolState {
	/// Track all active peers and their views
	/// to determine what is relevant to them.
	peer_views: HashMap<PeerId, View>,

	/// Our own view.
	view: View,

	/// Caches a mapping of relay parents or ancestor to live candidate receipts.
	/// Allows fast intersection of live candidates with views and consecutive unioning.
	/// Maps relay parent / ancestor -> live candidate receipts + its hash.
	receipts: HashMap<Hash, HashSet<(Hash, CommittedCandidateReceipt)>>,

	/// Allow reverse caching of view checks.
	/// Maps candidate hash -> relay parent for extracting meta information from `PerRelayParent`.
	/// Note that the presence of this is not sufficient to determine if deletion is OK, i.e.
	/// two histories could cover this.
	reverse: HashMap<Hash, Hash>,

	/// Keeps track of which candidate receipts are required due to ancestors of which relay parents
	/// of our view.
	/// Maps ancestor -> relay parents in view
	ancestry: HashMap<Hash, HashSet<Hash>>,

	/// Track things needed to start and stop work on a particular relay parent.
	per_relay_parent: HashMap<Hash, PerRelayParent>,

	/// Track data that is specific to a candidate.
	per_candidate: HashMap<Hash, PerCandidate>,
}

#[derive(Debug, Clone, Default)]
struct PerCandidate {
	/// A Candidate and a set of known erasure chunks in form of messages to be gossiped / distributed if the peer view wants that.
	/// This is _across_ peers and not specific to a particular one.
	/// candidate hash + erasure chunk index -> gossip message
	message_vault: HashMap<u32, AvailabilityGossipMessage>,

	/// Track received candidate hashes and chunk indices from peers.
	received_messages: HashMap<PeerId, HashSet<(Hash, ValidatorIndex)>>,

	/// Track already sent candidate hashes and the erasure chunk index to the peers.
	sent_messages: HashMap<PeerId, HashSet<(Hash, ValidatorIndex)>>,

	/// The set of validators.
	validators: Vec<ValidatorId>,

	/// If this node is a validator, note the index in the validator set.
	validator_index: Option<ValidatorIndex>,
}

#[derive(Debug, Clone, Default)]
struct PerRelayParent {
	/// Set of `K` ancestors for this relay parent.
	ancestors: Vec<Hash>,
}

impl ProtocolState {
	/// Collects the relay_parents ancestors including the relay parents themselfes.
	fn extend_with_ancestors<'a>(
		&'a self,
		relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
	) -> HashSet<Hash> {
		relay_parents
			.into_iter()
			.map(|relay_parent| {
				self.per_relay_parent
					.get(relay_parent)
					.into_iter()
					.map(|per_relay_parent| per_relay_parent.ancestors.iter().cloned())
					.flatten()
					.chain(iter::once(*relay_parent))
			})
			.flatten()
			.collect::<HashSet<Hash>>()
	}

	/// Unionize all cached entries for the given relay parents and its ancestors.
	/// Ignores all non existent relay parents, so this can be used directly with a peers view.
	/// Returns a map from candidate hash -> receipt
	fn cached_live_candidates_unioned<'a>(
		&'a self,
		relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
	) -> HashMap<Hash, CommittedCandidateReceipt> {
		let relay_parents_and_ancestors = self.extend_with_ancestors(relay_parents);
		relay_parents_and_ancestors
			.into_iter()
			.filter_map(|relay_parent_or_ancestor| self.receipts.get(&relay_parent_or_ancestor))
			.map(|receipt_set| receipt_set.into_iter())
			.flatten()
			.map(|(receipt_hash, receipt)| (receipt_hash.clone(), receipt.clone()))
			.collect::<HashMap<Hash, CommittedCandidateReceipt>>()
	}

	async fn add_relay_parent<Context>(
		&mut self,
		ctx: &mut Context,
		relay_parent: Hash,
		validators: Vec<ValidatorId>,
		validator_index: Option<ValidatorIndex>,
	) -> Result<()>
	where
		Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
Loading full blame...