Newer
Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The Statement Distribution Subsystem.
//!
//! This is responsible for distributing signed statements about candidate
//! validity amongst validators.
#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]
use polkadot_subsystem::{
Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem,
ActiveLeavesUpdate, FromOverseer, OverseerSignal,
messages::{
AllMessages, NetworkBridgeMessage, StatementDistributionMessage, CandidateBackingMessage,
RuntimeApiMessage, RuntimeApiRequest,
},
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
use node_primitives::SignedFullStatement;
Hash, CompactStatement, ValidatorIndex, ValidatorId, SigningContext, ValidatorSignature, CandidateHash,
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, ReputationChange as Rep, NetworkBridgeEvent,
};
use futures::prelude::*;
use futures::channel::{mpsc, oneshot};
use indexmap::IndexSet;
use std::collections::{HashMap, HashSet};
const COST_UNEXPECTED_STATEMENT: Rep = Rep::new(-100, "Unexpected Statement");
const COST_INVALID_SIGNATURE: Rep = Rep::new(-500, "Invalid Statement Signature");
const COST_DUPLICATE_STATEMENT: Rep = Rep::new(-250, "Statement sent more than once by peer");
const COST_APPARENT_FLOOD: Rep = Rep::new(-1000, "Peer appears to be flooding us with statements");
const BENEFIT_VALID_STATEMENT: Rep = Rep::new(5, "Peer provided a valid statement");
const BENEFIT_VALID_STATEMENT_FIRST: Rep = Rep::new(
25,
"Peer was the first to provide a valid statement",
);
/// The maximum amount of candidates each validator is allowed to second at any relay-parent.
/// Short for "Validator Candidate Threshold".
///
/// This is the amount of candidates we keep per validator at any relay-parent.
/// Typically we will only keep 1, but when a validator equivocates we will need to track 2.
const VC_THRESHOLD: usize = 2;
const LOG_TARGET: &str = "statement_distribution";
/// The statement distribution subsystem.
pub struct StatementDistribution {
// Prometheus metrics
metrics: Metrics,
}
impl<C> Subsystem<C> for StatementDistribution
where C: SubsystemContext<Message=StatementDistributionMessage>
{
fn start(self, ctx: C) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run`.
SpawnedSubsystem {
name: "statement-distribution-subsystem",
future: self.run(ctx).boxed(),
impl StatementDistribution {
/// Create a new Statement Distribution Subsystem
pub fn new(metrics: Metrics) -> StatementDistribution {
StatementDistribution {
metrics,
}
}
}
/// Tracks our impression of a single peer's view of the candidates a validator has seconded
/// for a given relay-parent.
///
/// It is expected to receive at most `VC_THRESHOLD` from us and be aware of at most `VC_THRESHOLD`
/// via other means.
#[derive(Default)]
struct VcPerPeerTracker {
local_observed: arrayvec::ArrayVec<[CandidateHash; VC_THRESHOLD]>,
remote_observed: arrayvec::ArrayVec<[CandidateHash; VC_THRESHOLD]>,
}
impl VcPerPeerTracker {
/// Note that the remote should now be aware that a validator has seconded a given candidate (by hash)
/// based on a message that we have sent it from our local pool.
fn note_local(&mut self, h: CandidateHash) {
if !note_hash(&mut self.local_observed, h) {
tracing::warn!("Statement distribution is erroneously attempting to distribute more \
than {} candidate(s) per validator index. Ignoring", VC_THRESHOLD);
}
}
/// Note that the remote should now be aware that a validator has seconded a given candidate (by hash)
/// based on a message that it has sent us.
///
/// Returns `true` if the peer was allowed to send us such a message, `false` otherwise.
fn note_remote(&mut self, h: CandidateHash) -> bool {
note_hash(&mut self.remote_observed, h)
}
}
fn note_hash(
observed: &mut arrayvec::ArrayVec<[CandidateHash; VC_THRESHOLD]>,
h: CandidateHash,
) -> bool {
if observed.contains(&h) { return true; }
}
/// knowledge that a peer has about goings-on in a relay parent.
#[derive(Default)]
struct PeerRelayParentKnowledge {
/// candidates that the peer is aware of. This indicates that we can
/// send other statements pertaining to that candidate.
known_candidates: HashSet<CandidateHash>,
/// fingerprints of all statements a peer should be aware of: those that
/// were sent to the peer by us.
sent_statements: HashSet<(CompactStatement, ValidatorIndex)>,
/// fingerprints of all statements a peer should be aware of: those that
/// were sent to us by the peer.
received_statements: HashSet<(CompactStatement, ValidatorIndex)>,
/// How many candidates this peer is aware of for each given validator index.
seconded_counts: HashMap<ValidatorIndex, VcPerPeerTracker>,
/// How many statements we've received for each candidate that we're aware of.
received_message_count: HashMap<CandidateHash, usize>,
}
impl PeerRelayParentKnowledge {
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based
/// on something that we would like to send to the peer.
///
/// This returns `None` if the peer cannot accept this statement, without altering internal
/// state.
///
/// If the peer can accept the statement, this returns `Some` and updates the internal state.
/// Once the knowledge has incorporated a statement, it cannot be incorporated again.
///
/// This returns `Some(true)` if this is the first time the peer has become aware of a
/// candidate with the given hash.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> Option<bool> {
let already_known = self.sent_statements.contains(fingerprint)
|| self.received_statements.contains(fingerprint);
if already_known {
return None;
}
let new_known = match fingerprint.0 {
CompactStatement::Candidate(ref h) => {
self.seconded_counts.entry(fingerprint.1)
.or_default()
.note_local(h.clone());
self.known_candidates.insert(h.clone())
},
CompactStatement::Valid(ref h) | CompactStatement::Invalid(ref h) => {
// The peer can only accept Valid and Invalid statements for which it is aware
// of the corresponding candidate.
if !self.known_candidates.contains(h) {
return None;
}
false
}
};
self.sent_statements.insert(fingerprint.clone());
Some(new_known)
}
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on
Loading full blame...