// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! PoV Distribution Subsystem of Polkadot.
//!
//! This is a gossip implementation of code that is responsible for distributing PoVs
//! among validators.
use polkadot_primitives::v1::{Hash, PoV, CandidateDescriptor};
use polkadot_subsystem::{
ActiveLeavesUpdate, OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, FromOverseer, SpawnedSubsystem,
};
use polkadot_subsystem::messages::{
PoVDistributionMessage, RuntimeApiMessage, RuntimeApiRequest, AllMessages, NetworkBridgeMessage,
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View,
};
use futures::prelude::*;
use futures::channel::oneshot;
use std::collections::{hash_map::{Entry, HashMap}, HashSet};
use std::sync::Arc;
const COST_APPARENT_FLOOD: Rep = Rep::new(-500, "Peer appears to be flooding us with PoV requests");
const COST_UNEXPECTED_POV: Rep = Rep::new(-500, "Peer sent us an unexpected PoV");
const COST_AWAITED_NOT_IN_VIEW: Rep
= Rep::new(-100, "Peer claims to be awaiting something outside of its view");
const BENEFIT_FRESH_POV: Rep = Rep::new(25, "Peer supplied us with an awaited PoV");
const BENEFIT_LATE_POV: Rep = Rep::new(10, "Peer supplied us with an awaited PoV, \
but was not the first to do so");
/// The PoV Distribution Subsystem.
pub struct PoVDistribution;
impl Subsystem for PoVDistribution
where C: SubsystemContext
{
fn start(self, ctx: C) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run`.
SpawnedSubsystem {
name: "pov-distribution-subsystem",
future: run(ctx).map(|_| ()).boxed(),
}
}
}
struct State {
relay_parent_state: HashMap,
peer_state: HashMap,
our_view: View,
}
struct BlockBasedState {
known: HashMap>,
/// All the PoVs we are or were fetching, coupled with channels expecting the data.
///
/// This may be an empty list, which indicates that we were once awaiting this PoV but have
/// received it already.
fetching: HashMap>>>,
n_validators: usize,
}
#[derive(Default)]
struct PeerState {
/// A set of awaited PoV-hashes for each relay-parent in the peer's view.
awaited: HashMap>,
}
fn awaiting_message(relay_parent: Hash, awaiting: Vec)
-> protocol_v1::ValidationProtocol
{
protocol_v1::ValidationProtocol::PoVDistribution(
protocol_v1::PoVDistributionMessage::Awaiting(relay_parent, awaiting)
)
}
fn send_pov_message(relay_parent: Hash, pov_hash: Hash, pov: PoV)
-> protocol_v1::ValidationProtocol
{
protocol_v1::ValidationProtocol::PoVDistribution(
protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov)
)
}
/// Handles the signal. If successful, returns `true` if the subsystem should conclude,
/// `false` otherwise.
async fn handle_signal(
state: &mut State,
ctx: &mut impl SubsystemContext,
signal: OverseerSignal,
) -> SubsystemResult {
match signal {
OverseerSignal::Conclude => Ok(true),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
for relay_parent in activated {
let (vals_tx, vals_rx) = oneshot::channel();
ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::Validators(vals_tx),
))).await?;
let n_validators = match vals_rx.await? {
Ok(v) => v.len(),
Err(e) => {
log::warn!(target: "pov_distribution",
"Error fetching validators from runtime API for active leaf: {:?}",
e
);
// Not adding bookkeeping here might make us behave funny, but we
// shouldn't take down the node on spurious runtime API errors.
//
// and this is "behave funny" as in be bad at our job, but not in any
// slashable or security-related way.
continue;
}
};
state.relay_parent_state.insert(relay_parent, BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: n_validators,
});
}
for relay_parent in deactivated {
state.relay_parent_state.remove(&relay_parent);
}
Ok(false)
}
OverseerSignal::BlockFinalized(_) => Ok(false),
}
}
/// Notify peers that we are awaiting a given PoV hash.
///
/// This only notifies peers who have the relay parent in their view.
async fn notify_all_we_are_awaiting(
peers: &mut HashMap,
ctx: &mut impl SubsystemContext,
relay_parent: Hash,
pov_hash: Hash,
) -> SubsystemResult<()> {
// We use `awaited` as a proxy for which heads are in the peer's view.
let peers_to_send: Vec<_> = peers.iter()
.filter_map(|(peer, state)| if state.awaited.contains_key(&relay_parent) {
Some(peer.clone())
} else {
None
})
.collect();
if peers_to_send.is_empty() { return Ok(()) }
let payload = awaiting_message(relay_parent, vec![pov_hash]);
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send,
payload,
))).await
}
/// Notify one peer about everything we're awaiting at a given relay-parent.
async fn notify_one_we_are_awaiting_many(
peer: &PeerId,
ctx: &mut impl SubsystemContext,
relay_parent_state: &HashMap,
relay_parent: Hash,
) -> SubsystemResult<()> {
let awaiting_hashes = relay_parent_state.get(&relay_parent).into_iter().flat_map(|s| {
// Send the peer everything we are fetching at this relay-parent
s.fetching.iter()
.filter(|(_, senders)| !senders.is_empty()) // that has not been completed already.
.map(|(pov_hash, _)| *pov_hash)
}).collect::>();
if awaiting_hashes.is_empty() { return Ok(()) }
let payload = awaiting_message(relay_parent, awaiting_hashes);
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
vec![peer.clone()],
payload,
))).await
}
/// Distribute a PoV to peers who are awaiting it.
async fn distribute_to_awaiting(
peers: &mut HashMap,
ctx: &mut impl SubsystemContext,
relay_parent: Hash,
pov_hash: Hash,
pov: &PoV,
) -> SubsystemResult<()> {
// Send to all peers who are awaiting the PoV and have that relay-parent in their view.
//
// Also removes it from their awaiting set.
let peers_to_send: Vec<_> = peers.iter_mut()
.filter_map(|(peer, state)| state.awaited.get_mut(&relay_parent).and_then(|awaited| {
if awaited.remove(&pov_hash) {
Some(peer.clone())
} else {
None
}
}))
.collect();
if peers_to_send.is_empty() { return Ok(()) }
let payload = send_pov_message(relay_parent, pov_hash, pov.clone());
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send,
payload,
))).await
}
/// Handles a `FetchPoV` message.
async fn handle_fetch(
state: &mut State,
ctx: &mut impl SubsystemContext,
relay_parent: Hash,
descriptor: CandidateDescriptor,
response_sender: oneshot::Sender>,
) -> SubsystemResult<()> {
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
Some(s) => s,
None => return Ok(()),
};
if let Some(pov) = relay_parent_state.known.get(&descriptor.pov_hash) {
let _ = response_sender.send(pov.clone());
return Ok(());
}
{
match relay_parent_state.fetching.entry(descriptor.pov_hash) {
Entry::Occupied(mut e) => {
// we are already awaiting this PoV if there is an entry.
e.get_mut().push(response_sender);
return Ok(());
}
Entry::Vacant(e) => {
e.insert(vec![response_sender]);
}
}
}
if relay_parent_state.fetching.len() > 2 * relay_parent_state.n_validators {
log::warn!("Other subsystems have requested PoV distribution to \
fetch more PoVs than reasonably expected: {}", relay_parent_state.fetching.len());
return Ok(());
}
// Issue an `Awaiting` message to all peers with this in their view.
notify_all_we_are_awaiting(
&mut state.peer_state,
ctx,
relay_parent,
descriptor.pov_hash
).await
}
/// Handles a `DistributePoV` message.
async fn handle_distribute(
state: &mut State,
ctx: &mut impl SubsystemContext,
relay_parent: Hash,
descriptor: CandidateDescriptor,
pov: Arc,
) -> SubsystemResult<()> {
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
None => return Ok(()),
Some(s) => s,
};
if let Some(our_awaited) = relay_parent_state.fetching.get_mut(&descriptor.pov_hash) {
// Drain all the senders, but keep the entry in the map around intentionally.
//
// It signals that we were at one point awaiting this, so we will be able to tell
// why peers are sending it to us.
for response_sender in our_awaited.drain(..) {
let _ = response_sender.send(pov.clone());
}
}
relay_parent_state.known.insert(descriptor.pov_hash, pov.clone());
distribute_to_awaiting(
&mut state.peer_state,
ctx,
relay_parent,
descriptor.pov_hash,
&*pov,
).await
}
/// Report a reputation change for a peer.
async fn report_peer(
ctx: &mut impl SubsystemContext,
peer: PeerId,
rep: Rep,
) -> SubsystemResult<()> {
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(peer, rep))).await
}
/// Handle a notification from a peer that they are awaiting some PoVs.
async fn handle_awaiting(
state: &mut State,
ctx: &mut impl SubsystemContext,
peer: PeerId,
relay_parent: Hash,
pov_hashes: Vec,
) -> SubsystemResult<()> {
if !state.our_view.0.contains(&relay_parent) {
report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await?;
return Ok(());
}
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
None => {
log::warn!("PoV Distribution relay parent state out-of-sync with our view");
return Ok(());
}
Some(s) => s,
};
let peer_awaiting = match
state.peer_state.get_mut(&peer).and_then(|s| s.awaited.get_mut(&relay_parent))
{
None => {
report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await?;
return Ok(());
}
Some(a) => a,
};
let will_be_awaited = peer_awaiting.len() + pov_hashes.len();
if will_be_awaited <= 2 * relay_parent_state.n_validators {
for pov_hash in pov_hashes {
// For all requested PoV hashes, if we have it, we complete the request immediately.
// Otherwise, we note that the peer is awaiting the PoV.
if let Some(pov) = relay_parent_state.known.get(&pov_hash) {
let payload = send_pov_message(relay_parent, pov_hash, (&**pov).clone());
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await?;
} else {
peer_awaiting.insert(pov_hash);
}
}
} else {
report_peer(ctx, peer, COST_APPARENT_FLOOD).await?;
}
Ok(())
}
/// Handle an incoming PoV from our peer. Reports them if unexpected, rewards them if not.
///
/// Completes any requests awaiting that PoV.
async fn handle_incoming_pov(
state: &mut State,
ctx: &mut impl SubsystemContext,
peer: PeerId,
relay_parent: Hash,
pov_hash: Hash,
pov: PoV,
) -> SubsystemResult<()> {
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
None => {
report_peer(ctx, peer, COST_UNEXPECTED_POV).await?;
return Ok(());
},
Some(r) => r,
};
let pov = {
// Do validity checks and complete all senders awaiting this PoV.
let fetching = match relay_parent_state.fetching.get_mut(&pov_hash) {
None => {
report_peer(ctx, peer, COST_UNEXPECTED_POV).await?;
return Ok(());
}
Some(f) => f,
};
let hash = pov.hash();
if hash != pov_hash {
report_peer(ctx, peer, COST_UNEXPECTED_POV).await?;
return Ok(());
}
let pov = Arc::new(pov);
if fetching.is_empty() {
// fetching is empty whenever we were awaiting something and
// it was completed afterwards.
report_peer(ctx, peer.clone(), BENEFIT_LATE_POV).await?;
} else {
// fetching is non-empty when the peer just provided us with data we needed.
report_peer(ctx, peer.clone(), BENEFIT_FRESH_POV).await?;
}
for response_sender in fetching.drain(..) {
let _ = response_sender.send(pov.clone());
}
pov
};
// make sure we don't consider this peer as awaiting that PoV anymore.
if let Some(peer_state) = state.peer_state.get_mut(&peer) {
peer_state.awaited.remove(&pov_hash);
}
// distribute the PoV to all other peers who are awaiting it.
distribute_to_awaiting(
&mut state.peer_state,
ctx,
relay_parent,
pov_hash,
&*pov,
).await
}
/// Handles a network bridge update.
async fn handle_network_update(
state: &mut State,
ctx: &mut impl SubsystemContext,
update: NetworkBridgeEvent,
) -> SubsystemResult<()> {
match update {
NetworkBridgeEvent::PeerConnected(peer, _observed_role) => {
state.peer_state.insert(peer, PeerState { awaited: HashMap::new() });
Ok(())
}
NetworkBridgeEvent::PeerDisconnected(peer) => {
state.peer_state.remove(&peer);
Ok(())
}
NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
if let Some(peer_state) = state.peer_state.get_mut(&peer_id) {
// prune anything not in the new view.
peer_state.awaited.retain(|relay_parent, _| view.0.contains(&relay_parent));
// introduce things from the new view.
for relay_parent in view.0.iter() {
if let Entry::Vacant(entry) = peer_state.awaited.entry(*relay_parent) {
entry.insert(HashSet::new());
// Notify the peer about everything we're awaiting at the new relay-parent.
notify_one_we_are_awaiting_many(
&peer_id,
ctx,
&state.relay_parent_state,
*relay_parent,
).await?;
}
}
}
Ok(())
}
NetworkBridgeEvent::PeerMessage(peer, message) => {
match message {
protocol_v1::PoVDistributionMessage::Awaiting(relay_parent, pov_hashes)
=> handle_awaiting(
state,
ctx,
peer,
relay_parent,
pov_hashes,
).await,
protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov)
=> handle_incoming_pov(
state,
ctx,
peer,
relay_parent,
pov_hash,
pov,
).await,
}
}
NetworkBridgeEvent::OurViewChange(view) => {
state.our_view = view;
Ok(())
}
}
}
async fn run(
mut ctx: impl SubsystemContext,
) -> SubsystemResult<()> {
let mut state = State {
relay_parent_state: HashMap::new(),
peer_state: HashMap::new(),
our_view: View(Vec::new()),
};
loop {
match ctx.recv().await? {
FromOverseer::Signal(signal) => if handle_signal(&mut state, &mut ctx, signal).await? {
return Ok(());
},
FromOverseer::Communication { msg } => match msg {
PoVDistributionMessage::FetchPoV(relay_parent, descriptor, response_sender) =>
handle_fetch(
&mut state,
&mut ctx,
relay_parent,
descriptor,
response_sender,
).await?,
PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) =>
handle_distribute(
&mut state,
&mut ctx,
relay_parent,
descriptor,
pov,
).await?,
PoVDistributionMessage::NetworkBridgeUpdateV1(event) =>
handle_network_update(
&mut state,
&mut ctx,
event,
).await?,
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::executor;
use polkadot_primitives::v1::BlockData;
use assert_matches::assert_matches;
fn make_pov(data: Vec) -> PoV {
PoV { block_data: BlockData(data) }
}
fn make_peer_state(awaited: Vec<(Hash, Vec)>)
-> PeerState
{
PeerState {
awaited: awaited.into_iter().map(|(rp, h)| (rp, h.into_iter().collect())).collect()
}
}
#[test]
fn distributes_to_those_awaiting_and_completes_local() {
let hash_a: Hash = [0; 32].into();
let hash_b: Hash = [1; 32].into();
let peer_a = PeerId::random();
let peer_b = PeerId::random();
let peer_c = PeerId::random();
let (pov_send, pov_recv) = oneshot::channel();
let pov = make_pov(vec![1, 2, 3]);
let pov_hash = pov.hash();
let mut state = State {
relay_parent_state: {
let mut s = HashMap::new();
let mut b = BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: 10,
};
b.fetching.insert(pov_hash, vec![pov_send]);
s.insert(hash_a, b);
s
},
peer_state: {
let mut s = HashMap::new();
// peer A has hash_a in its view and is awaiting the PoV.
s.insert(
peer_a.clone(),
make_peer_state(vec![(hash_a, vec![pov_hash])]),
);
// peer B has hash_a in its view but is not awaiting.
s.insert(
peer_b.clone(),
make_peer_state(vec![(hash_a, vec![])]),
);
// peer C doesn't have hash_a in its view but is awaiting the PoV under hash_b.
s.insert(
peer_c.clone(),
make_peer_state(vec![(hash_b, vec![pov_hash])]),
);
s
},
our_view: View(vec![hash_a, hash_b]),
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let mut descriptor = CandidateDescriptor::default();
descriptor.pov_hash = pov_hash;
executor::block_on(async move {
handle_distribute(
&mut state,
&mut ctx,
hash_a,
descriptor,
Arc::new(pov.clone()),
).await.unwrap();
assert!(!state.peer_state[&peer_a].awaited[&hash_a].contains(&pov_hash));
assert!(state.peer_state[&peer_c].awaited[&hash_b].contains(&pov_hash));
// our local sender also completed
assert_eq!(&*pov_recv.await.unwrap(), &pov);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(peers, message)
) => {
assert_eq!(peers, vec![peer_a.clone()]);
assert_eq!(
message,
send_pov_message(hash_a, pov_hash, pov.clone()),
);
}
)
});
}
#[test]
fn we_inform_peers_with_same_view_we_are_awaiting() {
let hash_a: Hash = [0; 32].into();
let hash_b: Hash = [1; 32].into();
let peer_a = PeerId::random();
let peer_b = PeerId::random();
let (pov_send, _) = oneshot::channel();
let pov = make_pov(vec![1, 2, 3]);
let pov_hash = pov.hash();
let mut state = State {
relay_parent_state: {
let mut s = HashMap::new();
let b = BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: 10,
};
s.insert(hash_a, b);
s
},
peer_state: {
let mut s = HashMap::new();
// peer A has hash_a in its view.
s.insert(
peer_a.clone(),
make_peer_state(vec![(hash_a, vec![])]),
);
// peer B doesn't have hash_a in its view.
s.insert(
peer_b.clone(),
make_peer_state(vec![(hash_b, vec![])]),
);
s
},
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let mut descriptor = CandidateDescriptor::default();
descriptor.pov_hash = pov_hash;
executor::block_on(async move {
handle_fetch(
&mut state,
&mut ctx,
hash_a,
descriptor,
pov_send,
).await.unwrap();
assert_eq!(state.relay_parent_state[&hash_a].fetching[&pov_hash].len(), 1);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(peers, message)
) => {
assert_eq!(peers, vec![peer_a.clone()]);
assert_eq!(
message,
awaiting_message(hash_a, vec![pov_hash]),
);
}
)
});
}
#[test]
fn peer_view_change_leads_to_us_informing() {
let hash_a: Hash = [0; 32].into();
let hash_b: Hash = [1; 32].into();
let peer_a = PeerId::random();
let (pov_a_send, _) = oneshot::channel();
let pov_a = make_pov(vec![1, 2, 3]);
let pov_a_hash = pov_a.hash();
let pov_b = make_pov(vec![4, 5, 6]);
let pov_b_hash = pov_b.hash();
let mut state = State {
relay_parent_state: {
let mut s = HashMap::new();
let mut b = BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: 10,
};
// pov_a is still being fetched, whereas the fetch of pov_b has already
// completed, as implied by the empty vector.
b.fetching.insert(pov_a_hash, vec![pov_a_send]);
b.fetching.insert(pov_b_hash, vec![]);
s.insert(hash_a, b);
s
},
peer_state: {
let mut s = HashMap::new();
// peer A doesn't yet have hash_a in its view.
s.insert(
peer_a.clone(),
make_peer_state(vec![(hash_b, vec![])]),
);
s
},
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
handle_network_update(
&mut state,
&mut ctx,
NetworkBridgeEvent::PeerViewChange(peer_a.clone(), View(vec![hash_a, hash_b])),
).await.unwrap();
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(peers, message)
) => {
assert_eq!(peers, vec![peer_a.clone()]);
assert_eq!(
message,
awaiting_message(hash_a, vec![pov_a_hash]),
);
}
)
});
}
#[test]
fn peer_complete_fetch_and_is_rewarded() {
let hash_a: Hash = [0; 32].into();
let peer_a = PeerId::random();
let peer_b = PeerId::random();
let (pov_send, pov_recv) = oneshot::channel();
let pov = make_pov(vec![1, 2, 3]);
let pov_hash = pov.hash();
let mut state = State {
relay_parent_state: {
let mut s = HashMap::new();
let mut b = BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: 10,
};
// pov is being fetched.
b.fetching.insert(pov_hash, vec![pov_send]);
s.insert(hash_a, b);
s
},
peer_state: {
let mut s = HashMap::new();
// peers A and B are functionally the same.
s.insert(
peer_a.clone(),
make_peer_state(vec![(hash_a, vec![])]),
);
s.insert(
peer_b.clone(),
make_peer_state(vec![(hash_a, vec![])]),
);
s
},
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
// Peer A answers our request before peer B.
handle_network_update(
&mut state,
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
).focus().unwrap(),
).await.unwrap();
handle_network_update(
&mut state,
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
).focus().unwrap(),
).await.unwrap();
assert_eq!(&*pov_recv.await.unwrap(), &pov);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_a);
assert_eq!(rep, BENEFIT_FRESH_POV);
}
);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, BENEFIT_LATE_POV);
}
);
});
}
#[test]
fn peer_punished_for_sending_bad_pov() {
let hash_a: Hash = [0; 32].into();
let peer_a = PeerId::random();
let (pov_send, _) = oneshot::channel();
let pov = make_pov(vec![1, 2, 3]);
let pov_hash = pov.hash();
let bad_pov = make_pov(vec![6, 6, 6]);
let mut state = State {
relay_parent_state: {
let mut s = HashMap::new();
let mut b = BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: 10,
};
// pov is being fetched.
b.fetching.insert(pov_hash, vec![pov_send]);
s.insert(hash_a, b);
s
},
peer_state: {
let mut s = HashMap::new();
s.insert(
peer_a.clone(),
make_peer_state(vec![(hash_a, vec![])]),
);
s
},
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
// Peer A answers our request: right relay parent, awaited hash, wrong PoV.
handle_network_update(
&mut state,
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, bad_pov.clone()),
).focus().unwrap(),
).await.unwrap();
// didn't complete our sender.
assert_eq!(state.relay_parent_state[&hash_a].fetching[&pov_hash].len(), 1);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_a);
assert_eq!(rep, COST_UNEXPECTED_POV);
}
);
});
}
#[test]
fn peer_punished_for_sending_unexpected_pov() {
let hash_a: Hash = [0; 32].into();
let peer_a = PeerId::random();
let pov = make_pov(vec![1, 2, 3]);
let pov_hash = pov.hash();
let mut state = State {
relay_parent_state: {
let mut s = HashMap::new();
let b = BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: 10,
};
s.insert(hash_a, b);
s
},
peer_state: {
let mut s = HashMap::new();
s.insert(
peer_a.clone(),
make_peer_state(vec![(hash_a, vec![])]),
);
s
},
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
// Peer A answers our request: right relay parent, awaited hash, wrong PoV.
handle_network_update(
&mut state,
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
).focus().unwrap(),
).await.unwrap();
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_a);
assert_eq!(rep, COST_UNEXPECTED_POV);
}
);
});
}
#[test]
fn peer_punished_for_sending_pov_out_of_our_view() {
let hash_a: Hash = [0; 32].into();
let hash_b: Hash = [1; 32].into();
let peer_a = PeerId::random();
let pov = make_pov(vec![1, 2, 3]);
let pov_hash = pov.hash();
let mut state = State {
relay_parent_state: {
let mut s = HashMap::new();
let b = BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: 10,
};
s.insert(hash_a, b);
s
},
peer_state: {
let mut s = HashMap::new();
s.insert(
peer_a.clone(),
make_peer_state(vec![(hash_a, vec![])]),
);
s
},
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
// Peer A answers our request: right relay parent, awaited hash, wrong PoV.
handle_network_update(
&mut state,
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_b, pov_hash, pov.clone()),
).focus().unwrap(),
).await.unwrap();
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_a);
assert_eq!(rep, COST_UNEXPECTED_POV);
}
);
});
}
#[test]
fn peer_reported_for_awaiting_too_much() {
let hash_a: Hash = [0; 32].into();
let peer_a = PeerId::random();
let n_validators = 10;
let mut state = State {
relay_parent_state: {
let mut s = HashMap::new();
let b = BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators,
};
s.insert(hash_a, b);
s
},
peer_state: {
let mut s = HashMap::new();
s.insert(
peer_a.clone(),
make_peer_state(vec![(hash_a, vec![])]),
);
s
},
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
let max_plausibly_awaited = n_validators * 2;
// The peer awaits a plausible (albeit unlikely) amount of PoVs.
for i in 0..max_plausibly_awaited {
let pov_hash = make_pov(vec![i as u8; 32]).hash();
handle_network_update(
&mut state,
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
awaiting_message(hash_a, vec![pov_hash]),
).focus().unwrap(),
).await.unwrap();
}
assert_eq!(state.peer_state[&peer_a].awaited[&hash_a].len(), max_plausibly_awaited);
// The last straw:
let last_pov_hash = make_pov(vec![max_plausibly_awaited as u8; 32]).hash();
handle_network_update(
&mut state,
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
awaiting_message(hash_a, vec![last_pov_hash]),
).focus().unwrap(),
).await.unwrap();
// No more bookkeeping for you!
assert_eq!(state.peer_state[&peer_a].awaited[&hash_a].len(), max_plausibly_awaited);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_a);
assert_eq!(rep, COST_APPARENT_FLOOD);
}
);
});
}
#[test]
fn peer_reported_for_awaiting_outside_their_view() {
let hash_a: Hash = [0; 32].into();
let hash_b: Hash = [1; 32].into();
let peer_a = PeerId::random();
let mut state = State {
relay_parent_state: {
let mut s = HashMap::new();
s.insert(hash_a, BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: 10,
});
s.insert(hash_b, BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: 10,
});
s
},
peer_state: {
let mut s = HashMap::new();
// Peer has only hash A in its view.
s.insert(
peer_a.clone(),
make_peer_state(vec![(hash_a, vec![])]),
);
s
},
our_view: View(vec![hash_a, hash_b]),
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
let pov_hash = make_pov(vec![1, 2, 3]).hash();
// Hash B is in our view but not the peer's
handle_network_update(
&mut state,
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
awaiting_message(hash_b, vec![pov_hash]),
).focus().unwrap(),
).await.unwrap();
assert!(state.peer_state[&peer_a].awaited.get(&hash_b).is_none());
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_a);
assert_eq!(rep, COST_AWAITED_NOT_IN_VIEW);
}
);
});
}
#[test]
fn peer_reported_for_awaiting_outside_our_view() {
let hash_a: Hash = [0; 32].into();
let hash_b: Hash = [1; 32].into();
let peer_a = PeerId::random();
let mut state = State {
relay_parent_state: {
let mut s = HashMap::new();
s.insert(hash_a, BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: 10,
});
s
},
peer_state: {
let mut s = HashMap::new();
// Peer has hashes A and B in their view.
s.insert(
peer_a.clone(),
make_peer_state(vec![(hash_a, vec![]), (hash_b, vec![])]),
);
s
},
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
let pov_hash = make_pov(vec![1, 2, 3]).hash();
// Hash B is in peer's view but not ours.
handle_network_update(
&mut state,
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
awaiting_message(hash_b, vec![pov_hash]),
).focus().unwrap(),
).await.unwrap();
// Illegal `awaited` is ignored.
assert!(state.peer_state[&peer_a].awaited[&hash_b].is_empty());
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_a);
assert_eq!(rep, COST_AWAITED_NOT_IN_VIEW);
}
);
});
}
#[test]
fn peer_complete_fetch_leads_to_us_completing_others() {
let hash_a: Hash = [0; 32].into();
let peer_a = PeerId::random();
let peer_b = PeerId::random();
let (pov_send, pov_recv) = oneshot::channel();
let pov = make_pov(vec![1, 2, 3]);
let pov_hash = pov.hash();
let mut state = State {
relay_parent_state: {
let mut s = HashMap::new();
let mut b = BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: 10,
};
// pov is being fetched.
b.fetching.insert(pov_hash, vec![pov_send]);
s.insert(hash_a, b);
s
},
peer_state: {
let mut s = HashMap::new();
s.insert(
peer_a.clone(),
make_peer_state(vec![(hash_a, vec![])]),
);
// peer B is awaiting peer A's request.
s.insert(
peer_b.clone(),
make_peer_state(vec![(hash_a, vec![pov_hash])]),
);
s
},
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
handle_network_update(
&mut state,
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
).focus().unwrap(),
).await.unwrap();
assert_eq!(&*pov_recv.await.unwrap(), &pov);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_a);
assert_eq!(rep, BENEFIT_FRESH_POV);
}
);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(peers, message)
) => {
assert_eq!(peers, vec![peer_b.clone()]);
assert_eq!(
message,
send_pov_message(hash_a, pov_hash, pov.clone()),
);
}
);
assert!(!state.peer_state[&peer_b].awaited[&hash_a].contains(&pov_hash));
});
}
#[test]
fn peer_completing_request_no_longer_awaiting() {
let hash_a: Hash = [0; 32].into();
let peer_a = PeerId::random();
let (pov_send, pov_recv) = oneshot::channel();
let pov = make_pov(vec![1, 2, 3]);
let pov_hash = pov.hash();
let mut state = State {
relay_parent_state: {
let mut s = HashMap::new();
let mut b = BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: 10,
};
// pov is being fetched.
b.fetching.insert(pov_hash, vec![pov_send]);
s.insert(hash_a, b);
s
},
peer_state: {
let mut s = HashMap::new();
// peer A is registered as awaiting.
s.insert(
peer_a.clone(),
make_peer_state(vec![(hash_a, vec![pov_hash])]),
);
s
},
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
handle_network_update(
&mut state,
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
).focus().unwrap(),
).await.unwrap();
assert_eq!(&*pov_recv.await.unwrap(), &pov);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_a);
assert_eq!(rep, BENEFIT_FRESH_POV);
}
);
// We received the PoV from peer A, so we do not consider it awaited by peer A anymore.
assert!(!state.peer_state[&peer_a].awaited[&hash_a].contains(&pov_hash));
});
}
}