mod.rs 37 KB
Newer Older
Shawn Tabrizi's avatar
Shawn Tabrizi committed
1
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//! Gossip messages and the message validator.
//!
//! At the moment, this module houses 2 gossip protocols central to Polkadot.
//!
//! The first is the attestation-gossip system, which aims to circulate parachain
//! candidate attestations by validators at leaves of the block-DAG.
//!
//! The second is the inter-chain message queue routing gossip, which aims to
//! circulate message queues between parachains, which remain un-routed as of
//! recent leaves.
//!
//! These gossip systems do not have any form of sybil-resistance in terms
//! of the nodes which can participate. It could be imposed e.g. by limiting only to
//! validators, but this would prevent message queues from getting into the hands
//! of collators and of attestations from getting into the hands of fishermen.
//! As such, we take certain precautions which allow arbitrary full nodes to
//! join the gossip graph, as well as validators (who are likely to be well-connected
//! amongst themselves).
//!
//! The first is the notion of a neighbor packet. This is a packet sent between
//! neighbors of the gossip graph to inform each other of their current protocol
//! state. As of this writing, for both attestation and message-routing gossip,
//! the only necessary information here is a (length-limited) set of perceived
//! leaves of the block-DAG.
//!
//! These leaves can be used to derive what information a node is willing to accept
//! There is typically an unbounded amount of possible "future" information relative to
//! any protocol state. For example, attestations or unrouted message queues from millions
//! of blocks after a known protocol state. The neighbor packet is meant to avoid being
//! spammed by illegitimate future information, while informing neighbors of when
//! previously-future and now current gossip messages would be accepted.
//!
//! Peers who send information which was not allowed under a recent neighbor packet
//! will be noted as non-beneficial to Substrate's peer-set management utility.

Ashley's avatar
Ashley committed
52
use sp_runtime::traits::{BlakeTwo256, Hash as HashT};
Gavin Wood's avatar
Gavin Wood committed
53
use sp_blockchain::Error as ClientError;
54
use sc_network::{ObservedRole, PeerId, ReputationChange};
55
use sc_network::NetworkService;
56
57
58
use sc_network_gossip::{
	ValidationResult as GossipValidationResult,
	ValidatorContext, MessageIntent,
59
};
60
use polkadot_validation::{SignedStatement};
61
use polkadot_primitives::{Block, Hash};
62
use polkadot_primitives::parachain::{
asynchronous rob's avatar
asynchronous rob committed
63
	ParachainHost, ValidatorId, ErasureChunk as PrimitiveChunk, SigningContext, PoVBlock,
64
65
};
use polkadot_erasure_coding::{self as erasure};
66
use codec::{Decode, Encode};
67
use sp_api::ProvideRuntimeApi;
68

69
use std::collections::HashMap;
70
71
use std::sync::Arc;

72
use arrayvec::ArrayVec;
73
use futures::prelude::*;
Gavin Wood's avatar
Gavin Wood committed
74
use parking_lot::{Mutex, RwLock};
75

76
use crate::legacy::{GossipMessageStream, GossipService};
77

78
79
80
81
use attestation::{View as AttestationView, PeerData as AttestationPeerData};

mod attestation;

82
/// The engine ID of the polkadot attestation system.
83
pub const POLKADOT_ENGINE_ID: sp_runtime::ConsensusEngineId = *b"dot1";
84
pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/legacy/1";
85

86
// arbitrary; in practice this should not be more than 2.
87
88
89
90
pub(crate) const MAX_CHAIN_HEADS: usize = 5;

/// Type alias for a bounded vector of leaves.
pub type LeavesVec = ArrayVec<[Hash; MAX_CHAIN_HEADS]>;
91
92

mod benefit {
93
	use sc_network::ReputationChange as Rep;
94
	/// When a peer sends us a previously-unknown candidate statement.
95
	pub const NEW_CANDIDATE: Rep = Rep::new(100, "Polkadot: New candidate");
96
	/// When a peer sends us a previously-unknown attestation.
97
	pub const NEW_ATTESTATION: Rep = Rep::new(50, "Polkadot: New attestation");
asynchronous rob's avatar
asynchronous rob committed
98
99
	/// When a peer sends us a previously-unknown pov-block
	pub const NEW_POV_BLOCK: Rep = Rep::new(150, "Polkadot: New PoV block");
100
101
	/// When a peer sends us a previously-unknown erasure chunk.
	pub const NEW_ERASURE_CHUNK: Rep = Rep::new(10, "Polkadot: New erasure chunk");
102
103
104
}

