gossip.rs 36.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//! Gossip messages and the message validator.
//!
//! At the moment, this module houses 2 gossip protocols central to Polkadot.
//!
//! The first is the attestation-gossip system, which aims to circulate parachain
//! candidate attestations by validators at leaves of the block-DAG.
//!
//! The second is the inter-chain message queue routing gossip, which aims to
//! circulate message queues between parachains, which remain un-routed as of
//! recent leaves.
//!
//! These gossip systems do not have any form of sybil-resistance in terms
//! of the nodes which can participate. It could be imposed e.g. by limiting only to
//! validators, but this would prevent message queues from getting into the hands
//! of collators and of attestations from getting into the hands of fishermen.
//! As such, we take certain precautions which allow arbitrary full nodes to
//! join the gossip graph, as well as validators (who are likely to be well-connected
//! amongst themselves).
//!
//! The first is the notion of a neighbor packet. This is a packet sent between
//! neighbors of the gossip graph to inform each other of their current protocol
//! state. As of this writing, for both attestation and message-routing gossip,
//! the only necessary information here is a (length-limited) set of perceived
//! leaves of the block-DAG.
//!
//! These leaves can be used to derive what information a node is willing to accept
//! There is typically an unbounded amount of possible "future" information relative to
//! any protocol state. For example, attestations or unrouted message queues from millions
//! of blocks after a known protocol state. The neighbor packet is meant to avoid being
//! spammed by illegitimate future information, while informing neighbors of when
//! previously-future and now current gossip messages would be accepted.
//!
//! Peers who send information which was not allowed under a recent neighbor packet
//! will be noted as non-beneficial to Substrate's peer-set management utility.

use sr_primitives::{generic::BlockId, traits::ProvideRuntimeApi};
use substrate_client::error::Error as ClientError;
54
use substrate_network::{config::Roles, PeerId};
55
56
use substrate_network::consensus_gossip::{
	self as network_gossip, ValidationResult as GossipValidationResult,
57
	ValidatorContext, MessageIntent, ConsensusMessage,
58
};
59
60
61
use polkadot_validation::SignedStatement;
use polkadot_primitives::{Block, Hash};
use polkadot_primitives::parachain::{ParachainHost, ValidatorId, Message as ParachainMessage};
62
use codec::{Decode, Encode};
63

64
use std::collections::HashMap;
65
66
use std::sync::Arc;

67
use arrayvec::ArrayVec;
68
use parking_lot::RwLock;
69
use log::warn;
70

71
use super::PolkadotNetworkService;
72
use crate::router::attestation_topic;
73

74
75
76
77
78
79
use attestation::{View as AttestationView, PeerData as AttestationPeerData};
use message_routing::{View as MessageRoutingView};

mod attestation;
mod message_routing;

80
/// The engine ID of the polkadot attestation system.
81
pub const POLKADOT_ENGINE_ID: sr_primitives::ConsensusEngineId = *b"dot1";
82

83
// arbitrary; in practice this should not be more than 2.
84
85
86
87
pub(crate) const MAX_CHAIN_HEADS: usize = 5;

/// Type alias for a bounded vector of leaves.
pub type LeavesVec = ArrayVec<[Hash; MAX_CHAIN_HEADS]>;
88
89
90
91
92
93

mod benefit {
	/// When a peer sends us a previously-unknown candidate statement.
	pub const NEW_CANDIDATE: i32 = 100;
	/// When a peer sends us a previously-unknown attestation.
	pub const NEW_ATTESTATION: i32 = 50;
94
95
	/// When a peer sends us a previously-unknown message packet.
	pub const NEW_ICMP_MESSAGES: i32 = 50;
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
}

mod cost {
	/// A peer sent us an attestation and we don't know the candidate.
	pub const ATTESTATION_NO_CANDIDATE: i32 = -100;
	/// A peer sent us a statement we consider in the future.
	pub const FUTURE_MESSAGE: i32 = -100;
	/// A peer sent us a statement from the past.
	pub const PAST_MESSAGE: i32 = -30;
	/// A peer sent us a malformed message.
	pub const MALFORMED_MESSAGE: i32 = -500;
	/// A peer sent us a wrongly signed message.
	pub const BAD_SIGNATURE: i32 = -500;
	/// A peer sent us a bad neighbor packet.
	pub const BAD_NEIGHBOR_PACKET: i32 = -300;
111
112
113
114
115
116
117
118
119
	/// A peer sent us an ICMP queue we haven't advertised a need for.
	pub const UNNEEDED_ICMP_MESSAGES: i32 = -100;

	/// A peer sent us an ICMP queue with a bad root.
	pub fn icmp_messages_root_mismatch(n_messages: usize) -> i32 {
		const PER_MESSAGE: i32 = -150;

		(0..n_messages).map(|_| PER_MESSAGE).sum()
	}
120
121
}

122
123
/// A gossip message.
#[derive(Encode, Decode, Clone)]
124
pub enum GossipMessage {
125
126
127
128
129
130
131
	/// A packet sent to a neighbor but not relayed.
	#[codec(index = "1")]
	Neighbor(VersionedNeighborPacket),
	/// An attestation-statement about the candidate.
	/// Non-candidate statements should only be sent to peers who are aware of the candidate.
	#[codec(index = "2")]
	Statement(GossipStatement),
132
133
134
	/// A packet of messages from one parachain to another.
	#[codec(index = "3")]
	ParachainMessages(GossipParachainMessages),
135
136
137
138
	// TODO: https://github.com/paritytech/polkadot/issues/253
	// erasure-coded chunks.
}

139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
impl GossipMessage {
	fn to_consensus_message(&self) -> ConsensusMessage {
		ConsensusMessage {
			data: self.encode(),
			engine_id: POLKADOT_ENGINE_ID,
		}
	}
}

impl From<NeighborPacket> for GossipMessage {
	fn from(packet: NeighborPacket) -> Self {
		GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))
	}
}

154
155
156
157
158
159
impl From<GossipStatement> for GossipMessage {
	fn from(stmt: GossipStatement) -> Self {
		GossipMessage::Statement(stmt)
	}
}

160
161
162
163
164
165
impl From<GossipParachainMessages> for GossipMessage {
	fn from(messages: GossipParachainMessages) -> Self {
		GossipMessage::ParachainMessages(messages)
	}
}

166
167
/// A gossip message containing a statement.
#[derive(Encode, Decode, Clone)]
168
pub struct GossipStatement {
169
170
171
	/// The block hash of the relay chain being referred to. In context, this should
	/// be a leaf.
	pub relay_chain_leaf: Hash,
172
	/// The signed statement being gossipped.
173
174
175
176
177
	pub signed_statement: SignedStatement,
}

impl GossipStatement {
	/// Create a new instance.
178
	pub fn new(relay_chain_leaf: Hash, signed_statement: SignedStatement) -> Self {
179
		Self {
180
			relay_chain_leaf,
181
182
183
			signed_statement,
		}
	}
184
185
}

186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
/// A packet of messages from one parachain to another.
///
/// These are all the messages posted from one parachain to another during the
/// execution of a single parachain block. Since this parachain block may have been
/// included in many forks of the relay chain, there is no relay-chain leaf parameter.
#[derive(Encode, Decode, Clone)]
pub struct GossipParachainMessages {
	/// The root of the message queue.
	pub queue_root: Hash,
	/// The messages themselves.
	pub messages: Vec<ParachainMessage>,
}

impl GossipParachainMessages {
	// confirms that the queue-root in the struct correctly matches
	// the messages.
	fn queue_root_is_correct(&self) -> bool {
		let root = polkadot_validation::message_queue_root(
			self.messages.iter().map(|m| &m.0)
		);
		root == self.queue_root
	}
}

210
211
212
213
214
215
216
217
218
219
220
221
/// A versioned neighbor message.
#[derive(Encode, Decode, Clone)]
pub enum VersionedNeighborPacket {
	#[codec(index = "1")]
	V1(NeighborPacket),
}

/// Contains information on which chain heads the peer is
/// accepting messages for.
#[derive(Encode, Decode, Clone)]
pub struct NeighborPacket {
	chain_heads: Vec<Hash>,
222
223
224
}

/// whether a block is known.
225
#[derive(Clone, Copy)]
226
227
228
229
230
231
232
233
234
pub enum Known {
	/// The block is a known leaf.
	Leaf,
	/// The block is known to be old.
	Old,
	/// The block is known to be bad.
	Bad,
}

