Newer
Older
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//! Gossip messages and the message validator.
//!
//! At the moment, this module houses 2 gossip protocols central to Polkadot.
//!
//! The first is the attestation-gossip system, which aims to circulate parachain
//! candidate attestations by validators at leaves of the block-DAG.
//!
//! The second is the inter-chain message queue routing gossip, which aims to
//! circulate message queues between parachains, which remain un-routed as of
//! recent leaves.
//!
//! These gossip systems do not have any form of sybil-resistance in terms
//! of the nodes which can participate. It could be imposed e.g. by limiting only to
//! validators, but this would prevent message queues from getting into the hands
//! of collators and of attestations from getting into the hands of fishermen.
//! As such, we take certain precautions which allow arbitrary full nodes to
//! join the gossip graph, as well as validators (who are likely to be well-connected
//! amongst themselves).
//!
//! The first is the notion of a neighbor packet. This is a packet sent between
//! neighbors of the gossip graph to inform each other of their current protocol
//! state. As of this writing, for both attestation and message-routing gossip,
//! the only necessary information here is a (length-limited) set of perceived
//! leaves of the block-DAG.
//!
//! These leaves can be used to derive what information a node is willing to accept
//! There is typically an unbounded amount of possible "future" information relative to
//! any protocol state. For example, attestations or unrouted message queues from millions
//! of blocks after a known protocol state. The neighbor packet is meant to avoid being
//! spammed by illegitimate future information, while informing neighbors of when
//! previously-future and now current gossip messages would be accepted.
//!
//! Peers who send information which was not allowed under a recent neighbor packet
//! will be noted as non-beneficial to Substrate's peer-set management utility.
use sp_runtime::traits::{BlakeTwo256, Hash as HashT};
use sc_network::{config::Roles, PeerId, ReputationChange};
use sc_network::NetworkService;
use sc_network_gossip::{
ValidationResult as GossipValidationResult,
ValidatorContext, MessageIntent,
use polkadot_validation::{SignedStatement};
use polkadot_primitives::{Block, Hash};
use polkadot_primitives::parachain::{
ParachainHost, ValidatorId, ErasureChunk as PrimitiveChunk, SigningContext,
};
use polkadot_erasure_coding::{self as erasure};
use futures::prelude::*;
use crate::legacy::{GossipMessageStream, GossipService};
use attestation::{View as AttestationView, PeerData as AttestationPeerData};
mod attestation;
/// The engine ID of the polkadot attestation system.
pub const POLKADOT_ENGINE_ID: sp_runtime::ConsensusEngineId = *b"dot1";
pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/legacy/1";
asynchronous rob
committed
// arbitrary; in practice this should not be more than 2.
pub(crate) const MAX_CHAIN_HEADS: usize = 5;
/// Type alias for a bounded vector of leaves.
pub type LeavesVec = ArrayVec<[Hash; MAX_CHAIN_HEADS]>;
asynchronous rob
committed
mod benefit {
use sc_network::ReputationChange as Rep;
asynchronous rob
committed
/// When a peer sends us a previously-unknown candidate statement.
pub const NEW_CANDIDATE: Rep = Rep::new(100, "Polkadot: New candidate");
asynchronous rob
committed
/// When a peer sends us a previously-unknown attestation.
pub const NEW_ATTESTATION: Rep = Rep::new(50, "Polkadot: New attestation");
/// When a peer sends us a previously-unknown erasure chunk.
pub const NEW_ERASURE_CHUNK: Rep = Rep::new(10, "Polkadot: New erasure chunk");
asynchronous rob
committed
}
mod cost {
use sc_network::ReputationChange as Rep;
/// No cost. This will not be reported.
pub const NONE: Rep = Rep::new(0, "");
asynchronous rob
committed
/// A peer sent us an attestation and we don't know the candidate.
pub const ATTESTATION_NO_CANDIDATE: Rep = Rep::new(-100, "Polkadot: No candidate");
asynchronous rob
committed
/// A peer sent us a statement we consider in the future.
pub const FUTURE_MESSAGE: Rep = Rep::new(-100, "Polkadot: Future message");
asynchronous rob
committed
/// A peer sent us a statement from the past.
pub const PAST_MESSAGE: Rep = Rep::new(-30, "Polkadot: Past message");
asynchronous rob
committed
/// A peer sent us a malformed message.
pub const MALFORMED_MESSAGE: Rep = Rep::new(-500, "Polkadot: Malformed message");
asynchronous rob
committed
/// A peer sent us a wrongly signed message.
pub const BAD_SIGNATURE: Rep = Rep::new(-500, "Polkadot: Bad signature");
asynchronous rob
committed
/// A peer sent us a bad neighbor packet.
pub const BAD_NEIGHBOR_PACKET: Rep = Rep::new(-300, "Polkadot: Bad neighbor");
/// A peer sent us an erasure chunk referring to a candidate that we are not aware of.
pub const ORPHANED_ERASURE_CHUNK: Rep = Rep::new(-10, "An erasure chunk from unknown candidate");
/// A peer sent us an erasure chunk that does not match candidate's erasure root.
pub const ERASURE_CHUNK_WRONG_ROOT: Rep = Rep::new(-100, "Chunk doesn't match encoding root");
asynchronous rob
committed
}
#[derive(Encode, Decode, Clone, PartialEq)]
pub enum GossipMessage {
asynchronous rob
committed
/// A packet sent to a neighbor but not relayed.
#[codec(index = "1")]
Neighbor(VersionedNeighborPacket),
/// An attestation-statement about the candidate.
/// Non-candidate statements should only be sent to peers who are aware of the candidate.
#[codec(index = "2")]
Statement(GossipStatement),
// TODO: https://github.com/paritytech/polkadot/issues/253
/// A packet containing one of the erasure-coding chunks of one candidate.
ErasureChunk(ErasureChunkMessage),
asynchronous rob
committed
}
impl From<NeighborPacket> for GossipMessage {
fn from(packet: NeighborPacket) -> Self {
GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))
}
}
impl From<GossipStatement> for GossipMessage {
fn from(stmt: GossipStatement) -> Self {
GossipMessage::Statement(stmt)
}
}
asynchronous rob
committed
/// A gossip message containing a statement.
#[derive(Encode, Decode, Clone, PartialEq)]
pub struct GossipStatement {
/// The block hash of the relay chain being referred to. In context, this should
/// be a leaf.
pub relay_chain_leaf: Hash,
/// The signed statement being gossipped.
pub signed_statement: SignedStatement,
}
impl GossipStatement {
/// Create a new instance.
pub fn new(relay_chain_leaf: Hash, signed_statement: SignedStatement) -> Self {
signed_statement,
}
}
asynchronous rob
committed
}
/// A gossip message containing one erasure chunk of a candidate block.
/// For each chunk of block erasure encoding one of this messages is constructed.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct ErasureChunkMessage {
/// The chunk itself.
pub chunk: PrimitiveChunk,
/// The hash of the candidate receipt of the block this chunk belongs to.
pub candidate_hash: Hash,
}
impl From<ErasureChunkMessage> for GossipMessage {
fn from(chk: ErasureChunkMessage) -> Self {
GossipMessage::ErasureChunk(chk)
}
}
/// A packet of messages from one parachain to another.
///
/// These are all the messages posted from one parachain to another during the
/// execution of a single parachain block. Since this parachain block may have been
/// included in many forks of the relay chain, there is no relay-chain leaf parameter.
#[derive(Encode, Decode, Clone, PartialEq)]
pub struct GossipParachainMessages {
/// The root of the message queue.
pub queue_root: Hash,
}
asynchronous rob
committed
/// A versioned neighbor message.
#[derive(Encode, Decode, Clone, PartialEq)]
asynchronous rob
committed
pub enum VersionedNeighborPacket {
#[codec(index = "1")]
V1(NeighborPacket),
}
/// Contains information on which chain heads the peer is
/// accepting messages for.
#[derive(Encode, Decode, Clone, PartialEq)]
asynchronous rob
committed
pub struct NeighborPacket {
chain_heads: Vec<Hash>,
}
/// whether a block is known.
#[derive(Clone, Copy, PartialEq)]
pub enum Known {
/// The block is a known leaf.
Leaf,
/// The block is known to be old.
Old,
/// The block is known to be bad.
Bad,
}
/// Context to the underlying polkadot chain.
pub trait ChainContext: Send + Sync {
/// Provide a closure which is invoked for every unrouted queue hash at a given leaf.
fn leaf_unrouted_roots(
&self,
leaf: &Hash,
with_queue_root: &mut dyn FnMut(&Hash),
) -> Result<(), ClientError>;
/// whether a block is known. If it's not, returns `None`.
fn is_known(&self, block_hash: &Hash) -> Option<Known>;
}
impl<F, P> ChainContext for (F, P) where
F: Fn(&Hash) -> Option<Known> + Send + Sync,
P: Send + Sync + std::ops::Deref,
P::Target: ProvideRuntimeApi<Block>,
<P::Target as ProvideRuntimeApi<Block>>::Api: ParachainHost<Block, Error = ClientError>,
fn is_known(&self, block_hash: &Hash) -> Option<Known> {
(self.0)(block_hash)
}
fn leaf_unrouted_roots(
&self,
_leaf: &Hash,
_with_queue_root: &mut dyn FnMut(&Hash),
) -> Result<(), ClientError> {
Ok(())
/// Compute the gossip topic for attestations on the given parent hash.
pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
let mut v = parent_hash.as_ref().to_vec();
v.extend(b"attestations");
BlakeTwo256::hash(&v[..])
}
/// Register a gossip validator on the network service.
// NOTE: since RegisteredMessageValidator is meant to be a type-safe proof
// that we've actually done the registration, this should be the only way
// to construct it outside of tests.
pub fn register_validator<C: ChainContext + 'static>(
service: Arc<NetworkService<Block, Hash>>,
executor: &impl futures::task::Spawn,
) -> RegisteredMessageValidator
asynchronous rob
committed
let s = service.clone();
let report_handle = Box::new(move |peer: &PeerId, cost_benefit: ReputationChange| {
if cost_benefit.value != 0 {
s.report_peer(peer.clone(), cost_benefit);
}
asynchronous rob
committed
});
let validator = Arc::new(MessageValidator {
asynchronous rob
committed
report_handle,
inner: RwLock::new(Inner {
peers: HashMap::new(),
attestation_view: Default::default(),
asynchronous rob
committed
})
});
let gossip_side = validator.clone();
let gossip_engine = Arc::new(Mutex::new(sc_network_gossip::GossipEngine::new(
service.clone(),
POLKADOT_ENGINE_ID,
// Ideally this would not be spawned as an orphaned task, but polled by
// `RegisteredMessageValidator` which in turn would be polled by a `ValidationNetwork`.
{
let gossip_engine = gossip_engine.clone();
let fut = futures::future::poll_fn(move |cx| {
gossip_engine.lock().poll_unpin(cx)
});
let spawn_res = executor.spawn_obj(futures::task::FutureObj::from(Box::new(fut)));
// Note: we consider the chances of an error to spawn a background task almost null.
if spawn_res.is_err() {
log::error!(target: "polkadot-gossip", "Failed to spawn background task");
}
RegisteredMessageValidator {
inner: validator as _,
service: Some(service),
gossip_engine: Some(gossip_engine),
}
#[derive(PartialEq)]
enum NewLeafAction {
// (who, message)
TargetedMessage(PeerId, GossipMessage),
}
/// Actions to take after noting a new block-DAG leaf.
///
/// This should be consumed by passing a consensus-gossip handle to `perform`.
#[must_use = "New chain-head gossip actions must be performed"]
pub struct NewLeafActions {
actions: Vec<NewLeafAction>,
}
impl NewLeafActions {
#[cfg(test)]
pub fn new() -> Self {
NewLeafActions { actions: Vec::new() }
}
/// Perform the queued actions, feeding into gossip.
pub fn perform(
self,
asynchronous rob
committed
gossip: &dyn crate::legacy::GossipService,
) {
for action in self.actions {
match action {
NewLeafAction::TargetedMessage(who, message)
=> gossip.send_message(who, message),
/// A registered message validator.
///
/// Create this using `register_validator`.
#[derive(Clone)]
pub struct RegisteredMessageValidator {
inner: Arc<MessageValidator<dyn ChainContext>>,
// Note: this is always `Some` in real code and `None` in tests.
service: Option<Arc<NetworkService<Block, Hash>>>,
// Note: this is always `Some` in real code and `None` in tests.
gossip_engine: Option<Arc<Mutex<sc_network_gossip::GossipEngine<Block>>>>,
impl RegisteredMessageValidator {
/// Register an availabilty store the gossip service can query.
pub(crate) fn register_availability_store(&self, availability_store: av_store::Store) {
self.inner.inner.write().availability_store = Some(availability_store);
}
/// Note that we perceive a new leaf of the block-DAG. We will notify our neighbors that
/// we now accept parachain candidate attestations and incoming message queues
/// relevant to this leaf.
pub(crate) fn new_local_leaf(
asynchronous rob
committed
&self,
validation: MessageValidationData,
) -> NewLeafActions {
// add an entry in attestation_view
// prune any entries from attestation_view which are no longer leaves
asynchronous rob
committed
let mut inner = self.inner.inner.write();
inner.attestation_view.new_local_leaf(validation);
asynchronous rob
committed
let mut actions = Vec::new();
{
let &mut Inner {
ref chain,
ref mut attestation_view,
..
} = &mut *inner;
attestation_view.prune_old_leaves(|hash| match chain.is_known(hash) {
asynchronous rob
committed
Some(Known::Leaf) => true,
_ => false,
});
}
asynchronous rob
committed
// send neighbor packets to peers
inner.multicast_neighbor_packet(
|who, message| actions.push(NewLeafAction::TargetedMessage(who.clone(), message))
);
NewLeafActions { actions }
asynchronous rob
committed
pub(crate) fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let topic_stream = if let Some(gossip_engine) = self.gossip_engine.as_ref() {
} else {
log::error!("Called gossip_messages_for on a test engine");
futures::channel::mpsc::unbounded().1
};
GossipMessageStream::new(topic_stream.boxed())
}
asynchronous rob
committed
pub(crate) fn gossip_message(&self, topic: Hash, message: GossipMessage) {
if let Some(gossip_engine) = self.gossip_engine.as_ref() {
topic,
message.encode(),
false,
);
} else {
log::error!("Called gossip_message on a test engine");
}
}
asynchronous rob
committed
pub(crate) fn send_message(&self, who: PeerId, message: GossipMessage) {
if let Some(gossip_engine) = self.gossip_engine.as_ref() {
gossip_engine.lock().send_message(vec![who], message.encode());
} else {
log::error!("Called send_message on a test engine");
}
}
asynchronous rob
committed
}
impl GossipService for RegisteredMessageValidator {
asynchronous rob
committed
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
RegisteredMessageValidator::gossip_messages_for(self, topic)
}
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
RegisteredMessageValidator::gossip_message(self, topic, message)
}
fn send_message(&self, who: PeerId, message: GossipMessage) {
RegisteredMessageValidator::send_message(self, who, message)
}
}
/// The data needed for validating gossip messages.
asynchronous rob
committed
#[derive(Default)]
pub(crate) struct MessageValidationData {
/// The authorities' parachain validation keys at a block.
pub(crate) authorities: Vec<ValidatorId>,
/// The signing context.
pub(crate) signing_context: SigningContext,
}
impl MessageValidationData {
fn check_statement(&self, statement: &SignedStatement) -> Result<(), ()> {
let sender = match self.authorities.get(statement.sender as usize) {
Some(val) => val,
asynchronous rob
committed
None => return Err(()),
asynchronous rob
committed
let good = self.authorities.contains(&sender) &&
::polkadot_validation::check_statement(
&statement.statement,
&statement.signature,
sender.clone(),
asynchronous rob
committed
);
if good {
Ok(())
} else {
Err(())
}
}
}
#[derive(Default)]
struct PeerData {
asynchronous rob
committed
}
asynchronous rob
committed
peers: HashMap<PeerId, PeerData>,
attestation_view: AttestationView,
availability_store: Option<av_store::Store>,
asynchronous rob
committed
}
impl<C: ?Sized + ChainContext> Inner<C> {
asynchronous rob
committed
fn validate_neighbor_packet(&mut self, sender: &PeerId, packet: NeighborPacket)
-> (GossipValidationResult<Hash>, ReputationChange, Vec<Hash>)
asynchronous rob
committed
{
let chain_heads = packet.chain_heads;
if chain_heads.len() > MAX_CHAIN_HEADS {
(GossipValidationResult::Discard, cost::BAD_NEIGHBOR_PACKET, Vec::new())
} else {
let chain_heads: LeavesVec = chain_heads.into_iter().collect();
let new_topics = if let Some(ref mut peer) = self.peers.get_mut(sender) {
let new_leaves = peer.attestation.update_leaves(&chain_heads);
let new_attestation_topics = new_leaves.iter().cloned().map(attestation_topic);
} else {
Vec::new()
};
(GossipValidationResult::Discard, cost::NONE, new_topics)
asynchronous rob
committed
}
}
fn validate_erasure_chunk_packet(&mut self, msg: ErasureChunkMessage)
-> (GossipValidationResult<Hash>, ReputationChange)
{
if let Some(store) = &self.availability_store {
if let Some(receipt) = store.get_candidate(&msg.candidate_hash) {
let chunk_hash = erasure::branch_hash(
&receipt.commitments.erasure_root,
&msg.chunk.proof,
msg.chunk.index as usize
);
if chunk_hash != Ok(BlakeTwo256::hash(&msg.chunk.chunk)) {
(
GossipValidationResult::Discard,
cost::ERASURE_CHUNK_WRONG_ROOT
)
} else {
if let Some(awaited_chunks) = store.awaited_chunks() {
let frontier_entry = av_store::AwaitedFrontierEntry {
candidate_hash: msg.candidate_hash,
relay_parent: receipt.relay_parent,
validator_index: msg.chunk.index,
};
if awaited_chunks.contains(&frontier_entry) {
let topic = crate::erasure_coding_topic(
&msg.candidate_hash
);
return (
GossipValidationResult::ProcessAndKeep(topic),
benefit::NEW_ERASURE_CHUNK,
);
}
}
(GossipValidationResult::Discard, cost::NONE)
}
} else {
(GossipValidationResult::Discard, cost::ORPHANED_ERASURE_CHUNK)
}
} else {
(GossipValidationResult::Discard, cost::NONE)
}
}
fn multicast_neighbor_packet<F: FnMut(&PeerId, GossipMessage)>(
asynchronous rob
committed
&self,
mut send_neighbor_packet: F,
) {
let neighbor_packet = GossipMessage::from(NeighborPacket {
chain_heads: self.attestation_view.neighbor_info().collect(),
asynchronous rob
committed
for peer in self.peers.keys() {
send_neighbor_packet(peer, neighbor_packet.clone())
asynchronous rob
committed
}
}
}
/// An unregistered message validator. Register this with `register_validator`.
pub struct MessageValidator<C: ?Sized> {
report_handle: Box<dyn Fn(&PeerId, ReputationChange) + Send + Sync>,
asynchronous rob
committed
}
impl<C: ChainContext + ?Sized> MessageValidator<C> {
asynchronous rob
committed
#[cfg(test)]
fn new_test(
report_handle: Box<dyn Fn(&PeerId, ReputationChange) + Send + Sync>,
asynchronous rob
committed
MessageValidator {
report_handle,
inner: RwLock::new(Inner {
peers: HashMap::new(),
attestation_view: Default::default(),
asynchronous rob
committed
}
}
fn report(&self, who: &PeerId, cost_benefit: ReputationChange) {
asynchronous rob
committed
(self.report_handle)(who, cost_benefit)
}
impl<C: ChainContext + ?Sized> sc_network_gossip::Validator<Block> for MessageValidator<C> {
fn new_peer(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId, _roles: Roles) {
asynchronous rob
committed
let mut inner = self.inner.write();
inner.peers.insert(who.clone(), PeerData::default());
asynchronous rob
committed
}
fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId) {
asynchronous rob
committed
let mut inner = self.inner.write();
inner.peers.remove(who);
}
fn validate(&self, context: &mut dyn ValidatorContext<Block>, sender: &PeerId, data: &[u8])
-> GossipValidationResult<Hash>
{
let mut decode_data = data;
let (res, cost_benefit) = match GossipMessage::decode(&mut decode_data) {
Err(_) => (GossipValidationResult::Discard, cost::MALFORMED_MESSAGE),
Ok(GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))) => {
asynchronous rob
committed
let (res, cb, topics) = self.inner.write().validate_neighbor_packet(sender, packet);
for new_topic in topics {
context.send_topic(sender, new_topic, false);
}
(res, cb)
}
Ok(GossipMessage::Statement(statement)) => {
let (res, cb) = {
let mut inner = self.inner.write();
let inner = &mut *inner;
inner.attestation_view.validate_statement_signature(statement, &inner.chain)
};
if let GossipValidationResult::ProcessAndKeep(ref topic) = res {
context.broadcast_message(topic.clone(), data.to_vec(), false);
}
(res, cb)
}
Ok(GossipMessage::ErasureChunk(chunk)) => {
self.inner.write().validate_erasure_chunk_packet(chunk)
}
asynchronous rob
committed
};
self.report(sender, cost_benefit);
res
}
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Hash, &[u8]) -> bool + 'a> {
asynchronous rob
committed
let inner = self.inner.read();
Box::new(move |topic, _data| {
// check that messages from this topic are considered live by one of our protocols.
// everything else is expired
let live = inner.attestation_view.is_topic_live(&topic);
asynchronous rob
committed
})
}
fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &Hash, &[u8]) -> bool + 'a> {
asynchronous rob
committed
let mut inner = self.inner.write();
Box::new(move |who, intent, topic, data| {
let &mut Inner {
ref mut peers,
ref mut attestation_view,
..
} = &mut *inner;
asynchronous rob
committed
match intent {
MessageIntent::PeriodicRebroadcast => return false,
_ => {},
}
let attestation_head = attestation_view.topic_block(topic).map(|x| x.clone());
let peer = peers.get_mut(who);
asynchronous rob
committed
match GossipMessage::decode(&mut &data[..]) {
Ok(GossipMessage::Statement(ref statement)) => {
// to allow statements, we need peer knowledge.
let peer_knowledge = peer.and_then(move |p| attestation_head.map(|r| (p, r)))
.and_then(|(p, r)| p.attestation.knowledge_at_mut(&r).map(|k| (k, r)));
peer_knowledge.map_or(false, |(knowledge, attestation_head)| {
attestation_view.statement_allowed(
statement,
&attestation_head,
knowledge,
)
})
}
_ => false,
asynchronous rob
committed
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use sc_network_gossip::Validator as ValidatorT;
asynchronous rob
committed
use std::sync::mpsc;
use parking_lot::Mutex;
use polkadot_primitives::parachain::AbridgedCandidateReceipt;
use sp_core::sr25519::Signature as Sr25519Signature;
use polkadot_validation::GenericStatement;
asynchronous rob
committed
#[derive(PartialEq, Clone, Debug)]
enum ContextEvent {
BroadcastTopic(Hash, bool),
BroadcastMessage(Hash, Vec<u8>, bool),
SendMessage(PeerId, Vec<u8>),
SendTopic(PeerId, Hash, bool),
}
#[derive(Default)]
struct MockValidatorContext {
events: Vec<ContextEvent>,
}
impl MockValidatorContext {
fn clear(&mut self) {
self.events.clear()
}
}
impl sc_network_gossip::ValidatorContext<Block> for MockValidatorContext {
asynchronous rob
committed
fn broadcast_topic(&mut self, topic: Hash, force: bool) {
self.events.push(ContextEvent::BroadcastTopic(topic, force));
}
fn broadcast_message(&mut self, topic: Hash, message: Vec<u8>, force: bool) {
self.events.push(ContextEvent::BroadcastMessage(topic, message, force));
}
fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
self.events.push(ContextEvent::SendMessage(who.clone(), message));
}
fn send_topic(&mut self, who: &PeerId, topic: Hash, force: bool) {
self.events.push(ContextEvent::SendTopic(who.clone(), topic, force));
}
}
#[derive(Default)]
struct TestChainContext {
known_map: HashMap<Hash, Known>,
ingress_roots: HashMap<Hash, Vec<Hash>>,
}
impl ChainContext for TestChainContext {
fn is_known(&self, block_hash: &Hash) -> Option<Known> {
self.known_map.get(block_hash).map(|x| x.clone())
}
fn leaf_unrouted_roots(&self, leaf: &Hash, with_queue_root: &mut dyn FnMut(&Hash))
-> Result<(), sp_blockchain::Error>
{
for root in self.ingress_roots.get(leaf).into_iter().flat_map(|roots| roots) {
with_queue_root(root)
}
Ok(())
}
}
asynchronous rob
committed
#[test]
fn message_allowed() {
let (tx, _rx) = mpsc::channel();
let tx = Mutex::new(tx);
let report_handle = Box::new(move |peer: &PeerId, cb: ReputationChange| tx.lock().send((peer.clone(), cb)).unwrap());
asynchronous rob
committed
let validator = MessageValidator::new_test(
asynchronous rob
committed
report_handle,
);
let peer_a = PeerId::random();
let mut validator_context = MockValidatorContext::default();
validator.new_peer(&mut validator_context, &peer_a, Roles::FULL);
assert!(validator_context.events.is_empty());
validator_context.clear();
let hash_a = [1u8; 32].into();
let hash_b = [2u8; 32].into();
let hash_c = [3u8; 32].into();
let message = GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a, hash_b],
}).encode();
asynchronous rob
committed
let res = validator.validate(
&mut validator_context,
&peer_a,
&message[..],
);
match res {
GossipValidationResult::Discard => {},
_ => panic!("wrong result"),
}
assert_eq!(
validator_context.events,
vec![
ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_b), false),
],
);
validator_context.clear();
let candidate_receipt = AbridgedCandidateReceipt::default();
asynchronous rob
committed
let statement = GossipMessage::Statement(GossipStatement {
asynchronous rob
committed
signed_statement: SignedStatement {
statement: GenericStatement::Candidate(candidate_receipt),
signature: Sr25519Signature([255u8; 64]).into(),
asynchronous rob
committed
sender: 1,
asynchronous rob
committed
});
let encoded = statement.encode();
let topic_a = attestation_topic(hash_a);
let topic_b = attestation_topic(hash_b);
let topic_c = attestation_topic(hash_c);
// topic_a is in all 3 views -> succeed
let mut validation_data = MessageValidationData::default();
validation_data.signing_context.parent_hash = hash_a;
validator.inner.write().attestation_view.new_local_leaf(validation_data);
asynchronous rob
committed
// topic_b is in the neighbor's view but not ours -> fail
// topic_c is not in either -> fail
{
let mut message_allowed = validator.message_allowed();
let intent = MessageIntent::Broadcast;
assert!(message_allowed(&peer_a, intent, &topic_a, &encoded));
assert!(!message_allowed(&peer_a, intent, &topic_b, &encoded));
assert!(!message_allowed(&peer_a, intent, &topic_c, &encoded));
asynchronous rob
committed
}
}
#[test]
fn too_many_chain_heads_is_report() {
let (tx, rx) = mpsc::channel();
let tx = Mutex::new(tx);
let report_handle = Box::new(move |peer: &PeerId, cb: ReputationChange| tx.lock().send((peer.clone(), cb)).unwrap());
asynchronous rob
committed
let validator = MessageValidator::new_test(
asynchronous rob
committed
report_handle,
);
let peer_a = PeerId::random();
let mut validator_context = MockValidatorContext::default();
validator.new_peer(&mut validator_context, &peer_a, Roles::FULL);
assert!(validator_context.events.is_empty());
validator_context.clear();
let chain_heads = (0..MAX_CHAIN_HEADS+1).map(|i| [i as u8; 32].into()).collect();
let message = GossipMessage::from(NeighborPacket {
chain_heads,
}).encode();
asynchronous rob
committed
let res = validator.validate(
&mut validator_context,
&peer_a,
&message[..],
);
match res {
GossipValidationResult::Discard => {},
_ => panic!("wrong result"),
}
assert_eq!(
validator_context.events,
Vec::new(),
);
drop(validator);
assert_eq!(rx.iter().collect::<Vec<_>>(), vec![(peer_a, cost::BAD_NEIGHBOR_PACKET)]);
}
#[test]
fn statement_only_sent_when_candidate_known() {
let (tx, _rx) = mpsc::channel();
let tx = Mutex::new(tx);
let report_handle = Box::new(move |peer: &PeerId, cb: ReputationChange| tx.lock().send((peer.clone(), cb)).unwrap());
asynchronous rob
committed
let validator = MessageValidator::new_test(
asynchronous rob
committed
report_handle,
);
let peer_a = PeerId::random();
let mut validator_context = MockValidatorContext::default();
validator.new_peer(&mut validator_context, &peer_a, Roles::FULL);
assert!(validator_context.events.is_empty());
validator_context.clear();
let hash_a = [1u8; 32].into();
let hash_b = [2u8; 32].into();
let message = GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a, hash_b],
}).encode();
asynchronous rob
committed
let res = validator.validate(
&mut validator_context,
&peer_a,
&message[..],
);
match res {
GossipValidationResult::Discard => {},
_ => panic!("wrong result"),
}
assert_eq!(
validator_context.events,
vec![
ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_b), false),
],
);
validator_context.clear();
let topic_a = attestation_topic(hash_a);
let c_hash = [99u8; 32].into();
let statement = GossipMessage::Statement(GossipStatement {
asynchronous rob
committed
signed_statement: SignedStatement {
statement: GenericStatement::Valid(c_hash),
signature: Sr25519Signature([255u8; 64]).into(),
asynchronous rob
committed
sender: 1,
}
});
let encoded = statement.encode();
let mut validation_data = MessageValidationData::default();
validation_data.signing_context.parent_hash = hash_a;
validator.inner.write().attestation_view.new_local_leaf(validation_data);
asynchronous rob
committed
{
let mut message_allowed = validator.message_allowed();
assert!(!message_allowed(&peer_a, MessageIntent::Broadcast, &topic_a, &encoded[..]));
asynchronous rob
committed
}
validator
.inner
.write()
.peers
.get_mut(&peer_a)
.unwrap()
.attestation
.note_aware_under_leaf(&hash_a, c_hash);
asynchronous rob
committed
{
let mut message_allowed = validator.message_allowed();
assert!(message_allowed(&peer_a, MessageIntent::Broadcast, &topic_a, &encoded[..]));