mod cost {
105
106
107
	use sc_network::ReputationChange as Rep;
	/// No cost. This will not be reported.
	pub const NONE: Rep = Rep::new(0, "");
108
	/// A peer sent us an attestation and we don't know the candidate.
109
	pub const ATTESTATION_NO_CANDIDATE: Rep = Rep::new(-100, "Polkadot: No candidate");
asynchronous rob's avatar
asynchronous rob committed
110
111
112
113
	/// A peer sent us a pov-block and we don't know the candidate or the leaf.
	pub const POV_BLOCK_UNWANTED: Rep = Rep::new(-500, "Polkadot: No candidate");
	/// A peer sent us a pov-block message with wrong data.
	pub const POV_BLOCK_BAD_DATA: Rep = Rep::new(-1000, "Polkadot: Bad PoV-block data");
114
	/// A peer sent us a statement we consider in the future.
115
	pub const FUTURE_MESSAGE: Rep = Rep::new(-100, "Polkadot: Future message");
116
	/// A peer sent us a statement from the past.
117
	pub const PAST_MESSAGE: Rep = Rep::new(-30, "Polkadot: Past message");
118
	/// A peer sent us a malformed message.
119
	pub const MALFORMED_MESSAGE: Rep = Rep::new(-500, "Polkadot: Malformed message");
120
	/// A peer sent us a wrongly signed message.
121
	pub const BAD_SIGNATURE: Rep = Rep::new(-500, "Polkadot: Bad signature");
122
	/// A peer sent us a bad neighbor packet.
123
	pub const BAD_NEIGHBOR_PACKET: Rep = Rep::new(-300, "Polkadot: Bad neighbor");
124
125
126
127
	/// A peer sent us an erasure chunk referring to a candidate that we are not aware of.
	pub const ORPHANED_ERASURE_CHUNK: Rep = Rep::new(-10, "An erasure chunk from unknown candidate");
	/// A peer sent us an erasure chunk that does not match candidate's erasure root.
	pub const ERASURE_CHUNK_WRONG_ROOT: Rep = Rep::new(-100, "Chunk doesn't match encoding root");
128
129
}

130
/// A gossip message.
131
#[derive(Encode, Decode, Clone, PartialEq)]
132
pub enum GossipMessage {
133
134
135
136
137
138
139
140
	/// A packet sent to a neighbor but not relayed.
	#[codec(index = "1")]
	Neighbor(VersionedNeighborPacket),
	/// An attestation-statement about the candidate.
	/// Non-candidate statements should only be sent to peers who are aware of the candidate.
	#[codec(index = "2")]
	Statement(GossipStatement),
	// TODO: https://github.com/paritytech/polkadot/issues/253
141
	/// A packet containing one of the erasure-coding chunks of one candidate.
Ashley's avatar
Ashley committed
142
	#[codec(index = "3")]
143
	ErasureChunk(ErasureChunkMessage),
asynchronous rob's avatar
asynchronous rob committed
144
145
146
	/// A PoV-block.
	#[codec(index = "255")]
	PoVBlock(GossipPoVBlock),
147
148
}

149
150
151
152
153
154
impl From<NeighborPacket> for GossipMessage {
	fn from(packet: NeighborPacket) -> Self {
		GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))
	}
}

155
156
157
158
159
160
impl From<GossipStatement> for GossipMessage {
	fn from(stmt: GossipStatement) -> Self {
		GossipMessage::Statement(stmt)
	}
}

asynchronous rob's avatar
asynchronous rob committed
161
162
163
164
165
166
impl From<GossipPoVBlock> for GossipMessage {
	fn from(pov: GossipPoVBlock) -> Self {
		GossipMessage::PoVBlock(pov)
	}
}

167
/// A gossip message containing a statement.
168
#[derive(Encode, Decode, Clone, PartialEq)]
169
pub struct GossipStatement {
170
171
172
	/// The block hash of the relay chain being referred to. In context, this should
	/// be a leaf.
	pub relay_chain_leaf: Hash,
173
	/// The signed statement being gossipped.
174
175
176
177
178
	pub signed_statement: SignedStatement,
}

impl GossipStatement {
	/// Create a new instance.
179
	pub fn new(relay_chain_leaf: Hash, signed_statement: SignedStatement) -> Self {
180
		Self {
181
			relay_chain_leaf,
182
183
184
			signed_statement,
		}
	}
185
186
}

187
188
/// A gossip message containing one erasure chunk of a candidate block.
/// For each chunk of block erasure encoding one of this messages is constructed.
189
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
190
191
192
193
194
195
196
197
198
199
200
201
202
pub struct ErasureChunkMessage {
	/// The chunk itself.
	pub chunk: PrimitiveChunk,
	/// The hash of the candidate receipt of the block this chunk belongs to.
	pub candidate_hash: Hash,
}

impl From<ErasureChunkMessage> for GossipMessage {
	fn from(chk: ErasureChunkMessage) -> Self {
		GossipMessage::ErasureChunk(chk)
	}
}

asynchronous rob's avatar
asynchronous rob committed
203
204
205
206
207
208
209
210
211
212
213
214
/// A pov-block being gossipped. Should only be sent to peers aware of the candidate
/// referenced.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct GossipPoVBlock {
	/// The block hash of the relay chain being referred to. In context, this should
	/// be a leaf.
	pub relay_chain_leaf: Hash,
	/// The hash of some candidate localized to the same relay-chain leaf, whose
	/// pov-block is this block.
	pub candidate_hash: Hash,
	/// The pov-block itself.
	pub pov_block: PoVBlock,