235
236
237
238
239
240
241
242
243
/// Context to the underlying polkadot chain.
pub trait ChainContext: Send + Sync {
	/// Provide a closure which is invoked for every unrouted queue hash at a given leaf.
	fn leaf_unrouted_roots(
		&self,
		leaf: &Hash,
		with_queue_root: &mut dyn FnMut(&Hash),
	) -> Result<(), ClientError>;

244
245
246
247
	/// whether a block is known. If it's not, returns `None`.
	fn is_known(&self, block_hash: &Hash) -> Option<Known>;
}

248
249
250
251
252
253
impl<F, P> ChainContext for (F, P) where
	F: Fn(&Hash) -> Option<Known> + Send + Sync,
	P: Send + Sync + std::ops::Deref,
	P::Target: ProvideRuntimeApi,
	<P::Target as ProvideRuntimeApi>::Api: ParachainHost<Block>,
{
254
	fn is_known(&self, block_hash: &Hash) -> Option<Known> {
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
		(self.0)(block_hash)
	}

	fn leaf_unrouted_roots(
		&self,
		&leaf: &Hash,
		with_queue_root: &mut dyn FnMut(&Hash),
	) -> Result<(), ClientError> {
		let api = self.1.runtime_api();

		let leaf_id = BlockId::Hash(leaf);
		let active_parachains = api.active_parachains(&leaf_id)?;

		for para_id in active_parachains {
			if let Some(ingress) = api.ingress(&leaf_id, para_id, None)? {
				for (_height, _from, queue_root) in ingress.iter() {
					with_queue_root(queue_root);
				}
			}
		}

		Ok(())
277
278
279
280
281
282
283
	}
}

/// Register a gossip validator on the network service.
// NOTE: since RegisteredMessageValidator is meant to be a type-safe proof
// that we've actually done the registration, this should be the only way
// to construct it outside of tests.
284
285
286
287
288
pub fn register_validator<C: ChainContext + 'static>(
	service: Arc<PolkadotNetworkService>,
	chain: C,
) -> RegisteredMessageValidator
{
289
290
291
292
	let s = service.clone();
	let report_handle = Box::new(move |peer: &PeerId, cost_benefit| {
		s.report_peer(peer.clone(), cost_benefit);
	});
293
	let validator = Arc::new(MessageValidator {
294
295
296
		report_handle,
		inner: RwLock::new(Inner {
			peers: HashMap::new(),
297
298
299
			attestation_view: Default::default(),
			message_routing_view: Default::default(),
			chain,
300
		})
301
302
303
	});

	let gossip_side = validator.clone();
304
305
306
	service.with_gossip(|gossip, ctx|
		gossip.register_validator(ctx, POLKADOT_ENGINE_ID, gossip_side)
	);
307
308
309
310

	RegisteredMessageValidator { inner: validator as _ }
}

311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#[derive(PartialEq)]
enum NewLeafAction {
	// (who, message)
	TargetedMessage(PeerId, ConsensusMessage),
	// (topic, message)
	Multicast(Hash, ConsensusMessage),
}

/// Actions to take after noting a new block-DAG leaf.
///
/// This should be consumed by passing a consensus-gossip handle to `perform`.
#[must_use = "New chain-head gossip actions must be performed"]
pub struct NewLeafActions {
	actions: Vec<NewLeafAction>,
}

impl NewLeafActions {
	/// Perform the queued actions, feeding into gossip.
	pub fn perform(
		self,
		gossip: &mut dyn crate::GossipService,
		ctx: &mut dyn substrate_network::Context<Block>,
	) {
		for action in self.actions {
			match action {
				NewLeafAction::TargetedMessage(who, message)
					=> gossip.send_message(ctx, &who, message),
				NewLeafAction::Multicast(topic, message)
					=> gossip.multicast(ctx, &topic, message),
			}
		}
	}
}

345
/// Register a gossip validator for a non-authority node.
346
pub fn register_non_authority_validator(service: Arc<PolkadotNetworkService>) {
347
348
349
350
351
352
353
354
	service.with_gossip(|gossip, ctx|
		gossip.register_validator(
			ctx,
			POLKADOT_ENGINE_ID,
			Arc::new(substrate_network::consensus_gossip::DiscardAll)),
	);
}

355
356
357
358
359
/// A registered message validator.
///
/// Create this using `register_validator`.
#[derive(Clone)]
pub struct RegisteredMessageValidator {
360
	inner: Arc<MessageValidator<dyn ChainContext>>,
361
362
363
364
}

