mod.rs 31 KB
Newer Older
Shawn Tabrizi's avatar
Shawn Tabrizi committed
1
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//! Gossip messages and the message validator.
//!
//! At the moment, this module houses 2 gossip protocols central to Polkadot.
//!
//! The first is the attestation-gossip system, which aims to circulate parachain
//! candidate attestations by validators at leaves of the block-DAG.
//!
//! The second is the inter-chain message queue routing gossip, which aims to
//! circulate message queues between parachains, which remain un-routed as of
//! recent leaves.
//!
//! These gossip systems do not have any form of sybil-resistance in terms
//! of the nodes which can participate. It could be imposed e.g. by limiting only to
//! validators, but this would prevent message queues from getting into the hands
//! of collators and of attestations from getting into the hands of fishermen.
//! As such, we take certain precautions which allow arbitrary full nodes to
//! join the gossip graph, as well as validators (who are likely to be well-connected
//! amongst themselves).
//!
//! The first is the notion of a neighbor packet. This is a packet sent between
//! neighbors of the gossip graph to inform each other of their current protocol
//! state. As of this writing, for both attestation and message-routing gossip,
//! the only necessary information here is a (length-limited) set of perceived
//! leaves of the block-DAG.
//!
//! These leaves can be used to derive what information a node is willing to accept
//! There is typically an unbounded amount of possible "future" information relative to
//! any protocol state. For example, attestations or unrouted message queues from millions
//! of blocks after a known protocol state. The neighbor packet is meant to avoid being
//! spammed by illegitimate future information, while informing neighbors of when
//! previously-future and now current gossip messages would be accepted.
//!
//! Peers who send information which was not allowed under a recent neighbor packet
//! will be noted as non-beneficial to Substrate's peer-set management utility.

Ashley's avatar
Ashley committed
52
use sp_runtime::traits::{BlakeTwo256, Hash as HashT};
Gavin Wood's avatar
Gavin Wood committed
53
use sp_blockchain::Error as ClientError;
54
use sc_network::{config::Roles, Context, PeerId, ReputationChange};
55
use sc_network::{NetworkService as SubstrateNetworkService, specialization::NetworkSpecialization};
56
57
58
use sc_network_gossip::{
	ValidationResult as GossipValidationResult,
	ValidatorContext, MessageIntent,
59
};
60
use polkadot_validation::{SignedStatement};
61
use polkadot_primitives::{Block, Hash};
62
use polkadot_primitives::parachain::{
Ashley's avatar
Ashley committed
63
	ParachainHost, ValidatorId, ErasureChunk as PrimitiveChunk
64
65
};
use polkadot_erasure_coding::{self as erasure};
66
use codec::{Decode, Encode};
67
use sp_api::ProvideRuntimeApi;
68

69
use std::collections::HashMap;
70
71
use std::sync::Arc;

72
use arrayvec::ArrayVec;
73
use futures::prelude::*;
74
75
use parking_lot::RwLock;

76
use crate::legacy::{GossipMessageStream, NetworkService, GossipService, PolkadotProtocol, router::attestation_topic};
77

78
79
80
81
use attestation::{View as AttestationView, PeerData as AttestationPeerData};

mod attestation;

82
/// The engine ID of the polkadot attestation system.
83
pub const POLKADOT_ENGINE_ID: sp_runtime::ConsensusEngineId = *b"dot1";
84
pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/legacy/1";
85

86
// arbitrary; in practice this should not be more than 2.
87
88
89
90
pub(crate) const MAX_CHAIN_HEADS: usize = 5;

/// Type alias for a bounded vector of leaves.
pub type LeavesVec = ArrayVec<[Hash; MAX_CHAIN_HEADS]>;
91
92

mod benefit {
93
	use sc_network::ReputationChange as Rep;
94
	/// When a peer sends us a previously-unknown candidate statement.
95
	pub const NEW_CANDIDATE: Rep = Rep::new(100, "Polkadot: New candidate");
96
	/// When a peer sends us a previously-unknown attestation.
97
	pub const NEW_ATTESTATION: Rep = Rep::new(50, "Polkadot: New attestation");
98
99
	/// When a peer sends us a previously-unknown erasure chunk.
	pub const NEW_ERASURE_CHUNK: Rep = Rep::new(10, "Polkadot: New erasure chunk");
100
101
102
}

