// Copyright 2020 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! PoV Distribution Subsystem of Polkadot. //! //! This is a gossip implementation of code that is responsible for distributing PoVs //! among validators. #![deny(unused_crate_dependencies)] #![warn(missing_docs)] use polkadot_primitives::v1::{Hash, PoV, CandidateDescriptor}; use polkadot_subsystem::{ ActiveLeavesUpdate, OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, SubsystemError, FromOverseer, SpawnedSubsystem, messages::{ PoVDistributionMessage, RuntimeApiMessage, RuntimeApiRequest, AllMessages, NetworkBridgeMessage, }, }; use polkadot_node_subsystem_util::{ metrics::{self, prometheus}, }; use polkadot_node_network_protocol::{ v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View, }; use futures::prelude::*; use futures::channel::oneshot; use std::collections::{hash_map::{Entry, HashMap}, HashSet}; use std::sync::Arc; const COST_APPARENT_FLOOD: Rep = Rep::new(-500, "Peer appears to be flooding us with PoV requests"); const COST_UNEXPECTED_POV: Rep = Rep::new(-500, "Peer sent us an unexpected PoV"); const COST_AWAITED_NOT_IN_VIEW: Rep = Rep::new(-100, "Peer claims to be awaiting something outside of its view"); const BENEFIT_FRESH_POV: Rep = Rep::new(25, "Peer supplied us with an awaited PoV"); const BENEFIT_LATE_POV: Rep = Rep::new(10, "Peer supplied us with an awaited PoV, \ but was not the first to do so"); /// The PoV Distribution Subsystem. pub struct PoVDistribution { // Prometheus metrics metrics: Metrics, } impl Subsystem for PoVDistribution where C: SubsystemContext { fn start(self, ctx: C) -> SpawnedSubsystem { // Swallow error because failure is fatal to the node and we log with more precision // within `run`. let future = self.run(ctx) .map_err(|e| SubsystemError::with_origin("pov-distribution", e)) .boxed(); SpawnedSubsystem { name: "pov-distribution-subsystem", future, } } } struct State { relay_parent_state: HashMap, peer_state: HashMap, our_view: View, metrics: Metrics, } struct BlockBasedState { known: HashMap>, /// All the PoVs we are or were fetching, coupled with channels expecting the data. /// /// This may be an empty list, which indicates that we were once awaiting this PoV but have /// received it already. fetching: HashMap>>>, n_validators: usize, } #[derive(Default)] struct PeerState { /// A set of awaited PoV-hashes for each relay-parent in the peer's view. awaited: HashMap>, } fn awaiting_message(relay_parent: Hash, awaiting: Vec) -> protocol_v1::ValidationProtocol { protocol_v1::ValidationProtocol::PoVDistribution( protocol_v1::PoVDistributionMessage::Awaiting(relay_parent, awaiting) ) } fn send_pov_message(relay_parent: Hash, pov_hash: Hash, pov: PoV) -> protocol_v1::ValidationProtocol { protocol_v1::ValidationProtocol::PoVDistribution( protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov) ) } /// Handles the signal. If successful, returns `true` if the subsystem should conclude, /// `false` otherwise. async fn handle_signal( state: &mut State, ctx: &mut impl SubsystemContext, signal: OverseerSignal, ) -> SubsystemResult { match signal { OverseerSignal::Conclude => Ok(true), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { for relay_parent in activated { let (vals_tx, vals_rx) = oneshot::channel(); ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::Validators(vals_tx), ))).await?; let n_validators = match vals_rx.await? { Ok(v) => v.len(), Err(e) => { log::warn!(target: "pov_distribution", "Error fetching validators from runtime API for active leaf: {:?}", e ); // Not adding bookkeeping here might make us behave funny, but we // shouldn't take down the node on spurious runtime API errors. // // and this is "behave funny" as in be bad at our job, but not in any // slashable or security-related way. continue; } }; state.relay_parent_state.insert(relay_parent, BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), n_validators: n_validators, }); } for relay_parent in deactivated { state.relay_parent_state.remove(&relay_parent); } Ok(false) } OverseerSignal::BlockFinalized(_) => Ok(false), } } /// Notify peers that we are awaiting a given PoV hash. /// /// This only notifies peers who have the relay parent in their view. async fn notify_all_we_are_awaiting( peers: &mut HashMap, ctx: &mut impl SubsystemContext, relay_parent: Hash, pov_hash: Hash, ) -> SubsystemResult<()> { // We use `awaited` as a proxy for which heads are in the peer's view. let peers_to_send: Vec<_> = peers.iter() .filter_map(|(peer, state)| if state.awaited.contains_key(&relay_parent) { Some(peer.clone()) } else { None }) .collect(); if peers_to_send.is_empty() { return Ok(()) } let payload = awaiting_message(relay_parent, vec![pov_hash]); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers_to_send, payload, ))).await } /// Notify one peer about everything we're awaiting at a given relay-parent. async fn notify_one_we_are_awaiting_many( peer: &PeerId, ctx: &mut impl SubsystemContext, relay_parent_state: &HashMap, relay_parent: Hash, ) -> SubsystemResult<()> { let awaiting_hashes = relay_parent_state.get(&relay_parent).into_iter().flat_map(|s| { // Send the peer everything we are fetching at this relay-parent s.fetching.iter() .filter(|(_, senders)| !senders.is_empty()) // that has not been completed already. .map(|(pov_hash, _)| *pov_hash) }).collect::>(); if awaiting_hashes.is_empty() { return Ok(()) } let payload = awaiting_message(relay_parent, awaiting_hashes); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( vec![peer.clone()], payload, ))).await } /// Distribute a PoV to peers who are awaiting it. async fn distribute_to_awaiting( peers: &mut HashMap, ctx: &mut impl SubsystemContext, metrics: &Metrics, relay_parent: Hash, pov_hash: Hash, pov: &PoV, ) -> SubsystemResult<()> { // Send to all peers who are awaiting the PoV and have that relay-parent in their view. // // Also removes it from their awaiting set. let peers_to_send: Vec<_> = peers.iter_mut() .filter_map(|(peer, state)| state.awaited.get_mut(&relay_parent).and_then(|awaited| { if awaited.remove(&pov_hash) { Some(peer.clone()) } else { None } })) .collect(); if peers_to_send.is_empty() { return Ok(()) } let payload = send_pov_message(relay_parent, pov_hash, pov.clone()); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers_to_send, payload, ))).await?; metrics.on_pov_distributed(); Ok(()) } /// Handles a `FetchPoV` message. async fn handle_fetch( state: &mut State, ctx: &mut impl SubsystemContext, relay_parent: Hash, descriptor: CandidateDescriptor, response_sender: oneshot::Sender>, ) -> SubsystemResult<()> { let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { Some(s) => s, None => return Ok(()), }; if let Some(pov) = relay_parent_state.known.get(&descriptor.pov_hash) { let _ = response_sender.send(pov.clone()); return Ok(()); } { match relay_parent_state.fetching.entry(descriptor.pov_hash) { Entry::Occupied(mut e) => { // we are already awaiting this PoV if there is an entry. e.get_mut().push(response_sender); return Ok(()); } Entry::Vacant(e) => { e.insert(vec![response_sender]); } } } if relay_parent_state.fetching.len() > 2 * relay_parent_state.n_validators { log::warn!("Other subsystems have requested PoV distribution to \ fetch more PoVs than reasonably expected: {}", relay_parent_state.fetching.len()); return Ok(()); } // Issue an `Awaiting` message to all peers with this in their view. notify_all_we_are_awaiting( &mut state.peer_state, ctx, relay_parent, descriptor.pov_hash ).await } /// Handles a `DistributePoV` message. async fn handle_distribute( state: &mut State, ctx: &mut impl SubsystemContext, relay_parent: Hash, descriptor: CandidateDescriptor, pov: Arc, ) -> SubsystemResult<()> { let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { None => return Ok(()), Some(s) => s, }; if let Some(our_awaited) = relay_parent_state.fetching.get_mut(&descriptor.pov_hash) { // Drain all the senders, but keep the entry in the map around intentionally. // // It signals that we were at one point awaiting this, so we will be able to tell // why peers are sending it to us. for response_sender in our_awaited.drain(..) { let _ = response_sender.send(pov.clone()); } } relay_parent_state.known.insert(descriptor.pov_hash, pov.clone()); distribute_to_awaiting( &mut state.peer_state, ctx, &state.metrics, relay_parent, descriptor.pov_hash, &*pov, ).await } /// Report a reputation change for a peer. async fn report_peer( ctx: &mut impl SubsystemContext, peer: PeerId, rep: Rep, ) -> SubsystemResult<()> { ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(peer, rep))).await } /// Handle a notification from a peer that they are awaiting some PoVs. async fn handle_awaiting( state: &mut State, ctx: &mut impl SubsystemContext, peer: PeerId, relay_parent: Hash, pov_hashes: Vec, ) -> SubsystemResult<()> { if !state.our_view.0.contains(&relay_parent) { report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await?; return Ok(()); } let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { None => { log::warn!("PoV Distribution relay parent state out-of-sync with our view"); return Ok(()); } Some(s) => s, }; let peer_awaiting = match state.peer_state.get_mut(&peer).and_then(|s| s.awaited.get_mut(&relay_parent)) { None => { report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await?; return Ok(()); } Some(a) => a, }; let will_be_awaited = peer_awaiting.len() + pov_hashes.len(); if will_be_awaited <= 2 * relay_parent_state.n_validators { for pov_hash in pov_hashes { // For all requested PoV hashes, if we have it, we complete the request immediately. // Otherwise, we note that the peer is awaiting the PoV. if let Some(pov) = relay_parent_state.known.get(&pov_hash) { let payload = send_pov_message(relay_parent, pov_hash, (&**pov).clone()); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload) )).await?; } else { peer_awaiting.insert(pov_hash); } } } else { report_peer(ctx, peer, COST_APPARENT_FLOOD).await?; } Ok(()) } /// Handle an incoming PoV from our peer. Reports them if unexpected, rewards them if not. /// /// Completes any requests awaiting that PoV. async fn handle_incoming_pov( state: &mut State, ctx: &mut impl SubsystemContext, peer: PeerId, relay_parent: Hash, pov_hash: Hash, pov: PoV, ) -> SubsystemResult<()> { let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { None => { report_peer(ctx, peer, COST_UNEXPECTED_POV).await?; return Ok(()); }, Some(r) => r, }; let pov = { // Do validity checks and complete all senders awaiting this PoV. let fetching = match relay_parent_state.fetching.get_mut(&pov_hash) { None => { report_peer(ctx, peer, COST_UNEXPECTED_POV).await?; return Ok(()); } Some(f) => f, }; let hash = pov.hash(); if hash != pov_hash { report_peer(ctx, peer, COST_UNEXPECTED_POV).await?; return Ok(()); } let pov = Arc::new(pov); if fetching.is_empty() { // fetching is empty whenever we were awaiting something and // it was completed afterwards. report_peer(ctx, peer.clone(), BENEFIT_LATE_POV).await?; } else { // fetching is non-empty when the peer just provided us with data we needed. report_peer(ctx, peer.clone(), BENEFIT_FRESH_POV).await?; } for response_sender in fetching.drain(..) { let _ = response_sender.send(pov.clone()); } pov }; // make sure we don't consider this peer as awaiting that PoV anymore. if let Some(peer_state) = state.peer_state.get_mut(&peer) { peer_state.awaited.remove(&pov_hash); } // distribute the PoV to all other peers who are awaiting it. distribute_to_awaiting( &mut state.peer_state, ctx, &state.metrics, relay_parent, pov_hash, &*pov, ).await } /// Handles a network bridge update. async fn handle_network_update( state: &mut State, ctx: &mut impl SubsystemContext, update: NetworkBridgeEvent, ) -> SubsystemResult<()> { match update { NetworkBridgeEvent::PeerConnected(peer, _observed_role) => { state.peer_state.insert(peer, PeerState { awaited: HashMap::new() }); Ok(()) } NetworkBridgeEvent::PeerDisconnected(peer) => { state.peer_state.remove(&peer); Ok(()) } NetworkBridgeEvent::PeerViewChange(peer_id, view) => { if let Some(peer_state) = state.peer_state.get_mut(&peer_id) { // prune anything not in the new view. peer_state.awaited.retain(|relay_parent, _| view.0.contains(&relay_parent)); // introduce things from the new view. for relay_parent in view.0.iter() { if let Entry::Vacant(entry) = peer_state.awaited.entry(*relay_parent) { entry.insert(HashSet::new()); // Notify the peer about everything we're awaiting at the new relay-parent. notify_one_we_are_awaiting_many( &peer_id, ctx, &state.relay_parent_state, *relay_parent, ).await?; } } } Ok(()) } NetworkBridgeEvent::PeerMessage(peer, message) => { match message { protocol_v1::PoVDistributionMessage::Awaiting(relay_parent, pov_hashes) => handle_awaiting( state, ctx, peer, relay_parent, pov_hashes, ).await, protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov) => handle_incoming_pov( state, ctx, peer, relay_parent, pov_hash, pov, ).await, } } NetworkBridgeEvent::OurViewChange(view) => { state.our_view = view; Ok(()) } } } impl PoVDistribution { /// Create a new instance of `PovDistribution`. pub fn new(metrics: Metrics) -> Self { Self { metrics } } async fn run( self, mut ctx: impl SubsystemContext, ) -> SubsystemResult<()> { let mut state = State { relay_parent_state: HashMap::new(), peer_state: HashMap::new(), our_view: View(Vec::new()), metrics: self.metrics, }; loop { match ctx.recv().await? { FromOverseer::Signal(signal) => if handle_signal(&mut state, &mut ctx, signal).await? { return Ok(()); }, FromOverseer::Communication { msg } => match msg { PoVDistributionMessage::FetchPoV(relay_parent, descriptor, response_sender) => handle_fetch( &mut state, &mut ctx, relay_parent, descriptor, response_sender, ).await?, PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) => handle_distribute( &mut state, &mut ctx, relay_parent, descriptor, pov, ).await?, PoVDistributionMessage::NetworkBridgeUpdateV1(event) => handle_network_update( &mut state, &mut ctx, event, ).await?, }, } } } } #[derive(Clone)] struct MetricsInner { povs_distributed: prometheus::Counter, } /// Availability Distribution metrics. #[derive(Default, Clone)] pub struct Metrics(Option); impl Metrics { fn on_pov_distributed(&self) { if let Some(metrics) = &self.0 { metrics.povs_distributed.inc(); } } } impl metrics::Metrics for Metrics { fn try_register(registry: &prometheus::Registry) -> std::result::Result { let metrics = MetricsInner { povs_distributed: prometheus::register( prometheus::Counter::new( "parachain_povs_distributed_total", "Number of PoVs distributed to other peers." )?, registry, )?, }; Ok(Metrics(Some(metrics))) } } #[cfg(test)] mod tests;