Unverified Commit 27301a34 authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

Only accept requests from peers that got notified by us. (#2889)

parent 80256e98
Pipeline #134597 failed with stages
in 25 minutes and 34 seconds
......@@ -167,9 +167,11 @@ fn note_hash(
/// knowledge that a peer has about goings-on in a relay parent.
#[derive(Default)]
struct PeerRelayParentKnowledge {
/// candidates that the peer is aware of. This indicates that we can
/// candidates that the peer is aware of because we sent statements to it. This indicates that we can
/// send other statements pertaining to that candidate.
known_candidates: HashSet<CandidateHash>,
sent_candidates: HashSet<CandidateHash>,
/// candidates that peer is aware of, because we received statements from it.
received_candidates: HashSet<CandidateHash>,
/// fingerprints of all statements a peer should be aware of: those that
/// were sent to the peer by us.
sent_statements: HashSet<(CompactStatement, ValidatorIndex)>,
......@@ -205,7 +207,7 @@ impl PeerRelayParentKnowledge {
.or_default()
.note_local(h.clone());
self.known_candidates.insert(h.clone())
self.sent_candidates.insert(h.clone())
},
CompactStatement::Valid(_) => {
false
......@@ -231,7 +233,7 @@ impl PeerRelayParentKnowledge {
CompactStatement::Valid(ref h) => {
// The peer can only accept Valid and Invalid statements for which it is aware
// of the corresponding candidate.
self.known_candidates.contains(h)
self.is_known_candidate(h)
}
CompactStatement::Seconded(_) => {
true
......@@ -280,7 +282,7 @@ impl PeerRelayParentKnowledge {
h
}
CompactStatement::Valid(ref h) => {
if !self.known_candidates.contains(&h) {
if !self.is_known_candidate(&h) {
return Err(COST_UNEXPECTED_STATEMENT);
}
......@@ -301,7 +303,7 @@ impl PeerRelayParentKnowledge {
}
self.received_statements.insert(fingerprint.clone());
Ok(self.known_candidates.insert(candidate_hash.clone()))
Ok(self.received_candidates.insert(candidate_hash.clone()))
}
/// This method does the same checks as `receive` without modifying the internal state.
......@@ -330,7 +332,7 @@ impl PeerRelayParentKnowledge {
h
}
CompactStatement::Valid(ref h) => {
if !self.known_candidates.contains(&h) {
if !self.is_known_candidate(&h) {
return Err(COST_UNEXPECTED_STATEMENT);
}
......@@ -348,6 +350,12 @@ impl PeerRelayParentKnowledge {
Ok(())
}
}
/// Check for candidates that the peer is aware of. This indicates that we can
/// send other statements pertaining to that candidate.
fn is_known_candidate(&self, candidate: &CandidateHash) -> bool {
self.sent_candidates.contains(candidate) || self.received_candidates.contains(candidate)
}
}
struct PeerData {
......@@ -1527,6 +1535,7 @@ impl StatementDistribution {
.await?,
Message::Responder(result) =>
self.handle_responder_message(
&peers,
&mut active_heads,
result.ok_or(SubsystemError::Context(
"Failed to read from responder receiver (stream finished)"
......@@ -1545,15 +1554,30 @@ impl StatementDistribution {
/// Handle messages from responder background task.
async fn handle_responder_message(
&self,
peers: &HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
message: ResponderMessage,
) -> SubsystemResult<bool> {
match message {
ResponderMessage::GetData {
requesting_peer,
relay_parent,
candidate_hash,
tx,
} => {
if !requesting_peer_knows_about_candidate(
peers,
&requesting_peer,
&relay_parent,
&candidate_hash
) {
tracing::warn!(
target: LOG_TARGET,
"Peer requested candidate, although we never announced it to that peer."
);
return Ok(false)
}
let active_head = match active_heads.get(&relay_parent) {
Some(head) => head,
None => return Ok(false),
......@@ -1844,6 +1868,36 @@ impl StatementDistribution {
}
}
/// Check whether a peer knows about a candidate from us.
///
/// If not, it is deemed illegal for it to request corresponding data from us.
fn requesting_peer_knows_about_candidate(
peers: &HashMap<PeerId, PeerData>,
requesting_peer: &PeerId,
relay_parent: &Hash,
candidate_hash: &CandidateHash,
) -> bool {
requesting_peer_knows_about_candidate_inner(
peers,
requesting_peer,
relay_parent,
candidate_hash,
).is_some()
}
/// Helper function for `requesting_peer_knows_about_statement`.
fn requesting_peer_knows_about_candidate_inner(
peers: &HashMap<PeerId, PeerData>,
requesting_peer: &PeerId,
relay_parent: &Hash,
candidate_hash: &CandidateHash,
) -> Option<()> {
let peer_data = peers.get(requesting_peer)?;
let knowledge = peer_data.view_knowledge.get(relay_parent)?;
knowledge.sent_candidates.get(&candidate_hash)?;
Some(())
}
#[derive(Clone)]
struct MetricsInner {
statements_distributed: prometheus::Counter<prometheus::U64>,
......@@ -2146,7 +2200,7 @@ mod tests {
// Sending an un-pinned statement should not work and should have no effect.
assert!(!knowledge.can_send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))));
assert!(!knowledge.known_candidates.contains(&hash_a));
assert!(!knowledge.is_known_candidate(&hash_a));
assert!(knowledge.sent_statements.is_empty());
assert!(knowledge.received_statements.is_empty());
assert!(knowledge.seconded_counts.is_empty());
......@@ -2155,7 +2209,7 @@ mod tests {
// Make the peer aware of the candidate.
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))), true);
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(1))), false);
assert!(knowledge.known_candidates.contains(&hash_a));
assert!(knowledge.is_known_candidate(&hash_a));
assert_eq!(knowledge.sent_statements.len(), 2);
assert!(knowledge.received_statements.is_empty());
assert_eq!(knowledge.seconded_counts.len(), 2);
......@@ -2163,7 +2217,7 @@ mod tests {
// And now it should accept the dependent message.
assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))), false);
assert!(knowledge.known_candidates.contains(&hash_a));
assert!(knowledge.is_known_candidate(&hash_a));
assert_eq!(knowledge.sent_statements.len(), 3);
assert!(knowledge.received_statements.is_empty());
assert_eq!(knowledge.seconded_counts.len(), 2);
......@@ -2208,7 +2262,7 @@ mod tests {
Ok(false),
);
assert!(knowledge.known_candidates.contains(&hash_a));
assert!(knowledge.is_known_candidate(&hash_a));
assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 2);
assert!(knowledge.check_can_receive(&(CompactStatement::Valid(hash_a), ValidatorIndex(2)), 3).is_ok());
......@@ -2394,7 +2448,7 @@ mod tests {
let c_knowledge = peer_data.view_knowledge.get(&hash_c).unwrap();
assert!(c_knowledge.known_candidates.contains(&candidate_hash));
assert!(c_knowledge.is_known_candidate(&candidate_hash));
assert!(c_knowledge.sent_statements.contains(
&(CompactStatement::Seconded(candidate_hash), ValidatorIndex(0))
));
......@@ -3073,14 +3127,14 @@ mod tests {
// Now that it has the candidate it should answer requests accordingly (even after a
// failed request):
// Failing request first:
// Failing request first (wrong relay parent hash):
let (pending_response, response_rx) = oneshot::channel();
let inner_req = StatementFetchingRequest {
relay_parent: hash_b,
candidate_hash: metadata.candidate_hash,
};
let req = sc_network::config::IncomingRequest {
peer: peer_a,
peer: peer_b,
payload: inner_req.encode(),
pending_response,
};
......@@ -3090,7 +3144,8 @@ mod tests {
Err(()) => {}
);
// Now the working one:
// Another failing request (peer_a never received a statement from us, so it is not
// allowed to request the data):
let (pending_response, response_rx) = oneshot::channel();
let inner_req = StatementFetchingRequest {
relay_parent: metadata.relay_parent,
......@@ -3102,7 +3157,23 @@ mod tests {
pending_response,
};
tx_reqs.send(req).await.unwrap();
assert_matches!(
response_rx.await.unwrap().result,
Err(()) => {}
);
// And now the succeding request from peer_b:
let (pending_response, response_rx) = oneshot::channel();
let inner_req = StatementFetchingRequest {
relay_parent: metadata.relay_parent,
candidate_hash: metadata.candidate_hash,
};
let req = sc_network::config::IncomingRequest {
peer: peer_b,
payload: inner_req.encode(),
pending_response,
};
tx_reqs.send(req).await.unwrap();
let StatementFetchingResponse::Statement(committed) =
Decode::decode(&mut response_rx.await.unwrap().result.unwrap().as_ref()).unwrap();
assert_eq!(committed, candidate);
......
......@@ -19,13 +19,13 @@ use futures::{SinkExt, StreamExt, channel::{mpsc, oneshot}, stream::FuturesUnord
use parity_scale_codec::Decode;
use polkadot_node_network_protocol::{
UnifiedReputationChange as Rep,
PeerId, UnifiedReputationChange as Rep,
request_response::{
IncomingRequest, MAX_PARALLEL_STATEMENT_REQUESTS, request::OutgoingResponse,
v1::{
StatementFetchingRequest, StatementFetchingResponse
}
}
},
},
};
use polkadot_primitives::v1::{CandidateHash, CommittedCandidateReceipt, Hash};
......@@ -37,6 +37,7 @@ const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request")
pub enum ResponderMessage {
/// Get an update of availble peers to try for fetching a given statement.
GetData {
requesting_peer: PeerId,
relay_parent: Hash,
candidate_hash: CandidateHash,
tx: oneshot::Sender<CommittedCandidateReceipt>
......@@ -112,6 +113,7 @@ pub async fn respond(
let (tx, rx) = oneshot::channel();
if let Err(err) = sender.feed(
ResponderMessage::GetData {
requesting_peer: peer,
relay_parent: req.payload.relay_parent,
candidate_hash: req.payload.candidate_hash,
tx,
......@@ -131,7 +133,7 @@ pub async fn respond(
?err,
"Requested data not found."
);
Err(())
Err(())
}
Ok(v) => Ok(StatementFetchingResponse::Statement(v)),
};
......@@ -147,7 +149,7 @@ pub async fn respond(
target: LOG_TARGET,
"Sending response failed"
);
}
}
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment