diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index f794f0f23ed0455ef666d6677705c44e9a3ba5f1..b9c9089bc677cfcb7ec3b2df30d8d748f408d454 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -167,9 +167,11 @@ fn note_hash( /// knowledge that a peer has about goings-on in a relay parent. #[derive(Default)] struct PeerRelayParentKnowledge { - /// candidates that the peer is aware of. This indicates that we can + /// candidates that the peer is aware of because we sent statements to it. This indicates that we can /// send other statements pertaining to that candidate. - known_candidates: HashSet<CandidateHash>, + sent_candidates: HashSet<CandidateHash>, + /// candidates that peer is aware of, because we received statements from it. + received_candidates: HashSet<CandidateHash>, /// fingerprints of all statements a peer should be aware of: those that /// were sent to the peer by us. sent_statements: HashSet<(CompactStatement, ValidatorIndex)>, @@ -205,7 +207,7 @@ impl PeerRelayParentKnowledge { .or_default() .note_local(h.clone()); - self.known_candidates.insert(h.clone()) + self.sent_candidates.insert(h.clone()) }, CompactStatement::Valid(_) => { false @@ -231,7 +233,7 @@ impl PeerRelayParentKnowledge { CompactStatement::Valid(ref h) => { // The peer can only accept Valid and Invalid statements for which it is aware // of the corresponding candidate. - self.known_candidates.contains(h) + self.is_known_candidate(h) } CompactStatement::Seconded(_) => { true @@ -280,7 +282,7 @@ impl PeerRelayParentKnowledge { h } CompactStatement::Valid(ref h) => { - if !self.known_candidates.contains(&h) { + if !self.is_known_candidate(&h) { return Err(COST_UNEXPECTED_STATEMENT); } @@ -301,7 +303,7 @@ impl PeerRelayParentKnowledge { } self.received_statements.insert(fingerprint.clone()); - Ok(self.known_candidates.insert(candidate_hash.clone())) + Ok(self.received_candidates.insert(candidate_hash.clone())) } /// This method does the same checks as `receive` without modifying the internal state. @@ -330,7 +332,7 @@ impl PeerRelayParentKnowledge { h } CompactStatement::Valid(ref h) => { - if !self.known_candidates.contains(&h) { + if !self.is_known_candidate(&h) { return Err(COST_UNEXPECTED_STATEMENT); } @@ -348,6 +350,12 @@ impl PeerRelayParentKnowledge { Ok(()) } } + + /// Check for candidates that the peer is aware of. This indicates that we can + /// send other statements pertaining to that candidate. + fn is_known_candidate(&self, candidate: &CandidateHash) -> bool { + self.sent_candidates.contains(candidate) || self.received_candidates.contains(candidate) + } } struct PeerData { @@ -1527,6 +1535,7 @@ impl StatementDistribution { .await?, Message::Responder(result) => self.handle_responder_message( + &peers, &mut active_heads, result.ok_or(SubsystemError::Context( "Failed to read from responder receiver (stream finished)" @@ -1545,15 +1554,30 @@ impl StatementDistribution { /// Handle messages from responder background task. async fn handle_responder_message( &self, + peers: &HashMap<PeerId, PeerData>, active_heads: &mut HashMap<Hash, ActiveHeadData>, message: ResponderMessage, ) -> SubsystemResult<bool> { match message { ResponderMessage::GetData { + requesting_peer, relay_parent, candidate_hash, tx, } => { + if !requesting_peer_knows_about_candidate( + peers, + &requesting_peer, + &relay_parent, + &candidate_hash + ) { + tracing::warn!( + target: LOG_TARGET, + "Peer requested candidate, although we never announced it to that peer." + ); + return Ok(false) + } + let active_head = match active_heads.get(&relay_parent) { Some(head) => head, None => return Ok(false), @@ -1844,6 +1868,36 @@ impl StatementDistribution { } } +/// Check whether a peer knows about a candidate from us. +/// +/// If not, it is deemed illegal for it to request corresponding data from us. +fn requesting_peer_knows_about_candidate( + peers: &HashMap<PeerId, PeerData>, + requesting_peer: &PeerId, + relay_parent: &Hash, + candidate_hash: &CandidateHash, +) -> bool { + requesting_peer_knows_about_candidate_inner( + peers, + requesting_peer, + relay_parent, + candidate_hash, + ).is_some() +} + +/// Helper function for `requesting_peer_knows_about_statement`. +fn requesting_peer_knows_about_candidate_inner( + peers: &HashMap<PeerId, PeerData>, + requesting_peer: &PeerId, + relay_parent: &Hash, + candidate_hash: &CandidateHash, +) -> Option<()> { + let peer_data = peers.get(requesting_peer)?; + let knowledge = peer_data.view_knowledge.get(relay_parent)?; + knowledge.sent_candidates.get(&candidate_hash)?; + Some(()) +} + #[derive(Clone)] struct MetricsInner { statements_distributed: prometheus::Counter<prometheus::U64>, @@ -2146,7 +2200,7 @@ mod tests { // Sending an un-pinned statement should not work and should have no effect. assert!(!knowledge.can_send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0)))); - assert!(!knowledge.known_candidates.contains(&hash_a)); + assert!(!knowledge.is_known_candidate(&hash_a)); assert!(knowledge.sent_statements.is_empty()); assert!(knowledge.received_statements.is_empty()); assert!(knowledge.seconded_counts.is_empty()); @@ -2155,7 +2209,7 @@ mod tests { // Make the peer aware of the candidate. assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))), true); assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(1))), false); - assert!(knowledge.known_candidates.contains(&hash_a)); + assert!(knowledge.is_known_candidate(&hash_a)); assert_eq!(knowledge.sent_statements.len(), 2); assert!(knowledge.received_statements.is_empty()); assert_eq!(knowledge.seconded_counts.len(), 2); @@ -2163,7 +2217,7 @@ mod tests { // And now it should accept the dependent message. assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))), false); - assert!(knowledge.known_candidates.contains(&hash_a)); + assert!(knowledge.is_known_candidate(&hash_a)); assert_eq!(knowledge.sent_statements.len(), 3); assert!(knowledge.received_statements.is_empty()); assert_eq!(knowledge.seconded_counts.len(), 2); @@ -2208,7 +2262,7 @@ mod tests { Ok(false), ); - assert!(knowledge.known_candidates.contains(&hash_a)); + assert!(knowledge.is_known_candidate(&hash_a)); assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 2); assert!(knowledge.check_can_receive(&(CompactStatement::Valid(hash_a), ValidatorIndex(2)), 3).is_ok()); @@ -2394,7 +2448,7 @@ mod tests { let c_knowledge = peer_data.view_knowledge.get(&hash_c).unwrap(); - assert!(c_knowledge.known_candidates.contains(&candidate_hash)); + assert!(c_knowledge.is_known_candidate(&candidate_hash)); assert!(c_knowledge.sent_statements.contains( &(CompactStatement::Seconded(candidate_hash), ValidatorIndex(0)) )); @@ -3073,14 +3127,14 @@ mod tests { // Now that it has the candidate it should answer requests accordingly (even after a // failed request): - // Failing request first: + // Failing request first (wrong relay parent hash): let (pending_response, response_rx) = oneshot::channel(); let inner_req = StatementFetchingRequest { relay_parent: hash_b, candidate_hash: metadata.candidate_hash, }; let req = sc_network::config::IncomingRequest { - peer: peer_a, + peer: peer_b, payload: inner_req.encode(), pending_response, }; @@ -3090,7 +3144,8 @@ mod tests { Err(()) => {} ); - // Now the working one: + // Another failing request (peer_a never received a statement from us, so it is not + // allowed to request the data): let (pending_response, response_rx) = oneshot::channel(); let inner_req = StatementFetchingRequest { relay_parent: metadata.relay_parent, @@ -3102,7 +3157,23 @@ mod tests { pending_response, }; tx_reqs.send(req).await.unwrap(); + assert_matches!( + response_rx.await.unwrap().result, + Err(()) => {} + ); + // And now the succeding request from peer_b: + let (pending_response, response_rx) = oneshot::channel(); + let inner_req = StatementFetchingRequest { + relay_parent: metadata.relay_parent, + candidate_hash: metadata.candidate_hash, + }; + let req = sc_network::config::IncomingRequest { + peer: peer_b, + payload: inner_req.encode(), + pending_response, + }; + tx_reqs.send(req).await.unwrap(); let StatementFetchingResponse::Statement(committed) = Decode::decode(&mut response_rx.await.unwrap().result.unwrap().as_ref()).unwrap(); assert_eq!(committed, candidate); diff --git a/polkadot/node/network/statement-distribution/src/responder.rs b/polkadot/node/network/statement-distribution/src/responder.rs index a5a0a15460adc6b366e50e9ae51fd9d4a9c6d7bc..4f572446abc20fd5944f8f1c09516c19b4620b02 100644 --- a/polkadot/node/network/statement-distribution/src/responder.rs +++ b/polkadot/node/network/statement-distribution/src/responder.rs @@ -19,13 +19,13 @@ use futures::{SinkExt, StreamExt, channel::{mpsc, oneshot}, stream::FuturesUnord use parity_scale_codec::Decode; use polkadot_node_network_protocol::{ - UnifiedReputationChange as Rep, + PeerId, UnifiedReputationChange as Rep, request_response::{ IncomingRequest, MAX_PARALLEL_STATEMENT_REQUESTS, request::OutgoingResponse, v1::{ StatementFetchingRequest, StatementFetchingResponse - } - } + }, + }, }; use polkadot_primitives::v1::{CandidateHash, CommittedCandidateReceipt, Hash}; @@ -37,6 +37,7 @@ const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request") pub enum ResponderMessage { /// Get an update of availble peers to try for fetching a given statement. GetData { + requesting_peer: PeerId, relay_parent: Hash, candidate_hash: CandidateHash, tx: oneshot::Sender<CommittedCandidateReceipt> @@ -112,6 +113,7 @@ pub async fn respond( let (tx, rx) = oneshot::channel(); if let Err(err) = sender.feed( ResponderMessage::GetData { + requesting_peer: peer, relay_parent: req.payload.relay_parent, candidate_hash: req.payload.candidate_hash, tx, @@ -131,7 +133,7 @@ pub async fn respond( ?err, "Requested data not found." ); - Err(()) + Err(()) } Ok(v) => Ok(StatementFetchingResponse::Statement(v)), }; @@ -147,7 +149,7 @@ pub async fn respond( target: LOG_TARGET, "Sending response failed" ); - } + } } }