Unverified Commit 8348cc4c authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Implement the Statement Distribution Subsystem (#1326)

* set up data types and control flow for statement distribution

* add some set-like methods to View

* implement sending to peers

* start fixing equivocation handling

* Add a section to the statement distribution subsystem on equivocations and flood protection

* fix typo and amend wording

* implement flood protection

* have peer knowledge tracker follow when peer first learns about a candidate

* send dependents after circulating

* add another TODO

* trigger send in one more place

* refactors from review

* send new statements to candidate backing

* instantiate active head data with runtime API values

* track our view changes and peer view changes

* apply a benefit to peers who send us statements we want

* remove unneeded TODO

* add some comments and improve Hash implementation

* start tests and fix `note_statement`

* test active_head seconding logic

* test that the per-peer tracking logic works

* test per-peer knowledge tracker

* test that peer view updates lead to messages being sent

* test statement circulation

* address review comments

* have view set methods return references
parent 0499212f
Pipeline #99573 passed with stages
in 21 minutes and 55 seconds
......@@ -4831,6 +4831,28 @@ dependencies = [
"westend-runtime",
]
[[package]]
name = "polkadot-statement-distribution"
version = "0.1.0"
dependencies = [
"arrayvec 0.5.1",
"assert_matches",
"futures 0.3.5",
"futures-timer 3.0.2",
"indexmap",
"log 0.4.8",
"parity-scale-codec",
"parking_lot 0.10.2",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-primitives",
"polkadot-subsystem-test-helpers",
"sp-keyring",
"sp-runtime",
"sp-staking",
"streamunordered",
]
[[package]]
name = "polkadot-statement-table"
version = "0.8.14"
......
......@@ -45,6 +45,7 @@ members = [
"node/core/proposer",
"node/network/bridge",
"node/network/statement-distribution",
"node/overseer",
"node/primitives",
"node/service",
......
[package]
name = "polkadot-statement-distribution"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
description = "Statement Distribution Subsystem"
edition = "2018"
[dependencies]
futures = "0.3.5"
log = "0.4.8"
futures-timer = "3.0.2"
streamunordered = "0.5.1"
polkadot-primitives = { path = "../../../primitives" }
node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" }
parity-scale-codec = "1.3.0"
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
arrayvec = "0.5.1"
indexmap = "1.4.0"
[dev-dependencies]
parking_lot = "0.10.0"
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
assert_matches = "1.3.0"
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The Statement Distribution Subsystem.
//!
//! This is responsible for distributing signed statements about candidate
//! validity amongst validators.
use polkadot_subsystem::{
Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem,
FromOverseer, OverseerSignal,
};
use polkadot_subsystem::messages::{
AllMessages, NetworkBridgeMessage, NetworkBridgeEvent, StatementDistributionMessage,
PeerId, ReputationChange as Rep, CandidateBackingMessage, RuntimeApiMessage,
RuntimeApiRequest,
};
use node_primitives::{ProtocolId, View, SignedFullStatement};
use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{
CompactStatement, ValidatorIndex, ValidatorId, SigningContext, ValidatorSignature,
};
use parity_scale_codec::{Encode, Decode};
use futures::prelude::*;
use futures::channel::oneshot;
use indexmap::IndexSet;
use std::collections::{HashMap, HashSet};
const PROTOCOL_V1: ProtocolId = *b"sdn1";
const COST_UNEXPECTED_STATEMENT: Rep = Rep::new(-100, "Unexpected Statement");
const COST_INVALID_SIGNATURE: Rep = Rep::new(-500, "Invalid Statement Signature");
const COST_INVALID_MESSAGE: Rep = Rep::new(-500, "Invalid message");
const COST_DUPLICATE_STATEMENT: Rep = Rep::new(-250, "Statement sent more than once by peer");
const COST_APPARENT_FLOOD: Rep = Rep::new(-1000, "Peer appears to be flooding us with statements");
const BENEFIT_VALID_STATEMENT: Rep = Rep::new(5, "Peer provided a valid statement");
const BENEFIT_VALID_STATEMENT_FIRST: Rep = Rep::new(
25,
"Peer was the first to provide a valid statement",
);
/// The maximum amount of candidates each validator is allowed to second at any relay-parent.
/// Short for "Validator Candidate Threshold".
///
/// This is the amount of candidates we keep per validator at any relay-parent.
/// Typically we will only keep 1, but when a validator equivocates we will need to track 2.
const VC_THRESHOLD: usize = 2;
/// The statement distribution subsystem.
pub struct StatementDistribution;
impl<C> Subsystem<C> for StatementDistribution
where C: SubsystemContext<Message=StatementDistributionMessage>
{
fn start(self, ctx: C) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run`.
SpawnedSubsystem(run(ctx).map(|_| ()).boxed())
}
}
fn network_update_message(n: NetworkBridgeEvent) -> AllMessages {
AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(n))
}
/// Tracks our impression of a single peer's view of the candidates a validator has seconded
/// for a given relay-parent.
///
/// It is expected to receive at most `VC_THRESHOLD` from us and be aware of at most `VC_THRESHOLD`
/// via other means.
#[derive(Default)]
struct VcPerPeerTracker {
local_observed: arrayvec::ArrayVec<[Hash; VC_THRESHOLD]>,
remote_observed: arrayvec::ArrayVec<[Hash; VC_THRESHOLD]>,
}
impl VcPerPeerTracker {
// Note that the remote should now be aware that a validator has seconded a given candidate (by hash)
// based on a message that we have sent it from our local pool.
fn note_local(&mut self, h: Hash) {
if !note_hash(&mut self.local_observed, h) {
log::warn!("Statement distribution is erroneously attempting to distribute more \
than {} candidate(s) per validator index. Ignoring", VC_THRESHOLD);
}
}
// Note that the remote should now be aware that a validator has seconded a given candidate (by hash)
// based on a message that it has sent us.
//
// Returns `true` if the peer was allowed to send us such a message, `false` otherwise.
fn note_remote(&mut self, h: Hash) -> bool {
note_hash(&mut self.remote_observed, h)
}
}
fn note_hash(
observed: &mut arrayvec::ArrayVec<[Hash; VC_THRESHOLD]>,
h: Hash,
) -> bool {
if observed.contains(&h) { return true; }
if observed.is_full() {
false
} else {
observed.try_push(h).expect("length of storage guarded above; \
only panics if length exceeds capacity; qed");
true
}
}
/// knowledge that a peer has about goings-on in a relay parent.
#[derive(Default)]
struct PeerRelayParentKnowledge {
/// candidates that the peer is aware of. This indicates that we can
/// send other statements pertaining to that candidate.
known_candidates: HashSet<Hash>,
/// fingerprints of all statements a peer should be aware of: those that
/// were sent to the peer by us.
sent_statements: HashSet<(CompactStatement, ValidatorIndex)>,
/// fingerprints of all statements a peer should be aware of: those that
/// were sent to us by the peer.
received_statements: HashSet<(CompactStatement, ValidatorIndex)>,
/// How many candidates this peer is aware of for each given validator index.
seconded_counts: HashMap<ValidatorIndex, VcPerPeerTracker>,
/// How many statements we've received for each candidate that we're aware of.
received_message_count: HashMap<Hash, usize>,
}
impl PeerRelayParentKnowledge {
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based
/// on something that we would like to send to the peer.
///
/// This returns `None` if the peer cannot accept this statement, without altering internal
/// state.
///
/// If the peer can accept the statement, this returns `Some` and updates the internal state.
/// Once the knowledge has incorporated a statement, it cannot be incorporated again.
///
/// This returns `Some(true)` if this is the first time the peer has become aware of a
/// candidate with the given hash.
fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> Option<bool> {
let already_known = self.sent_statements.contains(fingerprint)
|| self.received_statements.contains(fingerprint);
if already_known {
return None;
}
let new_known = match fingerprint.0 {
CompactStatement::Candidate(ref h) => {
self.seconded_counts.entry(fingerprint.1)
.or_default()
.note_local(h.clone());
self.known_candidates.insert(h.clone())
},
CompactStatement::Valid(ref h) | CompactStatement::Invalid(ref h) => {
// The peer can only accept Valid and Invalid statements for which it is aware
// of the corresponding candidate.
if !self.known_candidates.contains(h) {
return None;
}
false
}
};
self.sent_statements.insert(fingerprint.clone());
Some(new_known)
}
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on
/// a message we are receiving from the peer.
///
/// Provide the maximum message count that we can receive per candidate. In practice we should
/// not receive more statements for any one candidate than there are members in the group assigned
/// to that para, but this maximum needs to be lenient to account for equivocations that may be
/// cross-group. As such, a maximum of 2 * n_validators is recommended.
///
/// This returns an error if the peer should not have sent us this message according to protocol
/// rules for flood protection.
///
/// If this returns `Ok`, the internal state has been altered. After `receive`ing a new
/// candidate, we are then cleared to send the peer further statements about that candidate.
///
/// This returns `Ok(true)` if this is the first time the peer has become aware of a
/// candidate with given hash.
fn receive(
&mut self,
fingerprint: &(CompactStatement, ValidatorIndex),
max_message_count: usize,
) -> Result<bool, Rep> {
// We don't check `sent_statements` because a statement could be in-flight from both
// sides at the same time.
if self.received_statements.contains(fingerprint) {
return Err(COST_DUPLICATE_STATEMENT);
}
let candidate_hash = match fingerprint.0 {
CompactStatement::Candidate(ref h) => {
let allowed_remote = self.seconded_counts.entry(fingerprint.1)
.or_insert_with(Default::default)
.note_remote(h.clone());
if !allowed_remote {
return Err(COST_UNEXPECTED_STATEMENT);
}
h
}
CompactStatement::Valid(ref h)| CompactStatement::Invalid(ref h) => {
if !self.known_candidates.contains(&h) {
return Err(COST_UNEXPECTED_STATEMENT);
}
h
}
};
{
let received_per_candidate = self.received_message_count
.entry(candidate_hash.clone())
.or_insert(0);
if *received_per_candidate >= max_message_count {
return Err(COST_APPARENT_FLOOD);
}
*received_per_candidate += 1;
}
self.received_statements.insert(fingerprint.clone());
Ok(self.known_candidates.insert(candidate_hash.clone()))
}
}
struct PeerData {
view: View,
view_knowledge: HashMap<Hash, PeerRelayParentKnowledge>,
}
impl PeerData {
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based
/// on something that we would like to send to the peer.
///
/// This returns `None` if the peer cannot accept this statement, without altering internal
/// state.
///
/// If the peer can accept the statement, this returns `Some` and updates the internal state.
/// Once the knowledge has incorporated a statement, it cannot be incorporated again.
///
/// This returns `Some(true)` if this is the first time the peer has become aware of a
/// candidate with the given hash.
fn send(
&mut self,
relay_parent: &Hash,
fingerprint: &(CompactStatement, ValidatorIndex),
) -> Option<bool> {
self.view_knowledge.get_mut(relay_parent).map_or(None, |k| k.send(fingerprint))
}
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on
/// a message we are receiving from the peer.
///
/// Provide the maximum message count that we can receive per candidate. In practice we should
/// not receive more statements for any one candidate than there are members in the group assigned
/// to that para, but this maximum needs to be lenient to account for equivocations that may be
/// cross-group. As such, a maximum of 2 * n_validators is recommended.
///
/// This returns an error if the peer should not have sent us this message according to protocol
/// rules for flood protection.
///
/// If this returns `Ok`, the internal state has been altered. After `receive`ing a new
/// candidate, we are then cleared to send the peer further statements about that candidate.
///
/// This returns `Ok(true)` if this is the first time the peer has become aware of a
/// candidate with given hash.
fn receive(
&mut self,
relay_parent: &Hash,
fingerprint: &(CompactStatement, ValidatorIndex),
max_message_count: usize,
) -> Result<bool, Rep> {
self.view_knowledge.get_mut(relay_parent).ok_or(COST_UNEXPECTED_STATEMENT)?
.receive(fingerprint, max_message_count)
}
}
// A statement stored while a relay chain head is active.
#[derive(Debug)]
struct StoredStatement {
comparator: StoredStatementComparator,
statement: SignedFullStatement,
}
// A value used for comparison of stored statements to each other.
//
// The compact version of the statement, the validator index, and the signature of the validator
// is enough to differentiate between all types of equivocations, as long as the signature is
// actually checked to be valid. The same statement with 2 signatures and 2 statements with
// different (or same) signatures wll all be correctly judged to be unequal with this comparator.
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
struct StoredStatementComparator {
compact: CompactStatement,
validator_index: ValidatorIndex,
signature: ValidatorSignature,
}
impl StoredStatement {
fn compact(&self) -> &CompactStatement {
&self.comparator.compact
}
fn fingerprint(&self) -> (CompactStatement, ValidatorIndex) {
(self.comparator.compact.clone(), self.statement.validator_index())
}
}
impl std::borrow::Borrow<StoredStatementComparator> for StoredStatement {
fn borrow(&self) -> &StoredStatementComparator {
&self.comparator
}
}
impl std::hash::Hash for StoredStatement {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.comparator.hash(state)
}
}
impl std::cmp::PartialEq for StoredStatement {
fn eq(&self, other: &Self) -> bool {
&self.comparator == &other.comparator
}
}
impl std::cmp::Eq for StoredStatement {}
#[derive(Debug)]
enum NotedStatement<'a> {
NotUseful,
Fresh(&'a StoredStatement),
UsefulButKnown
}
struct ActiveHeadData {
/// All candidates we are aware of for this head, keyed by hash.
candidates: HashSet<Hash>,
/// Stored statements for circulation to peers.
///
/// These are iterable in insertion order, and `Seconded` statements are always
/// accepted before dependent statements.
statements: IndexSet<StoredStatement>,
/// The validators at this head.
validators: Vec<ValidatorId>,
/// The session index this head is at.
session_index: sp_staking::SessionIndex,
/// How many `Seconded` statements we've seen per validator.
seconded_counts: HashMap<ValidatorIndex, usize>,
}
impl ActiveHeadData {
fn new(validators: Vec<ValidatorId>, session_index: sp_staking::SessionIndex) -> Self {
ActiveHeadData {
candidates: Default::default(),
statements: Default::default(),
validators,
session_index,
seconded_counts: Default::default(),
}
}
/// Note the given statement.
///
/// If it was not already known and can be accepted, returns `NotedStatement::Fresh`,
/// with a handle to the statement.
///
/// If it can be accepted, but we already know it, returns `NotedStatement::UsefulButKnown`.
///
/// We accept up to `VC_THRESHOLD` (2 at time of writing) `Seconded` statements
/// per validator. These will be the first ones we see. The statement is assumed
/// to have been checked, including that the validator index is not out-of-bounds and
/// the signature is valid.
///
/// Any other statements or those that reference a candidate we are not aware of cannot be accepted
/// and will return `NotedStatement::NotUseful`.
fn note_statement(&mut self, statement: SignedFullStatement) -> NotedStatement {
let validator_index = statement.validator_index();
let comparator = StoredStatementComparator {
compact: statement.payload().to_compact(),
validator_index,
signature: statement.signature().clone(),
};
let stored = StoredStatement {
comparator: comparator.clone(),
statement,
};
match comparator.compact {
CompactStatement::Candidate(h) => {
let seconded_so_far = self.seconded_counts.entry(validator_index).or_insert(0);
if *seconded_so_far >= VC_THRESHOLD {
return NotedStatement::NotUseful;
}
self.candidates.insert(h);
if self.statements.insert(stored) {
*seconded_so_far += 1;
// This will always return `Some` because it was just inserted.
NotedStatement::Fresh(self.statements.get(&comparator)
.expect("Statement was just inserted; qed"))
} else {
NotedStatement::UsefulButKnown
}
}
CompactStatement::Valid(h) | CompactStatement::Invalid(h) => {
if !self.candidates.contains(&h) {
return NotedStatement::NotUseful;
}
if self.statements.insert(stored) {
// This will always return `Some` because it was just inserted.
NotedStatement::Fresh(self.statements.get(&comparator)
.expect("Statement was just inserted; qed"))
} else {
NotedStatement::UsefulButKnown
}
}
}
}
/// Get an iterator over all statements for the active head. Seconded statements come first.
fn statements(&self) -> impl Iterator<Item = &'_ StoredStatement> + '_ {
self.statements.iter()
}
/// Get an iterator over all statements for the active head that are for a particular candidate.
fn statements_about(&self, candidate_hash: Hash)
-> impl Iterator<Item = &'_ StoredStatement> + '_
{
self.statements().filter(move |s| s.compact().candidate_hash() == &candidate_hash)
}
}
/// Check a statement signature under this parent hash.
fn check_statement_signature(
head: &ActiveHeadData,
relay_parent: Hash,
statement: &SignedFullStatement,
) -> Result<(), ()> {
let signing_context = SigningContext {
session_index: head.session_index,
parent_hash: relay_parent,
};
head.validators.get(statement.validator_index() as usize)
.ok_or(())
.and_then(|v| statement.check_signature(&signing_context, v))
}
#[derive(Encode, Decode)]
enum WireMessage {
/// relay-parent, full statement.
#[codec(index = "0")]
Statement(Hash, SignedFullStatement),
}
/// Places the statement in storage if it is new, and then
/// circulates the statement to all peers who have not seen it yet, and
/// sends all statements dependent on that statement to peers who could previously not receive
/// them but now can.
async fn circulate_statement_and_dependents(
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
relay_parent: Hash,
statement: SignedFullStatement,
) -> SubsystemResult<()> {
if let Some(active_head)= active_heads.get_mut(&relay_parent) {
// First circulate the statement directly to all peers needing it.
// The borrow of `active_head` needs to encompass only this (Rust) statement.
let outputs: Option<(Hash, Vec<PeerId>)> = {
match active_head.note_statement(statement) {
NotedStatement::Fresh(stored) => Some((
stored.compact().candidate_hash().clone(),
circulate_statement(peers, ctx, relay_parent, stored).await?,
)),
_ => None,
}
};
// Now send dependent statements to all peers needing them, if any.
if let Some((candidate_hash, peers_needing_dependents)) = outputs {
for peer in peers_needing_dependents {
if let Some(peer_data) = peers.get_mut(&peer) {
// defensive: the peer data should always be some because the iterator
// of peers is derived from the set of peers.
send_statements_about(
peer,
peer_data,
ctx,
relay_parent,
candidate_hash,
&*active_head
).await?;
}
}
}
}
Ok(())
}
/// Circulates a statement to all peers who have not seen it yet, and returns
/// an iterator over peers who need to have dependent statements sent.
async fn circulate_statement(
peers: &mut HashMap<PeerId, PeerData>,