mod cost {
103
104
105
	use sc_network::ReputationChange as Rep;
	/// No cost. This will not be reported.
	pub const NONE: Rep = Rep::new(0, "");
106
	/// A peer sent us an attestation and we don't know the candidate.
107
	pub const ATTESTATION_NO_CANDIDATE: Rep = Rep::new(-100, "Polkadot: No candidate");
108
	/// A peer sent us a statement we consider in the future.
109
	pub const FUTURE_MESSAGE: Rep = Rep::new(-100, "Polkadot: Future message");
110
	/// A peer sent us a statement from the past.
111
	pub const PAST_MESSAGE: Rep = Rep::new(-30, "Polkadot: Past message");
112
	/// A peer sent us a malformed message.
113
	pub const MALFORMED_MESSAGE: Rep = Rep::new(-500, "Polkadot: Malformed message");
114
	/// A peer sent us a wrongly signed message.
115
	pub const BAD_SIGNATURE: Rep = Rep::new(-500, "Polkadot: Bad signature");
116
	/// A peer sent us a bad neighbor packet.
117
	pub const BAD_NEIGHBOR_PACKET: Rep = Rep::new(-300, "Polkadot: Bad neighbor");
118
119
120
121
	/// A peer sent us an erasure chunk referring to a candidate that we are not aware of.
	pub const ORPHANED_ERASURE_CHUNK: Rep = Rep::new(-10, "An erasure chunk from unknown candidate");
	/// A peer sent us an erasure chunk that does not match candidate's erasure root.
	pub const ERASURE_CHUNK_WRONG_ROOT: Rep = Rep::new(-100, "Chunk doesn't match encoding root");
122
123
}

124
/// A gossip message.
125
#[derive(Encode, Decode, Clone, PartialEq)]
126
pub enum GossipMessage {
127
128
129
130
131
132
133
134
	/// A packet sent to a neighbor but not relayed.
	#[codec(index = "1")]
	Neighbor(VersionedNeighborPacket),
	/// An attestation-statement about the candidate.
	/// Non-candidate statements should only be sent to peers who are aware of the candidate.
	#[codec(index = "2")]
	Statement(GossipStatement),
	// TODO: https://github.com/paritytech/polkadot/issues/253
135
	/// A packet containing one of the erasure-coding chunks of one candidate.
Ashley's avatar
Ashley committed
136
	#[codec(index = "3")]
137
	ErasureChunk(ErasureChunkMessage),
138
139
}

140
141
142
143
144
145
impl From<NeighborPacket> for GossipMessage {
	fn from(packet: NeighborPacket) -> Self {
		GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))
	}
}

146
147
148
149
150
151
impl From<GossipStatement> for GossipMessage {
	fn from(stmt: GossipStatement) -> Self {
		GossipMessage::Statement(stmt)
	}
}

152
/// A gossip message containing a statement.
153
#[derive(Encode, Decode, Clone, PartialEq)]
154
pub struct GossipStatement {
155
156
157
	/// The block hash of the relay chain being referred to. In context, this should
	/// be a leaf.
	pub relay_chain_leaf: Hash,
158
	/// The signed statement being gossipped.
159
160
161
162
163
	pub signed_statement: SignedStatement,
}

impl GossipStatement {
	/// Create a new instance.
164
	pub fn new(relay_chain_leaf: Hash, signed_statement: SignedStatement) -> Self {
165
		Self {
166
			relay_chain_leaf,
167
168
169
			signed_statement,
		}
	}
170
171
}

172
173
/// A gossip message containing one erasure chunk of a candidate block.
/// For each chunk of block erasure encoding one of this messages is constructed.
174
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
pub struct ErasureChunkMessage {
	/// The chunk itself.
	pub chunk: PrimitiveChunk,
	/// The relay parent of the block this chunk belongs to.
	pub relay_parent: Hash,
	/// The hash of the candidate receipt of the block this chunk belongs to.
	pub candidate_hash: Hash,
}

impl From<ErasureChunkMessage> for GossipMessage {
	fn from(chk: ErasureChunkMessage) -> Self {
		GossipMessage::ErasureChunk(chk)
	}
}

190
191
192
193
194
/// A packet of messages from one parachain to another.
///
/// These are all the messages posted from one parachain to another during the
/// execution of a single parachain block. Since this parachain block may have been
/// included in many forks of the relay chain, there is no relay-chain leaf parameter.
195
#[derive(Encode, Decode, Clone, PartialEq)]
196
197
198
199
200
pub struct GossipParachainMessages {
	/// The root of the message queue.
	pub queue_root: Hash,
}

201
/// A versioned neighbor message.
202
#[derive(Encode, Decode, Clone, PartialEq)]
203
204
205
206
207
208
209
pub enum VersionedNeighborPacket {
	#[codec(index = "1")]
	V1(NeighborPacket),
}

/// Contains information on which chain heads the peer is
/// accepting messages for.
210
#[derive(Encode, Decode, Clone, PartialEq)]
211
212
pub struct NeighborPacket {
	chain_heads: Vec<Hash>,
213
214
215
}

/// whether a block is known.
216
#[derive(Clone, Copy, PartialEq)]
217
218
219
220
221
222
223
224
225
pub enum Known {
	/// The block is a known leaf.
	Leaf,
	/// The block is known to be old.
	Old,
	/// The block is known to be bad.
	Bad,
}

