29.3 KB
Newer Older
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <>.

//! Message types for the overseer and subsystems.
//! These messages are intended to define the protocol by which different subsystems communicate with each
//! other and signals that they receive from an overseer to coordinate their work.
//! This is intended for use with the `polkadot-overseer` crate.
//! Subsystems' APIs are defined separately from their implementation, leading to easier mocking.

use futures::channel::{mpsc, oneshot};
use thiserror::Error;

pub use sc_network::IfDisconnected;

use polkadot_node_network_protocol::{
	PeerId, UnifiedReputationChange, peer_set::PeerSet,
		Requests, request::IncomingRequest, v1 as req_res_v1
	v1 as protocol_v1,
use polkadot_node_primitives::{
	CollationGenerationConfig, SignedFullStatement, ValidationResult,
	approval::{BlockApprovalMeta, IndirectAssignmentCert, IndirectSignedApprovalVote},
	BabeEpoch, AvailableData, PoV, ErasureChunk
use polkadot_primitives::v1::{
	AuthorityDiscoveryId, BackedCandidate, BlockNumber, SessionInfo,
	Header as BlockHeader, CandidateDescriptor, CandidateEvent, CandidateReceipt,
	CollatorId, CommittedCandidateReceipt, CoreState,
	GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption,
	PersistedValidationData, SessionIndex, SignedAvailabilityBitfield,
	ValidationCode, ValidatorId, CandidateHash,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
	ValidatorIndex, ValidatorSignature, InboundDownwardMessage, InboundHrmpMessage,
	CandidateIndex, GroupIndex, MultiDisputeStatementSet, SignedAvailabilityBitfields,
use polkadot_statement_table::v1::Misbehavior;
use polkadot_procmacro_subsystem_dispatch_gen::subsystem_dispatch_gen;
use std::{sync::Arc, collections::btree_map::BTreeMap};

/// Network events as transmitted to other subsystems, wrapped in their message types.
pub mod network_bridge_event;
pub use network_bridge_event::NetworkBridgeEvent;

/// Subsystem messages where each message is always bound to a relay parent.
pub trait BoundToRelayParent {
	/// Returns the relay parent this message is bound to.
	fn relay_parent(&self) -> Hash;
/// Messages received by the Candidate Selection subsystem.
pub enum CandidateSelectionMessage {
	/// A candidate collation can be fetched from a collator and should be considered for seconding.
	Collation(Hash, ParaId, CollatorId),
	/// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator.
	/// The hash is the relay parent.
	Invalid(Hash, CandidateReceipt),
	/// The candidate we recommended to be seconded was validated successfully.
	/// The hash is the relay parent.
	Seconded(Hash, SignedFullStatement),
impl BoundToRelayParent for CandidateSelectionMessage {
	fn relay_parent(&self) -> Hash {
		match self {
			Self::Collation(hash, ..) => *hash,
			Self::Invalid(hash, _) => *hash,
			Self::Seconded(hash, _) => *hash,
impl Default for CandidateSelectionMessage {
	fn default() -> Self {
		CandidateSelectionMessage::Invalid(Default::default(), Default::default())

/// Messages received by the Candidate Backing subsystem.
pub enum CandidateBackingMessage {
	/// Requests a set of backable candidates that could be backed in a child of the given
	/// relay-parent, referenced by its hash.
	GetBackedCandidates(Hash, Vec<CandidateHash>, oneshot::Sender<Vec<BackedCandidate>>),
	/// Note that the Candidate Backing subsystem should second the given candidate in the context of the
	/// given relay-parent (ref. by hash). This candidate must be validated.
	Second(Hash, CandidateReceipt, PoV),
	/// Note a validator's statement about a particular candidate. Disagreements about validity must be escalated
	/// to a broader check by Misbehavior Arbitration. Agreements are simply tallied until a quorum is reached.
	Statement(Hash, SignedFullStatement),
impl BoundToRelayParent for CandidateBackingMessage {
	fn relay_parent(&self) -> Hash {
		match self {
			Self::GetBackedCandidates(hash, _, _) => *hash,
			Self::Second(hash, _, _) => *hash,
			Self::Statement(hash, _) => *hash,
/// Blanket error for validation failing for internal reasons.
#[derive(Debug, Error)]
#[error("Validation failed with {0:?}")]
pub struct ValidationFailed(pub String);
/// Messages received by the Validation subsystem.
/// ## Validation Requests
/// Validation requests made to the subsystem should return an error only on internal error.
/// Otherwise, they should return either `Ok(ValidationResult::Valid(_))`
/// or `Ok(ValidationResult::Invalid)`.
pub enum CandidateValidationMessage {
	/// Validate a candidate with provided parameters using relay-chain state.
	/// This will implicitly attempt to gather the `PersistedValidationData` and `ValidationCode`
	/// from the runtime API of the chain, based on the `relay_parent`
	/// of the `CandidateDescriptor`.
	/// This will also perform checking of validation outputs against the acceptance criteria.
	/// If there is no state available which can provide this data or the core for
	/// the para is not free at the relay-parent, an error is returned.
		oneshot::Sender<Result<ValidationResult, ValidationFailed>>,
	/// Validate a candidate with provided, exhaustive parameters for validation.
	/// Explicitly provide the `PersistedValidationData` and `ValidationCode` so this can do full
	/// validation without needing to access the state of the relay-chain.
	/// This request doesn't involve acceptance criteria checking, therefore only useful for the
	/// cases where the validity of the candidate is established. This is the case for the typical
	/// use-case: secondary checkers would use this request relying on the full prior checks
	/// performed by the relay-chain.
		oneshot::Sender<Result<ValidationResult, ValidationFailed>>,
impl CandidateValidationMessage {
	/// If the current variant contains the relay parent hash, return it.
	pub fn relay_parent(&self) -> Option<Hash> {
		match self {
			Self::ValidateFromChainState(_, _, _) => None,
			Self::ValidateFromExhaustive(_, _, _, _, _) => None,
/// Messages received by the Collator Protocol subsystem.
pub enum CollatorProtocolMessage {
	/// Signal to the collator protocol that it should connect to validators with the expectation
	/// of collating on the given para. This is only expected to be called once, early on, if at all,
	/// and only by the Collation Generation subsystem. As such, it will overwrite the value of
	/// the previous signal.
	/// This should be sent before any `DistributeCollation` message.
	/// Provide a collation to distribute to validators with an optional result sender.
	/// The result sender should be informed when at least one parachain validator seconded the collation. It is also
	/// completely okay to just drop the sender.
	DistributeCollation(CandidateReceipt, PoV, Option<oneshot::Sender<SignedFullStatement>>),
	/// Fetch a collation under the given relay-parent for the given ParaId.
	FetchCollation(Hash, CollatorId, ParaId, oneshot::Sender<(CandidateReceipt, PoV)>),
	/// Report a collator as having provided an invalid collation. This should lead to disconnect
	/// and blacklist of the collator.
	/// Note a collator as having provided a good collation.
	/// Notify a collator that its collation was seconded.
	NotifyCollationSeconded(CollatorId, Hash, SignedFullStatement),
	/// Get a network bridge update.
	/// Incoming network request for a collation.

/// Messages received by the network bridge subsystem.
pub enum NetworkBridgeMessage {
	/// Report a peer for their actions.
	ReportPeer(PeerId, UnifiedReputationChange),
	/// Disconnect a peer from the given peer-set without affecting their reputation.
	DisconnectPeer(PeerId, PeerSet),

	/// Send a message to one or more peers on the validation peer-set.
	SendValidationMessage(Vec<PeerId>, protocol_v1::ValidationProtocol),

	/// Send a message to one or more peers on the collation peer-set.
	SendCollationMessage(Vec<PeerId>, protocol_v1::CollationProtocol),

	/// Send a batch of validation messages.
	/// NOTE: Messages will be processed in order (at least statement distribution relies on this).
	SendValidationMessages(Vec<(Vec<PeerId>, protocol_v1::ValidationProtocol)>),

	/// Send a batch of collation messages.
	/// NOTE: Messages will be processed in order.
	SendCollationMessages(Vec<(Vec<PeerId>, protocol_v1::CollationProtocol)>),

	/// Send requests via substrate request/response.
	/// Second parameter, tells what to do if we are not yet connected to the peer.
	SendRequests(Vec<Requests>, IfDisconnected),
	/// Connect to peers who represent the given `validator_ids`.
	/// Also ask the network to stay connected to these peers at least
	/// until a new request is issued.
	/// Because it overrides the previous request, it must be ensured
	/// that `validator_ids` include all peers the subsystems
	/// are interested in (per `PeerSet`).
	/// A caller can learn about validator connections by listening to the
	/// `PeerConnected` events from the network bridge.
	ConnectToValidators {
		/// Ids of the validators to connect to.
		validator_ids: Vec<AuthorityDiscoveryId>,
		/// The underlying protocol to use for this request.
		peer_set: PeerSet,
impl NetworkBridgeMessage {
	/// If the current variant contains the relay parent hash, return it.
	pub fn relay_parent(&self) -> Option<Hash> {
		match self {
			Self::ReportPeer(_, _) => None,
			Self::DisconnectPeer(_, _) => None,
			Self::SendValidationMessage(_, _) => None,
			Self::SendCollationMessage(_, _) => None,
			Self::SendValidationMessages(_) => None,
			Self::SendCollationMessages(_) => None,
			Self::ConnectToValidators { .. } => None,
			Self::SendRequests { .. } => None,
/// Availability Distribution Message.
pub enum AvailabilityDistributionMessage {
	/// Incoming network request for an availability chunk.
	/// Incoming network request for a seconded PoV.
	/// Instruct availability distribution to fetch a remote PoV.
	/// NOTE: The result of this fetch is not yet locally validated and could be bogus.
	FetchPoV {
		/// The relay parent giving the necessary context.
		relay_parent: Hash,
		/// Validator to fetch the PoV from.
		from_validator: ValidatorIndex,
		/// Candidate hash to fetch the PoV for.
		candidate_hash: CandidateHash,
		/// Expected hash of the PoV, a PoV not matching this hash will be rejected.
		pov_hash: Hash,
		/// Sender for getting back the result of this fetch.
		/// The sender will be canceled if the fetching failed for some reason.
		tx: oneshot::Sender<PoV>,
/// Availability Recovery Message.
pub enum AvailabilityRecoveryMessage {
	/// Recover available data from validators on the network.
		Option<GroupIndex>, // Optional backing group to request from first.
		oneshot::Sender<Result<AvailableData, crate::errors::RecoveryError>>,
	/// Incoming network request for available data.
/// Bitfield distribution message.
pub enum BitfieldDistributionMessage {
	/// Distribute a bitfield via gossip to other validators.
	DistributeBitfield(Hash, SignedAvailabilityBitfield),

	/// Event from the network bridge.
impl BitfieldDistributionMessage {
	/// If the current variant contains the relay parent hash, return it.
	pub fn relay_parent(&self) -> Option<Hash> {
		match self {
			Self::DistributeBitfield(hash, _) => Some(*hash),
			Self::NetworkBridgeUpdateV1(_) => None,
/// Bitfield signing message.
/// Currently non-instantiable.
pub enum BitfieldSigningMessage {}

impl BoundToRelayParent for BitfieldSigningMessage {
	fn relay_parent(&self) -> Hash {
		match *self {}
/// Availability store subsystem message.
pub enum AvailabilityStoreMessage {
	/// Query a `AvailableData` from the AV store.
	QueryAvailableData(CandidateHash, oneshot::Sender<Option<AvailableData>>),
	/// Query whether a `AvailableData` exists within the AV Store.
	/// This is useful in cases when existence
	/// matters, but we don't want to necessarily pass around multiple
	/// megabytes of data to get a single bit of information.
	QueryDataAvailability(CandidateHash, oneshot::Sender<bool>),
	/// Query an `ErasureChunk` from the AV store by the candidate hash and validator index.
	QueryChunk(CandidateHash, ValidatorIndex, oneshot::Sender<Option<ErasureChunk>>),
	/// Query all chunks that we have for the given candidate hash.
	QueryAllChunks(CandidateHash, oneshot::Sender<Vec<ErasureChunk>>),

	/// Query whether an `ErasureChunk` exists within the AV Store.
	/// This is useful in cases like bitfield signing, when existence
	/// matters, but we don't want to necessarily pass around large
	/// quantities of data to get a single bit of information.
	QueryChunkAvailability(CandidateHash, ValidatorIndex, oneshot::Sender<bool>),
	/// Store an `ErasureChunk` in the AV store.
	/// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed.
	StoreChunk {
		/// A hash of the candidate this chunk belongs to.
		candidate_hash: CandidateHash,
		/// The chunk itself.
		chunk: ErasureChunk,
		/// Sending side of the channel to send result to.
		tx: oneshot::Sender<Result<(), ()>>,

	/// Store a `AvailableData` in the AV store.
	/// If `ValidatorIndex` is present store corresponding chunk also.
	/// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed.
	StoreAvailableData(CandidateHash, Option<ValidatorIndex>, u32, AvailableData, oneshot::Sender<Result<(), ()>>),
impl AvailabilityStoreMessage {
	/// In fact, none of the AvailabilityStore messages assume a particular relay parent.
	pub fn relay_parent(&self) -> Option<Hash> {
		match self {
			_ => None,
/// A response channel for the result of a chain API request.
pub type ChainApiResponseChannel<T> = oneshot::Sender<Result<T, crate::errors::ChainApiError>>;

/// Chain API request subsystem message.
pub enum ChainApiMessage {
	/// Request the block number by hash.
	/// Returns `None` if a block with the given hash is not present in the db.
	BlockNumber(Hash, ChainApiResponseChannel<Option<BlockNumber>>),
	/// Request the block header by hash.
	/// Returns `None` if a block with the given hash is not present in the db.
	BlockHeader(Hash, ChainApiResponseChannel<Option<BlockHeader>>),
	/// Request the finalized block hash by number.
	/// Returns `None` if a block with the given number is not present in the db.
	/// Note: the caller must ensure the block is finalized.
	FinalizedBlockHash(BlockNumber, ChainApiResponseChannel<Option<Hash>>),
	/// Request the last finalized block number.
	/// This request always succeeds.
	/// Request the `k` ancestors block hashes of a block with the given hash.
	/// The response channel may return a `Vec` of size up to `k`
	/// filled with ancestors hashes with the following order:
	/// `parent`, `grandparent`, ...
	Ancestors {
		/// The hash of the block in question.
		hash: Hash,
		/// The number of ancestors to request.
		k: usize,
		/// The response channel.
		response_channel: ChainApiResponseChannel<Vec<Hash>>,

impl ChainApiMessage {
	/// If the current variant contains the relay parent hash, return it.
	pub fn relay_parent(&self) -> Option<Hash> {

/// A sender for the result of a runtime API request.
pub type RuntimeApiSender<T> = oneshot::Sender<Result<T, crate::errors::RuntimeApiError>>;
/// A request to the Runtime API subsystem.
pub enum RuntimeApiRequest {
	/// Get the next, current and some previous authority discovery set deduplicated.
	/// Get the current validator set.
	/// Get the validator groups and group rotation info.
	ValidatorGroups(RuntimeApiSender<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo)>),
	/// Get information on all availability cores.
	/// Get the persisted validation data for a particular para, taking the given
	/// `OccupiedCoreAssumption`, which will inform on how the validation data should be computed
	/// if the para currently occupies a core.
	/// Sends back `true` if the validation outputs pass all acceptance criteria checks.
	/// Get the session index that a child of the block will have.
	/// Get the validation code for a para, taking the given `OccupiedCoreAssumption`, which
	/// will inform on how the validation data should be computed if the para currently
	/// occupies a core.
	/// Fetch the historical validation code used by a para for candidates executed in the
	/// context of a given block height in the current chain.
	/// `context_height` may be no greater than the height of the block in whose
	/// state the runtime API is executed. Otherwise `None` is returned.
	/// Get a the candidate pending availability for a particular parachain by parachain / core index
	CandidatePendingAvailability(ParaId, RuntimeApiSender<Option<CommittedCandidateReceipt>>),
	/// Get all events concerning candidates (backing, inclusion, time-out) in the parent of
	/// the block in whose state this request is executed.
	/// Get the session info for the given session, if stored.
	SessionInfo(SessionIndex, RuntimeApiSender<Option<SessionInfo>>),
	/// Get all the pending inbound messages in the downward message queue for a para.
Sergey Pepyakin's avatar
Sergey Pepyakin committed
	/// Get the contents of all channels addressed to the given recipient. Channels that have no
	/// messages in them are also included.
		RuntimeApiSender<BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumber>>>>,
	/// Get information about the BABE epoch the block was included in.

/// A message to the Runtime API subsystem.
pub enum RuntimeApiMessage {
	/// Make a request of the runtime API against the post-state of the given relay-parent.
	Request(Hash, RuntimeApiRequest),

impl RuntimeApiMessage {
	/// If the current variant contains the relay parent hash, return it.
	pub fn relay_parent(&self) -> Option<Hash> {
		match self {
			Self::Request(hash, _) => Some(*hash),

/// Statement distribution message.
pub enum StatementDistributionMessage {
	/// We have originated a signed statement in the context of
	/// given relay-parent hash and it should be distributed to other validators.
	Share(Hash, SignedFullStatement),
	/// Event from the network bridge.
	/// Get receiver for receiving incoming network requests for statement fetching.
/// This data becomes intrinsics or extrinsics which should be included in a future relay chain block.
// It needs to be cloneable because multiple potential block authors can request copies.
#[derive(Debug, Clone)]
pub enum ProvisionableData {
	/// This bitfield indicates the availability of various candidate blocks.
	Bitfield(Hash, SignedAvailabilityBitfield),
	/// The Candidate Backing subsystem believes that this candidate is valid, pending availability.
	/// Misbehavior reports are self-contained proofs of validator misbehavior.
	MisbehaviorReport(Hash, ValidatorIndex, Misbehavior),
	/// Disputes trigger a broad dispute resolution process.
	Dispute(Hash, ValidatorSignature),
/// Inherent data returned by the provisioner
#[derive(Debug, Clone)]
pub struct ProvisionerInherentData {
	/// Signed bitfields.
	pub bitfields: SignedAvailabilityBitfields,
	/// Backed candidates.
	pub backed_candidates: Vec<BackedCandidate>,
	/// Dispute statement sets.
	pub disputes: MultiDisputeStatementSet,
/// Message to the Provisioner.
/// In all cases, the Hash is that of the relay parent.
pub enum ProvisionerMessage {
	/// This message allows external subsystems to request the set of bitfields and backed candidates
	/// associated with a particular potential block hash.
	/// This is expected to be used by a proposer, to inject that information into the InherentData
	/// where it can be assembled into the ParaInherent.
	RequestInherentData(Hash, oneshot::Sender<ProvisionerInherentData>),
	/// This data should become part of a relay chain block
	ProvisionableData(Hash, ProvisionableData),
impl BoundToRelayParent for ProvisionerMessage {
	fn relay_parent(&self) -> Hash {
		match self {
			Self::RequestInherentData(hash, _) => *hash,
			Self::ProvisionableData(hash, _) => *hash,
/// Message to the Collation Generation subsystem.
pub enum CollationGenerationMessage {
	/// Initialize the collation generation subsystem

impl CollationGenerationMessage {
	/// If the current variant contains the relay parent hash, return it.
	pub fn relay_parent(&self) -> Option<Hash> {

/// The result type of [`ApprovalVotingMessage::CheckAndImportAssignment`] request.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AssignmentCheckResult {
	/// The vote was accepted and should be propagated onwards.
	/// The vote was valid but duplicate and should not be propagated onwards.
	/// The vote was valid but too far in the future to accept right now.
	/// The vote was bad and should be ignored, reporting the peer who propagated it.

/// The result type of [`ApprovalVotingMessage::CheckAndImportApproval`] request.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ApprovalCheckResult {
	/// The vote was accepted and should be propagated onwards.
	/// The vote was bad and should be ignored, reporting the peer who propagated it.

/// Message to the Approval Voting subsystem.
pub enum ApprovalVotingMessage {
	/// Check if the assignment is valid and can be accepted by our view of the protocol.
	/// Should not be sent unless the block hash is known.
	/// Check if the approval vote is valid and can be accepted by our view of the
	/// protocol.
	/// Should not be sent unless the block hash within the indirect vote is known.
	/// Returns the highest possible ancestor hash of the provided block hash which is
	/// acceptable to vote on finality for.
	/// The `BlockNumber` provided is the number of the block's ancestor which is the
	/// earliest possible vote.
	/// It can also return the same block hash, if that is acceptable to vote upon.
	/// Return `None` if the input hash is unrecognized.
	ApprovedAncestor(Hash, BlockNumber, oneshot::Sender<Option<(Hash, BlockNumber)>>),

/// Message to the Approval Distribution subsystem.
pub enum ApprovalDistributionMessage {
	/// Notify the `ApprovalDistribution` subsystem about new blocks
	/// and the candidates contained within them.
	/// Distribute an assignment cert from the local validator. The cert is assumed
	/// to be valid, relevant, and for the given relay-parent and validator index.
	DistributeAssignment(IndirectAssignmentCert, CandidateIndex),
	/// Distribute an approval vote for the local validator. The approval vote is assumed to be
	/// valid, relevant, and the corresponding approval already issued.
	/// If not, the subsystem is free to drop the message.
	/// An update from the network bridge.

/// Message to the Gossip Support subsystem.
pub enum GossipSupportMessage {

/// A message type tying together all message types that are used across Subsystems.
#[derive(Debug, derive_more::From)]
	/// Message for the validation subsystem.
	/// Message for the candidate backing subsystem.
	/// Message for the candidate selection subsystem.
	/// Message for the Chain API subsystem.
	/// Message for the Collator Protocol subsystem.
	/// Message for the statement distribution subsystem.
	/// Message for the availability distribution subsystem.
	/// Message for the availability recovery subsystem.
	/// Message for the bitfield distribution subsystem.
	/// Message for the bitfield signing subsystem.
	/// Message for the Provisioner subsystem.
	/// Message for the Runtime API subsystem.
	/// Message for the availability store subsystem.
	/// Message for the network bridge subsystem.
	/// Message for the Collation Generation subsystem.
	/// Message for the Approval Voting subsystem.
	/// Message for the Approval Distribution subsystem.
	/// Message for the Gossip Support subsystem.
impl From<IncomingRequest<req_res_v1::PoVFetchingRequest>> for AvailabilityDistributionMessage {
	fn from(req: IncomingRequest<req_res_v1::PoVFetchingRequest>) -> Self {
impl From<IncomingRequest<req_res_v1::ChunkFetchingRequest>> for AvailabilityDistributionMessage {
	fn from(req: IncomingRequest<req_res_v1::ChunkFetchingRequest>) -> Self {
impl From<IncomingRequest<req_res_v1::CollationFetchingRequest>> for CollatorProtocolMessage {
	fn from(req: IncomingRequest<req_res_v1::CollationFetchingRequest>) -> Self {

impl From<IncomingRequest<req_res_v1::PoVFetchingRequest>> for AllMessages {
	fn from(req: IncomingRequest<req_res_v1::PoVFetchingRequest>) -> Self {
impl From<IncomingRequest<req_res_v1::ChunkFetchingRequest>> for AllMessages {
	fn from(req: IncomingRequest<req_res_v1::ChunkFetchingRequest>) -> Self {
impl From<IncomingRequest<req_res_v1::CollationFetchingRequest>> for AllMessages {
	fn from(req: IncomingRequest<req_res_v1::CollationFetchingRequest>) -> Self {
impl From<IncomingRequest<req_res_v1::AvailableDataFetchingRequest>> for AllMessages {
	fn from(req: IncomingRequest<req_res_v1::AvailableDataFetchingRequest>) -> Self {