lib.rs 38.6 KiB
Newer Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The availability distribution
//!
//! Transforms `AvailableData` into erasure chunks, which are distributed to peers
//! who are interested in the relevant candidates.
//! Gossip messages received from other peers are verified and gossiped to interested
//! peers. Verified in this context means, the erasure chunks contained merkle proof
//! is checked.

#![deny(unused_crate_dependencies, unused_qualifications)]

use parity_scale_codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt, TryFutureExt};
use sp_core::crypto::Public;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};

use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::{
	v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, View,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{
	BlakeTwo256, CoreState, ErasureChunk, Hash, HashT,
	SessionIndex, ValidatorId, ValidatorIndex, PARACHAIN_KEY_TYPE_ID, CandidateHash,
};
use polkadot_subsystem::messages::{
	AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
	NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest,
};
use polkadot_subsystem::{
	errors::{ChainApiError, RuntimeApiError},
	ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
	SubsystemContext, SubsystemError,
};
use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry;
use std::iter;
use thiserror::Error;
const LOG_TARGET: &'static str = "availability_distribution";
#[derive(Debug, Error)]
enum Error {
	#[error("Response channel to obtain StoreChunk failed")]
	StoreChunkResponseChannel(#[source] oneshot::Canceled),

	#[error("Response channel to obtain QueryChunk failed")]
	QueryChunkResponseChannel(#[source] oneshot::Canceled),

	#[error("Response channel to obtain QueryAncestors failed")]
	QueryAncestorsResponseChannel(#[source] oneshot::Canceled),
	#[error("RuntimeAPI to obtain QueryAncestors failed")]
	QueryAncestors(#[source] ChainApiError),

	#[error("Response channel to obtain QuerySession failed")]
	QuerySessionResponseChannel(#[source] oneshot::Canceled),
	#[error("RuntimeAPI to obtain QuerySession failed")]
	QuerySession(#[source] RuntimeApiError),

	#[error("Response channel to obtain QueryValidators failed")]
	QueryValidatorsResponseChannel(#[source] oneshot::Canceled),
	#[error("RuntimeAPI to obtain QueryValidators failed")]
	QueryValidators(#[source] RuntimeApiError),

	#[error("Response channel to obtain AvailabilityCores failed")]
	AvailabilityCoresResponseChannel(#[source] oneshot::Canceled),
	#[error("RuntimeAPI to obtain AvailabilityCores failed")]
	AvailabilityCores(#[source] RuntimeApiError),

	#[error("Response channel to obtain AvailabilityCores failed")]
	QueryAvailabilityResponseChannel(#[source] oneshot::Canceled),

	#[error("Receive channel closed")]
	IncomingMessageChannel(#[source] SubsystemError),
}

type Result<T> = std::result::Result<T, Error>;

const COST_MERKLE_PROOF_INVALID: Rep = Rep::new(-100, "Merkle proof was invalid");
const COST_NOT_A_LIVE_CANDIDATE: Rep = Rep::new(-51, "Candidate is not live");
const COST_PEER_DUPLICATE_MESSAGE: Rep = Rep::new(-500, "Peer sent identical messages");
const BENEFIT_VALID_MESSAGE_FIRST: Rep = Rep::new(15, "Valid message with new information");
const BENEFIT_VALID_MESSAGE: Rep = Rep::new(10, "Valid message");

/// Checked signed availability bitfield that is distributed
/// to other peers.
#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq, Hash)]
pub struct AvailabilityGossipMessage {
	/// Anchor hash of the candidate the `ErasureChunk` is associated to.
	pub candidate_hash: CandidateHash,
	/// The erasure chunk, a encoded information part of `AvailabilityData`.
	pub erasure_chunk: ErasureChunk,
}

impl From<AvailabilityGossipMessage> for protocol_v1::AvailabilityDistributionMessage {
	fn from(message: AvailabilityGossipMessage) -> Self {
		Self::Chunk(message.candidate_hash, message.erasure_chunk)
	}
}

/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Debug, Default)]
struct ProtocolState {
	/// Track all active peers and their views
	/// to determine what is relevant to them.
	peer_views: HashMap<PeerId, View>,

	/// Our own view.
	view: View,

	/// Caches a mapping of relay parents or ancestor to live candidate hashes.
	/// Allows fast intersection of live candidates with views and consecutive unioning.
	/// Maps relay parent / ancestor -> candidate hashes.
	live_under: HashMap<Hash, HashSet<CandidateHash>>,

	/// Track things needed to start and stop work on a particular relay parent.
	per_relay_parent: HashMap<Hash, PerRelayParent>,

	/// Track data that is specific to a candidate.
	per_candidate: HashMap<CandidateHash, PerCandidate>,
#[derive(Debug)]
struct PerCandidate {
	/// A Candidate and a set of known erasure chunks in form of messages to be gossiped / distributed if the peer view wants that.
	/// This is _across_ peers and not specific to a particular one.
	/// candidate hash + erasure chunk index -> gossip message
	message_vault: HashMap<u32, AvailabilityGossipMessage>,

	/// Track received erasure chunk indices per peer.
	received_messages: HashMap<PeerId, HashSet<ValidatorIndex>>,
	/// Track sent erasure chunk indices per peer.
	sent_messages: HashMap<PeerId, HashSet<ValidatorIndex>>,

	/// The set of validators.
	validators: Vec<ValidatorId>,

	/// If this node is a validator, note the index in the validator set.
	validator_index: Option<ValidatorIndex>,

	/// The descriptor of this candidate.
	descriptor: CandidateDescriptor,

	/// The set of relay chain blocks this appears to be live in.
	live_in: HashSet<Hash>,

	/// A Jaeger span relating to this candidate.
	span: jaeger::JaegerSpan,
	/// Returns `true` iff the given `validator_index` is required by the given `peer`.
	fn message_required_by_peer(&self, peer: &PeerId, validator_index: ValidatorIndex) -> bool {
		self.received_messages.get(peer).map(|v| !v.contains(&validator_index)).unwrap_or(true)
			&& self.sent_messages.get(peer).map(|v| !v.contains(&validator_index)).unwrap_or(true)
	}

	/// Add a chunk to the message vault. Overwrites anything that was already present.
	fn add_message(&mut self, chunk_index: u32, message: AvailabilityGossipMessage) {
		let _ = self.message_vault.insert(chunk_index, message);
	}

	/// Clean up the span if we've got our own chunk.
	fn drop_span_after_own_availability(&mut self) {
		if let Some(validator_index) = self.validator_index {
			if self.message_vault.contains_key(&validator_index) {
				self.span = jaeger::JaegerSpan::Disabled;
			}
		}
#[derive(Debug, Clone, Default)]
struct PerRelayParent {
	/// Set of `K` ancestors for this relay parent.
	ancestors: Vec<Hash>,
	/// Live candidates, according to this relay parent.
Loading full blame...