impl RegisteredMessageValidator {
	#[cfg(test)]
365
366
	pub(crate) fn new_test<C: ChainContext + 'static>(
		chain: C,
367
		report_handle: Box<dyn Fn(&PeerId, i32) + Send + Sync>,
368
	) -> Self {
369
		let validator = Arc::new(MessageValidator::new_test(chain, report_handle));
370
371
372
373

		RegisteredMessageValidator { inner: validator as _ }
	}

374
375
376
377
	/// Note that we perceive a new leaf of the block-DAG. We will notify our neighbors that
	/// we now accept parachain candidate attestations and incoming message queues
	/// relevant to this leaf.
	pub(crate) fn new_local_leaf(
378
		&self,
379
		relay_chain_leaf: Hash,
380
		validation: MessageValidationData,
381
382
383
384
		lookup_queue_by_root: impl Fn(&Hash) -> Option<Vec<ParachainMessage>>,
	) -> NewLeafActions {
		// add an entry in attestation_view
		// prune any entries from attestation_view which are no longer leaves
385
		let mut inner = self.inner.inner.write();
386
		inner.attestation_view.new_local_leaf(relay_chain_leaf, validation);
387

388
389
390
391
392
393
394
395
396
397
398
		let mut actions = Vec::new();

		{
			let &mut Inner {
				ref chain,
				ref mut attestation_view,
				ref mut message_routing_view,
				..
			} = &mut *inner;

			attestation_view.prune_old_leaves(|hash| match chain.is_known(hash) {
399
400
401
				Some(Known::Leaf) => true,
				_ => false,
			});
402
403
404
405

			if let Err(e) = message_routing_view.update_leaves(chain, attestation_view.neighbor_info()) {
				warn!("Unable to fully update leaf-state: {:?}", e);
			}
406
		}
407

408

409
		// send neighbor packets to peers
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
		inner.multicast_neighbor_packet(
			|who, message| actions.push(NewLeafAction::TargetedMessage(who.clone(), message))
		);

		// feed any new unrouted queues into the propagation pool.
		inner.message_routing_view.sweep_unknown_queues(|topic, queue_root|
			match lookup_queue_by_root(queue_root) {
				Some(messages) => {
					let message = GossipMessage::from(GossipParachainMessages {
						queue_root: *queue_root,
						messages,
					}).to_consensus_message();

					actions.push(NewLeafAction::Multicast(*topic, message));

					true
				}
				None => false,
			}
		);

		NewLeafActions { actions }
432
433
434
	}
}

435
/// The data needed for validating gossip messages.
436
#[derive(Default)]
437
pub(crate) struct MessageValidationData {
438
	/// The authorities' parachain validation keys at a block.
439
	pub(crate) authorities: Vec<ValidatorId>,
440
441
442
}

impl MessageValidationData {
443
444
445
	// check a statement's signature.
	fn check_statement(&self, relay_chain_leaf: &Hash, statement: &SignedStatement) -> Result<(), ()> {
		let sender = match self.authorities.get(statement.sender as usize) {
446
			Some(val) => val,
447
			None => return Err(()),
448
449
		};

450
		let good = self.authorities.contains(&sender) &&
451
452
453
			::polkadot_validation::check_statement(
				&statement.statement,
				&statement.signature,
454
				sender.clone(),
455
				relay_chain_leaf,
456
457
458
459
460
461
462
463
464
465
466
467
			);

		if good {
			Ok(())
		} else {
			Err(())
		}
	}
}

#[derive(Default)]
struct PeerData {
468
	attestation: AttestationPeerData,
469
470
471
}

impl PeerData {
472
473
	fn leaves(&self) -> impl Iterator<Item = &Hash> {
		self.attestation.leaves()
474
475
476
	}
}

477
struct Inner<C: ?Sized> {
478
	peers: HashMap<PeerId, PeerData>,
479
480
481
	attestation_view: AttestationView,
	message_routing_view: MessageRoutingView,
	chain: C,
482
483
}