215
216
}

217
/// A versioned neighbor message.
218
#[derive(Encode, Decode, Clone, PartialEq)]
219
220
221
222
223
224
225
pub enum VersionedNeighborPacket {
	#[codec(index = "1")]
	V1(NeighborPacket),
}

/// Contains information on which chain heads the peer is
/// accepting messages for.
226
#[derive(Encode, Decode, Clone, PartialEq)]
227
228
pub struct NeighborPacket {
	chain_heads: Vec<Hash>,
229
230
231
}

/// whether a block is known.
232
#[derive(Clone, Copy, PartialEq)]
233
234
235
236
237
238
239
240
241
pub enum Known {
	/// The block is a known leaf.
	Leaf,
	/// The block is known to be old.
	Old,
	/// The block is known to be bad.
	Bad,
}

242
243
244
245
246
247
248
249
250
/// Context to the underlying polkadot chain.
pub trait ChainContext: Send + Sync {
	/// Provide a closure which is invoked for every unrouted queue hash at a given leaf.
	fn leaf_unrouted_roots(
		&self,
		leaf: &Hash,
		with_queue_root: &mut dyn FnMut(&Hash),
	) -> Result<(), ClientError>;

251
252
253
254
	/// whether a block is known. If it's not, returns `None`.
	fn is_known(&self, block_hash: &Hash) -> Option<Known>;
}

255
256
257
impl<F, P> ChainContext for (F, P) where
	F: Fn(&Hash) -> Option<Known> + Send + Sync,
	P: Send + Sync + std::ops::Deref,
258
259
	P::Target: ProvideRuntimeApi<Block>,
	<P::Target as ProvideRuntimeApi<Block>>::Api: ParachainHost<Block, Error = ClientError>,
260
{
261
	fn is_known(&self, block_hash: &Hash) -> Option<Known> {
262
263
264
265
266
		(self.0)(block_hash)
	}

	fn leaf_unrouted_roots(
		&self,
Ashley's avatar
Ashley committed
267
268
		_leaf: &Hash,
		_with_queue_root: &mut dyn FnMut(&Hash),
269
270
	) -> Result<(), ClientError> {
		Ok(())
271
272
273
	}
}

274
275
276
277
278
279
280
281
282

/// Compute the gossip topic for attestations on the given parent hash.
pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
	let mut v = parent_hash.as_ref().to_vec();
	v.extend(b"attestations");

	BlakeTwo256::hash(&v[..])
}

asynchronous rob's avatar
asynchronous rob committed
283
284
285
286
287
288
289
290
/// Compute the gossip topic for PoV blocks based on the given parent hash.
pub(crate) fn pov_block_topic(parent_hash: Hash) -> Hash {
	let mut v = parent_hash.as_ref().to_vec();
	v.extend(b"pov-blocks");

	BlakeTwo256::hash(&v[..])
}

291
292
293
294
/// Register a gossip validator on the network service.
// NOTE: since RegisteredMessageValidator is meant to be a type-safe proof
// that we've actually done the registration, this should be the only way
// to construct it outside of tests.
295
296
pub fn register_validator<C: ChainContext + 'static>(
	service: Arc<NetworkService<Block, Hash>>,
297
	chain: C,
298
	executor: &impl futures::task::Spawn,
299
) -> RegisteredMessageValidator
300
{
301
	let s = service.clone();
302
303
304
305
	let report_handle = Box::new(move |peer: &PeerId, cost_benefit: ReputationChange| {
		if cost_benefit.value != 0 {
			s.report_peer(peer.clone(), cost_benefit);
		}
306
	});
307
	let validator = Arc::new(MessageValidator {
308
309
310
		report_handle,
		inner: RwLock::new(Inner {
			peers: HashMap::new(),
311
			attestation_view: Default::default(),
312
			availability_store: None,
313
			chain,
314
		})
315
316
317
	});

	let gossip_side = validator.clone();
Gavin Wood's avatar
Gavin Wood committed
318
	let gossip_engine = Arc::new(Mutex::new(sc_network_gossip::GossipEngine::new(
319
320
		service.clone(),
		POLKADOT_ENGINE_ID,
321
		POLKADOT_PROTOCOL_NAME,
322
		gossip_side,
Gavin Wood's avatar
Gavin Wood committed
323
	)));
324

Gavin Wood's avatar
Gavin Wood committed
325
326
	// Spawn gossip engine.
	//
327
328
	// Ideally this would not be spawned as an orphaned task, but polled by
	// `RegisteredMessageValidator` which in turn would be polled by a `ValidationNetwork`.
Gavin Wood's avatar
Gavin Wood committed
329
330
331
332
333
334
335
336
337
338
339
	{
		let gossip_engine = gossip_engine.clone();
		let fut = futures::future::poll_fn(move |cx| {
			gossip_engine.lock().poll_unpin(cx)
		});
		let spawn_res = executor.spawn_obj(futures::task::FutureObj::from(Box::new(fut)));

		// Note: we consider the chances of an error to spawn a background task almost null.
		if spawn_res.is_err() {
			log::error!(target: "polkadot-gossip", "Failed to spawn background task");
		}
340
341
	}

342
343
344
345
346
	RegisteredMessageValidator {
		inner: validator as _,
		service: Some(service),
		gossip_engine: Some(gossip_engine),
	}
347
348
}