226
227
228
229
230
231
232
233
234
/// Context to the underlying polkadot chain.
pub trait ChainContext: Send + Sync {
	/// Provide a closure which is invoked for every unrouted queue hash at a given leaf.
	fn leaf_unrouted_roots(
		&self,
		leaf: &Hash,
		with_queue_root: &mut dyn FnMut(&Hash),
	) -> Result<(), ClientError>;

235
236
237
238
	/// whether a block is known. If it's not, returns `None`.
	fn is_known(&self, block_hash: &Hash) -> Option<Known>;
}

239
240
241
impl<F, P> ChainContext for (F, P) where
	F: Fn(&Hash) -> Option<Known> + Send + Sync,
	P: Send + Sync + std::ops::Deref,
242
243
	P::Target: ProvideRuntimeApi<Block>,
	<P::Target as ProvideRuntimeApi<Block>>::Api: ParachainHost<Block, Error = ClientError>,
244
{
245
	fn is_known(&self, block_hash: &Hash) -> Option<Known> {
246
247
248
249
250
		(self.0)(block_hash)
	}

	fn leaf_unrouted_roots(
		&self,
Ashley's avatar
Ashley committed
251
252
		_leaf: &Hash,
		_with_queue_root: &mut dyn FnMut(&Hash),
253
254
	) -> Result<(), ClientError> {
		Ok(())
255
256
257
258
259
260
261
	}
}

/// Register a gossip validator on the network service.
// NOTE: since RegisteredMessageValidator is meant to be a type-safe proof
// that we've actually done the registration, this should be the only way
// to construct it outside of tests.
262
263
pub fn register_validator<C: ChainContext + 'static, S: NetworkSpecialization<Block>>(
	service: Arc<SubstrateNetworkService<Block, S, Hash>>,
264
	chain: C,
265
	executor: &impl futures::task::Spawn,
266
) -> RegisteredMessageValidator<S>
267
{
268
	let s = service.clone();
269
270
271
272
	let report_handle = Box::new(move |peer: &PeerId, cost_benefit: ReputationChange| {
		if cost_benefit.value != 0 {
			s.report_peer(peer.clone(), cost_benefit);
		}
273
	});
274
	let validator = Arc::new(MessageValidator {
275
276
277
		report_handle,
		inner: RwLock::new(Inner {
			peers: HashMap::new(),
278
			attestation_view: Default::default(),
279
			availability_store: None,
280
			chain,
281
		})
282
283
284
	});

	let gossip_side = validator.clone();
285
286
287
	let gossip_engine = sc_network_gossip::GossipEngine::new(
		service.clone(),
		POLKADOT_ENGINE_ID,
288
		POLKADOT_PROTOCOL_NAME,
289
		gossip_side,
290
	);
291

292
293
294
295
296
297
298
299
300
	// Ideally this would not be spawned as an orphaned task, but polled by
	// `RegisteredMessageValidator` which in turn would be polled by a `ValidationNetwork`.
	let spawn_res = executor.spawn_obj(futures::task::FutureObj::from(Box::new(gossip_engine.clone())));

	// Note: we consider the chances of an error to spawn a background task almost null.
	if spawn_res.is_err() {
		log::error!(target: "polkadot-gossip", "Failed to spawn background task");
	}

301
302
303
304
305
	RegisteredMessageValidator {
		inner: validator as _,
		service: Some(service),
		gossip_engine: Some(gossip_engine),
	}
306
307
}

308
309
310
#[derive(PartialEq)]
enum NewLeafAction {
	// (who, message)
311
	TargetedMessage(PeerId, GossipMessage),
312
313
314
315
316
317
318
319
320
321
322
323
324
325
}

/// Actions to take after noting a new block-DAG leaf.
///
/// This should be consumed by passing a consensus-gossip handle to `perform`.
#[must_use = "New chain-head gossip actions must be performed"]
pub struct NewLeafActions {
	actions: Vec<NewLeafAction>,
}

impl NewLeafActions {
	/// Perform the queued actions, feeding into gossip.
	pub fn perform(
		self,
326
		gossip: &dyn crate::legacy::GossipService,
327
328
329
330
	) {
		for action in self.actions {
			match action {
				NewLeafAction::TargetedMessage(who, message)
331
					=> gossip.send_message(who, message),
332
333
334
335
336
			}
		}
	}
}

337
338
339
/// A registered message validator.
///
/// Create this using `register_validator`.
340
pub struct RegisteredMessageValidator<S: NetworkSpecialization<Block>> {
341
	inner: Arc<MessageValidator<dyn ChainContext>>,
342
	// Note: this is always `Some` in real code and `None` in tests.
343
	service: Option<Arc<SubstrateNetworkService<Block, S, Hash>>>,
344
345
	// Note: this is always `Some` in real code and `None` in tests.
	gossip_engine: Option<sc_network_gossip::GossipEngine<Block>>,
346
347
}