484
impl<C: ?Sized + ChainContext> Inner<C> {
485
486
487
488
489
490
491
	fn validate_neighbor_packet(&mut self, sender: &PeerId, packet: NeighborPacket)
		-> (GossipValidationResult<Hash>, i32, Vec<Hash>)
	{
		let chain_heads = packet.chain_heads;
		if chain_heads.len() > MAX_CHAIN_HEADS {
			(GossipValidationResult::Discard, cost::BAD_NEIGHBOR_PACKET, Vec::new())
		} else {
492
493
494
495
496
497
498
499
500
501
502
503
504
505
			let chain_heads: LeavesVec = chain_heads.into_iter().collect();
			let new_topics = if let Some(ref mut peer) = self.peers.get_mut(sender) {
				let new_leaves = peer.attestation.update_leaves(&chain_heads);
				let new_attestation_topics = new_leaves.iter().cloned().map(attestation_topic);

				// find all topics which are from the intersection of our leaves with the peer's
				// new leaves.
				let new_message_routing_topics = self.message_routing_view.intersection_topics(&new_leaves);

				new_attestation_topics.chain(new_message_routing_topics).collect()
			} else {
				Vec::new()
			};

506
507
508
509
510
511
512
513
			(GossipValidationResult::Discard, 0, new_topics)
		}
	}

	fn multicast_neighbor_packet<F: FnMut(&PeerId, ConsensusMessage)>(
		&self,
		mut send_neighbor_packet: F,
	) {
514
515
516
		let neighbor_packet = GossipMessage::from(NeighborPacket {
			chain_heads: self.attestation_view.neighbor_info().collect(),
		}).to_consensus_message();
517
518

		for peer in self.peers.keys() {
519
			send_neighbor_packet(peer, neighbor_packet.clone())
520
		}
521
522
523
524
	}
}

/// An unregistered message validator. Register this with `register_validator`.
525
pub struct MessageValidator<C: ?Sized> {
526
	report_handle: Box<dyn Fn(&PeerId, i32) + Send + Sync>,
527
	inner: RwLock<Inner<C>>,
528
529
}

530
impl<C: ChainContext + ?Sized> MessageValidator<C> {
531
532
	#[cfg(test)]
	fn new_test(
533
		chain: C,
534
		report_handle: Box<dyn Fn(&PeerId, i32) + Send + Sync>,
535
	) -> Self where C: Sized {
536
537
538
539
		MessageValidator {
			report_handle,
			inner: RwLock::new(Inner {
				peers: HashMap::new(),
540
541
542
543
				attestation_view: Default::default(),
				message_routing_view: Default::default(),
				chain,
			}),
544
545
546
547
548
549
		}
	}

	fn report(&self, who: &PeerId, cost_benefit: i32) {
		(self.report_handle)(who, cost_benefit)
	}
550
551
}

552
impl<C: ChainContext + ?Sized> network_gossip::Validator<Block> for MessageValidator<C> {
553
	fn new_peer(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId, _roles: Roles) {
554
		let mut inner = self.inner.write();
555
		inner.peers.insert(who.clone(), PeerData::default());
556
557
	}

558
	fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId) {
559
560
561
562
		let mut inner = self.inner.write();
		inner.peers.remove(who);
	}

563
	fn validate(&self, context: &mut dyn ValidatorContext<Block>, sender: &PeerId, data: &[u8])
564
565
		-> GossipValidationResult<Hash>
	{
566
567
		let mut decode_data = data;
		let (res, cost_benefit) = match GossipMessage::decode(&mut decode_data) {
568
569
			Err(_) => (GossipValidationResult::Discard, cost::MALFORMED_MESSAGE),
			Ok(GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))) => {
570
571
572
573
574
575
				let (res, cb, topics) = self.inner.write().validate_neighbor_packet(sender, packet);
				for new_topic in topics {
					context.send_topic(sender, new_topic, false);
				}
				(res, cb)
			}
576
			Ok(GossipMessage::Statement(statement)) => {
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
				let (res, cb) = {
					let mut inner = self.inner.write();
					let inner = &mut *inner;
					inner.attestation_view.validate_statement_signature(statement, &inner.chain)
				};

				if let GossipValidationResult::ProcessAndKeep(ref topic) = res {
					context.broadcast_message(topic.clone(), data.to_vec(), false);
				}
				(res, cb)
			}
			Ok(GossipMessage::ParachainMessages(messages)) => {
				let (res, cb) = {
					let mut inner = self.inner.write();
					let inner = &mut *inner;
					inner.message_routing_view.validate_queue_and_note_known(&messages)
				};

595
596
597
598
599
600
601
602
603
604
605
				if let GossipValidationResult::ProcessAndKeep(ref topic) = res {
					context.broadcast_message(topic.clone(), data.to_vec(), false);
				}
				(res, cb)
			}
		};

		self.report(sender, cost_benefit);
		res
	}