349
350
351
#[derive(PartialEq)]
enum NewLeafAction {
	// (who, message)
352
	TargetedMessage(PeerId, GossipMessage),
353
354
355
356
357
358
359
360
361
362
363
}

/// Actions to take after noting a new block-DAG leaf.
///
/// This should be consumed by passing a consensus-gossip handle to `perform`.
#[must_use = "New chain-head gossip actions must be performed"]
pub struct NewLeafActions {
	actions: Vec<NewLeafAction>,
}

impl NewLeafActions {
364
365
366
367
368
	#[cfg(test)]
	pub fn new() -> Self {
		NewLeafActions { actions: Vec::new() }
	}

369
370
371
	/// Perform the queued actions, feeding into gossip.
	pub fn perform(
		self,
372
		gossip: &dyn crate::legacy::GossipService,
373
374
375
376
	) {
		for action in self.actions {
			match action {
				NewLeafAction::TargetedMessage(who, message)
377
					=> gossip.send_message(who, message),
378
379
380
381
382
			}
		}
	}
}

383
384
385
/// A registered message validator.
///
/// Create this using `register_validator`.
386
387
#[derive(Clone)]
pub struct RegisteredMessageValidator {
388
	inner: Arc<MessageValidator<dyn ChainContext>>,
389
	// Note: this is always `Some` in real code and `None` in tests.
390
	service: Option<Arc<NetworkService<Block, Hash>>>,
391
	// Note: this is always `Some` in real code and `None` in tests.
Gavin Wood's avatar
Gavin Wood committed
392
	gossip_engine: Option<Arc<Mutex<sc_network_gossip::GossipEngine<Block>>>>,
393
394
}

395
impl RegisteredMessageValidator {
396
397
	/// Register an availabilty store the gossip service can query.
	pub(crate) fn register_availability_store(&self, availability_store: av_store::Store) {
398
399
400
		self.inner.inner.write().availability_store = Some(availability_store);
	}

401
402
403
404
	/// Note that we perceive a new leaf of the block-DAG. We will notify our neighbors that
	/// we now accept parachain candidate attestations and incoming message queues
	/// relevant to this leaf.
	pub(crate) fn new_local_leaf(
405
406
		&self,
		validation: MessageValidationData,
407
408
409
	) -> NewLeafActions {
		// add an entry in attestation_view
		// prune any entries from attestation_view which are no longer leaves
410
		let mut inner = self.inner.inner.write();
411
		inner.attestation_view.new_local_leaf(validation);
412

413
414
415
416
417
418
419
420
421
422
		let mut actions = Vec::new();

		{
			let &mut Inner {
				ref chain,
				ref mut attestation_view,
				..
			} = &mut *inner;

			attestation_view.prune_old_leaves(|hash| match chain.is_known(hash) {
423
424
425
426
				Some(Known::Leaf) => true,
				_ => false,
			});
		}
427

428

429
		// send neighbor packets to peers
430
431
432
433
434
		inner.multicast_neighbor_packet(
			|who, message| actions.push(NewLeafAction::TargetedMessage(who.clone(), message))
		);

		NewLeafActions { actions }
435
436
	}

437
	pub(crate) fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
438
		let topic_stream = if let Some(gossip_engine) = self.gossip_engine.as_ref() {
Gavin Wood's avatar
Gavin Wood committed
439
			gossip_engine.lock().messages_for(topic)
440
441
442
443
444
445
446
447
		} else {
			log::error!("Called gossip_messages_for on a test engine");
			futures::channel::mpsc::unbounded().1
		};

		GossipMessageStream::new(topic_stream.boxed())
	}

448
	pub(crate) fn gossip_message(&self, topic: Hash, message: GossipMessage) {
449
		if let Some(gossip_engine) = self.gossip_engine.as_ref() {
Gavin Wood's avatar
Gavin Wood committed
450
			gossip_engine.lock().gossip_message(
451
452
453
454
455
456
457
458
459
				topic,
				message.encode(),
				false,
			);
		} else {
			log::error!("Called gossip_message on a test engine");
		}
	}

460
	pub(crate) fn send_message(&self, who: PeerId, message: GossipMessage) {
461
		if let Some(gossip_engine) = self.gossip_engine.as_ref() {
Gavin Wood's avatar
Gavin Wood committed
462
			gossip_engine.lock().send_message(vec![who], message.encode());
463
464
465
466
		} else {
			log::error!("Called send_message on a test engine");
		}
	}
467
468
}

