Unverified Commit 72318d75 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

bitfield-dist: fix state update on gossip (#2817)

* bitfield-dist: fix state update on gossip

* fixes

* doc fixes

* oops

* 2 lines of code change
parent 7bed5fec
Pipeline #132690 failed with stages
in 15 minutes and 51 seconds
......@@ -513,7 +513,7 @@ impl State {
Some(entry) => entry,
None => {
if let Some(peer_id) = source.peer_id() {
tracing::debug!(
tracing::trace!(
target: LOG_TARGET,
?peer_id,
?block_hash,
......@@ -1008,7 +1008,7 @@ impl State {
peer_id: PeerId,
blocks: Vec<(BlockDepth, Hash)>,
) {
// we will only propagate local assignment/approvals after a certain depth
// we will propagate only local assignment/approvals after a certain depth
const DEPTH_THRESHOLD: usize = 5;
let mut assignments = Vec::new();
......
......@@ -345,12 +345,6 @@ where
// check interest in the peer in this message's relay parent
if view.contains(&message.relay_parent) {
let message_needed = job_data.message_from_validator_needed_by_peer(&peer, &validator);
// track the message as sent for this peer
job_data.message_sent_to_peer
.entry(peer.clone())
.or_default()
.insert(validator.clone());
if message_needed {
Some(peer.clone())
} else {
......@@ -362,6 +356,15 @@ where
})
.collect::<Vec<PeerId>>();
let interested_peers = util::choose_random_sqrt_subset(interested_peers, MIN_GOSSIP_PEERS);
interested_peers.iter()
.for_each(|peer|{
// track the message as sent for this peer
job_data.message_sent_to_peer
.entry(peer.clone())
.or_default()
.insert(validator.clone());
});
drop(_span);
if interested_peers.is_empty() {
......
......@@ -33,7 +33,7 @@ use polkadot_subsystem::{
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
MIN_GOSSIP_PEERS,
self as util, MIN_GOSSIP_PEERS,
};
use polkadot_node_primitives::{SignedFullStatement};
use polkadot_primitives::v1::{
......@@ -158,25 +158,21 @@ struct PeerRelayParentKnowledge {
}
impl PeerRelayParentKnowledge {
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based
/// Updates our view of the peer's knowledge with this statement's fingerprint based
/// on something that we would like to send to the peer.
///
/// This returns `None` if the peer cannot accept this statement, without altering internal
/// state.
/// NOTE: assumes `self.can_send` returned true before this call.
///
/// If the peer can accept the statement, this returns `Some` and updates the internal state.
/// Once the knowledge has incorporated a statement, it cannot be incorporated again.
///
/// This returns `Some(true)` if this is the first time the peer has become aware of a
/// This returns `true` if this is the first time the peer has become aware of a
/// candidate with the given hash.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> Option<bool> {
let already_known = self.sent_statements.contains(fingerprint)
|| self.received_statements.contains(fingerprint);
if already_known {
return None;
}
fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool {
debug_assert!(
self.can_send(fingerprint),
"send is only called after `can_send` returns true; qed",
);
let new_known = match fingerprint.0 {
CompactStatement::Seconded(ref h) => {
......@@ -186,20 +182,36 @@ impl PeerRelayParentKnowledge {
self.known_candidates.insert(h.clone())
},
CompactStatement::Valid(ref h) => {
// The peer can only accept Valid and Invalid statements for which it is aware
// of the corresponding candidate.
if !self.known_candidates.contains(h) {
return None;
}
CompactStatement::Valid(_) => {
false
}
};
self.sent_statements.insert(fingerprint.clone());
Some(new_known)
new_known
}
/// This returns `true` if the peer cannot accept this statement, without altering internal
/// state, `false` otherwise.
fn can_send(&self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool {
let already_known = self.sent_statements.contains(fingerprint)
|| self.received_statements.contains(fingerprint);
if already_known {
return false;
}
match fingerprint.0 {
CompactStatement::Valid(ref h) => {
// The peer can only accept Valid and Invalid statements for which it is aware
// of the corresponding candidate.
self.known_candidates.contains(h)
}
CompactStatement::Seconded(_) => {
true
},
}
}
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on
......@@ -274,24 +286,41 @@ struct PeerData {
}
impl PeerData {
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based
/// Updates our view of the peer's knowledge with this statement's fingerprint based
/// on something that we would like to send to the peer.
///
/// This returns `None` if the peer cannot accept this statement, without altering internal
/// state.
/// NOTE: assumes `self.can_send` returned true before this call.
///
/// If the peer can accept the statement, this returns `Some` and updates the internal state.
/// Once the knowledge has incorporated a statement, it cannot be incorporated again.
///
/// This returns `Some(true)` if this is the first time the peer has become aware of a
/// This returns `true` if this is the first time the peer has become aware of a
/// candidate with the given hash.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn send(
&mut self,
relay_parent: &Hash,
fingerprint: &(CompactStatement, ValidatorIndex),
) -> Option<bool> {
self.view_knowledge.get_mut(relay_parent).map_or(None, |k| k.send(fingerprint))
) -> bool {
debug_assert!(
self.can_send(relay_parent, fingerprint),
"send is only called after `can_send` returns true; qed",
);
self.view_knowledge
.get_mut(relay_parent)
.expect("send is only called after `can_send` returns true; qed")
.send(fingerprint)
}
/// This returns `None` if the peer cannot accept this statement, without altering internal
/// state.
fn can_send(
&self,
relay_parent: &Hash,
fingerprint: &(CompactStatement, ValidatorIndex),
) -> bool {
self.view_knowledge
.get(relay_parent)
.map_or(false, |k| k.can_send(fingerprint))
}
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on
......@@ -623,12 +652,21 @@ async fn circulate_statement(
) -> Vec<PeerId> {
let fingerprint = stored.fingerprint();
let len_sqrt = (peers.len() as f64).sqrt() as usize;
let cap = std::cmp::max(MIN_GOSSIP_PEERS, len_sqrt);
let peers_to_send: HashMap<PeerId, bool> = peers.iter_mut().filter_map(|(peer, data)| {
data.send(&relay_parent, &fingerprint).map(|new| (peer.clone(), new))
}).take(cap).collect();
let peers_to_send: Vec<PeerId> = peers.iter().filter_map(|(peer, data)| {
if data.can_send(&relay_parent, &fingerprint) {
Some(peer.clone())
} else {
None
}
}).collect();
let peers_to_send = util::choose_random_sqrt_subset(peers_to_send, MIN_GOSSIP_PEERS);
let peers_to_send: Vec<(PeerId, bool)> = peers_to_send.into_iter()
.map(|peer_id| {
let new = peers.get_mut(&peer_id)
.expect("a subset is taken above, so it exists; qed")
.send(&relay_parent, &fingerprint);
(peer_id, new)
}).collect();
// Send all these peers the initial statement.
if !peers_to_send.is_empty() {
......@@ -638,10 +676,10 @@ async fn circulate_statement(
?peers_to_send,
?relay_parent,
statement = ?stored.statement,
"Sending statement"
"Sending statement",
);
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send.keys().cloned().collect(),
peers_to_send.iter().map(|(p, _)| p.clone()).collect(),
payload,
))).await;
}
......@@ -665,26 +703,29 @@ async fn send_statements_about(
metrics: &Metrics,
) {
for statement in active_head.statements_about(candidate_hash) {
if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() {
let payload = statement_message(
relay_parent,
statement.statement.clone(),
);
let fingerprint = statement.fingerprint();
if !peer_data.can_send(&relay_parent, &fingerprint) {
continue;
}
peer_data.send(&relay_parent, &fingerprint);
let payload = statement_message(
relay_parent,
statement.statement.clone(),
);
tracing::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
?candidate_hash,
statement = ?statement.statement,
"Sending statement"
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
tracing::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
?candidate_hash,
statement = ?statement.statement,
"Sending statement",
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
metrics.on_statement_distributed();
}
metrics.on_statement_distributed();
}
}
......@@ -699,25 +740,28 @@ async fn send_statements(
metrics: &Metrics,
) {
for statement in active_head.statements() {
if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() {
let payload = statement_message(
relay_parent,
statement.statement.clone(),
);
let fingerprint = statement.fingerprint();
if !peer_data.can_send(&relay_parent, &fingerprint) {
continue;
}
peer_data.send(&relay_parent, &fingerprint);
let payload = statement_message(
relay_parent,
statement.statement.clone(),
);
tracing::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
statement = ?statement.statement,
"Sending statement"
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
tracing::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
statement = ?statement.statement,
"Sending statement"
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
metrics.on_statement_distributed();
}
metrics.on_statement_distributed();
}
}
......@@ -1331,7 +1375,7 @@ mod tests {
let hash_a = CandidateHash([1; 32].into());
// Sending an un-pinned statement should not work and should have no effect.
assert!(knowledge.send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))).is_none());
assert!(!knowledge.can_send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))));
assert!(!knowledge.known_candidates.contains(&hash_a));
assert!(knowledge.sent_statements.is_empty());
assert!(knowledge.received_statements.is_empty());
......@@ -1339,8 +1383,8 @@ mod tests {
assert!(knowledge.received_message_count.is_empty());
// Make the peer aware of the candidate.
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))), Some(true));
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(1))), Some(false));
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))), true);
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(1))), false);
assert!(knowledge.known_candidates.contains(&hash_a));
assert_eq!(knowledge.sent_statements.len(), 2);
assert!(knowledge.received_statements.is_empty());
......@@ -1348,7 +1392,7 @@ mod tests {
assert!(knowledge.received_message_count.get(&hash_a).is_none());
// And now it should accept the dependent message.
assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))), Some(false));
assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))), false);
assert!(knowledge.known_candidates.contains(&hash_a));
assert_eq!(knowledge.sent_statements.len(), 3);
assert!(knowledge.received_statements.is_empty());
......@@ -1362,7 +1406,7 @@ mod tests {
let hash_a = CandidateHash([1; 32].into());
assert!(knowledge.receive(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0)), 3).unwrap());
assert!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))).is_none());
assert!(!knowledge.can_send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))));
}
#[test]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment