// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! The Statement Distribution Subsystem.
//!
//! This is responsible for distributing signed statements about candidate
//! validity amongst validators.
#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]
use polkadot_subsystem::{
Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem,
ActiveLeavesUpdate, FromOverseer, OverseerSignal,
messages::{
AllMessages, NetworkBridgeMessage, StatementDistributionMessage, CandidateBackingMessage,
RuntimeApiMessage, RuntimeApiRequest,
},
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
};
use node_primitives::SignedFullStatement;
use polkadot_primitives::v1::{
Hash, CompactStatement, ValidatorIndex, ValidatorId, SigningContext, ValidatorSignature, CandidateHash,
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, ReputationChange as Rep, NetworkBridgeEvent,
};
use futures::prelude::*;
use futures::channel::{mpsc, oneshot};
use indexmap::IndexSet;
use std::collections::{HashMap, HashSet};
const COST_UNEXPECTED_STATEMENT: Rep = Rep::new(-100, "Unexpected Statement");
const COST_INVALID_SIGNATURE: Rep = Rep::new(-500, "Invalid Statement Signature");
const COST_DUPLICATE_STATEMENT: Rep = Rep::new(-250, "Statement sent more than once by peer");
const COST_APPARENT_FLOOD: Rep = Rep::new(-1000, "Peer appears to be flooding us with statements");
const BENEFIT_VALID_STATEMENT: Rep = Rep::new(5, "Peer provided a valid statement");
const BENEFIT_VALID_STATEMENT_FIRST: Rep = Rep::new(
25,
"Peer was the first to provide a valid statement",
);
/// The maximum amount of candidates each validator is allowed to second at any relay-parent.
/// Short for "Validator Candidate Threshold".
///
/// This is the amount of candidates we keep per validator at any relay-parent.
/// Typically we will only keep 1, but when a validator equivocates we will need to track 2.
const VC_THRESHOLD: usize = 2;
const LOG_TARGET: &str = "statement_distribution";
/// The statement distribution subsystem.
pub struct StatementDistribution {
// Prometheus metrics
metrics: Metrics,
}
impl Subsystem for StatementDistribution
where C: SubsystemContext
{
fn start(self, ctx: C) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run`.
SpawnedSubsystem {
name: "statement-distribution-subsystem",
future: self.run(ctx).boxed(),
}
}
}
impl StatementDistribution {
/// Create a new Statement Distribution Subsystem
pub fn new(metrics: Metrics) -> StatementDistribution {
StatementDistribution {
metrics,
}
}
}
/// Tracks our impression of a single peer's view of the candidates a validator has seconded
/// for a given relay-parent.
///
/// It is expected to receive at most `VC_THRESHOLD` from us and be aware of at most `VC_THRESHOLD`
/// via other means.
#[derive(Default)]
struct VcPerPeerTracker {
local_observed: arrayvec::ArrayVec<[CandidateHash; VC_THRESHOLD]>,
remote_observed: arrayvec::ArrayVec<[CandidateHash; VC_THRESHOLD]>,
}
impl VcPerPeerTracker {
/// Note that the remote should now be aware that a validator has seconded a given candidate (by hash)
/// based on a message that we have sent it from our local pool.
fn note_local(&mut self, h: CandidateHash) {
if !note_hash(&mut self.local_observed, h) {
tracing::warn!("Statement distribution is erroneously attempting to distribute more \
than {} candidate(s) per validator index. Ignoring", VC_THRESHOLD);
}
}
/// Note that the remote should now be aware that a validator has seconded a given candidate (by hash)
/// based on a message that it has sent us.
///
/// Returns `true` if the peer was allowed to send us such a message, `false` otherwise.
fn note_remote(&mut self, h: CandidateHash) -> bool {
note_hash(&mut self.remote_observed, h)
}
}
fn note_hash(
observed: &mut arrayvec::ArrayVec<[CandidateHash; VC_THRESHOLD]>,
h: CandidateHash,
) -> bool {
if observed.contains(&h) { return true; }
observed.try_push(h).is_ok()
}
/// knowledge that a peer has about goings-on in a relay parent.
#[derive(Default)]
struct PeerRelayParentKnowledge {
/// candidates that the peer is aware of. This indicates that we can
/// send other statements pertaining to that candidate.
known_candidates: HashSet,
/// fingerprints of all statements a peer should be aware of: those that
/// were sent to the peer by us.
sent_statements: HashSet<(CompactStatement, ValidatorIndex)>,
/// fingerprints of all statements a peer should be aware of: those that
/// were sent to us by the peer.
received_statements: HashSet<(CompactStatement, ValidatorIndex)>,
/// How many candidates this peer is aware of for each given validator index.
seconded_counts: HashMap,
/// How many statements we've received for each candidate that we're aware of.
received_message_count: HashMap,
}
impl PeerRelayParentKnowledge {
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based
/// on something that we would like to send to the peer.
///
/// This returns `None` if the peer cannot accept this statement, without altering internal
/// state.
///
/// If the peer can accept the statement, this returns `Some` and updates the internal state.
/// Once the knowledge has incorporated a statement, it cannot be incorporated again.
///
/// This returns `Some(true)` if this is the first time the peer has become aware of a
/// candidate with the given hash.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> Option {
let already_known = self.sent_statements.contains(fingerprint)
|| self.received_statements.contains(fingerprint);
if already_known {
return None;
}
let new_known = match fingerprint.0 {
CompactStatement::Candidate(ref h) => {
self.seconded_counts.entry(fingerprint.1)
.or_default()
.note_local(h.clone());
self.known_candidates.insert(h.clone())
},
CompactStatement::Valid(ref h) | CompactStatement::Invalid(ref h) => {
// The peer can only accept Valid and Invalid statements for which it is aware
// of the corresponding candidate.
if !self.known_candidates.contains(h) {
return None;
}
false
}
};
self.sent_statements.insert(fingerprint.clone());
Some(new_known)
}
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on
/// a message we are receiving from the peer.
///
/// Provide the maximum message count that we can receive per candidate. In practice we should
/// not receive more statements for any one candidate than there are members in the group assigned
/// to that para, but this maximum needs to be lenient to account for equivocations that may be
/// cross-group. As such, a maximum of 2 * n_validators is recommended.
///
/// This returns an error if the peer should not have sent us this message according to protocol
/// rules for flood protection.
///
/// If this returns `Ok`, the internal state has been altered. After `receive`ing a new
/// candidate, we are then cleared to send the peer further statements about that candidate.
///
/// This returns `Ok(true)` if this is the first time the peer has become aware of a
/// candidate with given hash.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn receive(
&mut self,
fingerprint: &(CompactStatement, ValidatorIndex),
max_message_count: usize,
) -> Result {
// We don't check `sent_statements` because a statement could be in-flight from both
// sides at the same time.
if self.received_statements.contains(fingerprint) {
return Err(COST_DUPLICATE_STATEMENT);
}
let candidate_hash = match fingerprint.0 {
CompactStatement::Candidate(ref h) => {
let allowed_remote = self.seconded_counts.entry(fingerprint.1)
.or_insert_with(Default::default)
.note_remote(h.clone());
if !allowed_remote {
return Err(COST_UNEXPECTED_STATEMENT);
}
h
}
CompactStatement::Valid(ref h)| CompactStatement::Invalid(ref h) => {
if !self.known_candidates.contains(&h) {
return Err(COST_UNEXPECTED_STATEMENT);
}
h
}
};
{
let received_per_candidate = self.received_message_count
.entry(*candidate_hash)
.or_insert(0);
if *received_per_candidate >= max_message_count {
return Err(COST_APPARENT_FLOOD);
}
*received_per_candidate += 1;
}
self.received_statements.insert(fingerprint.clone());
Ok(self.known_candidates.insert(candidate_hash.clone()))
}
}
struct PeerData {
view: View,
view_knowledge: HashMap,
}
impl PeerData {
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based
/// on something that we would like to send to the peer.
///
/// This returns `None` if the peer cannot accept this statement, without altering internal
/// state.
///
/// If the peer can accept the statement, this returns `Some` and updates the internal state.
/// Once the knowledge has incorporated a statement, it cannot be incorporated again.
///
/// This returns `Some(true)` if this is the first time the peer has become aware of a
/// candidate with the given hash.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn send(
&mut self,
relay_parent: &Hash,
fingerprint: &(CompactStatement, ValidatorIndex),
) -> Option {
self.view_knowledge.get_mut(relay_parent).map_or(None, |k| k.send(fingerprint))
}
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on
/// a message we are receiving from the peer.
///
/// Provide the maximum message count that we can receive per candidate. In practice we should
/// not receive more statements for any one candidate than there are members in the group assigned
/// to that para, but this maximum needs to be lenient to account for equivocations that may be
/// cross-group. As such, a maximum of 2 * n_validators is recommended.
///
/// This returns an error if the peer should not have sent us this message according to protocol
/// rules for flood protection.
///
/// If this returns `Ok`, the internal state has been altered. After `receive`ing a new
/// candidate, we are then cleared to send the peer further statements about that candidate.
///
/// This returns `Ok(true)` if this is the first time the peer has become aware of a
/// candidate with given hash.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn receive(
&mut self,
relay_parent: &Hash,
fingerprint: &(CompactStatement, ValidatorIndex),
max_message_count: usize,
) -> Result {
self.view_knowledge
.get_mut(relay_parent)
.ok_or(COST_UNEXPECTED_STATEMENT)?
.receive(fingerprint, max_message_count)
}
}
// A statement stored while a relay chain head is active.
#[derive(Debug)]
struct StoredStatement {
comparator: StoredStatementComparator,
statement: SignedFullStatement,
}
// A value used for comparison of stored statements to each other.
//
// The compact version of the statement, the validator index, and the signature of the validator
// is enough to differentiate between all types of equivocations, as long as the signature is
// actually checked to be valid. The same statement with 2 signatures and 2 statements with
// different (or same) signatures wll all be correctly judged to be unequal with this comparator.
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
struct StoredStatementComparator {
compact: CompactStatement,
validator_index: ValidatorIndex,
signature: ValidatorSignature,
}
impl StoredStatement {
fn compact(&self) -> &CompactStatement {
&self.comparator.compact
}
fn fingerprint(&self) -> (CompactStatement, ValidatorIndex) {
(self.comparator.compact.clone(), self.statement.validator_index())
}
}
impl std::borrow::Borrow for StoredStatement {
fn borrow(&self) -> &StoredStatementComparator {
&self.comparator
}
}
impl std::hash::Hash for StoredStatement {
fn hash(&self, state: &mut H) {
self.comparator.hash(state)
}
}
impl std::cmp::PartialEq for StoredStatement {
fn eq(&self, other: &Self) -> bool {
&self.comparator == &other.comparator
}
}
impl std::cmp::Eq for StoredStatement {}
#[derive(Debug)]
enum NotedStatement<'a> {
NotUseful,
Fresh(&'a StoredStatement),
UsefulButKnown
}
struct ActiveHeadData {
/// All candidates we are aware of for this head, keyed by hash.
candidates: HashSet,
/// Stored statements for circulation to peers.
///
/// These are iterable in insertion order, and `Seconded` statements are always
/// accepted before dependent statements.
statements: IndexSet,
/// The validators at this head.
validators: Vec,
/// The session index this head is at.
session_index: sp_staking::SessionIndex,
/// How many `Seconded` statements we've seen per validator.
seconded_counts: HashMap,
}
impl ActiveHeadData {
fn new(validators: Vec, session_index: sp_staking::SessionIndex) -> Self {
ActiveHeadData {
candidates: Default::default(),
statements: Default::default(),
validators,
session_index,
seconded_counts: Default::default(),
}
}
/// Note the given statement.
///
/// If it was not already known and can be accepted, returns `NotedStatement::Fresh`,
/// with a handle to the statement.
///
/// If it can be accepted, but we already know it, returns `NotedStatement::UsefulButKnown`.
///
/// We accept up to `VC_THRESHOLD` (2 at time of writing) `Seconded` statements
/// per validator. These will be the first ones we see. The statement is assumed
/// to have been checked, including that the validator index is not out-of-bounds and
/// the signature is valid.
///
/// Any other statements or those that reference a candidate we are not aware of cannot be accepted
/// and will return `NotedStatement::NotUseful`.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn note_statement(&mut self, statement: SignedFullStatement) -> NotedStatement {
let validator_index = statement.validator_index();
let comparator = StoredStatementComparator {
compact: statement.payload().to_compact(),
validator_index,
signature: statement.signature().clone(),
};
let stored = StoredStatement {
comparator: comparator.clone(),
statement,
};
match comparator.compact {
CompactStatement::Candidate(h) => {
let seconded_so_far = self.seconded_counts.entry(validator_index).or_insert(0);
if *seconded_so_far >= VC_THRESHOLD {
return NotedStatement::NotUseful;
}
self.candidates.insert(h);
if self.statements.insert(stored) {
*seconded_so_far += 1;
// This will always return `Some` because it was just inserted.
NotedStatement::Fresh(self.statements.get(&comparator)
.expect("Statement was just inserted; qed"))
} else {
NotedStatement::UsefulButKnown
}
}
CompactStatement::Valid(h) | CompactStatement::Invalid(h) => {
if !self.candidates.contains(&h) {
return NotedStatement::NotUseful;
}
if self.statements.insert(stored) {
// This will always return `Some` because it was just inserted.
NotedStatement::Fresh(self.statements.get(&comparator)
.expect("Statement was just inserted; qed"))
} else {
NotedStatement::UsefulButKnown
}
}
}
}
/// Get an iterator over all statements for the active head. Seconded statements come first.
fn statements(&self) -> impl Iterator- + '_ {
self.statements.iter()
}
/// Get an iterator over all statements for the active head that are for a particular candidate.
fn statements_about(&self, candidate_hash: CandidateHash)
-> impl Iterator
- + '_
{
self.statements().filter(move |s| s.compact().candidate_hash() == &candidate_hash)
}
}
/// Check a statement signature under this parent hash.
fn check_statement_signature(
head: &ActiveHeadData,
relay_parent: Hash,
statement: &SignedFullStatement,
) -> Result<(), ()> {
let signing_context = SigningContext {
session_index: head.session_index,
parent_hash: relay_parent,
};
head.validators.get(statement.validator_index() as usize)
.ok_or(())
.and_then(|v| statement.check_signature(&signing_context, v))
}
type StatementListeners = Vec>;
/// Informs all registered listeners about a newly received statement.
///
/// Removes all closed listeners.
#[tracing::instrument(level = "trace", skip(listeners), fields(subsystem = LOG_TARGET))]
async fn inform_statement_listeners(
statement: &SignedFullStatement,
listeners: &mut StatementListeners,
) {
// Ignore the errors since these will be removed later.
stream::iter(listeners.iter_mut()).for_each_concurrent(
None,
|listener| async move {
let _ = listener.send(statement.clone()).await;
}
).await;
// Remove any closed listeners.
listeners.retain(|tx| !tx.is_closed());
}
/// Places the statement in storage if it is new, and then
/// circulates the statement to all peers who have not seen it yet, and
/// sends all statements dependent on that statement to peers who could previously not receive
/// them but now can.
#[tracing::instrument(level = "trace", skip(peers, ctx, active_heads, metrics), fields(subsystem = LOG_TARGET))]
async fn circulate_statement_and_dependents(
peers: &mut HashMap,
active_heads: &mut HashMap,
ctx: &mut impl SubsystemContext,
relay_parent: Hash,
statement: SignedFullStatement,
metrics: &Metrics,
) {
let active_head = match active_heads.get_mut(&relay_parent) {
Some(res) => res,
None => return,
};
// First circulate the statement directly to all peers needing it.
// The borrow of `active_head` needs to encompass only this (Rust) statement.
let outputs: Option<(CandidateHash, Vec)> = {
match active_head.note_statement(statement) {
NotedStatement::Fresh(stored) => Some((
*stored.compact().candidate_hash(),
circulate_statement(peers, ctx, relay_parent, stored).await,
)),
_ => None,
}
};
// Now send dependent statements to all peers needing them, if any.
if let Some((candidate_hash, peers_needing_dependents)) = outputs {
for peer in peers_needing_dependents {
if let Some(peer_data) = peers.get_mut(&peer) {
// defensive: the peer data should always be some because the iterator
// of peers is derived from the set of peers.
send_statements_about(
peer,
peer_data,
ctx,
relay_parent,
candidate_hash,
&*active_head,
metrics,
).await;
}
}
}
}
fn statement_message(relay_parent: Hash, statement: SignedFullStatement)
-> protocol_v1::ValidationProtocol
{
protocol_v1::ValidationProtocol::StatementDistribution(
protocol_v1::StatementDistributionMessage::Statement(relay_parent, statement)
)
}
/// Circulates a statement to all peers who have not seen it yet, and returns
/// an iterator over peers who need to have dependent statements sent.
#[tracing::instrument(level = "trace", skip(peers, ctx), fields(subsystem = LOG_TARGET))]
async fn circulate_statement(
peers: &mut HashMap,
ctx: &mut impl SubsystemContext,
relay_parent: Hash,
stored: &StoredStatement,
) -> Vec {
let fingerprint = stored.fingerprint();
let mut peers_to_send = HashMap::new();
for (peer, data) in peers.iter_mut() {
if let Some(new_known) = data.send(&relay_parent, &fingerprint) {
peers_to_send.insert(peer.clone(), new_known);
}
}
// Send all these peers the initial statement.
if !peers_to_send.is_empty() {
let payload = statement_message(relay_parent, stored.statement.clone());
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send.keys().cloned().collect(),
payload,
))).await;
}
peers_to_send.into_iter().filter_map(|(peer, needs_dependent)| if needs_dependent {
Some(peer)
} else {
None
}).collect()
}
/// Send all statements about a given candidate hash to a peer.
#[tracing::instrument(level = "trace", skip(peer_data, ctx, active_head, metrics), fields(subsystem = LOG_TARGET))]
async fn send_statements_about(
peer: PeerId,
peer_data: &mut PeerData,
ctx: &mut impl SubsystemContext,
relay_parent: Hash,
candidate_hash: CandidateHash,
active_head: &ActiveHeadData,
metrics: &Metrics,
) {
for statement in active_head.statements_about(candidate_hash) {
if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() {
let payload = statement_message(
relay_parent,
statement.statement.clone(),
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
metrics.on_statement_distributed();
}
}
}
/// Send all statements at a given relay-parent to a peer.
#[tracing::instrument(level = "trace", skip(peer_data, ctx, active_head, metrics), fields(subsystem = LOG_TARGET))]
async fn send_statements(
peer: PeerId,
peer_data: &mut PeerData,
ctx: &mut impl SubsystemContext,
relay_parent: Hash,
active_head: &ActiveHeadData,
metrics: &Metrics,
) {
for statement in active_head.statements() {
if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() {
let payload = statement_message(
relay_parent,
statement.statement.clone(),
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
metrics.on_statement_distributed();
}
}
}
async fn report_peer(
ctx: &mut impl SubsystemContext,
peer: PeerId,
rep: Rep,
) {
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
)).await
}
// Handle an incoming wire message. Returns a reference to a newly-stored statement
// if we were not already aware of it, along with the corresponding relay-parent.
//
// This function checks the signature and ensures the statement is compatible with our
// view.
#[tracing::instrument(level = "trace", skip(peer_data, ctx, active_heads, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_incoming_message<'a>(
peer: PeerId,
peer_data: &mut PeerData,
our_view: &View,
active_heads: &'a mut HashMap,
ctx: &mut impl SubsystemContext,
message: protocol_v1::StatementDistributionMessage,
metrics: &Metrics,
statement_listeners: &mut StatementListeners,
) -> Option<(Hash, &'a StoredStatement)> {
let (relay_parent, statement) = match message {
protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s),
};
if !our_view.contains(&relay_parent) {
report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await;
return None;
}
let active_head = match active_heads.get_mut(&relay_parent) {
Some(h) => h,
None => {
// This should never be out-of-sync with our view if the view updates
// correspond to actual `StartWork` messages. So we just log and ignore.
tracing::warn!(
requested_relay_parent = %relay_parent,
"our view out-of-sync with active heads; head not found",
);
return None;
}
};
// check the signature on the statement.
if let Err(()) = check_statement_signature(&active_head, relay_parent, &statement) {
report_peer(ctx, peer, COST_INVALID_SIGNATURE).await;
return None;
}
// Ensure the statement is stored in the peer data.
//
// Note that if the peer is sending us something that is not within their view,
// it will not be kept within their log.
let fingerprint = (statement.payload().to_compact(), statement.validator_index());
let max_message_count = active_head.validators.len() * 2;
match peer_data.receive(&relay_parent, &fingerprint, max_message_count) {
Err(rep) => {
report_peer(ctx, peer, rep).await;
return None;
}
Ok(true) => {
// Send the peer all statements concerning the candidate that we have,
// since it appears to have just learned about the candidate.
send_statements_about(
peer.clone(),
peer_data,
ctx,
relay_parent,
fingerprint.0.candidate_hash().clone(),
&*active_head,
metrics,
).await;
}
Ok(false) => {}
}
inform_statement_listeners(&statement, statement_listeners).await;
// Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation
// or unpinned to a seconded candidate. So it is safe to place it into the storage.
match active_head.note_statement(statement) {
NotedStatement::NotUseful => None,
NotedStatement::UsefulButKnown => {
report_peer(ctx, peer, BENEFIT_VALID_STATEMENT).await;
None
}
NotedStatement::Fresh(statement) => {
report_peer(ctx, peer, BENEFIT_VALID_STATEMENT_FIRST).await;
Some((relay_parent, statement))
}
}
}
/// Update a peer's view. Sends all newly unlocked statements based on the previous
#[tracing::instrument(level = "trace", skip(peer_data, ctx, active_heads, metrics), fields(subsystem = LOG_TARGET))]
async fn update_peer_view_and_send_unlocked(
peer: PeerId,
peer_data: &mut PeerData,
ctx: &mut impl SubsystemContext,
active_heads: &HashMap,
new_view: View,
metrics: &Metrics,
) {
let old_view = std::mem::replace(&mut peer_data.view, new_view);
// Remove entries for all relay-parents in the old view but not the new.
for removed in old_view.difference(&peer_data.view) {
let _ = peer_data.view_knowledge.remove(removed);
}
// Add entries for all relay-parents in the new view but not the old.
// Furthermore, send all statements we have for those relay parents.
let new_view = peer_data.view.difference(&old_view).copied().collect::>();
for new in new_view.iter().copied() {
peer_data.view_knowledge.insert(new, Default::default());
if let Some(active_head) = active_heads.get(&new) {
send_statements(
peer.clone(),
peer_data,
ctx,
new,
active_head,
metrics,
).await;
}
}
}
#[tracing::instrument(level = "trace", skip(peers, active_heads, ctx, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_network_update(
peers: &mut HashMap,
active_heads: &mut HashMap,
ctx: &mut impl SubsystemContext,
our_view: &mut View,
update: NetworkBridgeEvent,
metrics: &Metrics,
statement_listeners: &mut StatementListeners,
) {
match update {
NetworkBridgeEvent::PeerConnected(peer, _role) => {
peers.insert(peer, PeerData {
view: Default::default(),
view_knowledge: Default::default(),
});
}
NetworkBridgeEvent::PeerDisconnected(peer) => {
peers.remove(&peer);
}
NetworkBridgeEvent::PeerMessage(peer, message) => {
match peers.get_mut(&peer) {
Some(data) => {
let new_stored = handle_incoming_message(
peer,
data,
&*our_view,
active_heads,
ctx,
message,
metrics,
statement_listeners,
).await;
if let Some((relay_parent, new)) = new_stored {
// When we receive a new message from a peer, we forward it to the
// candidate backing subsystem.
let message = AllMessages::CandidateBacking(
CandidateBackingMessage::Statement(relay_parent, new.statement.clone())
);
ctx.send_message(message).await;
}
}
None => (),
}
}
NetworkBridgeEvent::PeerViewChange(peer, view) => {
match peers.get_mut(&peer) {
Some(data) => {
update_peer_view_and_send_unlocked(
peer,
data,
ctx,
&*active_heads,
view,
metrics,
).await
}
None => (),
}
}
NetworkBridgeEvent::OurViewChange(view) => {
let old_view = std::mem::replace(our_view, view);
active_heads.retain(|head, _| our_view.contains(head));
for new in our_view.difference(&old_view) {
if !active_heads.contains_key(&new) {
tracing::warn!(
target: LOG_TARGET,
unknown_hash = %new,
"Our network bridge view update \
inconsistent with `StartWork` messages we have received from overseer. \
Contains unknown hash.",
);
}
}
}
}
}
impl StatementDistribution {
#[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))]
async fn run(
self,
mut ctx: impl SubsystemContext,
) -> SubsystemResult<()> {
let mut peers: HashMap = HashMap::new();
let mut our_view = View::default();
let mut active_heads: HashMap = HashMap::new();
let mut statement_listeners = StatementListeners::new();
let metrics = self.metrics;
loop {
let message = ctx.recv().await?;
match message {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => {
let _timer = metrics.time_active_leaves_update();
for relay_parent in activated {
let (validators, session_index) = {
let (val_tx, val_rx) = oneshot::channel();
let (session_tx, session_rx) = oneshot::channel();
let val_message = AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::Validators(val_tx),
),
);
let session_message = AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionIndexForChild(session_tx),
),
);
ctx.send_messages(
std::iter::once(val_message).chain(std::iter::once(session_message))
).await;
match (val_rx.await?, session_rx.await?) {
(Ok(v), Ok(s)) => (v, s),
(Err(e), _) | (_, Err(e)) => {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to fetch runtime API data for active leaf",
);
// Lacking this bookkeeping might make us behave funny, although
// not in any slashable way. But we shouldn't take down the node
// on what are likely spurious runtime API errors.
continue;
}
}
};
active_heads.entry(relay_parent)
.or_insert(ActiveHeadData::new(validators, session_index));
}
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(_block_hash)) => {
// do nothing
}
FromOverseer::Signal(OverseerSignal::Conclude) => break,
FromOverseer::Communication { msg } => match msg {
StatementDistributionMessage::Share(relay_parent, statement) => {
let _timer = metrics.time_share();
inform_statement_listeners(
&statement,
&mut statement_listeners,
).await;
circulate_statement_and_dependents(
&mut peers,
&mut active_heads,
&mut ctx,
relay_parent,
statement,
&metrics,
).await;
}
StatementDistributionMessage::NetworkBridgeUpdateV1(event) => {
let _timer = metrics.time_network_bridge_update_v1();
handle_network_update(
&mut peers,
&mut active_heads,
&mut ctx,
&mut our_view,
event,
&metrics,
&mut statement_listeners,
).await;
}
StatementDistributionMessage::RegisterStatementListener(tx) => {
statement_listeners.push(tx);
}
}
}
}
Ok(())
}
}
#[derive(Clone)]
struct MetricsInner {
statements_distributed: prometheus::Counter,
active_leaves_update: prometheus::Histogram,
share: prometheus::Histogram,
network_bridge_update_v1: prometheus::Histogram,
}
/// Statement Distribution metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option);
impl Metrics {
fn on_statement_distributed(&self) {
if let Some(metrics) = &self.0 {
metrics.statements_distributed.inc();
}
}
/// Provide a timer for `active_leaves_update` which observes on drop.
fn time_active_leaves_update(&self) -> Option {
self.0.as_ref().map(|metrics| metrics.active_leaves_update.start_timer())
}
/// Provide a timer for `share` which observes on drop.
fn time_share(&self) -> Option {
self.0.as_ref().map(|metrics| metrics.share.start_timer())
}
/// Provide a timer for `network_bridge_update_v1` which observes on drop.
fn time_network_bridge_update_v1(&self) -> Option {
self.0.as_ref().map(|metrics| metrics.network_bridge_update_v1.start_timer())
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> std::result::Result {
let metrics = MetricsInner {
statements_distributed: prometheus::register(
prometheus::Counter::new(
"parachain_statements_distributed_total",
"Number of candidate validity statements distributed to other peers."
)?,
registry,
)?,
active_leaves_update: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_statement_distribution_active_leaves_update",
"Time spent within `statement_distribution::active_leaves_update`",
)
)?,
registry,
)?,
share: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_statement_distribution_share",
"Time spent within `statement_distribution::share`",
)
)?,
registry,
)?,
network_bridge_update_v1: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_statement_distribution_network_bridge_update_v1",
"Time spent within `statement_distribution::network_bridge_update_v1`",
)
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use sp_keyring::Sr25519Keyring;
use sp_application_crypto::AppKey;
use node_primitives::Statement;
use polkadot_primitives::v1::CommittedCandidateReceipt;
use assert_matches::assert_matches;
use futures::executor::{self, block_on};
use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore};
use sc_keystore::LocalKeystore;
#[test]
fn active_head_accepts_only_2_seconded_per_validator() {
let validators = vec![
Sr25519Keyring::Alice.public().into(),
Sr25519Keyring::Bob.public().into(),
Sr25519Keyring::Charlie.public().into(),
];
let parent_hash: Hash = [1; 32].into();
let session_index = 1;
let signing_context = SigningContext {
parent_hash,
session_index,
};
let candidate_a = {
let mut c = CommittedCandidateReceipt::default();
c.descriptor.relay_parent = parent_hash;
c.descriptor.para_id = 1.into();
c
};
let candidate_b = {
let mut c = CommittedCandidateReceipt::default();
c.descriptor.relay_parent = parent_hash;
c.descriptor.para_id = 2.into();
c
};
let candidate_c = {
let mut c = CommittedCandidateReceipt::default();
c.descriptor.relay_parent = parent_hash;
c.descriptor.para_id = 3.into();
c
};
let mut head_data = ActiveHeadData::new(validators, session_index);
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = SyncCryptoStore::sr25519_generate_new(
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
).unwrap();
let bob_public = SyncCryptoStore::sr25519_generate_new(
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Bob.to_seed())
).unwrap();
// note A
let a_seconded_val_0 = block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate_a.clone()),
&signing_context,
0,
&alice_public.into(),
)).expect("should be signed");
let noted = head_data.note_statement(a_seconded_val_0.clone());
assert_matches!(noted, NotedStatement::Fresh(_));
// note A (duplicate)
let noted = head_data.note_statement(a_seconded_val_0);
assert_matches!(noted, NotedStatement::UsefulButKnown);
// note B
let noted = head_data.note_statement(block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate_b.clone()),
&signing_context,
0,
&alice_public.into(),
)).expect("should be signed"));
assert_matches!(noted, NotedStatement::Fresh(_));
// note C (beyond 2 - ignored)
let noted = head_data.note_statement(block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate_c.clone()),
&signing_context,
0,
&alice_public.into(),
)).expect("should be signed"));
assert_matches!(noted, NotedStatement::NotUseful);
// note B (new validator)
let noted = head_data.note_statement(block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate_b.clone()),
&signing_context,
1,
&bob_public.into(),
)).expect("should be signed"));
assert_matches!(noted, NotedStatement::Fresh(_));
// note C (new validator)
let noted = head_data.note_statement(block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate_c.clone()),
&signing_context,
1,
&bob_public.into(),
)).expect("should be signed"));
assert_matches!(noted, NotedStatement::Fresh(_));
}
#[test]
fn note_local_works() {
let hash_a = CandidateHash([1; 32].into());
let hash_b = CandidateHash([2; 32].into());
let mut per_peer_tracker = VcPerPeerTracker::default();
per_peer_tracker.note_local(hash_a.clone());
per_peer_tracker.note_local(hash_b.clone());
assert!(per_peer_tracker.local_observed.contains(&hash_a));
assert!(per_peer_tracker.local_observed.contains(&hash_b));
assert!(!per_peer_tracker.remote_observed.contains(&hash_a));
assert!(!per_peer_tracker.remote_observed.contains(&hash_b));
}
#[test]
fn note_remote_works() {
let hash_a = CandidateHash([1; 32].into());
let hash_b = CandidateHash([2; 32].into());
let hash_c = CandidateHash([3; 32].into());
let mut per_peer_tracker = VcPerPeerTracker::default();
assert!(per_peer_tracker.note_remote(hash_a.clone()));
assert!(per_peer_tracker.note_remote(hash_b.clone()));
assert!(!per_peer_tracker.note_remote(hash_c.clone()));
assert!(per_peer_tracker.remote_observed.contains(&hash_a));
assert!(per_peer_tracker.remote_observed.contains(&hash_b));
assert!(!per_peer_tracker.remote_observed.contains(&hash_c));
assert!(!per_peer_tracker.local_observed.contains(&hash_a));
assert!(!per_peer_tracker.local_observed.contains(&hash_b));
assert!(!per_peer_tracker.local_observed.contains(&hash_c));
}
#[test]
fn per_peer_relay_parent_knowledge_send() {
let mut knowledge = PeerRelayParentKnowledge::default();
let hash_a = CandidateHash([1; 32].into());
// Sending an un-pinned statement should not work and should have no effect.
assert!(knowledge.send(&(CompactStatement::Valid(hash_a), 0)).is_none());
assert!(!knowledge.known_candidates.contains(&hash_a));
assert!(knowledge.sent_statements.is_empty());
assert!(knowledge.received_statements.is_empty());
assert!(knowledge.seconded_counts.is_empty());
assert!(knowledge.received_message_count.is_empty());
// Make the peer aware of the candidate.
assert_eq!(knowledge.send(&(CompactStatement::Candidate(hash_a), 0)), Some(true));
assert_eq!(knowledge.send(&(CompactStatement::Candidate(hash_a), 1)), Some(false));
assert!(knowledge.known_candidates.contains(&hash_a));
assert_eq!(knowledge.sent_statements.len(), 2);
assert!(knowledge.received_statements.is_empty());
assert_eq!(knowledge.seconded_counts.len(), 2);
assert!(knowledge.received_message_count.get(&hash_a).is_none());
// And now it should accept the dependent message.
assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), 0)), Some(false));
assert!(knowledge.known_candidates.contains(&hash_a));
assert_eq!(knowledge.sent_statements.len(), 3);
assert!(knowledge.received_statements.is_empty());
assert_eq!(knowledge.seconded_counts.len(), 2);
assert!(knowledge.received_message_count.get(&hash_a).is_none());
}
#[test]
fn cant_send_after_receiving() {
let mut knowledge = PeerRelayParentKnowledge::default();
let hash_a = CandidateHash([1; 32].into());
assert!(knowledge.receive(&(CompactStatement::Candidate(hash_a), 0), 3).unwrap());
assert!(knowledge.send(&(CompactStatement::Candidate(hash_a), 0)).is_none());
}
#[test]
fn per_peer_relay_parent_knowledge_receive() {
let mut knowledge = PeerRelayParentKnowledge::default();
let hash_a = CandidateHash([1; 32].into());
assert_eq!(
knowledge.receive(&(CompactStatement::Valid(hash_a), 0), 3),
Err(COST_UNEXPECTED_STATEMENT),
);
assert_eq!(
knowledge.receive(&(CompactStatement::Candidate(hash_a), 0), 3),
Ok(true),
);
// Push statements up to the flood limit.
assert_eq!(
knowledge.receive(&(CompactStatement::Valid(hash_a), 1), 3),
Ok(false),
);
assert!(knowledge.known_candidates.contains(&hash_a));
assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 2);
assert_eq!(
knowledge.receive(&(CompactStatement::Valid(hash_a), 2), 3),
Ok(false),
);
assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 3);
assert_eq!(
knowledge.receive(&(CompactStatement::Valid(hash_a), 7), 3),
Err(COST_APPARENT_FLOOD),
);
assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 3);
assert_eq!(knowledge.received_statements.len(), 3); // number of prior `Ok`s.
// Now make sure that the seconding limit is respected.
let hash_b = CandidateHash([2; 32].into());
let hash_c = CandidateHash([3; 32].into());
assert_eq!(
knowledge.receive(&(CompactStatement::Candidate(hash_b), 0), 3),
Ok(true),
);
assert_eq!(
knowledge.receive(&(CompactStatement::Candidate(hash_c), 0), 3),
Err(COST_UNEXPECTED_STATEMENT),
);
// Last, make sure that already-known statements are disregarded.
assert_eq!(
knowledge.receive(&(CompactStatement::Valid(hash_a), 2), 3),
Err(COST_DUPLICATE_STATEMENT),
);
assert_eq!(
knowledge.receive(&(CompactStatement::Candidate(hash_b), 0), 3),
Err(COST_DUPLICATE_STATEMENT),
);
}
#[test]
fn peer_view_update_sends_messages() {
let hash_a = [1; 32].into();
let hash_b = [2; 32].into();
let hash_c = [3; 32].into();
let candidate = {
let mut c = CommittedCandidateReceipt::default();
c.descriptor.relay_parent = hash_c;
c.descriptor.para_id = 1.into();
c
};
let candidate_hash = candidate.hash();
let old_view = View(vec![hash_a, hash_b]);
let new_view = View(vec![hash_b, hash_c]);
let mut active_heads = HashMap::new();
let validators = vec![
Sr25519Keyring::Alice.public().into(),
Sr25519Keyring::Bob.public().into(),
Sr25519Keyring::Charlie.public().into(),
];
let session_index = 1;
let signing_context = SigningContext {
parent_hash: hash_c,
session_index,
};
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = SyncCryptoStore::sr25519_generate_new(
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
).unwrap();
let bob_public = SyncCryptoStore::sr25519_generate_new(
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Bob.to_seed())
).unwrap();
let charlie_public = SyncCryptoStore::sr25519_generate_new(
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Charlie.to_seed())
).unwrap();
let new_head_data = {
let mut data = ActiveHeadData::new(validators, session_index);
let noted = data.note_statement(block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate.clone()),
&signing_context,
0,
&alice_public.into(),
)).expect("should be signed"));
assert_matches!(noted, NotedStatement::Fresh(_));
let noted = data.note_statement(block_on(SignedFullStatement::sign(
&keystore,
Statement::Valid(candidate_hash),
&signing_context,
1,
&bob_public.into(),
)).expect("should be signed"));
assert_matches!(noted, NotedStatement::Fresh(_));
let noted = data.note_statement(block_on(SignedFullStatement::sign(
&keystore,
Statement::Valid(candidate_hash),
&signing_context,
2,
&charlie_public.into(),
)).expect("should be signed"));
assert_matches!(noted, NotedStatement::Fresh(_));
data
};
active_heads.insert(hash_c, new_head_data);
let mut peer_data = PeerData {
view: old_view,
view_knowledge: {
let mut k = HashMap::new();
k.insert(hash_a, Default::default());
k.insert(hash_b, Default::default());
k
},
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let peer = PeerId::random();
executor::block_on(async move {
update_peer_view_and_send_unlocked(
peer.clone(),
&mut peer_data,
&mut ctx,
&active_heads,
new_view.clone(),
&Default::default(),
).await;
assert_eq!(peer_data.view, new_view);
assert!(!peer_data.view_knowledge.contains_key(&hash_a));
assert!(peer_data.view_knowledge.contains_key(&hash_b));
let c_knowledge = peer_data.view_knowledge.get(&hash_c).unwrap();
assert!(c_knowledge.known_candidates.contains(&candidate_hash));
assert!(c_knowledge.sent_statements.contains(
&(CompactStatement::Candidate(candidate_hash), 0)
));
assert!(c_knowledge.sent_statements.contains(
&(CompactStatement::Valid(candidate_hash), 1)
));
assert!(c_knowledge.sent_statements.contains(
&(CompactStatement::Valid(candidate_hash), 2)
));
// now see if we got the 3 messages from the active head data.
let active_head = active_heads.get(&hash_c).unwrap();
// semi-fragile because hashmap iterator ordering is undefined, but in practice
// it will not change between runs of the program.
for statement in active_head.statements_about(candidate_hash) {
let message = handle.recv().await;
let expected_to = vec![peer.clone()];
let expected_payload
= statement_message(hash_c, statement.statement.clone());
assert_matches!(
message,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
to,
payload,
)) => {
assert_eq!(to, expected_to);
assert_eq!(payload, expected_payload)
}
)
}
});
}
#[test]
fn circulated_statement_goes_to_all_peers_with_view() {
let hash_a = [1; 32].into();
let hash_b = [2; 32].into();
let hash_c = [3; 32].into();
let candidate = {
let mut c = CommittedCandidateReceipt::default();
c.descriptor.relay_parent = hash_b;
c.descriptor.para_id = 1.into();
c
};
let peer_a = PeerId::random();
let peer_b = PeerId::random();
let peer_c = PeerId::random();
let peer_a_view = View(vec![hash_a]);
let peer_b_view = View(vec![hash_a, hash_b]);
let peer_c_view = View(vec![hash_b, hash_c]);
let session_index = 1;
let peer_data_from_view = |view: View| PeerData {
view: view.clone(),
view_knowledge: view.0.iter().map(|v| (v.clone(), Default::default())).collect(),
};
let mut peer_data: HashMap<_, _> = vec![
(peer_a.clone(), peer_data_from_view(peer_a_view)),
(peer_b.clone(), peer_data_from_view(peer_b_view)),
(peer_c.clone(), peer_data_from_view(peer_c_view)),
].into_iter().collect();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
let statement = {
let signing_context = SigningContext {
parent_hash: hash_b,
session_index,
};
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = CryptoStore::sr25519_generate_new(
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
).await.unwrap();
let statement = SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate),
&signing_context,
0,
&alice_public.into(),
).await.expect("should be signed");
StoredStatement {
comparator: StoredStatementComparator {
compact: statement.payload().to_compact(),
validator_index: 0,
signature: statement.signature().clone()
},
statement,
}
};
let needs_dependents = circulate_statement(
&mut peer_data,
&mut ctx,
hash_b,
&statement,
).await;
{
assert_eq!(needs_dependents.len(), 2);
assert!(needs_dependents.contains(&peer_b));
assert!(needs_dependents.contains(&peer_c));
}
let fingerprint = (statement.compact().clone(), 0);
assert!(
peer_data.get(&peer_b).unwrap()
.view_knowledge.get(&hash_b).unwrap()
.sent_statements.contains(&fingerprint),
);
assert!(
peer_data.get(&peer_c).unwrap()
.view_knowledge.get(&hash_b).unwrap()
.sent_statements.contains(&fingerprint),
);
let message = handle.recv().await;
assert_matches!(
message,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
to,
payload,
)) => {
assert_eq!(to.len(), 2);
assert!(to.contains(&peer_b));
assert!(to.contains(&peer_c));
assert_eq!(
payload,
statement_message(hash_b, statement.statement.clone()),
);
}
)
});
}
}