469
impl GossipService for RegisteredMessageValidator {
470
471
472
473
474
475
476
477
478
479
480
481
	fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
		RegisteredMessageValidator::gossip_messages_for(self, topic)
	}

	fn gossip_message(&self, topic: Hash, message: GossipMessage) {
		RegisteredMessageValidator::gossip_message(self, topic, message)
	}

	fn send_message(&self, who: PeerId, message: GossipMessage) {
		RegisteredMessageValidator::send_message(self, who, message)
	}
}
482

483
/// The data needed for validating gossip messages.
484
#[derive(Default)]
485
pub(crate) struct MessageValidationData {
486
	/// The authorities' parachain validation keys at a block.
487
	pub(crate) authorities: Vec<ValidatorId>,
488
489
	/// The signing context.
	pub(crate) signing_context: SigningContext,
490
491
492
}

impl MessageValidationData {
493
	// check a statement's signature.
494
	fn check_statement(&self, statement: &SignedStatement) -> Result<(), ()> {
495
		let sender = match self.authorities.get(statement.sender as usize) {
496
			Some(val) => val,
497
			None => return Err(()),
498
499
		};

500
		let good = self.authorities.contains(&sender) &&
501
502
503
			::polkadot_validation::check_statement(
				&statement.statement,
				&statement.signature,
504
				sender.clone(),
505
				&self.signing_context,
506
507
508
509
510
511
512
513
514
515
516
517
			);

		if good {
			Ok(())
		} else {
			Err(())
		}
	}
}

#[derive(Default)]
struct PeerData {
518
	attestation: AttestationPeerData,
519
520
}

521
struct Inner<C: ?Sized> {
522
	peers: HashMap<PeerId, PeerData>,
523
	attestation_view: AttestationView,
524
	availability_store: Option<av_store::Store>,
525
	chain: C,
526
527
}

528
impl<C: ?Sized + ChainContext> Inner<C> {
529
	fn validate_neighbor_packet(&mut self, sender: &PeerId, packet: NeighborPacket)
530
		-> (GossipValidationResult<Hash>, ReputationChange, Vec<Hash>)
531
532
533
534
535
	{
		let chain_heads = packet.chain_heads;
		if chain_heads.len() > MAX_CHAIN_HEADS {
			(GossipValidationResult::Discard, cost::BAD_NEIGHBOR_PACKET, Vec::new())
		} else {
536
537
538
539
			let chain_heads: LeavesVec = chain_heads.into_iter().collect();
			let new_topics = if let Some(ref mut peer) = self.peers.get_mut(sender) {
				let new_leaves = peer.attestation.update_leaves(&chain_heads);
				let new_attestation_topics = new_leaves.iter().cloned().map(attestation_topic);
asynchronous rob's avatar
asynchronous rob committed
540
				let new_pov_block_topics = new_leaves.iter().cloned().map(pov_block_topic);
541

asynchronous rob's avatar
asynchronous rob committed
542
				new_attestation_topics.chain(new_pov_block_topics).collect()
543
544
545
546
			} else {
				Vec::new()
			};

547
			(GossipValidationResult::Discard, cost::NONE, new_topics)
548
549
550
		}
	}

551
552
553
554
555
556
	fn validate_erasure_chunk_packet(&mut self, msg: ErasureChunkMessage)
		-> (GossipValidationResult<Hash>, ReputationChange)
	{
		if let Some(store) = &self.availability_store {
			if let Some(receipt) = store.get_candidate(&msg.candidate_hash) {
				let chunk_hash = erasure::branch_hash(
557
					&receipt.commitments.erasure_root,
558
559
560
561
562
563
564
565
566
567
568
					&msg.chunk.proof,
					msg.chunk.index as usize
				);

				if chunk_hash != Ok(BlakeTwo256::hash(&msg.chunk.chunk)) {
					(
						GossipValidationResult::Discard,
						cost::ERASURE_CHUNK_WRONG_ROOT
					)
				} else {
					if let Some(awaited_chunks) = store.awaited_chunks() {
569
						let frontier_entry = av_store::AwaitedFrontierEntry {
570
571
							candidate_hash: msg.candidate_hash,
							relay_parent: receipt.relay_parent,
572
573
574
							validator_index: msg.chunk.index,
						};
						if awaited_chunks.contains(&frontier_entry) {
575
576
							let topic = crate::erasure_coding_topic(
								&msg.candidate_hash
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
							);

							return (
								GossipValidationResult::ProcessAndKeep(topic),
								benefit::NEW_ERASURE_CHUNK,
							);
						}
					}
					(GossipValidationResult::Discard, cost::NONE)
				}
			} else {
				(GossipValidationResult::Discard, cost::ORPHANED_ERASURE_CHUNK)
			}
		} else {
			(GossipValidationResult::Discard, cost::NONE)
		}
	}

595
	fn multicast_neighbor_packet<F: FnMut(&PeerId, GossipMessage)>(
596
597
598
		&self,
		mut send_neighbor_packet: F,
	) {
599
600
		let neighbor_packet = GossipMessage::from(NeighborPacket {
			chain_heads: self.attestation_view.neighbor_info().collect(),
601
		});
602
603

		for peer in self.peers.keys() {
604
			send_neighbor_packet(peer, neighbor_packet.clone())
605
		}
606
607
608
609
	}
}

