// Copyright 2020 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! The Statement Distribution Subsystem. //! //! This is responsible for distributing signed statements about candidate //! validity amongst validators. use polkadot_subsystem::{ Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem, ActiveLeavesUpdate, FromOverseer, OverseerSignal, }; use polkadot_subsystem::messages::{ AllMessages, NetworkBridgeMessage, NetworkBridgeEvent, StatementDistributionMessage, PeerId, ReputationChange as Rep, CandidateBackingMessage, RuntimeApiMessage, RuntimeApiRequest, }; use node_primitives::{ProtocolId, View, SignedFullStatement}; use polkadot_primitives::v1::{ Hash, CompactStatement, ValidatorIndex, ValidatorId, SigningContext, ValidatorSignature, }; use parity_scale_codec::{Encode, Decode}; use futures::prelude::*; use futures::channel::oneshot; use indexmap::IndexSet; use std::collections::{HashMap, HashSet}; const PROTOCOL_V1: ProtocolId = *b"sdn1"; const COST_UNEXPECTED_STATEMENT: Rep = Rep::new(-100, "Unexpected Statement"); const COST_INVALID_SIGNATURE: Rep = Rep::new(-500, "Invalid Statement Signature"); const COST_INVALID_MESSAGE: Rep = Rep::new(-500, "Invalid message"); const COST_DUPLICATE_STATEMENT: Rep = Rep::new(-250, "Statement sent more than once by peer"); const COST_APPARENT_FLOOD: Rep = Rep::new(-1000, "Peer appears to be flooding us with statements"); const BENEFIT_VALID_STATEMENT: Rep = Rep::new(5, "Peer provided a valid statement"); const BENEFIT_VALID_STATEMENT_FIRST: Rep = Rep::new( 25, "Peer was the first to provide a valid statement", ); /// The maximum amount of candidates each validator is allowed to second at any relay-parent. /// Short for "Validator Candidate Threshold". /// /// This is the amount of candidates we keep per validator at any relay-parent. /// Typically we will only keep 1, but when a validator equivocates we will need to track 2. const VC_THRESHOLD: usize = 2; /// The statement distribution subsystem. pub struct StatementDistribution; impl Subsystem for StatementDistribution where C: SubsystemContext { fn start(self, ctx: C) -> SpawnedSubsystem { // Swallow error because failure is fatal to the node and we log with more precision // within `run`. SpawnedSubsystem { name: "statement-distribution-subsystem", future: run(ctx).map(|_| ()).boxed(), } } } fn network_update_message(n: NetworkBridgeEvent) -> AllMessages { AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(n)) } /// Tracks our impression of a single peer's view of the candidates a validator has seconded /// for a given relay-parent. /// /// It is expected to receive at most `VC_THRESHOLD` from us and be aware of at most `VC_THRESHOLD` /// via other means. #[derive(Default)] struct VcPerPeerTracker { local_observed: arrayvec::ArrayVec<[Hash; VC_THRESHOLD]>, remote_observed: arrayvec::ArrayVec<[Hash; VC_THRESHOLD]>, } impl VcPerPeerTracker { // Note that the remote should now be aware that a validator has seconded a given candidate (by hash) // based on a message that we have sent it from our local pool. fn note_local(&mut self, h: Hash) { if !note_hash(&mut self.local_observed, h) { log::warn!("Statement distribution is erroneously attempting to distribute more \ than {} candidate(s) per validator index. Ignoring", VC_THRESHOLD); } } // Note that the remote should now be aware that a validator has seconded a given candidate (by hash) // based on a message that it has sent us. // // Returns `true` if the peer was allowed to send us such a message, `false` otherwise. fn note_remote(&mut self, h: Hash) -> bool { note_hash(&mut self.remote_observed, h) } } fn note_hash( observed: &mut arrayvec::ArrayVec<[Hash; VC_THRESHOLD]>, h: Hash, ) -> bool { if observed.contains(&h) { return true; } if observed.is_full() { false } else { observed.try_push(h).expect("length of storage guarded above; \ only panics if length exceeds capacity; qed"); true } } /// knowledge that a peer has about goings-on in a relay parent. #[derive(Default)] struct PeerRelayParentKnowledge { /// candidates that the peer is aware of. This indicates that we can /// send other statements pertaining to that candidate. known_candidates: HashSet, /// fingerprints of all statements a peer should be aware of: those that /// were sent to the peer by us. sent_statements: HashSet<(CompactStatement, ValidatorIndex)>, /// fingerprints of all statements a peer should be aware of: those that /// were sent to us by the peer. received_statements: HashSet<(CompactStatement, ValidatorIndex)>, /// How many candidates this peer is aware of for each given validator index. seconded_counts: HashMap, /// How many statements we've received for each candidate that we're aware of. received_message_count: HashMap, } impl PeerRelayParentKnowledge { /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based /// on something that we would like to send to the peer. /// /// This returns `None` if the peer cannot accept this statement, without altering internal /// state. /// /// If the peer can accept the statement, this returns `Some` and updates the internal state. /// Once the knowledge has incorporated a statement, it cannot be incorporated again. /// /// This returns `Some(true)` if this is the first time the peer has become aware of a /// candidate with the given hash. fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> Option { let already_known = self.sent_statements.contains(fingerprint) || self.received_statements.contains(fingerprint); if already_known { return None; } let new_known = match fingerprint.0 { CompactStatement::Candidate(ref h) => { self.seconded_counts.entry(fingerprint.1) .or_default() .note_local(h.clone()); self.known_candidates.insert(h.clone()) }, CompactStatement::Valid(ref h) | CompactStatement::Invalid(ref h) => { // The peer can only accept Valid and Invalid statements for which it is aware // of the corresponding candidate. if !self.known_candidates.contains(h) { return None; } false } }; self.sent_statements.insert(fingerprint.clone()); Some(new_known) } /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on /// a message we are receiving from the peer. /// /// Provide the maximum message count that we can receive per candidate. In practice we should /// not receive more statements for any one candidate than there are members in the group assigned /// to that para, but this maximum needs to be lenient to account for equivocations that may be /// cross-group. As such, a maximum of 2 * n_validators is recommended. /// /// This returns an error if the peer should not have sent us this message according to protocol /// rules for flood protection. /// /// If this returns `Ok`, the internal state has been altered. After `receive`ing a new /// candidate, we are then cleared to send the peer further statements about that candidate. /// /// This returns `Ok(true)` if this is the first time the peer has become aware of a /// candidate with given hash. fn receive( &mut self, fingerprint: &(CompactStatement, ValidatorIndex), max_message_count: usize, ) -> Result { // We don't check `sent_statements` because a statement could be in-flight from both // sides at the same time. if self.received_statements.contains(fingerprint) { return Err(COST_DUPLICATE_STATEMENT); } let candidate_hash = match fingerprint.0 { CompactStatement::Candidate(ref h) => { let allowed_remote = self.seconded_counts.entry(fingerprint.1) .or_insert_with(Default::default) .note_remote(h.clone()); if !allowed_remote { return Err(COST_UNEXPECTED_STATEMENT); } h } CompactStatement::Valid(ref h)| CompactStatement::Invalid(ref h) => { if !self.known_candidates.contains(&h) { return Err(COST_UNEXPECTED_STATEMENT); } h } }; { let received_per_candidate = self.received_message_count .entry(candidate_hash.clone()) .or_insert(0); if *received_per_candidate >= max_message_count { return Err(COST_APPARENT_FLOOD); } *received_per_candidate += 1; } self.received_statements.insert(fingerprint.clone()); Ok(self.known_candidates.insert(candidate_hash.clone())) } } struct PeerData { view: View, view_knowledge: HashMap, } impl PeerData { /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based /// on something that we would like to send to the peer. /// /// This returns `None` if the peer cannot accept this statement, without altering internal /// state. /// /// If the peer can accept the statement, this returns `Some` and updates the internal state. /// Once the knowledge has incorporated a statement, it cannot be incorporated again. /// /// This returns `Some(true)` if this is the first time the peer has become aware of a /// candidate with the given hash. fn send( &mut self, relay_parent: &Hash, fingerprint: &(CompactStatement, ValidatorIndex), ) -> Option { self.view_knowledge.get_mut(relay_parent).map_or(None, |k| k.send(fingerprint)) } /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on /// a message we are receiving from the peer. /// /// Provide the maximum message count that we can receive per candidate. In practice we should /// not receive more statements for any one candidate than there are members in the group assigned /// to that para, but this maximum needs to be lenient to account for equivocations that may be /// cross-group. As such, a maximum of 2 * n_validators is recommended. /// /// This returns an error if the peer should not have sent us this message according to protocol /// rules for flood protection. /// /// If this returns `Ok`, the internal state has been altered. After `receive`ing a new /// candidate, we are then cleared to send the peer further statements about that candidate. /// /// This returns `Ok(true)` if this is the first time the peer has become aware of a /// candidate with given hash. fn receive( &mut self, relay_parent: &Hash, fingerprint: &(CompactStatement, ValidatorIndex), max_message_count: usize, ) -> Result { self.view_knowledge.get_mut(relay_parent).ok_or(COST_UNEXPECTED_STATEMENT)? .receive(fingerprint, max_message_count) } } // A statement stored while a relay chain head is active. #[derive(Debug)] struct StoredStatement { comparator: StoredStatementComparator, statement: SignedFullStatement, } // A value used for comparison of stored statements to each other. // // The compact version of the statement, the validator index, and the signature of the validator // is enough to differentiate between all types of equivocations, as long as the signature is // actually checked to be valid. The same statement with 2 signatures and 2 statements with // different (or same) signatures wll all be correctly judged to be unequal with this comparator. #[derive(PartialEq, Eq, Hash, Clone, Debug)] struct StoredStatementComparator { compact: CompactStatement, validator_index: ValidatorIndex, signature: ValidatorSignature, } impl StoredStatement { fn compact(&self) -> &CompactStatement { &self.comparator.compact } fn fingerprint(&self) -> (CompactStatement, ValidatorIndex) { (self.comparator.compact.clone(), self.statement.validator_index()) } } impl std::borrow::Borrow for StoredStatement { fn borrow(&self) -> &StoredStatementComparator { &self.comparator } } impl std::hash::Hash for StoredStatement { fn hash(&self, state: &mut H) { self.comparator.hash(state) } } impl std::cmp::PartialEq for StoredStatement { fn eq(&self, other: &Self) -> bool { &self.comparator == &other.comparator } } impl std::cmp::Eq for StoredStatement {} #[derive(Debug)] enum NotedStatement<'a> { NotUseful, Fresh(&'a StoredStatement), UsefulButKnown } struct ActiveHeadData { /// All candidates we are aware of for this head, keyed by hash. candidates: HashSet, /// Stored statements for circulation to peers. /// /// These are iterable in insertion order, and `Seconded` statements are always /// accepted before dependent statements. statements: IndexSet, /// The validators at this head. validators: Vec, /// The session index this head is at. session_index: sp_staking::SessionIndex, /// How many `Seconded` statements we've seen per validator. seconded_counts: HashMap, } impl ActiveHeadData { fn new(validators: Vec, session_index: sp_staking::SessionIndex) -> Self { ActiveHeadData { candidates: Default::default(), statements: Default::default(), validators, session_index, seconded_counts: Default::default(), } } /// Note the given statement. /// /// If it was not already known and can be accepted, returns `NotedStatement::Fresh`, /// with a handle to the statement. /// /// If it can be accepted, but we already know it, returns `NotedStatement::UsefulButKnown`. /// /// We accept up to `VC_THRESHOLD` (2 at time of writing) `Seconded` statements /// per validator. These will be the first ones we see. The statement is assumed /// to have been checked, including that the validator index is not out-of-bounds and /// the signature is valid. /// /// Any other statements or those that reference a candidate we are not aware of cannot be accepted /// and will return `NotedStatement::NotUseful`. fn note_statement(&mut self, statement: SignedFullStatement) -> NotedStatement { let validator_index = statement.validator_index(); let comparator = StoredStatementComparator { compact: statement.payload().to_compact(), validator_index, signature: statement.signature().clone(), }; let stored = StoredStatement { comparator: comparator.clone(), statement, }; match comparator.compact { CompactStatement::Candidate(h) => { let seconded_so_far = self.seconded_counts.entry(validator_index).or_insert(0); if *seconded_so_far >= VC_THRESHOLD { return NotedStatement::NotUseful; } self.candidates.insert(h); if self.statements.insert(stored) { *seconded_so_far += 1; // This will always return `Some` because it was just inserted. NotedStatement::Fresh(self.statements.get(&comparator) .expect("Statement was just inserted; qed")) } else { NotedStatement::UsefulButKnown } } CompactStatement::Valid(h) | CompactStatement::Invalid(h) => { if !self.candidates.contains(&h) { return NotedStatement::NotUseful; } if self.statements.insert(stored) { // This will always return `Some` because it was just inserted. NotedStatement::Fresh(self.statements.get(&comparator) .expect("Statement was just inserted; qed")) } else { NotedStatement::UsefulButKnown } } } } /// Get an iterator over all statements for the active head. Seconded statements come first. fn statements(&self) -> impl Iterator + '_ { self.statements.iter() } /// Get an iterator over all statements for the active head that are for a particular candidate. fn statements_about(&self, candidate_hash: Hash) -> impl Iterator + '_ { self.statements().filter(move |s| s.compact().candidate_hash() == &candidate_hash) } } /// Check a statement signature under this parent hash. fn check_statement_signature( head: &ActiveHeadData, relay_parent: Hash, statement: &SignedFullStatement, ) -> Result<(), ()> { let signing_context = SigningContext { session_index: head.session_index, parent_hash: relay_parent, }; head.validators.get(statement.validator_index() as usize) .ok_or(()) .and_then(|v| statement.check_signature(&signing_context, v)) } #[derive(Encode, Decode)] enum WireMessage { /// relay-parent, full statement. #[codec(index = "0")] Statement(Hash, SignedFullStatement), } /// Places the statement in storage if it is new, and then /// circulates the statement to all peers who have not seen it yet, and /// sends all statements dependent on that statement to peers who could previously not receive /// them but now can. async fn circulate_statement_and_dependents( peers: &mut HashMap, active_heads: &mut HashMap, ctx: &mut impl SubsystemContext, relay_parent: Hash, statement: SignedFullStatement, ) -> SubsystemResult<()> { if let Some(active_head)= active_heads.get_mut(&relay_parent) { // First circulate the statement directly to all peers needing it. // The borrow of `active_head` needs to encompass only this (Rust) statement. let outputs: Option<(Hash, Vec)> = { match active_head.note_statement(statement) { NotedStatement::Fresh(stored) => Some(( stored.compact().candidate_hash().clone(), circulate_statement(peers, ctx, relay_parent, stored).await?, )), _ => None, } }; // Now send dependent statements to all peers needing them, if any. if let Some((candidate_hash, peers_needing_dependents)) = outputs { for peer in peers_needing_dependents { if let Some(peer_data) = peers.get_mut(&peer) { // defensive: the peer data should always be some because the iterator // of peers is derived from the set of peers. send_statements_about( peer, peer_data, ctx, relay_parent, candidate_hash, &*active_head ).await?; } } } } Ok(()) } /// Circulates a statement to all peers who have not seen it yet, and returns /// an iterator over peers who need to have dependent statements sent. async fn circulate_statement( peers: &mut HashMap, ctx: &mut impl SubsystemContext, relay_parent: Hash, stored: &StoredStatement, ) -> SubsystemResult> { let fingerprint = stored.fingerprint(); let mut peers_to_send = HashMap::new(); for (peer, data) in peers.iter_mut() { if let Some(new_known) = data.send(&relay_parent, &fingerprint) { peers_to_send.insert(peer.clone(), new_known); } } // Send all these peers the initial statement. if !peers_to_send.is_empty() { let payload = WireMessage::Statement(relay_parent, stored.statement.clone()).encode(); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( peers_to_send.keys().cloned().collect(), PROTOCOL_V1, payload, ))).await?; } Ok(peers_to_send.into_iter().filter_map(|(peer, needs_dependent)| if needs_dependent { Some(peer) } else { None }).collect()) } /// Send all statements about a given candidate hash to a peer. async fn send_statements_about( peer: PeerId, peer_data: &mut PeerData, ctx: &mut impl SubsystemContext, relay_parent: Hash, candidate_hash: Hash, active_head: &ActiveHeadData, ) -> SubsystemResult<()> { for statement in active_head.statements_about(candidate_hash) { if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() { let payload = WireMessage::Statement( relay_parent, statement.statement.clone(), ).encode(); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( vec![peer.clone()], PROTOCOL_V1, payload, ))).await?; } } Ok(()) } /// Send all statements at a given relay-parent to a peer. async fn send_statements( peer: PeerId, peer_data: &mut PeerData, ctx: &mut impl SubsystemContext, relay_parent: Hash, active_head: &ActiveHeadData ) -> SubsystemResult<()> { for statement in active_head.statements() { if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() { let payload = WireMessage::Statement( relay_parent, statement.statement.clone(), ).encode(); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( vec![peer.clone()], PROTOCOL_V1, payload, ))).await?; } } Ok(()) } async fn report_peer( ctx: &mut impl SubsystemContext, peer: PeerId, rep: Rep, ) -> SubsystemResult<()> { ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep) )).await } // Handle an incoming wire message. Returns a reference to a newly-stored statement // if we were not already aware of it, along with the corresponding relay-parent. // // This function checks the signature and ensures the statement is compatible with our // view. async fn handle_incoming_message<'a>( peer: PeerId, peer_data: &mut PeerData, our_view: &View, active_heads: &'a mut HashMap, ctx: &mut impl SubsystemContext, message: Vec, ) -> SubsystemResult> { let (relay_parent, statement) = match WireMessage::decode(&mut &message[..]) { Err(_) => return report_peer(ctx, peer, COST_INVALID_MESSAGE).await.map(|_| None), Ok(WireMessage::Statement(r, s)) => (r, s), }; if !our_view.contains(&relay_parent) { return report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await.map(|_| None); } let active_head = match active_heads.get_mut(&relay_parent) { Some(h) => h, None => { // This should never be out-of-sync with our view if the view updates // correspond to actual `StartWork` messages. So we just log and ignore. log::warn!("Our view out-of-sync with active heads. Head {} not found", relay_parent); return Ok(None); } }; // check the signature on the statement. if let Err(()) = check_statement_signature(&active_head, relay_parent, &statement) { return report_peer(ctx, peer, COST_INVALID_SIGNATURE).await.map(|_| None); } // Ensure the statement is stored in the peer data. // // Note that if the peer is sending us something that is not within their view, // it will not be kept within their log. let fingerprint = (statement.payload().to_compact(), statement.validator_index()); let max_message_count = active_head.validators.len() * 2; match peer_data.receive(&relay_parent, &fingerprint, max_message_count) { Err(rep) => { report_peer(ctx, peer, rep).await?; return Ok(None) } Ok(true) => { // Send the peer all statements concerning the candidate that we have, // since it appears to have just learned about the candidate. send_statements_about( peer.clone(), peer_data, ctx, relay_parent, fingerprint.0.candidate_hash().clone(), &*active_head, ).await? } Ok(false) => {} } // Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation // or unpinned to a seconded candidate. So it is safe to place it into the storage. match active_head.note_statement(statement) { NotedStatement::NotUseful => Ok(None), NotedStatement::UsefulButKnown => { report_peer(ctx, peer, BENEFIT_VALID_STATEMENT).await?; Ok(None) } NotedStatement::Fresh(statement) => { report_peer(ctx, peer, BENEFIT_VALID_STATEMENT_FIRST).await?; Ok(Some((relay_parent, statement))) } } } /// Update a peer's view. Sends all newly unlocked statements based on the previous async fn update_peer_view_and_send_unlocked( peer: PeerId, peer_data: &mut PeerData, ctx: &mut impl SubsystemContext, active_heads: &HashMap, new_view: View, ) -> SubsystemResult<()> { let old_view = std::mem::replace(&mut peer_data.view, new_view); // Remove entries for all relay-parents in the old view but not the new. for removed in old_view.difference(&peer_data.view) { let _ = peer_data.view_knowledge.remove(removed); } // Add entries for all relay-parents in the new view but not the old. // Furthermore, send all statements we have for those relay parents. let new_view = peer_data.view.difference(&old_view).copied().collect::>(); for new in new_view.iter().copied() { peer_data.view_knowledge.insert(new, Default::default()); if let Some(active_head) = active_heads.get(&new) { send_statements( peer.clone(), peer_data, ctx, new, active_head, ).await?; } } Ok(()) } async fn handle_network_update( peers: &mut HashMap, active_heads: &mut HashMap, ctx: &mut impl SubsystemContext, our_view: &mut View, update: NetworkBridgeEvent, ) -> SubsystemResult<()> { match update { NetworkBridgeEvent::PeerConnected(peer, _role) => { peers.insert(peer, PeerData { view: Default::default(), view_knowledge: Default::default(), }); Ok(()) } NetworkBridgeEvent::PeerDisconnected(peer) => { peers.remove(&peer); Ok(()) } NetworkBridgeEvent::PeerMessage(peer, message) => { match peers.get_mut(&peer) { Some(data) => { let new_stored = handle_incoming_message( peer, data, &*our_view, active_heads, ctx, message, ).await?; if let Some((relay_parent, new)) = new_stored { // When we receive a new message from a peer, we forward it to the // candidate backing subsystem. let message = AllMessages::CandidateBacking( CandidateBackingMessage::Statement(relay_parent, new.statement.clone()) ); ctx.send_message(message).await?; } Ok(()) } None => Ok(()), } } NetworkBridgeEvent::PeerViewChange(peer, view) => { match peers.get_mut(&peer) { Some(data) => { update_peer_view_and_send_unlocked( peer, data, ctx, &*active_heads, view, ).await } None => Ok(()), } } NetworkBridgeEvent::OurViewChange(view) => { let old_view = std::mem::replace(our_view, view); active_heads.retain(|head, _| our_view.contains(head)); for new in our_view.difference(&old_view) { if !active_heads.contains_key(&new) { log::warn!(target: "statement_distribution", "Our network bridge view update \ inconsistent with `StartWork` messages we have received from overseer. \ Contains unknown hash {}", new); } } Ok(()) } } } async fn run( mut ctx: impl SubsystemContext, ) -> SubsystemResult<()> { // startup: register the network protocol with the bridge. ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::RegisterEventProducer( PROTOCOL_V1, network_update_message, ))).await?; let mut peers: HashMap = HashMap::new(); let mut our_view = View::default(); let mut active_heads: HashMap = HashMap::new(); loop { let message = ctx.recv().await?; match message { FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => { for relay_parent in activated { let (validators, session_index) = { let (val_tx, val_rx) = oneshot::channel(); let (session_tx, session_rx) = oneshot::channel(); let val_message = AllMessages::RuntimeApi( RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::Validators(val_tx), ), ); let session_message = AllMessages::RuntimeApi( RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::SessionIndexForChild(session_tx), ), ); ctx.send_messages( std::iter::once(val_message).chain(std::iter::once(session_message)) ).await?; match (val_rx.await?, session_rx.await?) { (Ok(v), Ok(s)) => (v, s), (Err(e), _) | (_, Err(e)) => { log::warn!( target: "statement_distribution", "Failed to fetch runtime API data for active leaf: {:?}", e, ); // Lacking this bookkeeping might make us behave funny, although // not in any slashable way. But we shouldn't take down the node // on what are likely spurious runtime API errors. continue; } } }; active_heads.entry(relay_parent) .or_insert(ActiveHeadData::new(validators, session_index)); } } FromOverseer::Signal(OverseerSignal::BlockFinalized(_block_hash)) => { // do nothing } FromOverseer::Signal(OverseerSignal::Conclude) => break, FromOverseer::Communication { msg } => match msg { StatementDistributionMessage::Share(relay_parent, statement) => circulate_statement_and_dependents( &mut peers, &mut active_heads, &mut ctx, relay_parent, statement, ).await?, StatementDistributionMessage::NetworkBridgeUpdate(event) => handle_network_update( &mut peers, &mut active_heads, &mut ctx, &mut our_view, event, ).await?, } } } Ok(()) } #[cfg(test)] mod tests { use super::*; use sp_keyring::Sr25519Keyring; use node_primitives::Statement; use polkadot_primitives::v1::CommittedCandidateReceipt; use assert_matches::assert_matches; use futures::executor; #[test] fn active_head_accepts_only_2_seconded_per_validator() { let validators = vec![ Sr25519Keyring::Alice.public().into(), Sr25519Keyring::Bob.public().into(), Sr25519Keyring::Charlie.public().into(), ]; let parent_hash: Hash = [1; 32].into(); let session_index = 1; let signing_context = SigningContext { parent_hash, session_index, }; let candidate_a = { let mut c = CommittedCandidateReceipt::default(); c.descriptor.relay_parent = parent_hash; c.descriptor.para_id = 1.into(); c }; let candidate_b = { let mut c = CommittedCandidateReceipt::default(); c.descriptor.relay_parent = parent_hash; c.descriptor.para_id = 2.into(); c }; let candidate_c = { let mut c = CommittedCandidateReceipt::default(); c.descriptor.relay_parent = parent_hash; c.descriptor.para_id = 3.into(); c }; let mut head_data = ActiveHeadData::new(validators, session_index); // note A let a_seconded_val_0 = SignedFullStatement::sign( Statement::Seconded(candidate_a.clone()), &signing_context, 0, &Sr25519Keyring::Alice.pair().into(), ); let noted = head_data.note_statement(a_seconded_val_0.clone()); assert_matches!(noted, NotedStatement::Fresh(_)); // note A (duplicate) let noted = head_data.note_statement(a_seconded_val_0); assert_matches!(noted, NotedStatement::UsefulButKnown); // note B let noted = head_data.note_statement(SignedFullStatement::sign( Statement::Seconded(candidate_b.clone()), &signing_context, 0, &Sr25519Keyring::Alice.pair().into(), )); assert_matches!(noted, NotedStatement::Fresh(_)); // note C (beyond 2 - ignored) let noted = head_data.note_statement(SignedFullStatement::sign( Statement::Seconded(candidate_c.clone()), &signing_context, 0, &Sr25519Keyring::Alice.pair().into(), )); assert_matches!(noted, NotedStatement::NotUseful); // note B (new validator) let noted = head_data.note_statement(SignedFullStatement::sign( Statement::Seconded(candidate_b.clone()), &signing_context, 1, &Sr25519Keyring::Bob.pair().into(), )); assert_matches!(noted, NotedStatement::Fresh(_)); // note C (new validator) let noted = head_data.note_statement(SignedFullStatement::sign( Statement::Seconded(candidate_c.clone()), &signing_context, 1, &Sr25519Keyring::Bob.pair().into(), )); assert_matches!(noted, NotedStatement::Fresh(_)); } #[test] fn note_local_works() { let hash_a: Hash = [1; 32].into(); let hash_b: Hash = [2; 32].into(); let mut per_peer_tracker = VcPerPeerTracker::default(); per_peer_tracker.note_local(hash_a.clone()); per_peer_tracker.note_local(hash_b.clone()); assert!(per_peer_tracker.local_observed.contains(&hash_a)); assert!(per_peer_tracker.local_observed.contains(&hash_b)); assert!(!per_peer_tracker.remote_observed.contains(&hash_a)); assert!(!per_peer_tracker.remote_observed.contains(&hash_b)); } #[test] fn note_remote_works() { let hash_a: Hash = [1; 32].into(); let hash_b: Hash = [2; 32].into(); let hash_c: Hash = [3; 32].into(); let mut per_peer_tracker = VcPerPeerTracker::default(); assert!(per_peer_tracker.note_remote(hash_a.clone())); assert!(per_peer_tracker.note_remote(hash_b.clone())); assert!(!per_peer_tracker.note_remote(hash_c.clone())); assert!(per_peer_tracker.remote_observed.contains(&hash_a)); assert!(per_peer_tracker.remote_observed.contains(&hash_b)); assert!(!per_peer_tracker.remote_observed.contains(&hash_c)); assert!(!per_peer_tracker.local_observed.contains(&hash_a)); assert!(!per_peer_tracker.local_observed.contains(&hash_b)); assert!(!per_peer_tracker.local_observed.contains(&hash_c)); } #[test] fn per_peer_relay_parent_knowledge_send() { let mut knowledge = PeerRelayParentKnowledge::default(); let hash_a: Hash = [1; 32].into(); // Sending an un-pinned statement should not work and should have no effect. assert!(knowledge.send(&(CompactStatement::Valid(hash_a), 0)).is_none()); assert!(!knowledge.known_candidates.contains(&hash_a)); assert!(knowledge.sent_statements.is_empty()); assert!(knowledge.received_statements.is_empty()); assert!(knowledge.seconded_counts.is_empty()); assert!(knowledge.received_message_count.is_empty()); // Make the peer aware of the candidate. assert_eq!(knowledge.send(&(CompactStatement::Candidate(hash_a), 0)), Some(true)); assert_eq!(knowledge.send(&(CompactStatement::Candidate(hash_a), 1)), Some(false)); assert!(knowledge.known_candidates.contains(&hash_a)); assert_eq!(knowledge.sent_statements.len(), 2); assert!(knowledge.received_statements.is_empty()); assert_eq!(knowledge.seconded_counts.len(), 2); assert!(knowledge.received_message_count.get(&hash_a).is_none()); // And now it should accept the dependent message. assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), 0)), Some(false)); assert!(knowledge.known_candidates.contains(&hash_a)); assert_eq!(knowledge.sent_statements.len(), 3); assert!(knowledge.received_statements.is_empty()); assert_eq!(knowledge.seconded_counts.len(), 2); assert!(knowledge.received_message_count.get(&hash_a).is_none()); } #[test] fn cant_send_after_receiving() { let mut knowledge = PeerRelayParentKnowledge::default(); let hash_a: Hash = [1; 32].into(); assert!(knowledge.receive(&(CompactStatement::Candidate(hash_a), 0), 3).unwrap()); assert!(knowledge.send(&(CompactStatement::Candidate(hash_a), 0)).is_none()); } #[test] fn per_peer_relay_parent_knowledge_receive() { let mut knowledge = PeerRelayParentKnowledge::default(); let hash_a: Hash = [1; 32].into(); assert_eq!( knowledge.receive(&(CompactStatement::Valid(hash_a), 0), 3), Err(COST_UNEXPECTED_STATEMENT), ); assert_eq!( knowledge.receive(&(CompactStatement::Candidate(hash_a), 0), 3), Ok(true), ); // Push statements up to the flood limit. assert_eq!( knowledge.receive(&(CompactStatement::Valid(hash_a), 1), 3), Ok(false), ); assert!(knowledge.known_candidates.contains(&hash_a)); assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 2); assert_eq!( knowledge.receive(&(CompactStatement::Valid(hash_a), 2), 3), Ok(false), ); assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 3); assert_eq!( knowledge.receive(&(CompactStatement::Valid(hash_a), 7), 3), Err(COST_APPARENT_FLOOD), ); assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 3); assert_eq!(knowledge.received_statements.len(), 3); // number of prior `Ok`s. // Now make sure that the seconding limit is respected. let hash_b: Hash = [2; 32].into(); let hash_c: Hash = [3; 32].into(); assert_eq!( knowledge.receive(&(CompactStatement::Candidate(hash_b), 0), 3), Ok(true), ); assert_eq!( knowledge.receive(&(CompactStatement::Candidate(hash_c), 0), 3), Err(COST_UNEXPECTED_STATEMENT), ); // Last, make sure that already-known statements are disregarded. assert_eq!( knowledge.receive(&(CompactStatement::Valid(hash_a), 2), 3), Err(COST_DUPLICATE_STATEMENT), ); assert_eq!( knowledge.receive(&(CompactStatement::Candidate(hash_b), 0), 3), Err(COST_DUPLICATE_STATEMENT), ); } #[test] fn peer_view_update_sends_messages() { let hash_a = [1; 32].into(); let hash_b = [2; 32].into(); let hash_c = [3; 32].into(); let candidate = { let mut c = CommittedCandidateReceipt::default(); c.descriptor.relay_parent = hash_c; c.descriptor.para_id = 1.into(); c }; let candidate_hash = candidate.hash(); let old_view = View(vec![hash_a, hash_b]); let new_view = View(vec![hash_b, hash_c]); let mut active_heads = HashMap::new(); let validators = vec![ Sr25519Keyring::Alice.public().into(), Sr25519Keyring::Bob.public().into(), Sr25519Keyring::Charlie.public().into(), ]; let session_index = 1; let signing_context = SigningContext { parent_hash: hash_c, session_index, }; let new_head_data = { let mut data = ActiveHeadData::new(validators, session_index); let noted = data.note_statement(SignedFullStatement::sign( Statement::Seconded(candidate.clone()), &signing_context, 0, &Sr25519Keyring::Alice.pair().into(), )); assert_matches!(noted, NotedStatement::Fresh(_)); let noted = data.note_statement(SignedFullStatement::sign( Statement::Valid(candidate_hash), &signing_context, 1, &Sr25519Keyring::Bob.pair().into(), )); assert_matches!(noted, NotedStatement::Fresh(_)); let noted = data.note_statement(SignedFullStatement::sign( Statement::Valid(candidate_hash), &signing_context, 2, &Sr25519Keyring::Charlie.pair().into(), )); assert_matches!(noted, NotedStatement::Fresh(_)); data }; active_heads.insert(hash_c, new_head_data); let mut peer_data = PeerData { view: old_view, view_knowledge: { let mut k = HashMap::new(); k.insert(hash_a, Default::default()); k.insert(hash_b, Default::default()); k }, }; let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); let peer = PeerId::random(); executor::block_on(async move { update_peer_view_and_send_unlocked( peer.clone(), &mut peer_data, &mut ctx, &active_heads, new_view.clone(), ).await.unwrap(); assert_eq!(peer_data.view, new_view); assert!(!peer_data.view_knowledge.contains_key(&hash_a)); assert!(peer_data.view_knowledge.contains_key(&hash_b)); let c_knowledge = peer_data.view_knowledge.get(&hash_c).unwrap(); assert!(c_knowledge.known_candidates.contains(&candidate_hash)); assert!(c_knowledge.sent_statements.contains( &(CompactStatement::Candidate(candidate_hash), 0) )); assert!(c_knowledge.sent_statements.contains( &(CompactStatement::Valid(candidate_hash), 1) )); assert!(c_knowledge.sent_statements.contains( &(CompactStatement::Valid(candidate_hash), 2) )); // now see if we got the 3 messages from the active head data. let active_head = active_heads.get(&hash_c).unwrap(); // semi-fragile because hashmap iterator ordering is undefined, but in practice // it will not change between runs of the program. for statement in active_head.statements_about(candidate_hash) { let message = handle.recv().await; let expected_to = vec![peer.clone()]; let expected_protocol = PROTOCOL_V1; let expected_payload = WireMessage::Statement(hash_c, statement.statement.clone()).encode(); assert_matches!( message, AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( to, protocol, payload, )) => { assert_eq!(to, expected_to); assert_eq!(protocol, expected_protocol); assert_eq!(payload, expected_payload) } ) } }); } #[test] fn circulated_statement_goes_to_all_peers_with_view() { let hash_a = [1; 32].into(); let hash_b = [2; 32].into(); let hash_c = [3; 32].into(); let candidate = { let mut c = CommittedCandidateReceipt::default(); c.descriptor.relay_parent = hash_b; c.descriptor.para_id = 1.into(); c }; let peer_a = PeerId::random(); let peer_b = PeerId::random(); let peer_c = PeerId::random(); let peer_a_view = View(vec![hash_a]); let peer_b_view = View(vec![hash_a, hash_b]); let peer_c_view = View(vec![hash_b, hash_c]); let session_index = 1; let peer_data_from_view = |view: View| PeerData { view: view.clone(), view_knowledge: view.0.iter().map(|v| (v.clone(), Default::default())).collect(), }; let mut peer_data: HashMap<_, _> = vec![ (peer_a.clone(), peer_data_from_view(peer_a_view)), (peer_b.clone(), peer_data_from_view(peer_b_view)), (peer_c.clone(), peer_data_from_view(peer_c_view)), ].into_iter().collect(); let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); executor::block_on(async move { let statement = { let signing_context = SigningContext { parent_hash: hash_b, session_index, }; let statement = SignedFullStatement::sign( Statement::Seconded(candidate), &signing_context, 0, &Sr25519Keyring::Alice.pair().into(), ); StoredStatement { comparator: StoredStatementComparator { compact: statement.payload().to_compact(), validator_index: 0, signature: statement.signature().clone() }, statement, } }; let needs_dependents = circulate_statement( &mut peer_data, &mut ctx, hash_b, &statement, ).await.unwrap(); { assert_eq!(needs_dependents.len(), 2); assert!(needs_dependents.contains(&peer_b)); assert!(needs_dependents.contains(&peer_c)); } let fingerprint = (statement.compact().clone(), 0); assert!( peer_data.get(&peer_b).unwrap() .view_knowledge.get(&hash_b).unwrap() .sent_statements.contains(&fingerprint), ); assert!( peer_data.get(&peer_c).unwrap() .view_knowledge.get(&hash_b).unwrap() .sent_statements.contains(&fingerprint), ); let message = handle.recv().await; assert_matches!( message, AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( to, protocol, payload, )) => { assert_eq!(to.len(), 2); assert!(to.contains(&peer_b)); assert!(to.contains(&peer_c)); assert_eq!(protocol, PROTOCOL_V1); assert_eq!( payload, WireMessage::Statement(hash_b, statement.statement.clone()).encode(), ); } ) }); } }