Newer
Older
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//! Gossip messages and the message validator.
//!
//! At the moment, this module houses 2 gossip protocols central to Polkadot.
//!
//! The first is the attestation-gossip system, which aims to circulate parachain
//! candidate attestations by validators at leaves of the block-DAG.
//!
//! The second is the inter-chain message queue routing gossip, which aims to
//! circulate message queues between parachains, which remain un-routed as of
//! recent leaves.
//!
//! These gossip systems do not have any form of sybil-resistance in terms
//! of the nodes which can participate. It could be imposed e.g. by limiting only to
//! validators, but this would prevent message queues from getting into the hands
//! of collators and of attestations from getting into the hands of fishermen.
//! As such, we take certain precautions which allow arbitrary full nodes to
//! join the gossip graph, as well as validators (who are likely to be well-connected
//! amongst themselves).
//!
//! The first is the notion of a neighbor packet. This is a packet sent between
//! neighbors of the gossip graph to inform each other of their current protocol
//! state. As of this writing, for both attestation and message-routing gossip,
//! the only necessary information here is a (length-limited) set of perceived
//! leaves of the block-DAG.
//!
//! These leaves can be used to derive what information a node is willing to accept
//! There is typically an unbounded amount of possible "future" information relative to
//! any protocol state. For example, attestations or unrouted message queues from millions
//! of blocks after a known protocol state. The neighbor packet is meant to avoid being
//! spammed by illegitimate future information, while informing neighbors of when
//! previously-future and now current gossip messages would be accepted.
//!
//! Peers who send information which was not allowed under a recent neighbor packet
//! will be noted as non-beneficial to Substrate's peer-set management utility.
use sp_runtime::{generic::BlockId, traits::ProvideRuntimeApi};
use sc_network::{config::Roles, PeerId};
use sc_network::consensus_gossip::{
self as network_gossip, ValidationResult as GossipValidationResult,
asynchronous rob
committed
ValidatorContext, MessageIntent, ConsensusMessage,
use polkadot_validation::SignedStatement;
use polkadot_primitives::{Block, Hash};
use polkadot_primitives::parachain::{ParachainHost, ValidatorId, Message as ParachainMessage};
use crate::router::attestation_topic;
use attestation::{View as AttestationView, PeerData as AttestationPeerData};
use message_routing::{View as MessageRoutingView};
mod attestation;
mod message_routing;
/// The engine ID of the polkadot attestation system.
pub const POLKADOT_ENGINE_ID: sp_runtime::ConsensusEngineId = *b"dot1";
asynchronous rob
committed
// arbitrary; in practice this should not be more than 2.
pub(crate) const MAX_CHAIN_HEADS: usize = 5;
/// Type alias for a bounded vector of leaves.
pub type LeavesVec = ArrayVec<[Hash; MAX_CHAIN_HEADS]>;
asynchronous rob
committed
mod benefit {
/// When a peer sends us a previously-unknown candidate statement.
pub const NEW_CANDIDATE: i32 = 100;
/// When a peer sends us a previously-unknown attestation.
pub const NEW_ATTESTATION: i32 = 50;
/// When a peer sends us a previously-unknown message packet.
pub const NEW_ICMP_MESSAGES: i32 = 50;
asynchronous rob
committed
}
mod cost {
/// A peer sent us an attestation and we don't know the candidate.
pub const ATTESTATION_NO_CANDIDATE: i32 = -100;
/// A peer sent us a statement we consider in the future.
pub const FUTURE_MESSAGE: i32 = -100;
/// A peer sent us a statement from the past.
pub const PAST_MESSAGE: i32 = -30;
/// A peer sent us a malformed message.
pub const MALFORMED_MESSAGE: i32 = -500;
/// A peer sent us a wrongly signed message.
pub const BAD_SIGNATURE: i32 = -500;
/// A peer sent us a bad neighbor packet.
pub const BAD_NEIGHBOR_PACKET: i32 = -300;
/// A peer sent us an ICMP queue we haven't advertised a need for.
pub const UNNEEDED_ICMP_MESSAGES: i32 = -100;
/// A peer sent us an ICMP queue with a bad root.
pub fn icmp_messages_root_mismatch(n_messages: usize) -> i32 {
const PER_MESSAGE: i32 = -150;
(0..n_messages).map(|_| PER_MESSAGE).sum()
}
asynchronous rob
committed
}
/// A gossip message.
#[derive(Encode, Decode, Clone)]
pub enum GossipMessage {
asynchronous rob
committed
/// A packet sent to a neighbor but not relayed.
#[codec(index = "1")]
Neighbor(VersionedNeighborPacket),
/// An attestation-statement about the candidate.
/// Non-candidate statements should only be sent to peers who are aware of the candidate.
#[codec(index = "2")]
Statement(GossipStatement),
/// A packet of messages from one parachain to another.
#[codec(index = "3")]
ParachainMessages(GossipParachainMessages),
asynchronous rob
committed
// TODO: https://github.com/paritytech/polkadot/issues/253
// erasure-coded chunks.
}
impl GossipMessage {
fn to_consensus_message(&self) -> ConsensusMessage {
ConsensusMessage {
data: self.encode(),
engine_id: POLKADOT_ENGINE_ID,
}
}
}
impl From<NeighborPacket> for GossipMessage {
fn from(packet: NeighborPacket) -> Self {
GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))
}
}
impl From<GossipStatement> for GossipMessage {
fn from(stmt: GossipStatement) -> Self {
GossipMessage::Statement(stmt)
}
}
impl From<GossipParachainMessages> for GossipMessage {
fn from(messages: GossipParachainMessages) -> Self {
GossipMessage::ParachainMessages(messages)
}
}
asynchronous rob
committed
/// A gossip message containing a statement.
#[derive(Encode, Decode, Clone)]
pub struct GossipStatement {
/// The block hash of the relay chain being referred to. In context, this should
/// be a leaf.
pub relay_chain_leaf: Hash,
/// The signed statement being gossipped.
pub signed_statement: SignedStatement,
}
impl GossipStatement {
/// Create a new instance.
pub fn new(relay_chain_leaf: Hash, signed_statement: SignedStatement) -> Self {
signed_statement,
}
}
asynchronous rob
committed
}
/// A packet of messages from one parachain to another.
///
/// These are all the messages posted from one parachain to another during the
/// execution of a single parachain block. Since this parachain block may have been
/// included in many forks of the relay chain, there is no relay-chain leaf parameter.
#[derive(Encode, Decode, Clone)]
pub struct GossipParachainMessages {
/// The root of the message queue.
pub queue_root: Hash,
/// The messages themselves.
pub messages: Vec<ParachainMessage>,
}
impl GossipParachainMessages {
// confirms that the queue-root in the struct correctly matches
// the messages.
fn queue_root_is_correct(&self) -> bool {
let root = polkadot_validation::message_queue_root(
self.messages.iter().map(|m| &m.0)
);
root == self.queue_root
}
}
asynchronous rob
committed
/// A versioned neighbor message.
#[derive(Encode, Decode, Clone)]
pub enum VersionedNeighborPacket {
#[codec(index = "1")]
V1(NeighborPacket),
}
/// Contains information on which chain heads the peer is
/// accepting messages for.
#[derive(Encode, Decode, Clone)]
pub struct NeighborPacket {
chain_heads: Vec<Hash>,
}
/// whether a block is known.
asynchronous rob
committed
#[derive(Clone, Copy)]
pub enum Known {
/// The block is a known leaf.
Leaf,
/// The block is known to be old.
Old,
/// The block is known to be bad.
Bad,
}
/// Context to the underlying polkadot chain.
pub trait ChainContext: Send + Sync {
/// Provide a closure which is invoked for every unrouted queue hash at a given leaf.
fn leaf_unrouted_roots(
&self,
leaf: &Hash,
with_queue_root: &mut dyn FnMut(&Hash),
) -> Result<(), ClientError>;
/// whether a block is known. If it's not, returns `None`.
fn is_known(&self, block_hash: &Hash) -> Option<Known>;
}
impl<F, P> ChainContext for (F, P) where
F: Fn(&Hash) -> Option<Known> + Send + Sync,
P: Send + Sync + std::ops::Deref,
P::Target: ProvideRuntimeApi,
<P::Target as ProvideRuntimeApi>::Api: ParachainHost<Block, Error = ClientError>,
fn is_known(&self, block_hash: &Hash) -> Option<Known> {
(self.0)(block_hash)
}
fn leaf_unrouted_roots(
&self,
&leaf: &Hash,
with_queue_root: &mut dyn FnMut(&Hash),
) -> Result<(), ClientError> {
let api = self.1.runtime_api();
let leaf_id = BlockId::Hash(leaf);
let active_parachains = api.active_parachains(&leaf_id)?;
// TODO: https://github.com/paritytech/polkadot/issues/467
for (para_id, _) in active_parachains {
if let Some(ingress) = api.ingress(&leaf_id, para_id, None)? {
for (_height, _from, queue_root) in ingress.iter() {
with_queue_root(queue_root);
}
}
}
Ok(())
}
}
/// Register a gossip validator on the network service.
// NOTE: since RegisteredMessageValidator is meant to be a type-safe proof
// that we've actually done the registration, this should be the only way
// to construct it outside of tests.
pub fn register_validator<C: ChainContext + 'static>(
service: Arc<PolkadotNetworkService>,
chain: C,
) -> RegisteredMessageValidator
{
asynchronous rob
committed
let s = service.clone();
let report_handle = Box::new(move |peer: &PeerId, cost_benefit| {
s.report_peer(peer.clone(), cost_benefit);
});
let validator = Arc::new(MessageValidator {
asynchronous rob
committed
report_handle,
inner: RwLock::new(Inner {
peers: HashMap::new(),
attestation_view: Default::default(),
message_routing_view: Default::default(),
chain,
asynchronous rob
committed
})
});
let gossip_side = validator.clone();
service.with_gossip(|gossip, ctx|
gossip.register_validator(ctx, POLKADOT_ENGINE_ID, gossip_side)
);
RegisteredMessageValidator { inner: validator as _ }
}
#[derive(PartialEq)]
enum NewLeafAction {
// (who, message)
TargetedMessage(PeerId, ConsensusMessage),
// (topic, message)
Multicast(Hash, ConsensusMessage),
}
/// Actions to take after noting a new block-DAG leaf.
///
/// This should be consumed by passing a consensus-gossip handle to `perform`.
#[must_use = "New chain-head gossip actions must be performed"]
pub struct NewLeafActions {
actions: Vec<NewLeafAction>,
}
impl NewLeafActions {
/// Perform the queued actions, feeding into gossip.
pub fn perform(
self,
gossip: &mut dyn crate::GossipService,
ctx: &mut dyn sc_network::Context<Block>,
) {
for action in self.actions {
match action {
NewLeafAction::TargetedMessage(who, message)
=> gossip.send_message(ctx, &who, message),
NewLeafAction::Multicast(topic, message)
=> gossip.multicast(ctx, &topic, message),
}
}
}
}
/// A registered message validator.
///
/// Create this using `register_validator`.
#[derive(Clone)]
pub struct RegisteredMessageValidator {
inner: Arc<MessageValidator<dyn ChainContext>>,
}
impl RegisteredMessageValidator {
#[cfg(test)]
pub(crate) fn new_test<C: ChainContext + 'static>(
chain: C,
report_handle: Box<dyn Fn(&PeerId, i32) + Send + Sync>,
asynchronous rob
committed
) -> Self {
let validator = Arc::new(MessageValidator::new_test(chain, report_handle));
RegisteredMessageValidator { inner: validator as _ }
}
/// Note that we perceive a new leaf of the block-DAG. We will notify our neighbors that
/// we now accept parachain candidate attestations and incoming message queues
/// relevant to this leaf.
pub(crate) fn new_local_leaf(
asynchronous rob
committed
&self,
asynchronous rob
committed
validation: MessageValidationData,
lookup_queue_by_root: impl Fn(&Hash) -> Option<Vec<ParachainMessage>>,
) -> NewLeafActions {
// add an entry in attestation_view
// prune any entries from attestation_view which are no longer leaves
asynchronous rob
committed
let mut inner = self.inner.inner.write();
inner.attestation_view.new_local_leaf(relay_chain_leaf, validation);
asynchronous rob
committed
let mut actions = Vec::new();
{
let &mut Inner {
ref chain,
ref mut attestation_view,
ref mut message_routing_view,
..
} = &mut *inner;
attestation_view.prune_old_leaves(|hash| match chain.is_known(hash) {
asynchronous rob
committed
Some(Known::Leaf) => true,
_ => false,
});
if let Err(e) = message_routing_view.update_leaves(chain, attestation_view.neighbor_info()) {
warn!("Unable to fully update leaf-state: {:?}", e);
}
asynchronous rob
committed
}
asynchronous rob
committed
// send neighbor packets to peers
inner.multicast_neighbor_packet(
|who, message| actions.push(NewLeafAction::TargetedMessage(who.clone(), message))
);
// feed any new unrouted queues into the propagation pool.
inner.message_routing_view.sweep_unknown_queues(|topic, queue_root|
match lookup_queue_by_root(queue_root) {
Some(messages) => {
let message = GossipMessage::from(GossipParachainMessages {
queue_root: *queue_root,
messages,
}).to_consensus_message();
actions.push(NewLeafAction::Multicast(*topic, message));
true
}
None => false,
}
);
NewLeafActions { actions }
/// The data needed for validating gossip messages.
asynchronous rob
committed
#[derive(Default)]
pub(crate) struct MessageValidationData {
/// The authorities' parachain validation keys at a block.
pub(crate) authorities: Vec<ValidatorId>,
}
impl MessageValidationData {
// check a statement's signature.
fn check_statement(&self, relay_chain_leaf: &Hash, statement: &SignedStatement) -> Result<(), ()> {
let sender = match self.authorities.get(statement.sender as usize) {
Some(val) => val,
asynchronous rob
committed
None => return Err(()),
asynchronous rob
committed
let good = self.authorities.contains(&sender) &&
::polkadot_validation::check_statement(
&statement.statement,
&statement.signature,
sender.clone(),
asynchronous rob
committed
);
if good {
Ok(())
} else {
Err(())
}
}
}
#[derive(Default)]
struct PeerData {
asynchronous rob
committed
}
impl PeerData {
fn leaves(&self) -> impl Iterator<Item = &Hash> {
self.attestation.leaves()
asynchronous rob
committed
}
}
asynchronous rob
committed
peers: HashMap<PeerId, PeerData>,
attestation_view: AttestationView,
message_routing_view: MessageRoutingView,
chain: C,
asynchronous rob
committed
}
impl<C: ?Sized + ChainContext> Inner<C> {
asynchronous rob
committed
fn validate_neighbor_packet(&mut self, sender: &PeerId, packet: NeighborPacket)
-> (GossipValidationResult<Hash>, i32, Vec<Hash>)
{
let chain_heads = packet.chain_heads;
if chain_heads.len() > MAX_CHAIN_HEADS {
(GossipValidationResult::Discard, cost::BAD_NEIGHBOR_PACKET, Vec::new())
} else {
let chain_heads: LeavesVec = chain_heads.into_iter().collect();
let new_topics = if let Some(ref mut peer) = self.peers.get_mut(sender) {
let new_leaves = peer.attestation.update_leaves(&chain_heads);
let new_attestation_topics = new_leaves.iter().cloned().map(attestation_topic);
// find all topics which are from the intersection of our leaves with the peer's
// new leaves.
let new_message_routing_topics = self.message_routing_view.intersection_topics(&new_leaves);
new_attestation_topics.chain(new_message_routing_topics).collect()
} else {
Vec::new()
};
asynchronous rob
committed
(GossipValidationResult::Discard, 0, new_topics)
}
}
fn multicast_neighbor_packet<F: FnMut(&PeerId, ConsensusMessage)>(
&self,
mut send_neighbor_packet: F,
) {
let neighbor_packet = GossipMessage::from(NeighborPacket {
chain_heads: self.attestation_view.neighbor_info().collect(),
}).to_consensus_message();
asynchronous rob
committed
for peer in self.peers.keys() {
send_neighbor_packet(peer, neighbor_packet.clone())
asynchronous rob
committed
}
}
}
/// An unregistered message validator. Register this with `register_validator`.
pub struct MessageValidator<C: ?Sized> {
report_handle: Box<dyn Fn(&PeerId, i32) + Send + Sync>,
asynchronous rob
committed
}
impl<C: ChainContext + ?Sized> MessageValidator<C> {
asynchronous rob
committed
#[cfg(test)]
fn new_test(
report_handle: Box<dyn Fn(&PeerId, i32) + Send + Sync>,
asynchronous rob
committed
MessageValidator {
report_handle,
inner: RwLock::new(Inner {
peers: HashMap::new(),
attestation_view: Default::default(),
message_routing_view: Default::default(),
chain,
}),
asynchronous rob
committed
}
}
fn report(&self, who: &PeerId, cost_benefit: i32) {
(self.report_handle)(who, cost_benefit)
}
impl<C: ChainContext + ?Sized> network_gossip::Validator<Block> for MessageValidator<C> {
fn new_peer(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId, _roles: Roles) {
asynchronous rob
committed
let mut inner = self.inner.write();
inner.peers.insert(who.clone(), PeerData::default());
asynchronous rob
committed
}
fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId) {
asynchronous rob
committed
let mut inner = self.inner.write();
inner.peers.remove(who);
}
fn validate(&self, context: &mut dyn ValidatorContext<Block>, sender: &PeerId, data: &[u8])
-> GossipValidationResult<Hash>
{
let mut decode_data = data;
let (res, cost_benefit) = match GossipMessage::decode(&mut decode_data) {
Err(_) => (GossipValidationResult::Discard, cost::MALFORMED_MESSAGE),
Ok(GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))) => {
asynchronous rob
committed
let (res, cb, topics) = self.inner.write().validate_neighbor_packet(sender, packet);
for new_topic in topics {
context.send_topic(sender, new_topic, false);
}
(res, cb)
}
Ok(GossipMessage::Statement(statement)) => {
let (res, cb) = {
let mut inner = self.inner.write();
let inner = &mut *inner;
inner.attestation_view.validate_statement_signature(statement, &inner.chain)
};
if let GossipValidationResult::ProcessAndKeep(ref topic) = res {
context.broadcast_message(topic.clone(), data.to_vec(), false);
}
(res, cb)
}
Ok(GossipMessage::ParachainMessages(messages)) => {
let (res, cb) = {
let mut inner = self.inner.write();
let inner = &mut *inner;
inner.message_routing_view.validate_queue_and_note_known(&messages)
};
asynchronous rob
committed
if let GossipValidationResult::ProcessAndKeep(ref topic) = res {
context.broadcast_message(topic.clone(), data.to_vec(), false);
}
(res, cb)
}
};
self.report(sender, cost_benefit);
res
}
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Hash, &[u8]) -> bool + 'a> {
asynchronous rob
committed
let inner = self.inner.read();
Box::new(move |topic, _data| {
// check that messages from this topic are considered live by one of our protocols.
// everything else is expired
let live = inner.attestation_view.is_topic_live(&topic)
|| !inner.message_routing_view.is_topic_live(&topic);
!live // = expired
asynchronous rob
committed
})
}
fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &Hash, &[u8]) -> bool + 'a> {
asynchronous rob
committed
let mut inner = self.inner.write();
Box::new(move |who, intent, topic, data| {
let &mut Inner {
ref mut peers,
ref mut attestation_view,
ref mut message_routing_view,
..
} = &mut *inner;
asynchronous rob
committed
match intent {
MessageIntent::PeriodicRebroadcast => return false,
_ => {},
}
let attestation_head = attestation_view.topic_block(topic).map(|x| x.clone());
let peer = peers.get_mut(who);
asynchronous rob
committed
match GossipMessage::decode(&mut &data[..]) {
Ok(GossipMessage::Statement(ref statement)) => {
// to allow statements, we need peer knowledge.
let peer_knowledge = peer.and_then(move |p| attestation_head.map(|r| (p, r)))
.and_then(|(p, r)| p.attestation.knowledge_at_mut(&r).map(|k| (k, r)));
peer_knowledge.map_or(false, |(knowledge, attestation_head)| {
attestation_view.statement_allowed(
statement,
&attestation_head,
knowledge,
)
})
}
Ok(GossipMessage::ParachainMessages(_)) => match peer {
None => false,
Some(peer) => {
let their_leaves: LeavesVec = peer.leaves().cloned().collect();
message_routing_view.allowed_intersecting(&their_leaves, topic)
asynchronous rob
committed
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use sc_network::consensus_gossip::Validator as ValidatorT;
asynchronous rob
committed
use std::sync::mpsc;
use parking_lot::Mutex;
use polkadot_primitives::parachain::{CandidateReceipt, HeadData};
use sp_core::crypto::UncheckedInto;
use sp_core::sr25519::{Public as Sr25519Public, Signature as Sr25519Signature};
use polkadot_validation::GenericStatement;
use super::message_routing::queue_topic;
use crate::tests::TestChainContext;
asynchronous rob
committed
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
#[derive(PartialEq, Clone, Debug)]
enum ContextEvent {
BroadcastTopic(Hash, bool),
BroadcastMessage(Hash, Vec<u8>, bool),
SendMessage(PeerId, Vec<u8>),
SendTopic(PeerId, Hash, bool),
}
#[derive(Default)]
struct MockValidatorContext {
events: Vec<ContextEvent>,
}
impl MockValidatorContext {
fn clear(&mut self) {
self.events.clear()
}
}
impl network_gossip::ValidatorContext<Block> for MockValidatorContext {
fn broadcast_topic(&mut self, topic: Hash, force: bool) {
self.events.push(ContextEvent::BroadcastTopic(topic, force));
}
fn broadcast_message(&mut self, topic: Hash, message: Vec<u8>, force: bool) {
self.events.push(ContextEvent::BroadcastMessage(topic, message, force));
}
fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
self.events.push(ContextEvent::SendMessage(who.clone(), message));
}
fn send_topic(&mut self, who: &PeerId, topic: Hash, force: bool) {
self.events.push(ContextEvent::SendTopic(who.clone(), topic, force));
}
}
impl NewLeafActions {
fn has_message(&self, who: PeerId, message: ConsensusMessage) -> bool {
let x = NewLeafAction::TargetedMessage(who, message);
self.actions.iter().find(|&m| m == &x).is_some()
}
fn has_multicast(&self, topic: Hash, message: ConsensusMessage) -> bool {
let x = NewLeafAction::Multicast(topic, message);
self.actions.iter().find(|&m| m == &x).is_some()
}
}
fn validator_id(raw: [u8; 32]) -> ValidatorId {
Sr25519Public::from_raw(raw).into()
}
asynchronous rob
committed
#[test]
fn message_allowed() {
let (tx, _rx) = mpsc::channel();
let tx = Mutex::new(tx);
let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap());
let validator = MessageValidator::new_test(
asynchronous rob
committed
report_handle,
);
let peer_a = PeerId::random();
let mut validator_context = MockValidatorContext::default();
validator.new_peer(&mut validator_context, &peer_a, Roles::FULL);
assert!(validator_context.events.is_empty());
validator_context.clear();
let hash_a = [1u8; 32].into();
let hash_b = [2u8; 32].into();
let hash_c = [3u8; 32].into();
let message = GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a, hash_b],
}).encode();
asynchronous rob
committed
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
let res = validator.validate(
&mut validator_context,
&peer_a,
&message[..],
);
match res {
GossipValidationResult::Discard => {},
_ => panic!("wrong result"),
}
assert_eq!(
validator_context.events,
vec![
ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_b), false),
],
);
validator_context.clear();
let candidate_receipt = CandidateReceipt {
parachain_index: 5.into(),
collator: [255; 32].unchecked_into(),
head_data: HeadData(vec![9, 9, 9]),
signature: Default::default(),
egress_queue_roots: Vec::new(),
fees: 1_000_000,
block_data_hash: [20u8; 32].into(),
asynchronous rob
committed
};
let statement = GossipMessage::Statement(GossipStatement {
asynchronous rob
committed
signed_statement: SignedStatement {
statement: GenericStatement::Candidate(candidate_receipt),
signature: Sr25519Signature([255u8; 64]).into(),
asynchronous rob
committed
sender: 1,
asynchronous rob
committed
});
let encoded = statement.encode();
let topic_a = attestation_topic(hash_a);
let topic_b = attestation_topic(hash_b);
let topic_c = attestation_topic(hash_c);
// topic_a is in all 3 views -> succeed
validator.inner.write().attestation_view.new_local_leaf(hash_a, MessageValidationData::default());
asynchronous rob
committed
// topic_b is in the neighbor's view but not ours -> fail
// topic_c is not in either -> fail
{
let mut message_allowed = validator.message_allowed();
let intent = MessageIntent::Broadcast;
assert!(message_allowed(&peer_a, intent, &topic_a, &encoded));
assert!(!message_allowed(&peer_a, intent, &topic_b, &encoded));
assert!(!message_allowed(&peer_a, intent, &topic_c, &encoded));
asynchronous rob
committed
}
}
#[test]
fn too_many_chain_heads_is_report() {
let (tx, rx) = mpsc::channel();
let tx = Mutex::new(tx);
let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap());
let validator = MessageValidator::new_test(
asynchronous rob
committed
report_handle,
);
let peer_a = PeerId::random();
let mut validator_context = MockValidatorContext::default();
validator.new_peer(&mut validator_context, &peer_a, Roles::FULL);
assert!(validator_context.events.is_empty());
validator_context.clear();
let chain_heads = (0..MAX_CHAIN_HEADS+1).map(|i| [i as u8; 32].into()).collect();
let message = GossipMessage::from(NeighborPacket {
chain_heads,
}).encode();
asynchronous rob
committed
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
let res = validator.validate(
&mut validator_context,
&peer_a,
&message[..],
);
match res {
GossipValidationResult::Discard => {},
_ => panic!("wrong result"),
}
assert_eq!(
validator_context.events,
Vec::new(),
);
drop(validator);
assert_eq!(rx.iter().collect::<Vec<_>>(), vec![(peer_a, cost::BAD_NEIGHBOR_PACKET)]);
}
#[test]
fn statement_only_sent_when_candidate_known() {
let (tx, _rx) = mpsc::channel();
let tx = Mutex::new(tx);
let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap());
let validator = MessageValidator::new_test(
asynchronous rob
committed
report_handle,
);
let peer_a = PeerId::random();
let mut validator_context = MockValidatorContext::default();
validator.new_peer(&mut validator_context, &peer_a, Roles::FULL);
assert!(validator_context.events.is_empty());
validator_context.clear();
let hash_a = [1u8; 32].into();
let hash_b = [2u8; 32].into();
let message = GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a, hash_b],
}).encode();
asynchronous rob
committed
let res = validator.validate(
&mut validator_context,
&peer_a,
&message[..],
);
match res {
GossipValidationResult::Discard => {},
_ => panic!("wrong result"),
}
assert_eq!(
validator_context.events,
vec![
ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_b), false),
],
);
validator_context.clear();
let topic_a = attestation_topic(hash_a);
let c_hash = [99u8; 32].into();
let statement = GossipMessage::Statement(GossipStatement {
asynchronous rob
committed
signed_statement: SignedStatement {
statement: GenericStatement::Valid(c_hash),
signature: Sr25519Signature([255u8; 64]).into(),
asynchronous rob
committed
sender: 1,
}
});
let encoded = statement.encode();
validator.inner.write().attestation_view.new_local_leaf(hash_a, MessageValidationData::default());
asynchronous rob
committed
{
let mut message_allowed = validator.message_allowed();
assert!(!message_allowed(&peer_a, MessageIntent::Broadcast, &topic_a, &encoded[..]));
asynchronous rob
committed
}
validator
.inner
.write()
.peers
.get_mut(&peer_a)
.unwrap()
.attestation
.note_aware_under_leaf(&hash_a, c_hash);
asynchronous rob
committed
{
let mut message_allowed = validator.message_allowed();
assert!(message_allowed(&peer_a, MessageIntent::Broadcast, &topic_a, &encoded[..]));
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#[test]
fn multicasts_icmp_queues_when_building_on_new_leaf() {
let (tx, _rx) = mpsc::channel();
let tx = Mutex::new(tx);
let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap());
let hash_a = [1u8; 32].into();
let root_a = [11u8; 32].into();
let root_a_topic = queue_topic(root_a);
let root_a_messages = vec![
ParachainMessage(vec![1, 2, 3]),
ParachainMessage(vec![4, 5, 6]),
];
let chain = {
let mut chain = TestChainContext::default();
chain.known_map.insert(hash_a, Known::Leaf);
chain.ingress_roots.insert(hash_a, vec![root_a]);
chain
};
let validator = RegisteredMessageValidator::new_test(chain, report_handle);
let authorities: Vec<ValidatorId> = vec![validator_id([0; 32]), validator_id([10; 32])];
let peer_a = PeerId::random();
let peer_b = PeerId::random();
let mut validator_context = MockValidatorContext::default();
validator.inner.new_peer(&mut validator_context, &peer_a, Roles::FULL);
validator.inner.new_peer(&mut validator_context, &peer_b, Roles::FULL);
assert!(validator_context.events.is_empty());
validator_context.clear();
{
let message = GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a],
}).encode();
let res = validator.inner.validate(
&mut validator_context,
&peer_a,
&message[..],
);
match res {
GossipValidationResult::Discard => {},
_ => panic!("wrong result"),
}
assert_eq!(
validator_context.events,
vec![
ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
],
);
}
// ensure that we attempt to multicast all relevant queues after noting a leaf.
{
let actions = validator.new_local_leaf(
hash_a,
MessageValidationData { authorities },
|root| if root == &root_a {
Some(root_a_messages.clone())
} else {
None
},
);
assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a],
}).to_consensus_message()));
assert!(actions.has_multicast(root_a_topic, GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: root_a_messages.clone(),
}).to_consensus_message()));
}