/// An unregistered message validator. Register this with `register_validator`.
610
pub struct MessageValidator<C: ?Sized> {
611
	report_handle: Box<dyn Fn(&PeerId, ReputationChange) + Send + Sync>,
612
	inner: RwLock<Inner<C>>,
613
614
}

615
impl<C: ChainContext + ?Sized> MessageValidator<C> {
616
617
	#[cfg(test)]
	fn new_test(
618
		chain: C,
619
		report_handle: Box<dyn Fn(&PeerId, ReputationChange) + Send + Sync>,
620
	) -> Self where C: Sized {
621
622
623
624
		MessageValidator {
			report_handle,
			inner: RwLock::new(Inner {
				peers: HashMap::new(),
625
				attestation_view: Default::default(),
626
				availability_store: None,
627
628
				chain,
			}),
629
630
631
		}
	}

632
	fn report(&self, who: &PeerId, cost_benefit: ReputationChange) {
633
634
		(self.report_handle)(who, cost_benefit)
	}
635
636
}

637
impl<C: ChainContext + ?Sized> sc_network_gossip::Validator<Block> for MessageValidator<C> {
638
	fn new_peer(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId, _roles: ObservedRole) {
639
		let mut inner = self.inner.write();
640
		inner.peers.insert(who.clone(), PeerData::default());
641
642
	}

643
	fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId) {
644
645
646
647
		let mut inner = self.inner.write();
		inner.peers.remove(who);
	}

648
	fn validate(&self, context: &mut dyn ValidatorContext<Block>, sender: &PeerId, data: &[u8])
649
650
		-> GossipValidationResult<Hash>
	{
651
652
		let mut decode_data = data;
		let (res, cost_benefit) = match GossipMessage::decode(&mut decode_data) {
653
654
			Err(_) => (GossipValidationResult::Discard, cost::MALFORMED_MESSAGE),
			Ok(GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))) => {
655
656
657
658
659
660
				let (res, cb, topics) = self.inner.write().validate_neighbor_packet(sender, packet);
				for new_topic in topics {
					context.send_topic(sender, new_topic, false);
				}
				(res, cb)
			}
661
			Ok(GossipMessage::Statement(statement)) => {
662
663
664
665
666
667
668
669
670
671
672
				let (res, cb) = {
					let mut inner = self.inner.write();
					let inner = &mut *inner;
					inner.attestation_view.validate_statement_signature(statement, &inner.chain)
				};

				if let GossipValidationResult::ProcessAndKeep(ref topic) = res {
					context.broadcast_message(topic.clone(), data.to_vec(), false);
				}
				(res, cb)
			}
asynchronous rob's avatar
asynchronous rob committed
673
674
675
676
677
678
679
680
681
682
683
684
685
			Ok(GossipMessage::PoVBlock(pov_block)) => {
				let (res, cb) = {
					let mut inner = self.inner.write();
					let inner = &mut *inner;
					inner.attestation_view.validate_pov_block_message(&pov_block, &inner.chain)
				};

				if let GossipValidationResult::ProcessAndKeep(ref topic) = res {
					context.broadcast_message(topic.clone(), data.to_vec(), false);
				}

				(res, cb)
			}
686
687
688
			Ok(GossipMessage::ErasureChunk(chunk)) => {
				self.inner.write().validate_erasure_chunk_packet(chunk)
			}
689
690
691
692
693
694
		};

		self.report(sender, cost_benefit);
		res
	}

695
	fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Hash, &[u8]) -> bool + 'a> {
696
697
698
		let inner = self.inner.read();

		Box::new(move |topic, _data| {
699
700
			// check that messages from this topic are considered live by one of our protocols.
			// everything else is expired
Ashley's avatar
Ashley committed
701
			let live = inner.attestation_view.is_topic_live(&topic);
702
703

			!live // = expired
704
705
706
		})
	}

707
	fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &Hash, &[u8]) -> bool + 'a> {
708
709
		let mut inner = self.inner.write();
		Box::new(move |who, intent, topic, data| {
710
711
712
713
714
			let &mut Inner {
				ref mut peers,
				ref mut attestation_view,
				..
			} = &mut *inner;
715
716
717
718
719
720

			match intent {
				MessageIntent::PeriodicRebroadcast => return false,
				_ => {},
			}

721
722
			let attestation_head = attestation_view.topic_block(topic).map(|x| x.clone());
			let peer = peers.get_mut(who);
723
724

			match GossipMessage::decode(&mut &data[..]) {
725
726
727
728
729
730
				Ok(GossipMessage::Statement(ref statement)) => {
					// to allow statements, we need peer knowledge.
					let peer_knowledge = peer.and_then(move |p| attestation_head.map(|r| (p, r)))
						.and_then(|(p, r)| p.attestation.knowledge_at_mut(&r).map(|k| (k, r)));

					peer_knowledge.map_or(false, |(knowledge, attestation_head)| {
asynchronous rob's avatar
asynchronous rob committed
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
						statement.relay_chain_leaf == attestation_head
							&& attestation_view.statement_allowed(
								statement,
								knowledge,
							)
					})
				}
				Ok(GossipMessage::PoVBlock(ref pov_block)) => {
					// to allow pov-blocks, we need peer knowledge.
					let peer_knowledge = peer.and_then(move |p| attestation_head.map(|r| (p, r)))
						.and_then(|(p, r)| p.attestation.knowledge_at_mut(&r).map(|k| (k, r)));

					peer_knowledge.map_or(false, |(knowledge, attestation_head)| {
						pov_block.relay_chain_leaf == attestation_head
							&& attestation_view.pov_block_allowed(
								pov_block,
								knowledge,
							)
749
750
751
					})
				}
				_ => false,
752
			}
753
754
755
756
757
758
759
		})
	}
}

#[cfg(test)]
mod tests {
	use super::*;
760
	use sc_network_gossip::Validator as ValidatorT;
761
762
	use std::sync::mpsc;
	use parking_lot::Mutex;
asynchronous rob's avatar
asynchronous rob committed
763
	use polkadot_primitives::parachain::{AbridgedCandidateReceipt, BlockData};
Ashley's avatar
Ashley committed
764
	use sp_core::sr25519::Signature as Sr25519Signature;
765
766
	use polkadot_validation::GenericStatement;

767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
	#[derive(PartialEq, Clone, Debug)]
	enum ContextEvent {
		BroadcastTopic(Hash, bool),
		BroadcastMessage(Hash, Vec<u8>, bool),
		SendMessage(PeerId, Vec<u8>),
		SendTopic(PeerId, Hash, bool),
	}

	#[derive(Default)]
	struct MockValidatorContext {
		events: Vec<ContextEvent>,
	}

	impl MockValidatorContext {
		fn clear(&mut self) {
			self.events.clear()
		}
	}

786
	impl sc_network_gossip::ValidatorContext<Block> for MockValidatorContext {
787
788
789
790
791
792
793
794
795
796
797
798
799
800
		fn broadcast_topic(&mut self, topic: Hash, force: bool) {
			self.events.push(ContextEvent::BroadcastTopic(topic, force));
		}
		fn broadcast_message(&mut self, topic: Hash, message: Vec<u8>, force: bool) {
			self.events.push(ContextEvent::BroadcastMessage(topic, message, force));
		}
		fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
			self.events.push(ContextEvent::SendMessage(who.clone(), message));
		}
		fn send_topic(&mut self, who: &PeerId, topic: Hash, force: bool) {
			self.events.push(ContextEvent::SendTopic(who.clone(), topic, force));
		}
	}

801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
	#[derive(Default)]
	struct TestChainContext {
		known_map: HashMap<Hash, Known>,
		ingress_roots: HashMap<Hash, Vec<Hash>>,
	}

	impl ChainContext for TestChainContext {
		fn is_known(&self, block_hash: &Hash) -> Option<Known> {
			self.known_map.get(block_hash).map(|x| x.clone())
		}

		fn leaf_unrouted_roots(&self, leaf: &Hash, with_queue_root: &mut dyn FnMut(&Hash))
			-> Result<(), sp_blockchain::Error>
		{
			for root in self.ingress_roots.get(leaf).into_iter().flat_map(|roots| roots) {
				with_queue_root(root)
			}

			Ok(())
		}
	}

823
	#[test]
asynchronous rob's avatar
asynchronous rob committed
824
	fn attestation_message_allowed() {
825
826
		let (tx, _rx) = mpsc::channel();
		let tx = Mutex::new(tx);
827
		let report_handle = Box::new(move |peer: &PeerId, cb: ReputationChange| tx.lock().send((peer.clone(), cb)).unwrap());
828
		let validator = MessageValidator::new_test(
829
			TestChainContext::default(),
830
831
832
833
834
835
			report_handle,
		);

		let peer_a = PeerId::random();

		let mut validator_context = MockValidatorContext::default();
836
		validator.new_peer(&mut validator_context, &peer_a, ObservedRole::Full);
837
838
839
840
841
842
843
		assert!(validator_context.events.is_empty());
		validator_context.clear();

		let hash_a = [1u8; 32].into();
		let hash_b = [2u8; 32].into();
		let hash_c = [3u8; 32].into();

844
845
846
		let message = GossipMessage::from(NeighborPacket {
			chain_heads: vec![hash_a, hash_b],
		}).encode();
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
		let res = validator.validate(
			&mut validator_context,
			&peer_a,
			&message[..],
		);

		match res {
			GossipValidationResult::Discard => {},
			_ => panic!("wrong result"),
		}
		assert_eq!(
			validator_context.events,
			vec![
				ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
				ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_b), false),
asynchronous rob's avatar
asynchronous rob committed
862
863
864

				ContextEvent::SendTopic(peer_a.clone(), pov_block_topic(hash_a), false),
				ContextEvent::SendTopic(peer_a.clone(), pov_block_topic(hash_b), false),
865
866
867
868
869
			],
		);

		validator_context.clear();