606
	fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Hash, &[u8]) -> bool + 'a> {
607
608
609
		let inner = self.inner.read();

		Box::new(move |topic, _data| {
610
611
612
613
614
615
			// check that messages from this topic are considered live by one of our protocols.
			// everything else is expired
			let live = inner.attestation_view.is_topic_live(&topic)
				|| !inner.message_routing_view.is_topic_live(&topic);

			!live // = expired
616
617
618
		})
	}

619
	fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &Hash, &[u8]) -> bool + 'a> {
620
621
		let mut inner = self.inner.write();
		Box::new(move |who, intent, topic, data| {
622
623
624
625
626
627
			let &mut Inner {
				ref mut peers,
				ref mut attestation_view,
				ref mut message_routing_view,
				..
			} = &mut *inner;
628
629
630
631
632
633

			match intent {
				MessageIntent::PeriodicRebroadcast => return false,
				_ => {},
			}

634
635
			let attestation_head = attestation_view.topic_block(topic).map(|x| x.clone());
			let peer = peers.get_mut(who);
636
637

			match GossipMessage::decode(&mut &data[..]) {
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
				Ok(GossipMessage::Statement(ref statement)) => {
					// to allow statements, we need peer knowledge.
					let peer_knowledge = peer.and_then(move |p| attestation_head.map(|r| (p, r)))
						.and_then(|(p, r)| p.attestation.knowledge_at_mut(&r).map(|k| (k, r)));

					peer_knowledge.map_or(false, |(knowledge, attestation_head)| {
						attestation_view.statement_allowed(
							statement,
							&attestation_head,
							knowledge,
						)
					})
				}
				Ok(GossipMessage::ParachainMessages(_)) => match peer {
					None => false,
					Some(peer) => {
						let their_leaves: LeavesVec = peer.leaves().cloned().collect();
						message_routing_view.allowed_intersecting(&their_leaves, topic)
656
657
					}
				}
658
				_ => false,
659
			}
660
661
662
663
664
665
666
667
668
669
670
671
		})
	}
}

#[cfg(test)]
mod tests {
	use super::*;
	use substrate_network::consensus_gossip::Validator as ValidatorT;
	use std::sync::mpsc;
	use parking_lot::Mutex;
	use polkadot_primitives::parachain::{CandidateReceipt, HeadData};
	use substrate_primitives::crypto::UncheckedInto;
672
673
674
675
676
	use substrate_primitives::sr25519::{Public as Sr25519Public, Signature as Sr25519Signature};
	use polkadot_validation::GenericStatement;
	use super::message_routing::queue_topic;

	use crate::tests::TestChainContext;
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711

	#[derive(PartialEq, Clone, Debug)]
	enum ContextEvent {
		BroadcastTopic(Hash, bool),
		BroadcastMessage(Hash, Vec<u8>, bool),
		SendMessage(PeerId, Vec<u8>),
		SendTopic(PeerId, Hash, bool),
	}

	#[derive(Default)]
	struct MockValidatorContext {
		events: Vec<ContextEvent>,
	}

	impl MockValidatorContext {
		fn clear(&mut self) {
			self.events.clear()
		}
	}

	impl network_gossip::ValidatorContext<Block> for MockValidatorContext {
		fn broadcast_topic(&mut self, topic: Hash, force: bool) {
			self.events.push(ContextEvent::BroadcastTopic(topic, force));
		}
		fn broadcast_message(&mut self, topic: Hash, message: Vec<u8>, force: bool) {
			self.events.push(ContextEvent::BroadcastMessage(topic, message, force));
		}
		fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
			self.events.push(ContextEvent::SendMessage(who.clone(), message));
		}
		fn send_topic(&mut self, who: &PeerId, topic: Hash, force: bool) {
			self.events.push(ContextEvent::SendTopic(who.clone(), topic, force));
		}
	}

712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
	impl NewLeafActions {
		fn has_message(&self, who: PeerId, message: ConsensusMessage) -> bool {
			let x = NewLeafAction::TargetedMessage(who, message);
			self.actions.iter().find(|&m| m == &x).is_some()
		}

		fn has_multicast(&self, topic: Hash, message: ConsensusMessage) -> bool {
			let x = NewLeafAction::Multicast(topic, message);
			self.actions.iter().find(|&m| m == &x).is_some()
		}
	}

	fn validator_id(raw: [u8; 32]) -> ValidatorId {
		Sr25519Public::from_raw(raw).into()
	}

728
729
730
731
732
733
	#[test]
	fn message_allowed() {
		let (tx, _rx) = mpsc::channel();
		let tx = Mutex::new(tx);
		let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap());
		let validator = MessageValidator::new_test(
734
			TestChainContext::default(),
735
736
737
738
739
740
741
742
743
744
745
746
747
748
			report_handle,
		);

		let peer_a = PeerId::random();

		let mut validator_context = MockValidatorContext::default();
		validator.new_peer(&mut validator_context, &peer_a, Roles::FULL);
		assert!(validator_context.events.is_empty());
		validator_context.clear();

		let hash_a = [1u8; 32].into();
		let hash_b = [2u8; 32].into();
		let hash_c = [3u8; 32].into();

749
750
751
		let message = GossipMessage::from(NeighborPacket {
			chain_heads: vec![hash_a, hash_b],
		}).encode();
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
		let res = validator.validate(
			&mut validator_context,
			&peer_a,
			&message[..],
		);

		match res {
			GossipValidationResult::Discard => {},
			_ => panic!("wrong result"),
		}
		assert_eq!(
			validator_context.events,
			vec![
				ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
				ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_b), false),
			],
		);

		validator_context.clear();

		let candidate_receipt = CandidateReceipt {
			parachain_index: 5.into(),
			collator: [255; 32].unchecked_into(),
			head_data: HeadData(vec![9, 9, 9]),
			signature: Default::default(),
			egress_queue_roots: Vec::new(),
			fees: 1_000_000,
			block_data_hash: [20u8; 32].into(),
780
			upward_messages: Vec::new(),
781
782
783
		};

		let statement = GossipMessage::Statement(GossipStatement {
784
			relay_chain_leaf: hash_a,
785
786
			signed_statement: SignedStatement {
				statement: GenericStatement::Candidate(candidate_receipt),
787
				signature: Sr25519Signature([255u8; 64]).into(),
788
				sender: 1,
789
			}
790
791
792
793
794
795
796
797
		});
		let encoded = statement.encode();

		let topic_a = attestation_topic(hash_a);
		let topic_b = attestation_topic(hash_b);
		let topic_c = attestation_topic(hash_c);

		// topic_a is in all 3 views -> succeed
798
		validator.inner.write().attestation_view.new_local_leaf(hash_a, MessageValidationData::default());
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
		// topic_b is in the neighbor's view but not ours -> fail
		// topic_c is not in either -> fail

		{
			let mut message_allowed = validator.message_allowed();
			assert!(message_allowed(&peer_a, MessageIntent::Broadcast, &topic_a, &encoded));
			assert!(!message_allowed(&peer_a, MessageIntent::Broadcast, &topic_b, &encoded));
			assert!(!message_allowed(&peer_a, MessageIntent::Broadcast, &topic_c, &encoded));
		}
	}

	#[test]
	fn too_many_chain_heads_is_report() {
		let (tx, rx) = mpsc::channel();
		let tx = Mutex::new(tx);
		let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap());
		let validator = MessageValidator::new_test(
816
			TestChainContext::default(),
817
818
819
820
821
822
823
824
825
826
827
828
			report_handle,
		);

		let peer_a = PeerId::random();

		let mut validator_context = MockValidatorContext::default();
		validator.new_peer(&mut validator_context, &peer_a, Roles::FULL);
		assert!(validator_context.events.is_empty());
		validator_context.clear();

		let chain_heads = (0..MAX_CHAIN_HEADS+1).map(|i| [i as u8; 32].into()).collect();

829
830
831
		let message = GossipMessage::from(NeighborPacket {
			chain_heads,
		}).encode();
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
		let res = validator.validate(
			&mut validator_context,
			&peer_a,
			&message[..],
		);

		match res {
			GossipValidationResult::Discard => {},
			_ => panic!("wrong result"),
		}
		assert_eq!(
			validator_context.events,
			Vec::new(),
		);

		drop(validator);

		assert_eq!(rx.iter().collect::<Vec<_>>(), vec![(peer_a, cost::BAD_NEIGHBOR_PACKET)]);
	}

	#[test]
	fn statement_only_sent_when_candidate_known() {
		let (tx, _rx) = mpsc::channel();
		let tx = Mutex::new(tx);
		let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap());
		let validator = MessageValidator::new_test(
858
			TestChainContext::default(),
859
860
861
862
863
864
865
866
867
868
869
870
871
			report_handle,
		);

		let peer_a = PeerId::random();

		let mut validator_context = MockValidatorContext::default();
		validator.new_peer(&mut validator_context, &peer_a, Roles::FULL);
		assert!(validator_context.events.is_empty());
		validator_context.clear();

		let hash_a = [1u8; 32].into();
		let hash_b = [2u8; 32].into();

