Unverified Commit 65a5d185 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

approval-distribution: limit the amount of assignments on unify (#2737)



* approval-distribution: limit the amount of packets on unify

* guide: fix a typo

* compilation fix

* grammar

* Update roadmap/implementers-guide/src/node/approval/approval-distribution.md

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>

* more grammar

* propagate only local assignments/approvals after a certain depth

* increase the threshold

* guides update

Co-authored-by: David's avatarDavid <dvdplm@gmail.com>
parent 2bdcaaea
Pipeline #131128 failed with stages
in 21 minutes and 12 seconds
......@@ -114,12 +114,20 @@ enum ApprovalState {
Approved(AssignmentCert, ValidatorSignature),
}
#[derive(Debug, Clone, Copy)]
enum LocalSource {
Yes,
No,
}
type BlockDepth = usize;
/// Information about candidates in the context of a particular block they are included in.
/// In other words, multiple `CandidateEntry`s may exist for the same candidate,
/// if it is included by multiple blocks - this is likely the case when there are forks.
#[derive(Debug, Default)]
struct CandidateEntry {
approvals: HashMap<ValidatorIndex, ApprovalState>,
approvals: HashMap<ValidatorIndex, (ApprovalState, LocalSource)>,
}
#[derive(Debug, Clone)]
......@@ -135,6 +143,13 @@ impl MessageSource {
Self::Local => None,
}
}
fn as_local_source(&self) -> LocalSource {
match self {
Self::Local => LocalSource::Yes,
_ => LocalSource::No,
}
}
}
enum PendingMessage {
......@@ -229,8 +244,6 @@ impl State {
);
{
let _timer = metrics.time_import_pending_now_known();
let pending_now_known = self.pending_known.keys()
.filter(|k| self.blocks.contains_key(k))
.copied()
......@@ -241,6 +254,15 @@ impl State {
.flatten()
.collect::<Vec<_>>();
if !to_import.is_empty() {
tracing::debug!(
target: LOG_TARGET,
num = to_import.len(),
"Processing pending assignment/approvals",
);
let _timer = metrics.time_import_pending_now_known();
for (peer_id, message) in to_import {
match message {
PendingMessage::Assignment(assignment, claimed_index) => {
......@@ -263,6 +285,7 @@ impl State {
}
}
}
}
for (peer_id, view) in self.peer_views.iter() {
let intersection = view.iter().filter(|h| new_hashes.contains(h));
......@@ -489,6 +512,7 @@ impl State {
tx,
))).await;
let timer = metrics.time_awaiting_approval_voting();
let result = match rx.await {
Ok(result) => result,
Err(_) => {
......@@ -499,6 +523,7 @@ impl State {
return;
}
};
drop(timer);
tracing::trace!(
target: LOG_TARGET,
......@@ -551,6 +576,8 @@ impl State {
}
}
let local_source = source.as_local_source();
// Invariant: none of the peers except for the `source` know about the assignment.
metrics.on_assignment_imported();
......@@ -560,7 +587,7 @@ impl State {
// unless the approval state is set already
candidate_entry.approvals
.entry(validator_index)
.or_insert_with(|| ApprovalState::Assigned(assignment.cert.clone()));
.or_insert_with(|| (ApprovalState::Assigned(assignment.cert.clone()), local_source));
}
None => {
tracing::warn!(
......@@ -596,10 +623,11 @@ impl State {
if !peers.is_empty() {
tracing::trace!(
target: LOG_TARGET,
"Sending assignment (block={}, index={}) to {} peers",
block_hash,
claimed_candidate_index,
peers.len(),
?block_hash,
?claimed_candidate_index,
?local_source,
num_peers = peers.len(),
"Sending an assignment to peers",
);
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
......@@ -762,6 +790,8 @@ impl State {
}
}
let local_source = source.as_local_source();
// Invariant: none of the peers except for the `source` know about the approval.
metrics.on_approval_imported();
......@@ -770,10 +800,10 @@ impl State {
// set the approval state for validator_index to Approved
// it should be in assigned state already
match candidate_entry.approvals.remove(&validator_index) {
Some(ApprovalState::Assigned(cert)) => {
Some((ApprovalState::Assigned(cert), _local)) => {
candidate_entry.approvals.insert(
validator_index,
ApprovalState::Approved(cert, vote.signature.clone()),
(ApprovalState::Approved(cert, vote.signature.clone()), local_source),
);
}
_ => {
......@@ -819,10 +849,11 @@ impl State {
if !peers.is_empty() {
tracing::trace!(
target: LOG_TARGET,
"Sending approval (block={}, index={}) to {} peers",
block_hash,
candidate_index,
peers.len(),
?block_hash,
?candidate_index,
?local_source,
num_peers = peers.len(),
"Sending an approval to peers",
);
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
......@@ -843,7 +874,7 @@ impl State {
) {
metrics.on_unify_with_peer();
let _timer = metrics.time_unify_with_peer();
let mut to_send = HashSet::new();
let mut to_send: Vec<(BlockDepth, Hash)> = Vec::new();
let view_finalized_number = view.finalized_number;
for head in view.into_iter() {
......@@ -867,7 +898,7 @@ impl State {
block = entry.parent_hash.clone();
Some(interesting_block)
});
to_send.extend(interesting_blocks);
to_send.extend(interesting_blocks.enumerate());
}
// step 6.
// send all assignments and approvals for all candidates in those blocks to the peer
......@@ -883,12 +914,16 @@ impl State {
entries: &HashMap<Hash, BlockEntry>,
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
peer_id: PeerId,
blocks: HashSet<Hash>,
blocks: Vec<(BlockDepth, Hash)>,
) {
// we will only propagate local assignment/approvals after a certain depth
const DEPTH_THRESHOLD: usize = 5;
let mut assignments = Vec::new();
let mut approvals = Vec::new();
let num_blocks = blocks.len();
for block in blocks.into_iter() {
for (depth, block) in blocks.into_iter() {
let entry = match entries.get(&block) {
Some(entry) => entry,
None => continue, // should be unreachable
......@@ -903,7 +938,10 @@ impl State {
for (candidate_index, candidate_entry) in entry.candidates.iter().enumerate() {
let candidate_index = candidate_index as u32;
for (validator_index, approval_state) in candidate_entry.approvals.iter() {
for (validator_index, (approval_state, is_local)) in candidate_entry.approvals.iter() {
if depth >= DEPTH_THRESHOLD && !matches!(is_local, LocalSource::Yes) {
continue;
}
match approval_state {
ApprovalState::Assigned(cert) => {
assignments.push((IndirectAssignmentCert {
......@@ -926,6 +964,14 @@ impl State {
}
if !assignments.is_empty() {
tracing::trace!(
target: LOG_TARGET,
num = assignments.len(),
?num_blocks,
?peer_id,
"Sending assignments to a peer",
);
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
vec![peer_id.clone()],
protocol_v1::ValidationProtocol::ApprovalDistribution(
......@@ -935,6 +981,14 @@ impl State {
}
if !approvals.is_empty() {
tracing::trace!(
target: LOG_TARGET,
num = approvals.len(),
?num_blocks,
?peer_id,
"Sending approvals to a peer",
);
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
vec![peer_id],
protocol_v1::ValidationProtocol::ApprovalDistribution(
......
......@@ -237,9 +237,11 @@ Imports an approval signature referenced by block hash and candidate index:
1. Initialize a set `fresh_blocks = {}`
For each block in the view:
2. Load the `BlockEntry` for the block. If the block is unknown, or the number is less than or equal to the view's finalized number, go to step 6.
2. Load the `BlockEntry` for the block. If the block is unknown, or the number is less than or equal to the view's finalized number go to step 6.
3. Inspect the `known_by` set of the `BlockEntry`. If the peer is already present, go to step 6.
4. Add the peer to `known_by` with a cloned version of `block_entry.knowledge`. and add the hash of the block to `fresh_blocks`.
5. Return to step 2 with the ancestor of the block.
5. Return to step 2 with the ancestor of the block, keeping track of the block depth (+1).
6. For each block in `fresh_blocks`, send all assignments and approvals for all candidates in those blocks to the peer.
6. For each block in `fresh_blocks`, send all assignments and approvals for all candidates in those blocks to the peer if the block depth threshold is not reached, otherwise, send only assignments and approvals origination with the local source.
The reason we only send our local assignments and approvals when a certain block depth is reached when unifying with a peer is to avoid DoS attacks. It also helps when a node starts with a large difference between finalized and the highest block.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment