// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! PoV Distribution Subsystem of Polkadot.
//!
//! This is a gossip implementation of code that is responsible for distributing PoVs
//! among validators.
#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]
use polkadot_primitives::v1::{
Hash, PoV, CandidateDescriptor, ValidatorId, Id as ParaId, CoreIndex, CoreState,
};
use polkadot_subsystem::{
ActiveLeavesUpdate, OverseerSignal, SubsystemContext, SubsystemResult, SubsystemError, Subsystem,
FromOverseer, SpawnedSubsystem,
messages::{
PoVDistributionMessage, AllMessages, NetworkBridgeMessage,
},
};
use polkadot_node_subsystem_util::{
validator_discovery,
request_validators_ctx,
request_validator_groups_ctx,
request_availability_cores_ctx,
metrics::{self, prometheus},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View,
};
use futures::prelude::*;
use futures::channel::oneshot;
use std::collections::{hash_map::{Entry, HashMap}, HashSet};
use std::sync::Arc;
mod error;
#[cfg(test)]
mod tests;
const COST_APPARENT_FLOOD: Rep = Rep::new(-500, "Peer appears to be flooding us with PoV requests");
const COST_UNEXPECTED_POV: Rep = Rep::new(-500, "Peer sent us an unexpected PoV");
const COST_AWAITED_NOT_IN_VIEW: Rep
= Rep::new(-100, "Peer claims to be awaiting something outside of its view");
const BENEFIT_FRESH_POV: Rep = Rep::new(25, "Peer supplied us with an awaited PoV");
const BENEFIT_LATE_POV: Rep = Rep::new(10, "Peer supplied us with an awaited PoV, \
but was not the first to do so");
const LOG_TARGET: &str = "pov_distribution";
/// The PoV Distribution Subsystem.
pub struct PoVDistribution {
// Prometheus metrics
metrics: Metrics,
}
impl Subsystem for PoVDistribution
where C: SubsystemContext
{
fn start(self, ctx: C) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run`.
let future = self.run(ctx)
.map_err(|e| SubsystemError::with_origin("pov-distribution", e))
.boxed();
SpawnedSubsystem {
name: "pov-distribution-subsystem",
future,
}
}
}
#[derive(Default)]
struct State {
/// A state of things going on on a per-relay-parent basis.
relay_parent_state: HashMap,
/// Info on peers.
peer_state: HashMap,
/// Our own view.
our_view: View,
/// Connect to relevant groups of validators at different relay parents.
connection_requests: validator_discovery::ConnectionRequests,
/// Metrics.
metrics: Metrics,
}
struct BlockBasedState {
known: HashMap>,
/// All the PoVs we are or were fetching, coupled with channels expecting the data.
///
/// This may be an empty list, which indicates that we were once awaiting this PoV but have
/// received it already.
fetching: HashMap>>>,
n_validators: usize,
}
#[derive(Default)]
struct PeerState {
/// A set of awaited PoV-hashes for each relay-parent in the peer's view.
awaited: HashMap>,
}
fn awaiting_message(relay_parent: Hash, awaiting: Vec)
-> protocol_v1::ValidationProtocol
{
protocol_v1::ValidationProtocol::PoVDistribution(
protocol_v1::PoVDistributionMessage::Awaiting(relay_parent, awaiting)
)
}
fn send_pov_message(relay_parent: Hash, pov_hash: Hash, pov: PoV)
-> protocol_v1::ValidationProtocol
{
protocol_v1::ValidationProtocol::PoVDistribution(
protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov)
)
}
/// Handles the signal. If successful, returns `true` if the subsystem should conclude,
/// `false` otherwise.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_signal(
state: &mut State,
ctx: &mut impl SubsystemContext,
signal: OverseerSignal,
) -> SubsystemResult {
match signal {
OverseerSignal::Conclude => Ok(true),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
let _timer = state.metrics.time_handle_signal();
for relay_parent in activated {
match request_validators_ctx(relay_parent.clone(), ctx).await {
Ok(vals_rx) => {
let n_validators = match vals_rx.await? {
Ok(v) => v.len(),
Err(e) => {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Error fetching validators from runtime API for active leaf",
);
// Not adding bookkeeping here might make us behave funny, but we
// shouldn't take down the node on spurious runtime API errors.
//
// and this is "behave funny" as in be bad at our job, but not in any
// slashable or security-related way.
continue;
}
};
state.relay_parent_state.insert(relay_parent, BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators,
});
}
Err(e) => {
// continue here also as above.
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Error fetching validators from runtime API for active leaf",
);
}
}
}
for relay_parent in deactivated {
state.connection_requests.remove(&relay_parent);
state.relay_parent_state.remove(&relay_parent);
}
Ok(false)
}
OverseerSignal::BlockFinalized(..) => Ok(false),
}
}
/// Notify peers that we are awaiting a given PoV hash.
///
/// This only notifies peers who have the relay parent in their view.
#[tracing::instrument(level = "trace", skip(peers, ctx), fields(subsystem = LOG_TARGET))]
async fn notify_all_we_are_awaiting(
peers: &mut HashMap,
ctx: &mut impl SubsystemContext,
relay_parent: Hash,
pov_hash: Hash,
) {
// We use `awaited` as a proxy for which heads are in the peer's view.
let peers_to_send: Vec<_> = peers.iter()
.filter_map(|(peer, state)| if state.awaited.contains_key(&relay_parent) {
Some(peer.clone())
} else {
None
})
.collect();
if peers_to_send.is_empty() {
return;
}
let payload = awaiting_message(relay_parent, vec![pov_hash]);
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send,
payload,
))).await;
}
/// Notify one peer about everything we're awaiting at a given relay-parent.
#[tracing::instrument(level = "trace", skip(ctx, relay_parent_state), fields(subsystem = LOG_TARGET))]
async fn notify_one_we_are_awaiting_many(
peer: &PeerId,
ctx: &mut impl SubsystemContext,
relay_parent_state: &HashMap,
relay_parent: Hash,
) {
let awaiting_hashes = relay_parent_state.get(&relay_parent).into_iter().flat_map(|s| {
// Send the peer everything we are fetching at this relay-parent
s.fetching.iter()
.filter(|(_, senders)| !senders.is_empty()) // that has not been completed already.
.map(|(pov_hash, _)| *pov_hash)
}).collect::>();
if awaiting_hashes.is_empty() {
return;
}
let payload = awaiting_message(relay_parent, awaiting_hashes);
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
vec![peer.clone()],
payload,
))).await;
}
/// Distribute a PoV to peers who are awaiting it.
#[tracing::instrument(level = "trace", skip(peers, ctx, metrics, pov), fields(subsystem = LOG_TARGET))]
async fn distribute_to_awaiting(
peers: &mut HashMap,
ctx: &mut impl SubsystemContext,
metrics: &Metrics,
relay_parent: Hash,
pov_hash: Hash,
pov: &PoV,
) {
// Send to all peers who are awaiting the PoV and have that relay-parent in their view.
//
// Also removes it from their awaiting set.
let peers_to_send: Vec<_> = peers.iter_mut()
.filter_map(|(peer, state)| state.awaited.get_mut(&relay_parent).and_then(|awaited| {
if awaited.remove(&pov_hash) {
Some(peer.clone())
} else {
None
}
}))
.collect();
if peers_to_send.is_empty() { return; }
let payload = send_pov_message(relay_parent, pov_hash, pov.clone());
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send,
payload,
))).await;
metrics.on_pov_distributed();
}
/// Get the Id of the Core that is assigned to the para being collated on if any
/// and the total number of cores.
async fn determine_core(
ctx: &mut impl SubsystemContext,
para_id: ParaId,
relay_parent: Hash,
) -> error::Result