872
873
874
875
		let message = GossipMessage::from(NeighborPacket {
			chain_heads: vec![hash_a, hash_b],
		}).encode();

876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
		let res = validator.validate(
			&mut validator_context,
			&peer_a,
			&message[..],
		);

		match res {
			GossipValidationResult::Discard => {},
			_ => panic!("wrong result"),
		}
		assert_eq!(
			validator_context.events,
			vec![
				ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
				ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_b), false),
			],
		);

		validator_context.clear();

		let topic_a = attestation_topic(hash_a);
		let c_hash = [99u8; 32].into();

		let statement = GossipMessage::Statement(GossipStatement {
900
			relay_chain_leaf: hash_a,
901
902
			signed_statement: SignedStatement {
				statement: GenericStatement::Valid(c_hash),
903
				signature: Sr25519Signature([255u8; 64]).into(),
904
905
906
907
				sender: 1,
			}
		});
		let encoded = statement.encode();
908
		validator.inner.write().attestation_view.new_local_leaf(hash_a, MessageValidationData::default());
909
910
911
912
913
914
915
916
917
918
919
920

		{
			let mut message_allowed = validator.message_allowed();
			assert!(!message_allowed(&peer_a, MessageIntent::Broadcast, &topic_a, &encoded[..]));
		}

		validator
			.inner
			.write()
			.peers
			.get_mut(&peer_a)
			.unwrap()
921
922
			.attestation
			.note_aware_under_leaf(&hash_a, c_hash);
923
924
925
		{
			let mut message_allowed = validator.message_allowed();
			assert!(message_allowed(&peer_a, MessageIntent::Broadcast, &topic_a, &encoded[..]));
926
927
		}
	}
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000

	#[test]
	fn multicasts_icmp_queues_when_building_on_new_leaf() {
		let (tx, _rx) = mpsc::channel();
		let tx = Mutex::new(tx);
		let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap());

		let hash_a = [1u8; 32].into();
		let root_a = [11u8; 32].into();
		let root_a_topic = queue_topic(root_a);

		let root_a_messages = vec![
			ParachainMessage(vec![1, 2, 3]),
			ParachainMessage(vec![4, 5, 6]),
		];

		let chain = {
			let mut chain = TestChainContext::default();
			chain.known_map.insert(hash_a, Known::Leaf);
			chain.ingress_roots.insert(hash_a, vec![root_a]);
			chain
		};

		let validator = RegisteredMessageValidator::new_test(chain, report_handle);

		let authorities: Vec<ValidatorId> = vec![validator_id([0; 32]), validator_id([10; 32])];

		let peer_a = PeerId::random();
		let peer_b = PeerId::random();

		let mut validator_context = MockValidatorContext::default();
		validator.inner.new_peer(&mut validator_context, &peer_a, Roles::FULL);
		validator.inner.new_peer(&mut validator_context, &peer_b, Roles::FULL);
		assert!(validator_context.events.is_empty());
		validator_context.clear();


		{
			let message = GossipMessage::from(NeighborPacket {
				chain_heads: vec![hash_a],
			}).encode();
			let res = validator.inner.validate(
				&mut validator_context,
				&peer_a,
				&message[..],
			);

			match res {
				GossipValidationResult::Discard => {},
				_ => panic!("wrong result"),
			}
			assert_eq!(
				validator_context.events,
				vec![
					ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
				],
			);
		}

		// ensure that we attempt to multicast all relevant queues after noting a leaf.
		{
			let actions = validator.new_local_leaf(
				hash_a,
				MessageValidationData { authorities },
				|root| if root == &root_a {
					Some(root_a_messages.clone())
				} else {
					None
				},
			);

			assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket {
				chain_heads: vec![hash_a],
For faster browsing, not all history is shown. View entire blame