348
349
350
351
352
353
354
355
356
357
358
impl<S: NetworkSpecialization<Block>> Clone for RegisteredMessageValidator<S> {
	fn clone(&self) -> Self {
		RegisteredMessageValidator {
			inner: self.inner.clone(),
			service: self.service.clone(),
			gossip_engine: self.gossip_engine.clone(),
		}
	}
}

impl RegisteredMessageValidator<crate::legacy::PolkadotProtocol> {
359
	#[cfg(test)]
360
361
	pub(crate) fn new_test<C: ChainContext + 'static>(
		chain: C,
362
		report_handle: Box<dyn Fn(&PeerId, ReputationChange) + Send + Sync>,
363
	) -> Self {
364
		let validator = Arc::new(MessageValidator::new_test(chain, report_handle));
365

366
367
368
369
370
		RegisteredMessageValidator {
			inner: validator as _,
			service: None,
			gossip_engine: None,
		}
371
	}
372
}
373

374
impl<S: NetworkSpecialization<Block>> RegisteredMessageValidator<S> {
375
376
377
378
	pub fn register_availability_store(&mut self, availability_store: av_store::Store) {
		self.inner.inner.write().availability_store = Some(availability_store);
	}

379
380
381
382
	/// Note that we perceive a new leaf of the block-DAG. We will notify our neighbors that
	/// we now accept parachain candidate attestations and incoming message queues
	/// relevant to this leaf.
	pub(crate) fn new_local_leaf(
383
		&self,
384
		relay_chain_leaf: Hash,
385
		validation: MessageValidationData,
386
387
388
	) -> NewLeafActions {
		// add an entry in attestation_view
		// prune any entries from attestation_view which are no longer leaves
389
		let mut inner = self.inner.inner.write();
390
		inner.attestation_view.new_local_leaf(relay_chain_leaf, validation);
391

392
393
394
395
396
397
398
399
400
401
		let mut actions = Vec::new();

		{
			let &mut Inner {
				ref chain,
				ref mut attestation_view,
				..
			} = &mut *inner;

			attestation_view.prune_old_leaves(|hash| match chain.is_known(hash) {
402
403
404
405
				Some(Known::Leaf) => true,
				_ => false,
			});
		}
406

407

408
		// send neighbor packets to peers
409
410
411
412
413
		inner.multicast_neighbor_packet(
			|who, message| actions.push(NewLeafAction::TargetedMessage(who.clone(), message))
		);

		NewLeafActions { actions }
414
415
	}

416
	pub(crate) fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
417
418
419
420
421
422
423
424
425
426
		let topic_stream = if let Some(gossip_engine) = self.gossip_engine.as_ref() {
			gossip_engine.messages_for(topic)
		} else {
			log::error!("Called gossip_messages_for on a test engine");
			futures::channel::mpsc::unbounded().1
		};

		GossipMessageStream::new(topic_stream.boxed())
	}

427
	pub(crate) fn gossip_message(&self, topic: Hash, message: GossipMessage) {
428
429
430
431
432
433
434
435
436
437
438
		if let Some(gossip_engine) = self.gossip_engine.as_ref() {
			gossip_engine.gossip_message(
				topic,
				message.encode(),
				false,
			);
		} else {
			log::error!("Called gossip_message on a test engine");
		}
	}

439
	pub(crate) fn send_message(&self, who: PeerId, message: GossipMessage) {
440
441
442
443
444
445
		if let Some(gossip_engine) = self.gossip_engine.as_ref() {
			gossip_engine.send_message(vec![who], message.encode());
		} else {
			log::error!("Called send_message on a test engine");
		}
	}
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
}

impl<S: NetworkSpecialization<Block>> GossipService for RegisteredMessageValidator<S> {
	fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
		RegisteredMessageValidator::gossip_messages_for(self, topic)
	}

	fn gossip_message(&self, topic: Hash, message: GossipMessage) {
		RegisteredMessageValidator::gossip_message(self, topic, message)
	}

	fn send_message(&self, who: PeerId, message: GossipMessage) {
		RegisteredMessageValidator::send_message(self, who, message)
	}
}
461

462
impl NetworkService for RegisteredMessageValidator<crate::legacy::PolkadotProtocol> {
463
464
465
466
467
468
469
470
471
472
473
	fn with_spec<F: Send + 'static>(&self, with: F)
		where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context<Block>)
	{
		if let Some(service) = self.service.as_ref() {
			service.with_spec(with)
		} else {
			log::error!("Called with_spec on a test engine");
		}
	}
}

474
/// The data needed for validating gossip messages.
475
#[derive(Default)]
476
pub(crate) struct MessageValidationData {
477
	/// The authorities' parachain validation keys at a block.
478
	pub(crate) authorities: Vec<ValidatorId>,
479
480
481
}

impl MessageValidationData {
482
483
484
	// check a statement's signature.
	fn check_statement(&self, relay_chain_leaf: &Hash, statement: &SignedStatement) -> Result<(), ()> {
		let sender = match self.authorities.get(statement.sender as usize) {
485
			Some(val) => val,
486
			None => return Err(()),
487
488
		};

489
		let good = self.authorities.contains(&sender) &&
490
491
492
			::polkadot_validation::check_statement(
				&statement.statement,
				&statement.signature,
493
				sender.clone(),
494
				relay_chain_leaf,
495
496
497
498
499
500
501
502
503
504
505
506
			);

		if good {
			Ok(())
		} else {
			Err(())
		}
	}
}

#[derive(Default)]
struct PeerData {
507
	attestation: AttestationPeerData,
508
509
}

510
struct Inner<C: ?Sized> {
511
	peers: HashMap<PeerId, PeerData>,
512
	attestation_view: AttestationView,
513
	availability_store: Option<av_store::Store>,
514
	chain: C,
515
516
}

517
impl<C: ?Sized + ChainContext> Inner<C> {
518
	fn validate_neighbor_packet(&mut self, sender: &PeerId, packet: NeighborPacket)
519
		-> (GossipValidationResult<Hash>, ReputationChange, Vec<Hash>)
520
521
522
523
524
	{
		let chain_heads = packet.chain_heads;
		if chain_heads.len() > MAX_CHAIN_HEADS {
			(GossipValidationResult::Discard, cost::BAD_NEIGHBOR_PACKET, Vec::new())
		} else {
525
526
527
528
529
			let chain_heads: LeavesVec = chain_heads.into_iter().collect();
			let new_topics = if let Some(ref mut peer) = self.peers.get_mut(sender) {
				let new_leaves = peer.attestation.update_leaves(&chain_heads);
				let new_attestation_topics = new_leaves.iter().cloned().map(attestation_topic);

Ashley's avatar
Ashley committed
530
				new_attestation_topics.collect()
531
532
533
534
			} else {
				Vec::new()
			};

535
			(GossipValidationResult::Discard, cost::NONE, new_topics)
536
537
538
		}
	}

539
540
541
542
543
544
	fn validate_erasure_chunk_packet(&mut self, msg: ErasureChunkMessage)
		-> (GossipValidationResult<Hash>, ReputationChange)
	{
		if let Some(store) = &self.availability_store {
			if let Some(receipt) = store.get_candidate(&msg.candidate_hash) {
				let chunk_hash = erasure::branch_hash(
545
					&receipt.commitments.erasure_root,
546
547
548
549
550
551
552
553
554
555
556
					&msg.chunk.proof,
					msg.chunk.index as usize
				);

				if chunk_hash != Ok(BlakeTwo256::hash(&msg.chunk.chunk)) {
					(
						GossipValidationResult::Discard,
						cost::ERASURE_CHUNK_WRONG_ROOT
					)
				} else {
					if let Some(awaited_chunks) = store.awaited_chunks() {
557
558
559
560
561
562
						let frontier_entry = av_store::AwaitedFrontierEntry {
							relay_parent: msg.relay_parent,
							erasure_root: receipt.commitments.erasure_root,
							validator_index: msg.chunk.index,
						};
						if awaited_chunks.contains(&frontier_entry) {
563
564
							let topic = av_store::erasure_coding_topic(
								msg.relay_parent,
565
								receipt.commitments.erasure_root,
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
								msg.chunk.index,
							);

							return (
								GossipValidationResult::ProcessAndKeep(topic),
								benefit::NEW_ERASURE_CHUNK,
							);
						}
					}
					(GossipValidationResult::Discard, cost::NONE)
				}
			} else {
				(GossipValidationResult::Discard, cost::ORPHANED_ERASURE_CHUNK)
			}
		} else {
			(GossipValidationResult::Discard, cost::NONE)
		}
	}

585
	fn multicast_neighbor_packet<F: FnMut(&PeerId, GossipMessage)>(
586
587
588
		&self,
		mut send_neighbor_packet: F,
	) {
589
590
		let neighbor_packet = GossipMessage::from(NeighborPacket {
			chain_heads: self.attestation_view.neighbor_info().collect(),
591
		});
592
593

		for peer in self.peers.keys() {
594
			send_neighbor_packet(peer, neighbor_packet.clone())
595
		}
596
597
598
599
	}
}

/// An unregistered message validator. Register this with `register_validator`.
600
pub struct MessageValidator<C: ?Sized> {
601
	report_handle: Box<dyn Fn(&PeerId, ReputationChange) + Send + Sync>,
602
	inner: RwLock<Inner<C>>,
603
604
}

605
impl<C: ChainContext + ?Sized> MessageValidator<C> {
606
607
	#[cfg(test)]
	fn new_test(
608
		chain: C,
609
		report_handle: Box<dyn Fn(&PeerId, ReputationChange) + Send + Sync>,
610
	) -> Self where C: Sized {
611
612
613
614
		MessageValidator {
			report_handle,
			inner: RwLock::new(Inner {
				peers: HashMap::new(),
615
				attestation_view: Default::default(),
616
				availability_store: None,
617
618
				chain,
			}),
619
620
621
		}
	}

622
	fn report(&self, who: &PeerId, cost_benefit: ReputationChange) {
623
624
		(self.report_handle)(who, cost_benefit)
	}
625
626
}

627
impl<C: ChainContext + ?Sized> sc_network_gossip::Validator<Block> for MessageValidator<C> {
628
	fn new_peer(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId, _roles: Roles) {
629
		let mut inner = self.inner.write();
630
		inner.peers.insert(who.clone(), PeerData::default());
631
632
	}

633
	fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId) {
634
635
636
637
		let mut inner = self.inner.write();
		inner.peers.remove(who);
	}

638
	fn validate(&self, context: &mut dyn ValidatorContext<Block>, sender: &PeerId, data: &[u8])
639
640
		-> GossipValidationResult<Hash>
	{
641
642
		let mut decode_data = data;
		let (res, cost_benefit) = match GossipMessage::decode(&mut decode_data) {
643
644
			Err(_) => (GossipValidationResult::Discard, cost::MALFORMED_MESSAGE),
			Ok(GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))) => {
645
646
647
648
649
650
				let (res, cb, topics) = self.inner.write().validate_neighbor_packet(sender, packet);
				for new_topic in topics {
					context.send_topic(sender, new_topic, false);
				}
				(res, cb)
			}
651
			Ok(GossipMessage::Statement(statement)) => {
652
653
654
655
656
657
658
659
660
661
662
				let (res, cb) = {
					let mut inner = self.inner.write();
					let inner = &mut *inner;
					inner.attestation_view.validate_statement_signature(statement, &inner.chain)
				};

				if let GossipValidationResult::ProcessAndKeep(ref topic) = res {
					context.broadcast_message(topic.clone(), data.to_vec(), false);
				}
				(res, cb)
			}
663
664
665
			Ok(GossipMessage::ErasureChunk(chunk)) => {
				self.inner.write().validate_erasure_chunk_packet(chunk)
			}
666
667
668
669
670
671
		};

		self.report(sender, cost_benefit);
		res
	}

672
	fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Hash, &[u8]) -> bool + 'a> {
673
674
675
		let inner = self.inner.read();

		Box::new(move |topic, _data| {
676
677
			// check that messages from this topic are considered live by one of our protocols.
			// everything else is expired
Ashley's avatar
Ashley committed
678
			let live = inner.attestation_view.is_topic_live(&topic);
679
680

			!live // = expired
681
682
683
		})
	}

684
	fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &Hash, &[u8]) -> bool + 'a> {
685
686
		let mut inner = self.inner.write();
		Box::new(move |who, intent, topic, data| {
687
688
689
690
691
			let &mut Inner {
				ref mut peers,
				ref mut attestation_view,
				..
			} = &mut *inner;
692
693
694
695
696
697

			match intent {
				MessageIntent::PeriodicRebroadcast => return false,
				_ => {},
			}

698
699
			let attestation_head = attestation_view.topic_block(topic).map(|x| x.clone());
			let peer = peers.get_mut(who);
700
701

			match GossipMessage::decode(&mut &data[..]) {
702
703
704
705
706
707
708
709
710
711
712
713
714
715
				Ok(GossipMessage::Statement(ref statement)) => {
					// to allow statements, we need peer knowledge.
					let peer_knowledge = peer.and_then(move |p| attestation_head.map(|r| (p, r)))
						.and_then(|(p, r)| p.attestation.knowledge_at_mut(&r).map(|k| (k, r)));

					peer_knowledge.map_or(false, |(knowledge, attestation_head)| {
						attestation_view.statement_allowed(
							statement,
							&attestation_head,
							knowledge,
						)
					})
				}
				_ => false,
716
			}
717
718
719
720
721
722
723
		})
	}
}

#[cfg(test)]
mod tests {
	use super::*;
724
	use sc_network_gossip::Validator as ValidatorT;
725
726
	use std::sync::mpsc;
	use parking_lot::Mutex;
727
	use polkadot_primitives::parachain::AbridgedCandidateReceipt;
Ashley's avatar
Ashley committed
728
	use sp_core::sr25519::Signature as Sr25519Signature;
729
730
	use polkadot_validation::GenericStatement;

731
	use crate::legacy::tests::TestChainContext;
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751

	#[derive(PartialEq, Clone, Debug)]
	enum ContextEvent {
		BroadcastTopic(Hash, bool),
		BroadcastMessage(Hash, Vec<u8>, bool),
		SendMessage(PeerId, Vec<u8>),
		SendTopic(PeerId, Hash, bool),
	}

	#[derive(Default)]
	struct MockValidatorContext {
		events: Vec<ContextEvent>,
	}

	impl MockValidatorContext {
		fn clear(&mut self) {
			self.events.clear()
		}
	}

752
	impl sc_network_gossip::ValidatorContext<Block> for MockValidatorContext {
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
		fn broadcast_topic(&mut self, topic: Hash, force: bool) {
			self.events.push(ContextEvent::BroadcastTopic(topic, force));
		}
		fn broadcast_message(&mut self, topic: Hash, message: Vec<u8>, force: bool) {
			self.events.push(ContextEvent::BroadcastMessage(topic, message, force));
		}
		fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
			self.events.push(ContextEvent::SendMessage(who.clone(), message));
		}
		fn send_topic(&mut self, who: &PeerId, topic: Hash, force: bool) {
			self.events.push(ContextEvent::SendTopic(who.clone(), topic, force));
		}
	}

	#[test]
	fn message_allowed() {
		let (tx, _rx) = mpsc::channel();
		let tx = Mutex::new(tx);
771
		let report_handle = Box::new(move |peer: &PeerId, cb: ReputationChange| tx.lock().send((peer.clone(), cb)).unwrap());
772
		let validator = MessageValidator::new_test(
773
			TestChainContext::default(),
774
775
776
777
778
779
780
781
782
783
784
785
786
787
			report_handle,
		);

		let peer_a = PeerId::random();

		let mut validator_context = MockValidatorContext::default();
		validator.new_peer(&mut validator_context, &peer_a, Roles::FULL);
		assert!(validator_context.events.is_empty());
		validator_context.clear();

		let hash_a = [1u8; 32].into();
		let hash_b = [2u8; 32].into();
		let hash_c = [3u8; 32].into();

788
789
790
		let message = GossipMessage::from(NeighborPacket {
			chain_heads: vec![hash_a, hash_b],
		}).encode();
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
		let res = validator.validate(
			&mut validator_context,
			&peer_a,
			&message[..],
		);

		match res {
			GossipValidationResult::Discard => {},
			_ => panic!("wrong result"),
		}
		assert_eq!(
			validator_context.events,
			vec![
				ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
				ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_b), false),
			],
		);

		validator_context.clear();

811
		let candidate_receipt = AbridgedCandidateReceipt::default();
812
		let statement = GossipMessage::Statement(GossipStatement {
813
			relay_chain_leaf: hash_a,
814
815
			signed_statement: SignedStatement {
				statement: GenericStatement::Candidate(candidate_receipt),
816
				signature: Sr25519Signature([255u8; 64]).into(),
817
				sender: 1,
818
			}
819
820
821
822
823
824
825
826
		});
		let encoded = statement.encode();

		let topic_a = attestation_topic(hash_a);
		let topic_b = attestation_topic(hash_b);
		let topic_c = attestation_topic(hash_c);

		// topic_a is in all 3 views -> succeed
827
		validator.inner.write().attestation_view.new_local_leaf(hash_a, MessageValidationData::default());
