Newer
Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The Statement Distribution Subsystem.
//!
//! This is responsible for distributing signed statements about candidate
//! validity amongst validators.
use polkadot_subsystem::{
Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem,
ActiveLeavesUpdate, FromOverseer, OverseerSignal,
};
use polkadot_subsystem::messages::{
AllMessages, NetworkBridgeMessage, StatementDistributionMessage, CandidateBackingMessage,
RuntimeApiMessage, RuntimeApiRequest,
use node_primitives::SignedFullStatement;
use polkadot_primitives::v1::{
Hash, CompactStatement, ValidatorIndex, ValidatorId, SigningContext, ValidatorSignature,
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, ReputationChange as Rep, NetworkBridgeEvent,
};
use futures::prelude::*;
use futures::channel::oneshot;
use indexmap::IndexSet;
use std::collections::{HashMap, HashSet};
const COST_UNEXPECTED_STATEMENT: Rep = Rep::new(-100, "Unexpected Statement");
const COST_INVALID_SIGNATURE: Rep = Rep::new(-500, "Invalid Statement Signature");
const COST_DUPLICATE_STATEMENT: Rep = Rep::new(-250, "Statement sent more than once by peer");
const COST_APPARENT_FLOOD: Rep = Rep::new(-1000, "Peer appears to be flooding us with statements");
const BENEFIT_VALID_STATEMENT: Rep = Rep::new(5, "Peer provided a valid statement");
const BENEFIT_VALID_STATEMENT_FIRST: Rep = Rep::new(
25,
"Peer was the first to provide a valid statement",
);
/// The maximum amount of candidates each validator is allowed to second at any relay-parent.
/// Short for "Validator Candidate Threshold".
///
/// This is the amount of candidates we keep per validator at any relay-parent.
/// Typically we will only keep 1, but when a validator equivocates we will need to track 2.
const VC_THRESHOLD: usize = 2;
/// The statement distribution subsystem.
pub struct StatementDistribution;
impl<C> Subsystem<C> for StatementDistribution
where C: SubsystemContext<Message=StatementDistributionMessage>
{
fn start(self, ctx: C) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run`.
SpawnedSubsystem {
name: "statement-distribution-subsystem",
future: run(ctx).map(|_| ()).boxed(),
}
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
}
}
/// Tracks our impression of a single peer's view of the candidates a validator has seconded
/// for a given relay-parent.
///
/// It is expected to receive at most `VC_THRESHOLD` from us and be aware of at most `VC_THRESHOLD`
/// via other means.
#[derive(Default)]
struct VcPerPeerTracker {
local_observed: arrayvec::ArrayVec<[Hash; VC_THRESHOLD]>,
remote_observed: arrayvec::ArrayVec<[Hash; VC_THRESHOLD]>,
}
impl VcPerPeerTracker {
// Note that the remote should now be aware that a validator has seconded a given candidate (by hash)
// based on a message that we have sent it from our local pool.
fn note_local(&mut self, h: Hash) {
if !note_hash(&mut self.local_observed, h) {
log::warn!("Statement distribution is erroneously attempting to distribute more \
than {} candidate(s) per validator index. Ignoring", VC_THRESHOLD);
}
}
// Note that the remote should now be aware that a validator has seconded a given candidate (by hash)
// based on a message that it has sent us.
//
// Returns `true` if the peer was allowed to send us such a message, `false` otherwise.
fn note_remote(&mut self, h: Hash) -> bool {
note_hash(&mut self.remote_observed, h)
}
}
fn note_hash(
observed: &mut arrayvec::ArrayVec<[Hash; VC_THRESHOLD]>,
h: Hash,
) -> bool {
if observed.contains(&h) { return true; }
if observed.is_full() {
false
} else {
observed.try_push(h).expect("length of storage guarded above; \
only panics if length exceeds capacity; qed");
true
}
}
/// knowledge that a peer has about goings-on in a relay parent.
#[derive(Default)]
struct PeerRelayParentKnowledge {
/// candidates that the peer is aware of. This indicates that we can
/// send other statements pertaining to that candidate.
known_candidates: HashSet<Hash>,
/// fingerprints of all statements a peer should be aware of: those that
/// were sent to the peer by us.
sent_statements: HashSet<(CompactStatement, ValidatorIndex)>,
/// fingerprints of all statements a peer should be aware of: those that
/// were sent to us by the peer.
received_statements: HashSet<(CompactStatement, ValidatorIndex)>,
/// How many candidates this peer is aware of for each given validator index.
seconded_counts: HashMap<ValidatorIndex, VcPerPeerTracker>,
/// How many statements we've received for each candidate that we're aware of.
received_message_count: HashMap<Hash, usize>,
}
impl PeerRelayParentKnowledge {
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based
/// on something that we would like to send to the peer.
///
/// This returns `None` if the peer cannot accept this statement, without altering internal
/// state.
///
/// If the peer can accept the statement, this returns `Some` and updates the internal state.
/// Once the knowledge has incorporated a statement, it cannot be incorporated again.
///
/// This returns `Some(true)` if this is the first time the peer has become aware of a
/// candidate with the given hash.
fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> Option<bool> {
let already_known = self.sent_statements.contains(fingerprint)
|| self.received_statements.contains(fingerprint);
if already_known {
return None;
}
let new_known = match fingerprint.0 {
CompactStatement::Candidate(ref h) => {
self.seconded_counts.entry(fingerprint.1)
.or_default()
.note_local(h.clone());
self.known_candidates.insert(h.clone())
},
CompactStatement::Valid(ref h) | CompactStatement::Invalid(ref h) => {
// The peer can only accept Valid and Invalid statements for which it is aware
// of the corresponding candidate.
if !self.known_candidates.contains(h) {
return None;
}
false
}
};
self.sent_statements.insert(fingerprint.clone());
Some(new_known)
}
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on
/// a message we are receiving from the peer.
///
/// Provide the maximum message count that we can receive per candidate. In practice we should
/// not receive more statements for any one candidate than there are members in the group assigned
/// to that para, but this maximum needs to be lenient to account for equivocations that may be
/// cross-group. As such, a maximum of 2 * n_validators is recommended.
///
/// This returns an error if the peer should not have sent us this message according to protocol
/// rules for flood protection.
///
/// If this returns `Ok`, the internal state has been altered. After `receive`ing a new
/// candidate, we are then cleared to send the peer further statements about that candidate.
Loading full blame...