870
		let candidate_receipt = AbridgedCandidateReceipt::default();
871
		let statement = GossipMessage::Statement(GossipStatement {
872
			relay_chain_leaf: hash_a,
873
874
			signed_statement: SignedStatement {
				statement: GenericStatement::Candidate(candidate_receipt),
875
				signature: Sr25519Signature([255u8; 64]).into(),
876
				sender: 1,
877
			}
878
879
880
881
882
883
884
885
		});
		let encoded = statement.encode();

		let topic_a = attestation_topic(hash_a);
		let topic_b = attestation_topic(hash_b);
		let topic_c = attestation_topic(hash_c);

		// topic_a is in all 3 views -> succeed
886
887
888
		let mut validation_data = MessageValidationData::default();
		validation_data.signing_context.parent_hash = hash_a;
		validator.inner.write().attestation_view.new_local_leaf(validation_data);
889
890
891
892
893
		// topic_b is in the neighbor's view but not ours -> fail
		// topic_c is not in either -> fail

		{
			let mut message_allowed = validator.message_allowed();
Gavin Wood's avatar
Gavin Wood committed
894
895
896
897
			let intent = MessageIntent::Broadcast;
			assert!(message_allowed(&peer_a, intent, &topic_a, &encoded));
			assert!(!message_allowed(&peer_a, intent, &topic_b, &encoded));
			assert!(!message_allowed(&peer_a, intent, &topic_c, &encoded));
898
899
900
901
902
903
904
		}
	}

	#[test]
	fn too_many_chain_heads_is_report() {
		let (tx, rx) = mpsc::channel();
		let tx = Mutex::new(tx);
905
		let report_handle = Box::new(move |peer: &PeerId, cb: ReputationChange| tx.lock().send((peer.clone(), cb)).unwrap());
906
		let validator = MessageValidator::new_test(
907
			TestChainContext::default(),
908
909
910
911
912
913
			report_handle,
		);

		let peer_a = PeerId::random();

		let mut validator_context = MockValidatorContext::default();
914
		validator.new_peer(&mut validator_context, &peer_a, ObservedRole::Full);
915
916
917
918
919
		assert!(validator_context.events.is_empty());
		validator_context.clear();

		let chain_heads = (0..MAX_CHAIN_HEADS+1).map(|i| [i as u8; 32].into()).collect();

920
921
922
		let message = GossipMessage::from(NeighborPacket {
			chain_heads,
		}).encode();
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
		let res = validator.validate(
			&mut validator_context,
			&peer_a,
			&message[..],
		);

		match res {
			GossipValidationResult::Discard => {},
			_ => panic!("wrong result"),
		}
		assert_eq!(
			validator_context.events,
			Vec::new(),
		);

		drop(validator);

		assert_eq!(rx.iter().collect::<Vec<_>>(), vec![(peer_a, cost::BAD_NEIGHBOR_PACKET)]);
	}

	#[test]
	fn statement_only_sent_when_candidate_known() {
		let (tx, _rx) = mpsc::channel();
		let tx = Mutex::new(tx);
947
		let report_handle = Box::new(move |peer: &PeerId, cb: ReputationChange| tx.lock().send((peer.clone(), cb)).unwrap());
948
		let validator = MessageValidator::new_test(
949
			TestChainContext::default(),
950
951
952
953
954
955
			report_handle,
		);

		let peer_a = PeerId::random();

		let mut validator_context = MockValidatorContext::default();
956
		validator.new_peer(&mut validator_context, &peer_a, ObservedRole::Full);
957
958
959
960
961
962
		assert!(validator_context.events.is_empty());
		validator_context.clear();

		let hash_a = [1u8; 32].into();
		let hash_b = [2u8; 32].into();

963
964
965
966
		let message = GossipMessage::from(NeighborPacket {
			chain_heads: vec![hash_a, hash_b],
		}).encode();

asynchronous rob's avatar
asynchronous rob committed
967
968
969
970
971
972
		{
			let res = validator.validate(
				&mut validator_context,
				&peer_a,
				&message[..],
			);
973

asynchronous rob's avatar
asynchronous rob committed
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
			match res {
				GossipValidationResult::Discard => {},
				_ => panic!("wrong result"),
			}
			assert_eq!(
				validator_context.events,
				vec![
					ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
					ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_b), false),

					ContextEvent::SendTopic(peer_a.clone(), pov_block_topic(hash_a), false),
					ContextEvent::SendTopic(peer_a.clone(), pov_block_topic(hash_b), false),
				],
			);

				validator_context.clear();
990
		}
asynchronous rob's avatar
asynchronous rob committed
991
992
993
994
995
996
997
998
999
1000

		let mut validation_data = MessageValidationData::default();
		validation_data.signing_context.parent_hash = hash_a;
		validator.inner.write().attestation_view.new_local_leaf(validation_data);
	}

	#[test]
	fn pov_block_message_allowed() {
		let (tx, _rx) = mpsc::channel();
		let tx = Mutex::new(tx);
For faster browsing, not all history is shown. View entire blame