// Copyright 2020 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! PoV Distribution Subsystem of Polkadot. //! //! This is a gossip implementation of code that is responsible for distributing PoVs //! among validators. use polkadot_primitives::v1::{Hash, PoV, CandidateDescriptor}; use polkadot_subsystem::{ ActiveLeavesUpdate, OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, FromOverseer, SpawnedSubsystem, }; use polkadot_subsystem::messages::{ PoVDistributionMessage, RuntimeApiMessage, RuntimeApiRequest, AllMessages, NetworkBridgeMessage, }; use polkadot_node_network_protocol::{ v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View, }; use futures::prelude::*; use futures::channel::oneshot; use std::collections::{hash_map::{Entry, HashMap}, HashSet}; use std::sync::Arc; const COST_APPARENT_FLOOD: Rep = Rep::new(-500, "Peer appears to be flooding us with PoV requests"); const COST_UNEXPECTED_POV: Rep = Rep::new(-500, "Peer sent us an unexpected PoV"); const COST_AWAITED_NOT_IN_VIEW: Rep = Rep::new(-100, "Peer claims to be awaiting something outside of its view"); const BENEFIT_FRESH_POV: Rep = Rep::new(25, "Peer supplied us with an awaited PoV"); const BENEFIT_LATE_POV: Rep = Rep::new(10, "Peer supplied us with an awaited PoV, \ but was not the first to do so"); /// The PoV Distribution Subsystem. pub struct PoVDistribution; impl Subsystem for PoVDistribution where C: SubsystemContext { fn start(self, ctx: C) -> SpawnedSubsystem { // Swallow error because failure is fatal to the node and we log with more precision // within `run`. SpawnedSubsystem { name: "pov-distribution-subsystem", future: run(ctx).map(|_| ()).boxed(), } } } struct State { relay_parent_state: HashMap, peer_state: HashMap, our_view: View, } struct BlockBasedState { known: HashMap>, /// All the PoVs we are or were fetching, coupled with channels expecting the data. /// /// This may be an empty list, which indicates that we were once awaiting this PoV but have /// received it already. fetching: HashMap>>>, n_validators: usize, } #[derive(Default)] struct PeerState { /// A set of awaited PoV-hashes for each relay-parent in the peer's view. awaited: HashMap>, } fn awaiting_message(relay_parent: Hash, awaiting: Vec) -> protocol_v1::ValidationProtocol { protocol_v1::ValidationProtocol::PoVDistribution( protocol_v1::PoVDistributionMessage::Awaiting(relay_parent, awaiting) ) } fn send_pov_message(relay_parent: Hash, pov_hash: Hash, pov: PoV) -> protocol_v1::ValidationProtocol { protocol_v1::ValidationProtocol::PoVDistribution( protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov) ) } /// Handles the signal. If successful, returns `true` if the subsystem should conclude, /// `false` otherwise. async fn handle_signal( state: &mut State, ctx: &mut impl SubsystemContext, signal: OverseerSignal, ) -> SubsystemResult { match signal { OverseerSignal::Conclude => Ok(true), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { for relay_parent in activated { let (vals_tx, vals_rx) = oneshot::channel(); ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::Validators(vals_tx), ))).await?; let n_validators = match vals_rx.await? { Ok(v) => v.len(), Err(e) => { log::warn!(target: "pov_distribution", "Error fetching validators from runtime API for active leaf: {:?}", e ); // Not adding bookkeeping here might make us behave funny, but we // shouldn't take down the node on spurious runtime API errors. // // and this is "behave funny" as in be bad at our job, but not in any // slashable or security-related way. continue; } }; state.relay_parent_state.insert(relay_parent, BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), n_validators: n_validators, }); } for relay_parent in deactivated { state.relay_parent_state.remove(&relay_parent); } Ok(false) } OverseerSignal::BlockFinalized(_) => Ok(false), } } /// Notify peers that we are awaiting a given PoV hash. /// /// This only notifies peers who have the relay parent in their view. async fn notify_all_we_are_awaiting( peers: &mut HashMap, ctx: &mut impl SubsystemContext, relay_parent: Hash, pov_hash: Hash, ) -> SubsystemResult<()> { // We use `awaited` as a proxy for which heads are in the peer's view. let peers_to_send: Vec<_> = peers.iter() .filter_map(|(peer, state)| if state.awaited.contains_key(&relay_parent) { Some(peer.clone()) } else { None }) .collect(); if peers_to_send.is_empty() { return Ok(()) } let payload = awaiting_message(relay_parent, vec![pov_hash]); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers_to_send, payload, ))).await } /// Notify one peer about everything we're awaiting at a given relay-parent. async fn notify_one_we_are_awaiting_many( peer: &PeerId, ctx: &mut impl SubsystemContext, relay_parent_state: &HashMap, relay_parent: Hash, ) -> SubsystemResult<()> { let awaiting_hashes = relay_parent_state.get(&relay_parent).into_iter().flat_map(|s| { // Send the peer everything we are fetching at this relay-parent s.fetching.iter() .filter(|(_, senders)| !senders.is_empty()) // that has not been completed already. .map(|(pov_hash, _)| *pov_hash) }).collect::>(); if awaiting_hashes.is_empty() { return Ok(()) } let payload = awaiting_message(relay_parent, awaiting_hashes); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( vec![peer.clone()], payload, ))).await } /// Distribute a PoV to peers who are awaiting it. async fn distribute_to_awaiting( peers: &mut HashMap, ctx: &mut impl SubsystemContext, relay_parent: Hash, pov_hash: Hash, pov: &PoV, ) -> SubsystemResult<()> { // Send to all peers who are awaiting the PoV and have that relay-parent in their view. // // Also removes it from their awaiting set. let peers_to_send: Vec<_> = peers.iter_mut() .filter_map(|(peer, state)| state.awaited.get_mut(&relay_parent).and_then(|awaited| { if awaited.remove(&pov_hash) { Some(peer.clone()) } else { None } })) .collect(); if peers_to_send.is_empty() { return Ok(()) } let payload = send_pov_message(relay_parent, pov_hash, pov.clone()); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers_to_send, payload, ))).await } /// Handles a `FetchPoV` message. async fn handle_fetch( state: &mut State, ctx: &mut impl SubsystemContext, relay_parent: Hash, descriptor: CandidateDescriptor, response_sender: oneshot::Sender>, ) -> SubsystemResult<()> { let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { Some(s) => s, None => return Ok(()), }; if let Some(pov) = relay_parent_state.known.get(&descriptor.pov_hash) { let _ = response_sender.send(pov.clone()); return Ok(()); } { match relay_parent_state.fetching.entry(descriptor.pov_hash) { Entry::Occupied(mut e) => { // we are already awaiting this PoV if there is an entry. e.get_mut().push(response_sender); return Ok(()); } Entry::Vacant(e) => { e.insert(vec![response_sender]); } } } if relay_parent_state.fetching.len() > 2 * relay_parent_state.n_validators { log::warn!("Other subsystems have requested PoV distribution to \ fetch more PoVs than reasonably expected: {}", relay_parent_state.fetching.len()); return Ok(()); } // Issue an `Awaiting` message to all peers with this in their view. notify_all_we_are_awaiting( &mut state.peer_state, ctx, relay_parent, descriptor.pov_hash ).await } /// Handles a `DistributePoV` message. async fn handle_distribute( state: &mut State, ctx: &mut impl SubsystemContext, relay_parent: Hash, descriptor: CandidateDescriptor, pov: Arc, ) -> SubsystemResult<()> { let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { None => return Ok(()), Some(s) => s, }; if let Some(our_awaited) = relay_parent_state.fetching.get_mut(&descriptor.pov_hash) { // Drain all the senders, but keep the entry in the map around intentionally. // // It signals that we were at one point awaiting this, so we will be able to tell // why peers are sending it to us. for response_sender in our_awaited.drain(..) { let _ = response_sender.send(pov.clone()); } } relay_parent_state.known.insert(descriptor.pov_hash, pov.clone()); distribute_to_awaiting( &mut state.peer_state, ctx, relay_parent, descriptor.pov_hash, &*pov, ).await } /// Report a reputation change for a peer. async fn report_peer( ctx: &mut impl SubsystemContext, peer: PeerId, rep: Rep, ) -> SubsystemResult<()> { ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(peer, rep))).await } /// Handle a notification from a peer that they are awaiting some PoVs. async fn handle_awaiting( state: &mut State, ctx: &mut impl SubsystemContext, peer: PeerId, relay_parent: Hash, pov_hashes: Vec, ) -> SubsystemResult<()> { if !state.our_view.0.contains(&relay_parent) { report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await?; return Ok(()); } let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { None => { log::warn!("PoV Distribution relay parent state out-of-sync with our view"); return Ok(()); } Some(s) => s, }; let peer_awaiting = match state.peer_state.get_mut(&peer).and_then(|s| s.awaited.get_mut(&relay_parent)) { None => { report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await?; return Ok(()); } Some(a) => a, }; let will_be_awaited = peer_awaiting.len() + pov_hashes.len(); if will_be_awaited <= 2 * relay_parent_state.n_validators { for pov_hash in pov_hashes { // For all requested PoV hashes, if we have it, we complete the request immediately. // Otherwise, we note that the peer is awaiting the PoV. if let Some(pov) = relay_parent_state.known.get(&pov_hash) { let payload = send_pov_message(relay_parent, pov_hash, (&**pov).clone()); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload) )).await?; } else { peer_awaiting.insert(pov_hash); } } } else { report_peer(ctx, peer, COST_APPARENT_FLOOD).await?; } Ok(()) } /// Handle an incoming PoV from our peer. Reports them if unexpected, rewards them if not. /// /// Completes any requests awaiting that PoV. async fn handle_incoming_pov( state: &mut State, ctx: &mut impl SubsystemContext, peer: PeerId, relay_parent: Hash, pov_hash: Hash, pov: PoV, ) -> SubsystemResult<()> { let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { None => { report_peer(ctx, peer, COST_UNEXPECTED_POV).await?; return Ok(()); }, Some(r) => r, }; let pov = { // Do validity checks and complete all senders awaiting this PoV. let fetching = match relay_parent_state.fetching.get_mut(&pov_hash) { None => { report_peer(ctx, peer, COST_UNEXPECTED_POV).await?; return Ok(()); } Some(f) => f, }; let hash = pov.hash(); if hash != pov_hash { report_peer(ctx, peer, COST_UNEXPECTED_POV).await?; return Ok(()); } let pov = Arc::new(pov); if fetching.is_empty() { // fetching is empty whenever we were awaiting something and // it was completed afterwards. report_peer(ctx, peer.clone(), BENEFIT_LATE_POV).await?; } else { // fetching is non-empty when the peer just provided us with data we needed. report_peer(ctx, peer.clone(), BENEFIT_FRESH_POV).await?; } for response_sender in fetching.drain(..) { let _ = response_sender.send(pov.clone()); } pov }; // make sure we don't consider this peer as awaiting that PoV anymore. if let Some(peer_state) = state.peer_state.get_mut(&peer) { peer_state.awaited.remove(&pov_hash); } // distribute the PoV to all other peers who are awaiting it. distribute_to_awaiting( &mut state.peer_state, ctx, relay_parent, pov_hash, &*pov, ).await } /// Handles a network bridge update. async fn handle_network_update( state: &mut State, ctx: &mut impl SubsystemContext, update: NetworkBridgeEvent, ) -> SubsystemResult<()> { match update { NetworkBridgeEvent::PeerConnected(peer, _observed_role) => { state.peer_state.insert(peer, PeerState { awaited: HashMap::new() }); Ok(()) } NetworkBridgeEvent::PeerDisconnected(peer) => { state.peer_state.remove(&peer); Ok(()) } NetworkBridgeEvent::PeerViewChange(peer_id, view) => { if let Some(peer_state) = state.peer_state.get_mut(&peer_id) { // prune anything not in the new view. peer_state.awaited.retain(|relay_parent, _| view.0.contains(&relay_parent)); // introduce things from the new view. for relay_parent in view.0.iter() { if let Entry::Vacant(entry) = peer_state.awaited.entry(*relay_parent) { entry.insert(HashSet::new()); // Notify the peer about everything we're awaiting at the new relay-parent. notify_one_we_are_awaiting_many( &peer_id, ctx, &state.relay_parent_state, *relay_parent, ).await?; } } } Ok(()) } NetworkBridgeEvent::PeerMessage(peer, message) => { match message { protocol_v1::PoVDistributionMessage::Awaiting(relay_parent, pov_hashes) => handle_awaiting( state, ctx, peer, relay_parent, pov_hashes, ).await, protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov) => handle_incoming_pov( state, ctx, peer, relay_parent, pov_hash, pov, ).await, } } NetworkBridgeEvent::OurViewChange(view) => { state.our_view = view; Ok(()) } } } async fn run( mut ctx: impl SubsystemContext, ) -> SubsystemResult<()> { let mut state = State { relay_parent_state: HashMap::new(), peer_state: HashMap::new(), our_view: View(Vec::new()), }; loop { match ctx.recv().await? { FromOverseer::Signal(signal) => if handle_signal(&mut state, &mut ctx, signal).await? { return Ok(()); }, FromOverseer::Communication { msg } => match msg { PoVDistributionMessage::FetchPoV(relay_parent, descriptor, response_sender) => handle_fetch( &mut state, &mut ctx, relay_parent, descriptor, response_sender, ).await?, PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) => handle_distribute( &mut state, &mut ctx, relay_parent, descriptor, pov, ).await?, PoVDistributionMessage::NetworkBridgeUpdateV1(event) => handle_network_update( &mut state, &mut ctx, event, ).await?, }, } } } #[cfg(test)] mod tests { use super::*; use futures::executor; use polkadot_primitives::v1::BlockData; use assert_matches::assert_matches; fn make_pov(data: Vec) -> PoV { PoV { block_data: BlockData(data) } } fn make_peer_state(awaited: Vec<(Hash, Vec)>) -> PeerState { PeerState { awaited: awaited.into_iter().map(|(rp, h)| (rp, h.into_iter().collect())).collect() } } #[test] fn distributes_to_those_awaiting_and_completes_local() { let hash_a: Hash = [0; 32].into(); let hash_b: Hash = [1; 32].into(); let peer_a = PeerId::random(); let peer_b = PeerId::random(); let peer_c = PeerId::random(); let (pov_send, pov_recv) = oneshot::channel(); let pov = make_pov(vec![1, 2, 3]); let pov_hash = pov.hash(); let mut state = State { relay_parent_state: { let mut s = HashMap::new(); let mut b = BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), n_validators: 10, }; b.fetching.insert(pov_hash, vec![pov_send]); s.insert(hash_a, b); s }, peer_state: { let mut s = HashMap::new(); // peer A has hash_a in its view and is awaiting the PoV. s.insert( peer_a.clone(), make_peer_state(vec![(hash_a, vec![pov_hash])]), ); // peer B has hash_a in its view but is not awaiting. s.insert( peer_b.clone(), make_peer_state(vec![(hash_a, vec![])]), ); // peer C doesn't have hash_a in its view but is awaiting the PoV under hash_b. s.insert( peer_c.clone(), make_peer_state(vec![(hash_b, vec![pov_hash])]), ); s }, our_view: View(vec![hash_a, hash_b]), }; let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); let mut descriptor = CandidateDescriptor::default(); descriptor.pov_hash = pov_hash; executor::block_on(async move { handle_distribute( &mut state, &mut ctx, hash_a, descriptor, Arc::new(pov.clone()), ).await.unwrap(); assert!(!state.peer_state[&peer_a].awaited[&hash_a].contains(&pov_hash)); assert!(state.peer_state[&peer_c].awaited[&hash_b].contains(&pov_hash)); // our local sender also completed assert_eq!(&*pov_recv.await.unwrap(), &pov); assert_matches!( handle.recv().await, AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage(peers, message) ) => { assert_eq!(peers, vec![peer_a.clone()]); assert_eq!( message, send_pov_message(hash_a, pov_hash, pov.clone()), ); } ) }); } #[test] fn we_inform_peers_with_same_view_we_are_awaiting() { let hash_a: Hash = [0; 32].into(); let hash_b: Hash = [1; 32].into(); let peer_a = PeerId::random(); let peer_b = PeerId::random(); let (pov_send, _) = oneshot::channel(); let pov = make_pov(vec![1, 2, 3]); let pov_hash = pov.hash(); let mut state = State { relay_parent_state: { let mut s = HashMap::new(); let b = BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), n_validators: 10, }; s.insert(hash_a, b); s }, peer_state: { let mut s = HashMap::new(); // peer A has hash_a in its view. s.insert( peer_a.clone(), make_peer_state(vec![(hash_a, vec![])]), ); // peer B doesn't have hash_a in its view. s.insert( peer_b.clone(), make_peer_state(vec![(hash_b, vec![])]), ); s }, our_view: View(vec![hash_a]), }; let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); let mut descriptor = CandidateDescriptor::default(); descriptor.pov_hash = pov_hash; executor::block_on(async move { handle_fetch( &mut state, &mut ctx, hash_a, descriptor, pov_send, ).await.unwrap(); assert_eq!(state.relay_parent_state[&hash_a].fetching[&pov_hash].len(), 1); assert_matches!( handle.recv().await, AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage(peers, message) ) => { assert_eq!(peers, vec![peer_a.clone()]); assert_eq!( message, awaiting_message(hash_a, vec![pov_hash]), ); } ) }); } #[test] fn peer_view_change_leads_to_us_informing() { let hash_a: Hash = [0; 32].into(); let hash_b: Hash = [1; 32].into(); let peer_a = PeerId::random(); let (pov_a_send, _) = oneshot::channel(); let pov_a = make_pov(vec![1, 2, 3]); let pov_a_hash = pov_a.hash(); let pov_b = make_pov(vec![4, 5, 6]); let pov_b_hash = pov_b.hash(); let mut state = State { relay_parent_state: { let mut s = HashMap::new(); let mut b = BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), n_validators: 10, }; // pov_a is still being fetched, whereas the fetch of pov_b has already // completed, as implied by the empty vector. b.fetching.insert(pov_a_hash, vec![pov_a_send]); b.fetching.insert(pov_b_hash, vec![]); s.insert(hash_a, b); s }, peer_state: { let mut s = HashMap::new(); // peer A doesn't yet have hash_a in its view. s.insert( peer_a.clone(), make_peer_state(vec![(hash_b, vec![])]), ); s }, our_view: View(vec![hash_a]), }; let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); executor::block_on(async move { handle_network_update( &mut state, &mut ctx, NetworkBridgeEvent::PeerViewChange(peer_a.clone(), View(vec![hash_a, hash_b])), ).await.unwrap(); assert_matches!( handle.recv().await, AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage(peers, message) ) => { assert_eq!(peers, vec![peer_a.clone()]); assert_eq!( message, awaiting_message(hash_a, vec![pov_a_hash]), ); } ) }); } #[test] fn peer_complete_fetch_and_is_rewarded() { let hash_a: Hash = [0; 32].into(); let peer_a = PeerId::random(); let peer_b = PeerId::random(); let (pov_send, pov_recv) = oneshot::channel(); let pov = make_pov(vec![1, 2, 3]); let pov_hash = pov.hash(); let mut state = State { relay_parent_state: { let mut s = HashMap::new(); let mut b = BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), n_validators: 10, }; // pov is being fetched. b.fetching.insert(pov_hash, vec![pov_send]); s.insert(hash_a, b); s }, peer_state: { let mut s = HashMap::new(); // peers A and B are functionally the same. s.insert( peer_a.clone(), make_peer_state(vec![(hash_a, vec![])]), ); s.insert( peer_b.clone(), make_peer_state(vec![(hash_a, vec![])]), ); s }, our_view: View(vec![hash_a]), }; let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); executor::block_on(async move { // Peer A answers our request before peer B. handle_network_update( &mut state, &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), send_pov_message(hash_a, pov_hash, pov.clone()), ).focus().unwrap(), ).await.unwrap(); handle_network_update( &mut state, &mut ctx, NetworkBridgeEvent::PeerMessage( peer_b.clone(), send_pov_message(hash_a, pov_hash, pov.clone()), ).focus().unwrap(), ).await.unwrap(); assert_eq!(&*pov_recv.await.unwrap(), &pov); assert_matches!( handle.recv().await, AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_a); assert_eq!(rep, BENEFIT_FRESH_POV); } ); assert_matches!( handle.recv().await, AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_b); assert_eq!(rep, BENEFIT_LATE_POV); } ); }); } #[test] fn peer_punished_for_sending_bad_pov() { let hash_a: Hash = [0; 32].into(); let peer_a = PeerId::random(); let (pov_send, _) = oneshot::channel(); let pov = make_pov(vec![1, 2, 3]); let pov_hash = pov.hash(); let bad_pov = make_pov(vec![6, 6, 6]); let mut state = State { relay_parent_state: { let mut s = HashMap::new(); let mut b = BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), n_validators: 10, }; // pov is being fetched. b.fetching.insert(pov_hash, vec![pov_send]); s.insert(hash_a, b); s }, peer_state: { let mut s = HashMap::new(); s.insert( peer_a.clone(), make_peer_state(vec![(hash_a, vec![])]), ); s }, our_view: View(vec![hash_a]), }; let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); executor::block_on(async move { // Peer A answers our request: right relay parent, awaited hash, wrong PoV. handle_network_update( &mut state, &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), send_pov_message(hash_a, pov_hash, bad_pov.clone()), ).focus().unwrap(), ).await.unwrap(); // didn't complete our sender. assert_eq!(state.relay_parent_state[&hash_a].fetching[&pov_hash].len(), 1); assert_matches!( handle.recv().await, AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_a); assert_eq!(rep, COST_UNEXPECTED_POV); } ); }); } #[test] fn peer_punished_for_sending_unexpected_pov() { let hash_a: Hash = [0; 32].into(); let peer_a = PeerId::random(); let pov = make_pov(vec![1, 2, 3]); let pov_hash = pov.hash(); let mut state = State { relay_parent_state: { let mut s = HashMap::new(); let b = BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), n_validators: 10, }; s.insert(hash_a, b); s }, peer_state: { let mut s = HashMap::new(); s.insert( peer_a.clone(), make_peer_state(vec![(hash_a, vec![])]), ); s }, our_view: View(vec![hash_a]), }; let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); executor::block_on(async move { // Peer A answers our request: right relay parent, awaited hash, wrong PoV. handle_network_update( &mut state, &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), send_pov_message(hash_a, pov_hash, pov.clone()), ).focus().unwrap(), ).await.unwrap(); assert_matches!( handle.recv().await, AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_a); assert_eq!(rep, COST_UNEXPECTED_POV); } ); }); } #[test] fn peer_punished_for_sending_pov_out_of_our_view() { let hash_a: Hash = [0; 32].into(); let hash_b: Hash = [1; 32].into(); let peer_a = PeerId::random(); let pov = make_pov(vec![1, 2, 3]); let pov_hash = pov.hash(); let mut state = State { relay_parent_state: { let mut s = HashMap::new(); let b = BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), n_validators: 10, }; s.insert(hash_a, b); s }, peer_state: { let mut s = HashMap::new(); s.insert( peer_a.clone(), make_peer_state(vec![(hash_a, vec![])]), ); s }, our_view: View(vec![hash_a]), }; let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); executor::block_on(async move { // Peer A answers our request: right relay parent, awaited hash, wrong PoV. handle_network_update( &mut state, &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), send_pov_message(hash_b, pov_hash, pov.clone()), ).focus().unwrap(), ).await.unwrap(); assert_matches!( handle.recv().await, AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_a); assert_eq!(rep, COST_UNEXPECTED_POV); } ); }); } #[test] fn peer_reported_for_awaiting_too_much() { let hash_a: Hash = [0; 32].into(); let peer_a = PeerId::random(); let n_validators = 10; let mut state = State { relay_parent_state: { let mut s = HashMap::new(); let b = BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), n_validators, }; s.insert(hash_a, b); s }, peer_state: { let mut s = HashMap::new(); s.insert( peer_a.clone(), make_peer_state(vec![(hash_a, vec![])]), ); s }, our_view: View(vec![hash_a]), }; let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); executor::block_on(async move { let max_plausibly_awaited = n_validators * 2; // The peer awaits a plausible (albeit unlikely) amount of PoVs. for i in 0..max_plausibly_awaited { let pov_hash = make_pov(vec![i as u8; 32]).hash(); handle_network_update( &mut state, &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), awaiting_message(hash_a, vec![pov_hash]), ).focus().unwrap(), ).await.unwrap(); } assert_eq!(state.peer_state[&peer_a].awaited[&hash_a].len(), max_plausibly_awaited); // The last straw: let last_pov_hash = make_pov(vec![max_plausibly_awaited as u8; 32]).hash(); handle_network_update( &mut state, &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), awaiting_message(hash_a, vec![last_pov_hash]), ).focus().unwrap(), ).await.unwrap(); // No more bookkeeping for you! assert_eq!(state.peer_state[&peer_a].awaited[&hash_a].len(), max_plausibly_awaited); assert_matches!( handle.recv().await, AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_a); assert_eq!(rep, COST_APPARENT_FLOOD); } ); }); } #[test] fn peer_reported_for_awaiting_outside_their_view() { let hash_a: Hash = [0; 32].into(); let hash_b: Hash = [1; 32].into(); let peer_a = PeerId::random(); let mut state = State { relay_parent_state: { let mut s = HashMap::new(); s.insert(hash_a, BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), n_validators: 10, }); s.insert(hash_b, BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), n_validators: 10, }); s }, peer_state: { let mut s = HashMap::new(); // Peer has only hash A in its view. s.insert( peer_a.clone(), make_peer_state(vec![(hash_a, vec![])]), ); s }, our_view: View(vec![hash_a, hash_b]), }; let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); executor::block_on(async move { let pov_hash = make_pov(vec![1, 2, 3]).hash(); // Hash B is in our view but not the peer's handle_network_update( &mut state, &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), awaiting_message(hash_b, vec![pov_hash]), ).focus().unwrap(), ).await.unwrap(); assert!(state.peer_state[&peer_a].awaited.get(&hash_b).is_none()); assert_matches!( handle.recv().await, AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_a); assert_eq!(rep, COST_AWAITED_NOT_IN_VIEW); } ); }); } #[test] fn peer_reported_for_awaiting_outside_our_view() { let hash_a: Hash = [0; 32].into(); let hash_b: Hash = [1; 32].into(); let peer_a = PeerId::random(); let mut state = State { relay_parent_state: { let mut s = HashMap::new(); s.insert(hash_a, BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), n_validators: 10, }); s }, peer_state: { let mut s = HashMap::new(); // Peer has hashes A and B in their view. s.insert( peer_a.clone(), make_peer_state(vec![(hash_a, vec![]), (hash_b, vec![])]), ); s }, our_view: View(vec![hash_a]), }; let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); executor::block_on(async move { let pov_hash = make_pov(vec![1, 2, 3]).hash(); // Hash B is in peer's view but not ours. handle_network_update( &mut state, &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), awaiting_message(hash_b, vec![pov_hash]), ).focus().unwrap(), ).await.unwrap(); // Illegal `awaited` is ignored. assert!(state.peer_state[&peer_a].awaited[&hash_b].is_empty()); assert_matches!( handle.recv().await, AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_a); assert_eq!(rep, COST_AWAITED_NOT_IN_VIEW); } ); }); } #[test] fn peer_complete_fetch_leads_to_us_completing_others() { let hash_a: Hash = [0; 32].into(); let peer_a = PeerId::random(); let peer_b = PeerId::random(); let (pov_send, pov_recv) = oneshot::channel(); let pov = make_pov(vec![1, 2, 3]); let pov_hash = pov.hash(); let mut state = State { relay_parent_state: { let mut s = HashMap::new(); let mut b = BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), n_validators: 10, }; // pov is being fetched. b.fetching.insert(pov_hash, vec![pov_send]); s.insert(hash_a, b); s }, peer_state: { let mut s = HashMap::new(); s.insert( peer_a.clone(), make_peer_state(vec![(hash_a, vec![])]), ); // peer B is awaiting peer A's request. s.insert( peer_b.clone(), make_peer_state(vec![(hash_a, vec![pov_hash])]), ); s }, our_view: View(vec![hash_a]), }; let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); executor::block_on(async move { handle_network_update( &mut state, &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), send_pov_message(hash_a, pov_hash, pov.clone()), ).focus().unwrap(), ).await.unwrap(); assert_eq!(&*pov_recv.await.unwrap(), &pov); assert_matches!( handle.recv().await, AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_a); assert_eq!(rep, BENEFIT_FRESH_POV); } ); assert_matches!( handle.recv().await, AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage(peers, message) ) => { assert_eq!(peers, vec![peer_b.clone()]); assert_eq!( message, send_pov_message(hash_a, pov_hash, pov.clone()), ); } ); assert!(!state.peer_state[&peer_b].awaited[&hash_a].contains(&pov_hash)); }); } #[test] fn peer_completing_request_no_longer_awaiting() { let hash_a: Hash = [0; 32].into(); let peer_a = PeerId::random(); let (pov_send, pov_recv) = oneshot::channel(); let pov = make_pov(vec![1, 2, 3]); let pov_hash = pov.hash(); let mut state = State { relay_parent_state: { let mut s = HashMap::new(); let mut b = BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), n_validators: 10, }; // pov is being fetched. b.fetching.insert(pov_hash, vec![pov_send]); s.insert(hash_a, b); s }, peer_state: { let mut s = HashMap::new(); // peer A is registered as awaiting. s.insert( peer_a.clone(), make_peer_state(vec![(hash_a, vec![pov_hash])]), ); s }, our_view: View(vec![hash_a]), }; let pool = sp_core::testing::TaskExecutor::new(); let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); executor::block_on(async move { handle_network_update( &mut state, &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), send_pov_message(hash_a, pov_hash, pov.clone()), ).focus().unwrap(), ).await.unwrap(); assert_eq!(&*pov_recv.await.unwrap(), &pov); assert_matches!( handle.recv().await, AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_a); assert_eq!(rep, BENEFIT_FRESH_POV); } ); // We received the PoV from peer A, so we do not consider it awaited by peer A anymore. assert!(!state.peer_state[&peer_a].awaited[&hash_a].contains(&pov_hash)); }); } }