828
829
830
831
832
		// topic_b is in the neighbor's view but not ours -> fail
		// topic_c is not in either -> fail

		{
			let mut message_allowed = validator.message_allowed();
Gavin Wood's avatar
Gavin Wood committed
833
834
835
836
			let intent = MessageIntent::Broadcast;
			assert!(message_allowed(&peer_a, intent, &topic_a, &encoded));
			assert!(!message_allowed(&peer_a, intent, &topic_b, &encoded));
			assert!(!message_allowed(&peer_a, intent, &topic_c, &encoded));
837
838
839
840
841
842
843
		}
	}

	#[test]
	fn too_many_chain_heads_is_report() {
		let (tx, rx) = mpsc::channel();
		let tx = Mutex::new(tx);
844
		let report_handle = Box::new(move |peer: &PeerId, cb: ReputationChange| tx.lock().send((peer.clone(), cb)).unwrap());
845
		let validator = MessageValidator::new_test(
846
			TestChainContext::default(),
847
848
849
850
851
852
853
854
855
856
857
858
			report_handle,
		);

		let peer_a = PeerId::random();

		let mut validator_context = MockValidatorContext::default();
		validator.new_peer(&mut validator_context, &peer_a, Roles::FULL);
		assert!(validator_context.events.is_empty());
		validator_context.clear();

		let chain_heads = (0..MAX_CHAIN_HEADS+1).map(|i| [i as u8; 32].into()).collect();

859
860
861
		let message = GossipMessage::from(NeighborPacket {
			chain_heads,
		}).encode();
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
		let res = validator.validate(
			&mut validator_context,
			&peer_a,
			&message[..],
		);

		match res {
			GossipValidationResult::Discard => {},
			_ => panic!("wrong result"),
		}
		assert_eq!(
			validator_context.events,
			Vec::new(),
		);

		drop(validator);

		assert_eq!(rx.iter().collect::<Vec<_>>(), vec![(peer_a, cost::BAD_NEIGHBOR_PACKET)]);
	}

	#[test]
	fn statement_only_sent_when_candidate_known() {
		let (tx, _rx) = mpsc::channel();
		let tx = Mutex::new(tx);
886
		let report_handle = Box::new(move |peer: &PeerId, cb: ReputationChange| tx.lock().send((peer.clone(), cb)).unwrap());
887
		let validator = MessageValidator::new_test(
888
			TestChainContext::default(),
889
890
891
892
893
894
895
896
897
898
899
900
901
			report_handle,
		);

		let peer_a = PeerId::random();

		let mut validator_context = MockValidatorContext::default();
		validator.new_peer(&mut validator_context, &peer_a, Roles::FULL);
		assert!(validator_context.events.is_empty());
		validator_context.clear();

		let hash_a = [1u8; 32].into();
		let hash_b = [2u8; 32].into();

902
903
904
905
		let message = GossipMessage::from(NeighborPacket {
			chain_heads: vec![hash_a, hash_b],
		}).encode();

906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
		let res = validator.validate(
			&mut validator_context,
			&peer_a,
			&message[..],
		);

		match res {
			GossipValidationResult::Discard => {},
			_ => panic!("wrong result"),
		}
		assert_eq!(
			validator_context.events,
			vec![
				ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
				ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_b), false),
			],
		);

		validator_context.clear();

		let topic_a = attestation_topic(hash_a);
		let c_hash = [99u8; 32].into();

		let statement = GossipMessage::Statement(GossipStatement {
930
			relay_chain_leaf: hash_a,
931
932
			signed_statement: SignedStatement {
				statement: GenericStatement::Valid(c_hash),
933
				signature: Sr25519Signature([255u8; 64]).into(),
934
935
936
937
				sender: 1,
			}
		});
		let encoded = statement.encode();
938
		validator.inner.write().attestation_view.new_local_leaf(hash_a, MessageValidationData::default());
939
940
941

		{
			let mut message_allowed = validator.message_allowed();
Gavin Wood's avatar
Gavin Wood committed
942
			assert!(!message_allowed(&peer_a, MessageIntent::Broadcast, &topic_a, &encoded[..]));
943
944
945
946
947
948
949
950
		}

		validator
			.inner
			.write()
			.peers
			.get_mut(&peer_a)
			.unwrap()
951
952
			.attestation
			.note_aware_under_leaf(&hash_a, c_hash);
953
954
		{
			let mut message_allowed = validator.message_allowed();
Gavin Wood's avatar
Gavin Wood committed
955
			assert!(message_allowed(&peer_a, MessageIntent::Broadcast, &topic_a, &encoded[..]));
956
957
		}
	}
958
959
960
961
962

	#[test]
	fn multicasts_icmp_queues_when_building_on_new_leaf() {
		let (tx, _rx) = mpsc::channel();
		let tx = Mutex::new(tx);
963
		let report_handle = Box::new(move |peer: &PeerId, cb: ReputationChange| tx.lock().send((peer.clone(), cb)).unwrap());
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008

		let hash_a = [1u8; 32].into();
		let root_a = [11u8; 32].into();

		let chain = {
			let mut chain = TestChainContext::default();
			chain.known_map.insert(hash_a, Known::Leaf);
			chain.ingress_roots.insert(hash_a, vec![root_a]);
			chain
		};

		let validator = RegisteredMessageValidator::new_test(chain, report_handle);

		let peer_a = PeerId::random();
		let peer_b = PeerId::random();

		let mut validator_context = MockValidatorContext::default();
		validator.inner.new_peer(&mut validator_context, &peer_a, Roles::FULL);
		validator.inner.new_peer(&mut validator_context, &peer_b, Roles::FULL);
		assert!(validator_context.events.is_empty());
		validator_context.clear();


		{
			let message = GossipMessage::from(NeighborPacket {
				chain_heads: vec![hash_a],
			}).encode();
			let res = validator.inner.validate(
				&mut validator_context,
				&peer_a,
				&message[..],
			);

			match res {
				GossipValidationResult::Discard => {},
				_ => panic!("wrong result"),
			}
			assert_eq!(
				validator_context.events,
				vec![
					ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
				],
			);
		}
	}
1009
}