Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! [`ApprovalDistributionSubsystem`] implementation.
//!
//! https://w3f.github.io/parachain-implementers-guide/node/approval/approval-distribution.html
#![warn(missing_docs)]
#[cfg(test)]
mod tests;
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
use futures::{channel::oneshot, FutureExt as _};
use polkadot_primitives::v1::{
Hash, BlockNumber, ValidatorIndex, ValidatorSignature, CandidateIndex,
};
use polkadot_node_primitives::{
approval::{AssignmentCert, BlockApprovalMeta, IndirectSignedApprovalVote, IndirectAssignmentCert},
};
use polkadot_node_subsystem::{
messages::{
AllMessages, ApprovalDistributionMessage, ApprovalVotingMessage, NetworkBridgeMessage,
AssignmentCheckResult, ApprovalCheckResult, NetworkBridgeEvent,
},
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_network_protocol::{
PeerId, View, v1 as protocol_v1, UnifiedReputationChange as Rep,
const LOG_TARGET: &str = "parachain::approval-distribution";
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("Peer sent an out-of-view assignment or approval");
const COST_DUPLICATE_MESSAGE: Rep = Rep::CostMinorRepeated("Peer sent identical messages");
const COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE: Rep = Rep::CostMinor("The vote was valid but too far in the future");
const COST_INVALID_MESSAGE: Rep = Rep::CostMajor("The vote was bad");
const BENEFIT_VALID_MESSAGE: Rep = Rep::BenefitMinor("Peer sent a valid message");
const BENEFIT_VALID_MESSAGE_FIRST: Rep = Rep::BenefitMinorFirst("Valid message with new information");
/// The Approval Distribution subsystem.
pub struct ApprovalDistribution {
metrics: Metrics,
}
/// The [`State`] struct is responsible for tracking the overall state of the subsystem.
///
/// It tracks metadata about our view of the unfinalized chain,
/// which assignments and approvals we have seen, and our peers' views.
#[derive(Default)]
struct State {
/// These two fields are used in conjunction to construct a view over the unfinalized chain.
blocks_by_number: BTreeMap<BlockNumber, Vec<Hash>>,
blocks: HashMap<Hash, BlockEntry>,
/// Our view updates to our peers can race with `NewBlocks` updates. We store messages received
/// against the directly mentioned blocks in our view in this map until `NewBlocks` is received.
///
/// As long as the parent is already in the `blocks` map and `NewBlocks` messages aren't delayed
/// by more than a block length, this strategy will work well for mitigating the race. This is
/// also a race that occurs typically on local networks.
pending_known: HashMap<Hash, Vec<(PeerId, PendingMessage)>>,
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/// Peer view data is partially stored here, and partially inline within the [`BlockEntry`]s
peer_views: HashMap<PeerId, View>,
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
enum MessageFingerprint {
Assignment(Hash, CandidateIndex, ValidatorIndex),
Approval(Hash, CandidateIndex, ValidatorIndex),
}
#[derive(Debug, Clone, Default)]
struct Knowledge {
known_messages: HashSet<MessageFingerprint>,
}
/// Information about blocks in our current view as well as whether peers know of them.
struct BlockEntry {
/// Peers who we know are aware of this block and thus, the candidates within it.
/// This maps to their knowledge of messages.
known_by: HashMap<PeerId, Knowledge>,
/// The number of the block.
number: BlockNumber,
/// The parent hash of the block.
parent_hash: Hash,
/// Our knowledge of messages.
knowledge: Knowledge,
/// A votes entry for each candidate indexed by [`CandidateIndex`].
candidates: Vec<CandidateEntry>,
}
#[derive(Debug)]
enum ApprovalState {
Assigned(AssignmentCert),
Approved(AssignmentCert, ValidatorSignature),
}
#[derive(Debug, Clone, Copy)]
enum LocalSource {
Yes,
No,
}
type BlockDepth = usize;
/// Information about candidates in the context of a particular block they are included in.
/// In other words, multiple `CandidateEntry`s may exist for the same candidate,
/// if it is included by multiple blocks - this is likely the case when there are forks.
#[derive(Debug, Default)]
struct CandidateEntry {
approvals: HashMap<ValidatorIndex, (ApprovalState, LocalSource)>,
}
#[derive(Debug, Clone)]
enum MessageSource {
Peer(PeerId),
Local,
}
impl MessageSource {
fn peer_id(&self) -> Option<PeerId> {
match self {
Self::Peer(id) => Some(id.clone()),
Self::Local => None,
}
}
fn as_local_source(&self) -> LocalSource {
match self {
Self::Local => LocalSource::Yes,
_ => LocalSource::No,
}
}
enum PendingMessage {
Assignment(IndirectAssignmentCert, CandidateIndex),
Approval(IndirectSignedApprovalVote),
}
impl State {
async fn handle_network_msg(
&mut self,
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
metrics: &Metrics,
event: NetworkBridgeEvent<protocol_v1::ApprovalDistributionMessage>,
) {
match event {
NetworkBridgeEvent::PeerConnected(peer_id, role) => {
// insert a blank view if none already present
tracing::trace!(
target: LOG_TARGET,
?peer_id,
?role,
"Peer connected",
);
self.peer_views.entry(peer_id).or_default();
}
NetworkBridgeEvent::PeerDisconnected(peer_id) => {
tracing::trace!(
target: LOG_TARGET,
?peer_id,
"Peer disconnected",
);
self.peer_views.remove(&peer_id);
self.blocks.iter_mut().for_each(|(_hash, entry)| {
entry.known_by.remove(&peer_id);
})
}
NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
self.handle_peer_view_change(ctx, metrics, peer_id, view).await;
NetworkBridgeEvent::OurViewChange(view) => {
tracing::trace!(
target: LOG_TARGET,
?view,
"Own view change",
);
for head in view.iter() {
if !self.blocks.contains_key(head) {
self.pending_known.entry(*head).or_default();
}
}
self.pending_known.retain(|h, _| view.contains(h));
}
NetworkBridgeEvent::PeerMessage(peer_id, msg) => {
self.process_incoming_peer_message(ctx, metrics, peer_id, msg).await;
}
}
}
async fn handle_new_blocks(
&mut self,
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
metas: Vec<BlockApprovalMeta>,
) {
let mut new_hashes = HashSet::new();
match self.blocks.entry(meta.hash.clone()) {
hash_map::Entry::Vacant(entry) => {
let candidates_count = meta.candidates.len();
let mut candidates = Vec::with_capacity(candidates_count);
candidates.resize_with(candidates_count, Default::default);
entry.insert(BlockEntry {
known_by: HashMap::new(),
number: meta.number,
parent_hash: meta.parent_hash.clone(),
knowledge: Knowledge::default(),
candidates,
});
new_hashes.insert(meta.hash.clone());
}
_ => continue,
}
self.blocks_by_number.entry(meta.number).or_default().push(meta.hash);
}
tracing::debug!(
target: LOG_TARGET,
"Got new blocks {:?}",
metas.iter().map(|m| (m.hash, m.number)).collect::<Vec<_>>(),
);
{
let pending_now_known = self.pending_known.keys()
.filter(|k| self.blocks.contains_key(k))
.copied()
.collect::<Vec<_>>();
let to_import = pending_now_known.into_iter()
.filter_map(|k| self.pending_known.remove(&k))
.flatten()
.collect::<Vec<_>>();
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
if !to_import.is_empty() {
tracing::debug!(
target: LOG_TARGET,
num = to_import.len(),
"Processing pending assignment/approvals",
);
let _timer = metrics.time_import_pending_now_known();
for (peer_id, message) in to_import {
match message {
PendingMessage::Assignment(assignment, claimed_index) => {
self.import_and_circulate_assignment(
ctx,
metrics,
MessageSource::Peer(peer_id),
assignment,
claimed_index,
).await;
}
PendingMessage::Approval(approval_vote) => {
self.import_and_circulate_approval(
ctx,
metrics,
MessageSource::Peer(peer_id),
approval_vote,
).await;
}
}
}
}
}
for (peer_id, view) in self.peer_views.iter() {
let intersection = view.iter().filter(|h| new_hashes.contains(h));
let view_intersection = View::new(
intersection.cloned(),
view.finalized_number,
);
metrics,
&mut self.blocks,
peer_id.clone(),
view_intersection,
).await;
}
}
async fn process_incoming_peer_message(
&mut self,
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
metrics: &Metrics,
peer_id: PeerId,
msg: protocol_v1::ApprovalDistributionMessage,
) {
match msg {
protocol_v1::ApprovalDistributionMessage::Assignments(assignments) => {
tracing::trace!(
target: LOG_TARGET,
peer_id = %peer_id,
num = assignments.len(),
"Processing assignments from a peer",
);
for (assignment, claimed_index) in assignments.into_iter() {
if let Some(pending) = self.pending_known.get_mut(&assignment.block_hash) {
pending.push((
peer_id.clone(),
PendingMessage::Assignment(assignment, claimed_index),
));
continue;
}
self.import_and_circulate_assignment(
ctx,
metrics,
MessageSource::Peer(peer_id.clone()),
assignment,
claimed_index,
).await;
}
}
protocol_v1::ApprovalDistributionMessage::Approvals(approvals) => {
tracing::trace!(
target: LOG_TARGET,
peer_id = %peer_id,
num = approvals.len(),
"Processing approvals from a peer",
);
for approval_vote in approvals.into_iter() {
if let Some(pending) = self.pending_known.get_mut(&approval_vote.block_hash) {
pending.push((
peer_id.clone(),
PendingMessage::Approval(approval_vote),
));
continue;
}
self.import_and_circulate_approval(
ctx,
metrics,
MessageSource::Peer(peer_id.clone()),
approval_vote,
).await;
}
}
}
}
async fn handle_peer_view_change(
&mut self,
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
peer_id: PeerId,
view: View,
) {
tracing::trace!(
target: LOG_TARGET,
?view,
"Peer view change",
);
Self::unify_with_peer(
ctx,
metrics,
&mut self.blocks,
peer_id.clone(),
view.clone(),
).await;
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
let finalized_number = view.finalized_number;
let old_view = self.peer_views.insert(peer_id.clone(), view);
let old_finalized_number = old_view.map(|v| v.finalized_number).unwrap_or(0);
// we want to prune every block known_by peer up to (including) view.finalized_number
let blocks = &mut self.blocks;
// the `BTreeMap::range` is constrained by stored keys
// so the loop won't take ages if the new finalized_number skyrockets
// but we need to make sure the range is not empty, otherwise it will panic
// it shouldn't be, we make sure of this in the network bridge
let range = old_finalized_number..=finalized_number;
if !range.is_empty() {
self.blocks_by_number
.range(range)
.map(|(_number, hashes)| hashes)
.flatten()
.for_each(|hash| {
if let Some(entry) = blocks.get_mut(hash) {
entry.known_by.remove(&peer_id);
}
});
}
}
fn handle_block_finalized(
&mut self,
finalized_number: BlockNumber,
) {
// we want to prune every block up to (including) finalized_number
// why +1 here?
// split_off returns everything after the given key, including the key
let split_point = finalized_number.saturating_add(1);
let mut old_blocks = self.blocks_by_number.split_off(&split_point);
// after split_off old_blocks actually contains new blocks, we need to swap
std::mem::swap(&mut self.blocks_by_number, &mut old_blocks);
// now that we pruned `self.blocks_by_number`, let's clean up `self.blocks` too
old_blocks.values()
.flatten()
.for_each(|h| {
self.blocks.remove(h);
});
}
async fn import_and_circulate_assignment(
&mut self,
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
metrics: &Metrics,
source: MessageSource,
assignment: IndirectAssignmentCert,
claimed_candidate_index: CandidateIndex,
) {
let block_hash = assignment.block_hash.clone();
let validator_index = assignment.validator;
let entry = match self.blocks.get_mut(&block_hash) {
Some(entry) => entry,
None => {
if let Some(peer_id) = source.peer_id() {
?block_hash,
?validator_index,
"Unexpected assignment",
);
modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await;
}
return;
}
};
// compute a fingerprint of the assignment
let fingerprint = MessageFingerprint::Assignment(
block_hash,
claimed_candidate_index,
validator_index,
);
if let Some(peer_id) = source.peer_id() {
// check if our knowledge of the peer already contains this assignment
match entry.known_by.entry(peer_id.clone()) {
hash_map::Entry::Occupied(knowledge) => {
if knowledge.get().known_messages.contains(&fingerprint) {
?fingerprint,
"Duplicate assignment",
);
modify_reputation(ctx, peer_id, COST_DUPLICATE_MESSAGE).await;
return;
}
}
hash_map::Entry::Vacant(_) => {
modify_reputation(ctx, peer_id.clone(), COST_UNEXPECTED_MESSAGE).await;
}
}
// if the assignment is known to be valid, reward the peer
if entry.knowledge.known_messages.contains(&fingerprint) {
modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE).await;
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
?fingerprint,
"Known assignment",
);
peer_knowledge.known_messages.insert(fingerprint.clone());
}
return;
}
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment(
assignment.clone(),
let timer = metrics.time_awaiting_approval_voting();
let result = match rx.await {
Ok(result) => result,
Err(_) => {
tracing::debug!(
target: LOG_TARGET,
"The approval voting subsystem is down",
);
return;
}
};
drop(timer);
tracing::trace!(
target: LOG_TARGET,
?source,
?fingerprint,
?result,
"Checked assignment",
);
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
match result {
AssignmentCheckResult::Accepted => {
modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE_FIRST).await;
entry.knowledge.known_messages.insert(fingerprint.clone());
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
peer_knowledge.known_messages.insert(fingerprint.clone());
}
}
AssignmentCheckResult::AcceptedDuplicate => {
// "duplicate" assignments aren't necessarily equal.
// There is more than one way each validator can be assigned to each core.
// cf. https://github.com/paritytech/polkadot/pull/2160#discussion_r557628699
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
peer_knowledge.known_messages.insert(fingerprint);
}
return;
}
AssignmentCheckResult::TooFarInFuture => {
modify_reputation(ctx, peer_id, COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE).await;
return;
}
AssignmentCheckResult::Bad => {
modify_reputation(ctx, peer_id, COST_INVALID_MESSAGE).await;
return;
}
}
} else {
if !entry.knowledge.known_messages.insert(fingerprint.clone()) {
// if we already imported an assignment, there is no need to distribute it again
tracing::warn!(
target: LOG_TARGET,
?fingerprint,
"Importing locally an already known assignment",
);
} else {
tracing::debug!(
target: LOG_TARGET,
?fingerprint,
"Importing locally a new assignment",
);
let local_source = source.as_local_source();
// Invariant: none of the peers except for the `source` know about the assignment.
metrics.on_assignment_imported();
match entry.candidates.get_mut(claimed_candidate_index as usize) {
Some(candidate_entry) => {
// set the approval state for validator_index to Assigned
// unless the approval state is set already
candidate_entry.approvals
.entry(validator_index)
.or_insert_with(|| (ApprovalState::Assigned(assignment.cert.clone()), local_source));
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
}
None => {
tracing::warn!(
target: LOG_TARGET,
hash = ?block_hash,
?claimed_candidate_index,
"Expected a candidate entry on import_and_circulate_assignment",
);
}
}
// Dispatch a ApprovalDistributionV1Message::Assignment(assignment, candidate_index)
// to all peers in the BlockEntry's known_by set who know about the block,
// excluding the peer in the source, if source has kind MessageSource::Peer.
let maybe_peer_id = source.peer_id();
let peers = entry
.known_by
.keys()
.cloned()
.filter(|key| maybe_peer_id.as_ref().map_or(true, |id| id != key))
.collect::<Vec<_>>();
let assignments = vec![(assignment, claimed_candidate_index)];
// Add the fingerprint of the assignment to the knowledge of each peer.
for peer in peers.iter() {
// we already filtered peers above, so this should always be Some
if let Some(entry) = entry.known_by.get_mut(peer) {
entry.known_messages.insert(fingerprint.clone());
}
}
if !peers.is_empty() {
tracing::trace!(
target: LOG_TARGET,
?block_hash,
?claimed_candidate_index,
?local_source,
num_peers = peers.len(),
"Sending an assignment to peers",
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(assignments)
),
).into()).await;
}
}
async fn import_and_circulate_approval(
&mut self,
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
metrics: &Metrics,
source: MessageSource,
vote: IndirectSignedApprovalVote,
) {
let block_hash = vote.block_hash.clone();
let validator_index = vote.validator;
let candidate_index = vote.candidate_index;
let entry = match self.blocks.get_mut(&block_hash) {
Some(entry) if entry.candidates.get(candidate_index as usize).is_some() => entry,
_ => {
if let Some(peer_id) = source.peer_id() {
modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await;
}
return;
}
};
// compute a fingerprint of the approval
let fingerprint = MessageFingerprint::Approval(
block_hash.clone(),
candidate_index,
validator_index,
);
if let Some(peer_id) = source.peer_id() {
let assignment_fingerprint = MessageFingerprint::Assignment(
block_hash.clone(),
candidate_index,
validator_index,
);
if !entry.knowledge.known_messages.contains(&assignment_fingerprint) {
tracing::debug!(
target: LOG_TARGET,
?peer_id,
?fingerprint,
"Unknown approval assignment",
);
modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await;
return;
}
// check if our knowledge of the peer already contains this approval
match entry.known_by.entry(peer_id.clone()) {
hash_map::Entry::Occupied(knowledge) => {
if knowledge.get().known_messages.contains(&fingerprint) {
tracing::debug!(
target: LOG_TARGET,
?peer_id,
?fingerprint,
"Duplicate approval",
);
modify_reputation(ctx, peer_id, COST_DUPLICATE_MESSAGE).await;
return;
}
}
hash_map::Entry::Vacant(_) => {
tracing::debug!(
target: LOG_TARGET,
?peer_id,
?fingerprint,
modify_reputation(ctx, peer_id.clone(), COST_UNEXPECTED_MESSAGE).await;
}
}
// if the approval is known to be valid, reward the peer
if entry.knowledge.known_messages.contains(&fingerprint) {
tracing::trace!(
target: LOG_TARGET,
?peer_id,
?fingerprint,
"Known approval",
);
modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE).await;
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
peer_knowledge.known_messages.insert(fingerprint.clone());
}
return;
}
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportApproval(
vote.clone(),
tx,
))).await;
let timer = metrics.time_awaiting_approval_voting();
let result = match rx.await {
Ok(result) => result,
Err(_) => {
tracing::debug!(
target: LOG_TARGET,
"The approval voting subsystem is down",
);
return;
}
};
tracing::trace!(
target: LOG_TARGET,
?peer_id,
?fingerprint,
?result,
"Checked approval",
);
match result {
ApprovalCheckResult::Accepted => {
modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE_FIRST).await;
entry.knowledge.known_messages.insert(fingerprint.clone());
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
peer_knowledge.known_messages.insert(fingerprint.clone());
}
}
ApprovalCheckResult::Bad => {
modify_reputation(ctx, peer_id, COST_INVALID_MESSAGE).await;
tracing::info!(
target: LOG_TARGET,
"Got a bad approval from peer",
);
return;
}
}
} else {
if !entry.knowledge.known_messages.insert(fingerprint.clone()) {
// if we already imported an approval, there is no need to distribute it again
tracing::warn!(
target: LOG_TARGET,
?fingerprint,
"Importing locally an already known approval",
);
} else {
tracing::debug!(
target: LOG_TARGET,
?fingerprint,
"Importing locally a new approval",
);
let local_source = source.as_local_source();
// Invariant: none of the peers except for the `source` know about the approval.
metrics.on_approval_imported();
match entry.candidates.get_mut(candidate_index as usize) {
Some(candidate_entry) => {
// set the approval state for validator_index to Approved
// it should be in assigned state already
match candidate_entry.approvals.remove(&validator_index) {
Some((ApprovalState::Assigned(cert), _local)) => {
candidate_entry.approvals.insert(
validator_index,
(ApprovalState::Approved(cert, vote.signature.clone()), local_source),
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
);
}
_ => {
tracing::warn!(
target: LOG_TARGET,
hash = ?block_hash,
?candidate_index,
"Expected a candidate entry with `ApprovalState::Assigned`",
);
}
}
}
None => {
tracing::warn!(
target: LOG_TARGET,
hash = ?block_hash,
?candidate_index,
"Expected a candidate entry on import_and_circulate_approval",
);
}
}
// Dispatch a ApprovalDistributionV1Message::Approval(vote)
// to all peers in the BlockEntry's known_by set who know about the block,
// excluding the peer in the source, if source has kind MessageSource::Peer.
let maybe_peer_id = source.peer_id();
let peers = entry
.known_by
.keys()
.cloned()
.filter(|key| maybe_peer_id.as_ref().map_or(true, |id| id != key))
.collect::<Vec<_>>();
// Add the fingerprint of the assignment to the knowledge of each peer.
for peer in peers.iter() {
// we already filtered peers above, so this should always be Some
if let Some(entry) = entry.known_by.get_mut(peer) {
entry.known_messages.insert(fingerprint.clone());
}
}
let approvals = vec![vote];
if !peers.is_empty() {
tracing::trace!(
target: LOG_TARGET,
?block_hash,
?candidate_index,
?local_source,
num_peers = peers.len(),
"Sending an approval to peers",
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(approvals)
),
).into()).await;
}
}
async fn unify_with_peer(
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
metrics: &Metrics,
entries: &mut HashMap<Hash, BlockEntry>,
peer_id: PeerId,
view: View,
) {
metrics.on_unify_with_peer();
let _timer = metrics.time_unify_with_peer();
let mut to_send: Vec<(BlockDepth, Hash)> = Vec::new();
let view_finalized_number = view.finalized_number;
for head in view.into_iter() {
let mut block = head;
let interesting_blocks = std::iter::from_fn(|| {
// step 2.
let entry = match entries.get_mut(&block) {
Some(entry) if entry.number > view_finalized_number => entry,
_ => return None,
};
let interesting_block = match entry.known_by.entry(peer_id.clone()) {
// step 3.
hash_map::Entry::Occupied(_) => return None,
// step 4.
hash_map::Entry::Vacant(vacant) => {
vacant.insert(entry.knowledge.clone());
block
}
};
// step 5.
block = entry.parent_hash.clone();
Some(interesting_block)
});
to_send.extend(interesting_blocks.enumerate());
}
// step 6.
// send all assignments and approvals for all candidates in those blocks to the peer
Self::send_gossip_messages_to_peer(
entries,
ctx,
peer_id,
to_send
).await;
}
async fn send_gossip_messages_to_peer(
entries: &HashMap<Hash, BlockEntry>,
ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>,
peer_id: PeerId,
blocks: Vec<(BlockDepth, Hash)>,
// we will only propagate local assignment/approvals after a certain depth
const DEPTH_THRESHOLD: usize = 5;
let mut assignments = Vec::new();
let mut approvals = Vec::new();
let num_blocks = blocks.len();
for (depth, block) in blocks.into_iter() {
let entry = match entries.get(&block) {
Some(entry) => entry,
None => continue, // should be unreachable
};
tracing::trace!(
target: LOG_TARGET,
"Sending all assignments and approvals in block {} to peer {}",
block,
peer_id,
);
for (candidate_index, candidate_entry) in entry.candidates.iter().enumerate() {
let candidate_index = candidate_index as u32;
for (validator_index, (approval_state, is_local)) in candidate_entry.approvals.iter() {
if depth >= DEPTH_THRESHOLD && !matches!(is_local, LocalSource::Yes) {
continue;
}
match approval_state {
ApprovalState::Assigned(cert) => {
assignments.push((
IndirectAssignmentCert {
block_hash: block.clone(),
validator: validator_index.clone(),
cert: cert.clone(),
},
candidate_index.clone(),
));
ApprovalState::Approved(assignment_cert, signature) => {
assignments.push((
IndirectAssignmentCert {
block_hash: block.clone(),
validator: validator_index.clone(),
cert: assignment_cert.clone(),
},
candidate_index.clone(),
));
approvals.push(IndirectSignedApprovalVote {
block_hash: block.clone(),
validator: validator_index.clone(),
candidate_index: candidate_index.clone(),
signature: signature.clone(),
});
}
}
}
}
}
if !assignments.is_empty() {
tracing::trace!(
target: LOG_TARGET,
num = assignments.len(),
?num_blocks,
?peer_id,
"Sending assignments to a peer",
);
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
vec![peer_id.clone()],
protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(assignments)
),
).into()).await;
}
if !approvals.is_empty() {
tracing::trace!(
target: LOG_TARGET,
num = approvals.len(),
?num_blocks,
?peer_id,
"Sending approvals to a peer",