lib.rs 34.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The bitfield distribution
//!
//! In case this node is a validator, gossips its own signed availability bitfield
//! for a particular relay parent.
//! Independently of that, gossips on received messages from peers to other interested peers.

23
24
#![deny(unused_crate_dependencies)]

25
use codec::{Decode, Encode};
26
use futures::{channel::oneshot, FutureExt, TryFutureExt};
27
28
29
30

use log::{trace, warn};
use polkadot_subsystem::messages::*;
use polkadot_subsystem::{
31
	ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
32
};
33
34
35
use polkadot_node_subsystem_util::{
	metrics::{self, prometheus},
};
36
use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
37
use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, NetworkBridgeEvent, View, ReputationChange};
38
use polkadot_subsystem::SubsystemError;
39
40
41
42
43
44
45
46
47
48
49
50
use std::collections::{HashMap, HashSet};

const COST_SIGNATURE_INVALID: ReputationChange =
	ReputationChange::new(-100, "Bitfield signature invalid");
const COST_VALIDATOR_INDEX_INVALID: ReputationChange =
	ReputationChange::new(-100, "Bitfield validator index invalid");
const COST_MISSING_PEER_SESSION_KEY: ReputationChange =
	ReputationChange::new(-133, "Missing peer session key");
const COST_NOT_IN_VIEW: ReputationChange =
	ReputationChange::new(-51, "Not interested in that parent hash");
const COST_PEER_DUPLICATE_MESSAGE: ReputationChange =
	ReputationChange::new(-500, "Peer sent the same message multiple times");
51
const BENEFIT_VALID_MESSAGE_FIRST: ReputationChange =
52
	ReputationChange::new(15, "Valid message with new information");
53
const BENEFIT_VALID_MESSAGE: ReputationChange =
54
55
56
57
58
	ReputationChange::new(10, "Valid message");

/// Checked signed availability bitfield that is distributed
/// to other peers.
#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq)]
59
struct BitfieldGossipMessage {
60
	/// The relay parent this message is relative to.
61
	relay_parent: Hash,
62
	/// The actual signed availability bitfield.
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
	signed_availability: SignedAvailabilityBitfield,
}

impl BitfieldGossipMessage {
	fn into_validation_protocol(self) -> protocol_v1::ValidationProtocol {
		protocol_v1::ValidationProtocol::BitfieldDistribution(
			self.into_network_message()
		)
	}

	fn into_network_message(self)
		-> protocol_v1::BitfieldDistributionMessage
	{
		protocol_v1::BitfieldDistributionMessage::Bitfield(
			self.relay_parent,
			self.signed_availability,
		)
	}
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
}

/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Default, Clone)]
struct ProtocolState {
	/// track all active peers and their views
	/// to determine what is relevant to them.
	peer_views: HashMap<PeerId, View>,

	/// Our current view.
	view: View,

	/// Additional data particular to a relay parent.
	per_relay_parent: HashMap<Hash, PerRelayParentData>,
}

/// Data for a particular relay parent.
#[derive(Debug, Clone, Default)]
struct PerRelayParentData {
	/// Signing context for a particular relay parent.
	signing_context: SigningContext,

	/// Set of validators for a particular relay parent.
	validator_set: Vec<ValidatorId>,

	/// Set of validators for a particular relay parent for which we
	/// received a valid `BitfieldGossipMessage`.
	/// Also serves as the list of known messages for peers connecting
	/// after bitfield gossips were already received.
	one_per_validator: HashMap<ValidatorId, BitfieldGossipMessage>,

	/// Avoid duplicate message transmission to our peers.
	message_sent_to_peer: HashMap<PeerId, HashSet<ValidatorId>>,

	/// Track messages that were already received by a peer
	/// to prevent flooding.
	message_received_from_peer: HashMap<PeerId, HashSet<ValidatorId>>,
}

impl PerRelayParentData {
	/// Determines if that particular message signed by a validator is needed by the given peer.
	fn message_from_validator_needed_by_peer(
		&self,
		peer: &PeerId,
		validator: &ValidatorId,
	) -> bool {
		if let Some(set) = self.message_sent_to_peer.get(peer) {
			!set.contains(validator)
		} else {
			false
		}
	}
}

136
137
const TARGET: &'static str = "bitd";

138
/// The bitfield distribution subsystem.
139
140
141
pub struct BitfieldDistribution {
	metrics: Metrics,
}
142
143

impl BitfieldDistribution {
144
145
146
147
148
	/// Create a new instance of the `BitfieldDistribution` subsystem.
	pub fn new(metrics: Metrics) -> Self {
		Self { metrics }
	}

149
	/// Start processing work as passed on from the Overseer.
150
	async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()>
151
152
153
154
155
156
157
158
159
160
161
	where
		Context: SubsystemContext<Message = BitfieldDistributionMessage>,
	{
		// work: process incoming messages from the overseer and process accordingly.
		let mut state = ProtocolState::default();
		loop {
			let message = ctx.recv().await?;
			match message {
				FromOverseer::Communication {
					msg: BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability),
				} => {
162
					trace!(target: TARGET, "Processing DistributeBitfield");
163
					handle_bitfield_distribution(&mut ctx, &mut state, &self.metrics, hash, signed_availability)
164
165
166
						.await?;
				}
				FromOverseer::Communication {
167
					msg: BitfieldDistributionMessage::NetworkBridgeUpdateV1(event),
168
				} => {
169
					trace!(target: TARGET, "Processing NetworkMessage");
170
					// a network message was received
171
					if let Err(e) = handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await {
172
						warn!(target: TARGET, "Failed to handle incoming network messages: {:?}", e);
173
174
					}
				}
175
176
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => {
					for relay_parent in activated {
177
						trace!(target: TARGET, "Start {:?}", relay_parent);
178
						// query basic system parameters once
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
						if let Some((validator_set, signing_context)) =
							query_basics(&mut ctx, relay_parent).await?
						{
							// If our runtime API fails, we don't take down the node,
							// but we might alter peers' reputations erroneously as a result
							// of not having the correct bookkeeping. If we have lost a race
							// with state pruning, it is unlikely that peers will be sending
							// us anything to do with this relay-parent anyway.
							let _ = state.per_relay_parent.insert(
								relay_parent,
								PerRelayParentData {
									signing_context,
									validator_set,
									..Default::default()
								},
							);
						}
196
197
198
					}

					for relay_parent in deactivated {
199
						trace!(target: TARGET, "Stop {:?}", relay_parent);
200
201
						// defer the cleanup to the view change
					}
202
				}
203
				FromOverseer::Signal(OverseerSignal::BlockFinalized(hash)) => {
204
					trace!(target: TARGET, "Block finalized {:?}", hash);
205
				}
206
				FromOverseer::Signal(OverseerSignal::Conclude) => {
207
					trace!(target: TARGET, "Conclude");
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
					return Ok(());
				}
			}
		}
	}
}

/// Modify the reputation of a peer based on its behaviour.
async fn modify_reputation<Context>(
	ctx: &mut Context,
	peer: PeerId,
	rep: ReputationChange,
) -> SubsystemResult<()>
where
	Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
224
	trace!(target: TARGET, "Reputation change of {:?} for peer {:?}", rep, peer);
225
226
227
228
229
230
231
232
233
234
235
236
	ctx.send_message(AllMessages::NetworkBridge(
		NetworkBridgeMessage::ReportPeer(peer, rep),
	))
	.await
}

/// Distribute a given valid and signature checked bitfield message.
///
/// For this variant the source is this node.
async fn handle_bitfield_distribution<Context>(
	ctx: &mut Context,
	state: &mut ProtocolState,
237
	metrics: &Metrics,
238
239
240
241
242
243
244
245
246
247
248
249
	relay_parent: Hash,
	signed_availability: SignedAvailabilityBitfield,
) -> SubsystemResult<()>
where
	Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
	// Ignore anything the overseer did not tell this subsystem to work on
	let mut job_data = state.per_relay_parent.get_mut(&relay_parent);
	let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
		job_data
	} else {
		trace!(
250
			target: TARGET,
251
252
253
254
255
256
257
258
			"Not supposed to work on relay parent {} related data",
			relay_parent
		);

		return Ok(());
	};
	let validator_set = &job_data.validator_set;
	if validator_set.is_empty() {
259
		trace!(target: TARGET, "Validator set for {:?} is empty", relay_parent);
260
261
262
263
264
265
266
		return Ok(());
	}

	let validator_index = signed_availability.validator_index() as usize;
	let validator = if let Some(validator) = validator_set.get(validator_index) {
		validator.clone()
	} else {
267
		trace!(target: TARGET, "Could not find a validator for index {}", validator_index);
268
269
270
271
272
273
274
275
276
277
278
		return Ok(());
	};

	let peer_views = &mut state.peer_views;
	let msg = BitfieldGossipMessage {
		relay_parent,
		signed_availability,
	};

	relay_message(ctx, job_data, peer_views, validator, msg).await?;

279
280
	metrics.on_own_bitfield_gossipped();

281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
	Ok(())
}

/// Distribute a given valid and signature checked bitfield message.
///
/// Can be originated by another subsystem or received via network from another peer.
async fn relay_message<Context>(
	ctx: &mut Context,
	job_data: &mut PerRelayParentData,
	peer_views: &mut HashMap<PeerId, View>,
	validator: ValidatorId,
	message: BitfieldGossipMessage,
) -> SubsystemResult<()>
where
	Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
	// notify the overseer about a new and valid signed bitfield
	ctx.send_message(AllMessages::Provisioner(
		ProvisionerMessage::ProvisionableData(ProvisionableData::Bitfield(
			message.relay_parent.clone(),
			message.signed_availability.clone(),
		)),
	))
	.await?;

	let message_sent_to_peer = &mut (job_data.message_sent_to_peer);

	// pass on the bitfield distribution to all interested peers
	let interested_peers = peer_views
		.iter()
		.filter_map(|(peer, view)| {
			// check interest in the peer in this message's relay parent
			if view.contains(&message.relay_parent) {
				// track the message as sent for this peer
				message_sent_to_peer
					.entry(peer.clone())
					.or_default()
					.insert(validator.clone());

				Some(peer.clone())
			} else {
				None
			}
		})
		.collect::<Vec<PeerId>>();

	if interested_peers.is_empty() {
		trace!(
329
			target: TARGET,
330
331
332
333
334
			"No peers are interested in gossip for relay parent {:?}",
			message.relay_parent
		);
	} else {
		ctx.send_message(AllMessages::NetworkBridge(
335
			NetworkBridgeMessage::SendValidationMessage(
336
				interested_peers,
337
				message.into_validation_protocol(),
338
339
340
341
342
343
344
345
346
347
348
			),
		))
		.await?;
	}
	Ok(())
}

/// Handle an incoming message from a peer.
async fn process_incoming_peer_message<Context>(
	ctx: &mut Context,
	state: &mut ProtocolState,
349
	metrics: &Metrics,
350
351
352
353
354
355
	origin: PeerId,
	message: BitfieldGossipMessage,
) -> SubsystemResult<()>
where
	Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
356
	// we don't care about this, not part of our view.
357
358
359
360
	if !state.view.contains(&message.relay_parent) {
		return modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
	}

361
	// Ignore anything the overseer did not tell this subsystem to work on.
362
363
364
365
366
367
368
369
370
371
	let mut job_data = state.per_relay_parent.get_mut(&message.relay_parent);
	let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
		job_data
	} else {
		return modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
	};

	let validator_set = &job_data.validator_set;
	if validator_set.is_empty() {
		trace!(
372
			target: TARGET,
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
			"Validator set for relay parent {:?} is empty",
			&message.relay_parent
		);
		return modify_reputation(ctx, origin, COST_MISSING_PEER_SESSION_KEY).await;
	}

	// Use the (untrusted) validator index provided by the signed payload
	// and see if that one actually signed the availability bitset.
	let signing_context = job_data.signing_context.clone();
	let validator_index = message.signed_availability.validator_index() as usize;
	let validator = if let Some(validator) = validator_set.get(validator_index) {
		validator.clone()
	} else {
		return modify_reputation(ctx, origin, COST_VALIDATOR_INDEX_INVALID).await;
	};

	// Check if the peer already sent us a message for the validator denoted in the message earlier.
	// Must be done after validator index verification, in order to avoid storing an unbounded
	// number of set entries.
	let received_set = job_data
		.message_received_from_peer
		.entry(origin.clone())
		.or_default();

	if !received_set.contains(&validator) {
		received_set.insert(validator.clone());
	} else {
		return modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
	};

	if message
		.signed_availability
		.check_signature(&signing_context, &validator)
		.is_ok()
	{
408
		metrics.on_bitfield_received();
409
410
411
412
413
		let one_per_validator = &mut (job_data.one_per_validator);

		// only relay_message a message of a validator once
		if one_per_validator.get(&validator).is_some() {
			trace!(
414
				target: TARGET,
415
416
417
				"Already received a message for validator at index {}",
				validator_index
			);
418
			modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await?;
419
420
421
422
423
424
			return Ok(());
		}
		one_per_validator.insert(validator.clone(), message.clone());

		relay_message(ctx, job_data, &mut state.peer_views, validator, message).await?;

425
		modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await
426
427
428
429
	} else {
		modify_reputation(ctx, origin, COST_SIGNATURE_INVALID).await
	}
}
430

431
432
433
434
435
/// Deal with network bridge updates and track what needs to be tracked
/// which depends on the message type received.
async fn handle_network_msg<Context>(
	ctx: &mut Context,
	state: &mut ProtocolState,
436
	metrics: &Metrics,
437
	bridge_message: NetworkBridgeEvent<protocol_v1::BitfieldDistributionMessage>,
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
) -> SubsystemResult<()>
where
	Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
	match bridge_message {
		NetworkBridgeEvent::PeerConnected(peerid, _role) => {
			// insert if none already present
			state.peer_views.entry(peerid).or_default();
		}
		NetworkBridgeEvent::PeerDisconnected(peerid) => {
			// get rid of superfluous data
			state.peer_views.remove(&peerid);
		}
		NetworkBridgeEvent::PeerViewChange(peerid, view) => {
			handle_peer_view_change(ctx, state, peerid, view).await?;
		}
		NetworkBridgeEvent::OurViewChange(view) => {
			handle_our_view_change(state, view)?;
		}
457
458
459
		NetworkBridgeEvent::PeerMessage(remote, message) => {
			match message {
				protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) => {
460
					trace!(target: TARGET, "Received bitfield gossip from peer {:?}", &remote);
461
462
463
464
					let gossiped_bitfield = BitfieldGossipMessage {
						relay_parent,
						signed_availability: bitfield,
					};
465
					process_incoming_peer_message(ctx, state, metrics, remote, gossiped_bitfield).await?;
466
				}
467
468
469
470
471
472
473
474
475
476
477
478
479
			}
		}
	}
	Ok(())
}

/// Handle the changes necassary when our view changes.
fn handle_our_view_change(state: &mut ProtocolState, view: View) -> SubsystemResult<()> {
	let old_view = std::mem::replace(&mut (state.view), view);

	for added in state.view.difference(&old_view) {
		if !state.per_relay_parent.contains_key(&added) {
			warn!(
480
				target: TARGET,
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
				"Our view contains {} but the overseer never told use we should work on this",
				&added
			);
		}
	}
	for removed in old_view.difference(&state.view) {
		// cleanup relay parents we are not interested in any more
		let _ = state.per_relay_parent.remove(&removed);
	}
	Ok(())
}


// Send the difference between two views which were not sent
// to that particular peer.
async fn handle_peer_view_change<Context>(
	ctx: &mut Context,
	state: &mut ProtocolState,
	origin: PeerId,
	view: View,
) -> SubsystemResult<()>
where
	Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
	let current = state.peer_views.entry(origin.clone()).or_default();

507
	let added: Vec<Hash> = view.difference(&*current).cloned().collect();
508
509
510
511
512
513

	*current = view;

	// Send all messages we've seen before and the peer is now interested
	// in to that peer.

514
	let delta_set: Vec<(ValidatorId, BitfieldGossipMessage)> = added
515
516
517
518
519
520
521
522
523
524
525
		.into_iter()
		.filter_map(|new_relay_parent_interest| {
			if let Some(job_data) = (&*state).per_relay_parent.get(&new_relay_parent_interest) {
				// Send all jointly known messages for a validator (given the current relay parent)
				// to the peer `origin`...
				let one_per_validator = job_data.one_per_validator.clone();
				let origin = origin.clone();
				Some(
					one_per_validator
						.into_iter()
						.filter(move |(validator, _message)| {
526
							// ..except for the ones the peer already has.
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
							job_data.message_from_validator_needed_by_peer(&origin, validator)
						}),
				)
			} else {
				// A relay parent is in the peers view, which is not in ours, ignore those.
				None
			}
		})
		.flatten()
		.collect();

	for (validator, message) in delta_set.into_iter() {
		send_tracked_gossip_message(ctx, state, origin.clone(), validator, message).await?;
	}

	Ok(())
}

/// Send a gossip message and track it in the per relay parent data.
async fn send_tracked_gossip_message<Context>(
	ctx: &mut Context,
	state: &mut ProtocolState,
	dest: PeerId,
	validator: ValidatorId,
	message: BitfieldGossipMessage,
) -> SubsystemResult<()>
where
	Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
	let job_data = if let Some(job_data) = state.per_relay_parent.get_mut(&message.relay_parent) {
		job_data
	} else {
		return Ok(());
	};

	let message_sent_to_peer = &mut (job_data.message_sent_to_peer);
	message_sent_to_peer
		.entry(dest.clone())
		.or_default()
		.insert(validator.clone());

	ctx.send_message(AllMessages::NetworkBridge(
569
		NetworkBridgeMessage::SendValidationMessage(
570
			vec![dest],
571
			message.into_validation_protocol(),
572
573
574
575
576
577
578
579
580
581
582
583
		),
	))
	.await?;

	Ok(())
}

impl<C> Subsystem<C> for BitfieldDistribution
where
	C: SubsystemContext<Message = BitfieldDistributionMessage> + Sync + Send,
{
	fn start(self, ctx: C) -> SpawnedSubsystem {
584
585
586
587
		let future = self.run(ctx)
			.map_err(|e| {
				SubsystemError::with_origin("bitfield-distribution", e)
			})
588
			.boxed();
589

590
		SpawnedSubsystem {
591
			name: "bitfield-distribution-subsystem",
592
			future,
593
594
595
596
597
598
599
600
		}
	}
}

/// Query our validator set and signing context for a particular relay parent.
async fn query_basics<Context>(
	ctx: &mut Context,
	relay_parent: Hash,
601
) -> SubsystemResult<Option<(Vec<ValidatorId>, SigningContext)>>
602
603
604
605
where
	Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
	let (validators_tx, validators_rx) = oneshot::channel();
606
	let (session_tx, session_rx) = oneshot::channel();
607
608
609
610
611
612
613
614

	let query_validators = AllMessages::RuntimeApi(RuntimeApiMessage::Request(
		relay_parent.clone(),
		RuntimeApiRequest::Validators(validators_tx),
	));

	let query_signing = AllMessages::RuntimeApi(RuntimeApiMessage::Request(
		relay_parent.clone(),
615
		RuntimeApiRequest::SessionIndexForChild(session_tx),
616
617
618
619
620
	));

	ctx.send_messages(std::iter::once(query_validators).chain(std::iter::once(query_signing)))
		.await?;

621
622
623
624
625
626
	match (validators_rx.await?, session_rx.await?) {
		(Ok(v), Ok(s)) => Ok(Some((
			v,
			SigningContext { parent_hash: relay_parent, session_index: s },
		))),
		(Err(e), _) | (_, Err(e)) => {
627
			warn!(target: TARGET, "Failed to fetch basics from runtime API: {:?}", e);
628
629
630
			Ok(None)
		}
	}
631
632
}

633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
#[derive(Clone)]
struct MetricsInner {
	gossipped_own_availability_bitfields: prometheus::Counter<prometheus::U64>,
	received_availability_bitfields: prometheus::Counter<prometheus::U64>,
}

/// Bitfield Distribution metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_own_bitfield_gossipped(&self) {
		if let Some(metrics) = &self.0 {
			metrics.gossipped_own_availability_bitfields.inc();
		}
	}

	fn on_bitfield_received(&self) {
		if let Some(metrics) = &self.0 {
			metrics.received_availability_bitfields.inc();
		}
	}
}

impl metrics::Metrics for Metrics {
	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			gossipped_own_availability_bitfields: prometheus::register(
				prometheus::Counter::new(
					"parachain_gossipped_own_availabilty_bitfields_total",
					"Number of own availability bitfields sent to other peers."
				)?,
				registry,
			)?,
			received_availability_bitfields: prometheus::register(
				prometheus::Counter::new(
					"parachain_received_availabilty_bitfields_total",
					"Number of valid availability bitfields received from other peers."
				)?,
				registry,
			)?,
		};
		Ok(Metrics(Some(metrics)))
	}
}


680
681
682
683
684
685
#[cfg(test)]
mod test {
	use super::*;
	use bitvec::bitvec;
	use futures::executor;
	use maplit::hashmap;
686
	use polkadot_primitives::v1::{Signed, AvailabilityBitfield};
687
688
	use polkadot_node_subsystem_test_helpers::make_subsystem_context;
	use polkadot_node_subsystem_util::TimeoutExt;
689
690
691
692
	use sp_keystore::{SyncCryptoStorePtr, SyncCryptoStore};
	use sp_application_crypto::AppKey;
	use sc_keystore::LocalKeystore;
	use std::sync::Arc;
693
694
	use std::time::Duration;
	use assert_matches::assert_matches;
695
	use polkadot_node_network_protocol::ObservedRole;
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749

	macro_rules! view {
		( $( $hash:expr ),* $(,)? ) => [
			View(vec![ $( $hash.clone() ),* ])
		];
	}

	macro_rules! peers {
		( $( $peer:expr ),* $(,)? ) => [
			vec![ $( $peer.clone() ),* ]
		];
	}

	macro_rules! launch {
		($fut:expr) => {
			$fut
			.timeout(Duration::from_millis(10))
			.await
			.expect("10ms is more than enough for sending messages.")
			.expect("Error values should really never occur.")
		};
	}

	/// A very limited state, only interested in the relay parent of the
	/// given message, which must be signed by `validator` and a set of peers
	/// which are also only interested in that relay parent.
	fn prewarmed_state(
		validator: ValidatorId,
		signing_context: SigningContext,
		known_message: BitfieldGossipMessage,
		peers: Vec<PeerId>,
	) -> ProtocolState {
		let relay_parent = known_message.relay_parent.clone();
		ProtocolState {
			per_relay_parent: hashmap! {
				relay_parent.clone() =>
					PerRelayParentData {
						signing_context,
						validator_set: vec![validator.clone()],
						one_per_validator: hashmap! {
							validator.clone() => known_message.clone(),
						},
						message_received_from_peer: hashmap!{},
						message_sent_to_peer: hashmap!{},
					},
			},
			peer_views: peers
				.into_iter()
				.map(|peer| (peer, view!(relay_parent)))
				.collect(),
			view: view!(relay_parent),
		}
	}

750
751
752
753
754
	fn state_with_view(
		view: View,
		relay_parent: Hash,
		keystore_path: &tempfile::TempDir,
	) -> (ProtocolState, SigningContext, SyncCryptoStorePtr, ValidatorId) {
755
756
757
758
759
760
761
		let mut state = ProtocolState::default();

		let signing_context = SigningContext {
			session_index: 1,
			parent_hash: relay_parent.clone(),
		};

762
763
764
765
766
		let keystore : SyncCryptoStorePtr = Arc::new(LocalKeystore::open(keystore_path.path(), None)
			.expect("Creates keystore"));
		let validator = SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, None)
			.expect("generating sr25519 key not to fail");

767
768
769
770
		state.per_relay_parent = view.0.iter().map(|relay_parent| {(
				relay_parent.clone(),
				PerRelayParentData {
					signing_context: signing_context.clone(),
771
					validator_set: vec![validator.clone().into()],
772
773
774
775
776
777
778
779
					one_per_validator: hashmap!{},
					message_received_from_peer: hashmap!{},
					message_sent_to_peer: hashmap!{},
				})
			}).collect();

		state.view = view;

780
		(state, signing_context, keystore, validator.into())
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
	}

	#[test]
	fn receive_invalid_signature() {
		let _ = env_logger::builder()
			.filter(None, log::LevelFilter::Trace)
			.is_test(true)
			.try_init();

		let hash_a: Hash = [0; 32].into();

		let peer_a = PeerId::random();
		let peer_b = PeerId::random();
		assert_ne!(peer_a, peer_b);

		let signing_context = SigningContext {
			session_index: 1,
			parent_hash: hash_a.clone(),
		};

		// another validator not part of the validatorset
802
803
804
805
806
807
808
		let keystore_path = tempfile::tempdir().expect("Creates keystore path");
		let keystore : SyncCryptoStorePtr = Arc::new(LocalKeystore::open(keystore_path.path(), None)
			.expect("Creates keystore"));
		let malicious = SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, None)
								.expect("Malicious key created");
		let validator = SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, None)
								.expect("Malicious key created");
809
810
811

		let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
		let signed =
812
813
			executor::block_on(Signed::<AvailabilityBitfield>::sign(&keystore, payload, &signing_context, 0, &malicious.into()))
			.expect("should be signed");
814
815
816
817
818
819

		let msg = BitfieldGossipMessage {
			relay_parent: hash_a.clone(),
			signed_availability: signed.clone(),
		};

Bastian Köcher's avatar
Bastian Köcher committed
820
		let pool = sp_core::testing::TaskExecutor::new();
821
822
823
824
		let (mut ctx, mut handle) =
			make_subsystem_context::<BitfieldDistributionMessage, _>(pool);

		let mut state = prewarmed_state(
825
			validator.into(),
826
827
828
829
830
831
832
833
834
			signing_context.clone(),
			msg.clone(),
			vec![peer_b.clone()],
		);

		executor::block_on(async move {
			launch!(handle_network_msg(
				&mut ctx,
				&mut state,
835
				&Default::default(),
836
				NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.into_network_message()),
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
			));

			// reputation change due to invalid validator index
			assert_matches!(
				handle.recv().await,
				AllMessages::NetworkBridge(
					NetworkBridgeMessage::ReportPeer(peer, rep)
				) => {
					assert_eq!(peer, peer_b);
					assert_eq!(rep, COST_SIGNATURE_INVALID)
				}
			);
		});
	}

	#[test]
	fn receive_invalid_validator_index() {
		let _ = env_logger::builder()
			.filter(None, log::LevelFilter::Trace)
			.is_test(true)
			.try_init();

		let hash_a: Hash = [0; 32].into();
		let hash_b: Hash = [1; 32].into(); // other

		let peer_a = PeerId::random();
		let peer_b = PeerId::random();
		assert_ne!(peer_a, peer_b);

866
		let keystore_path = tempfile::tempdir().expect("Creates keystore path");
867
		// validator 0 key pair
868
869
		let (mut state, signing_context, keystore, validator) =
			state_with_view(view![hash_a, hash_b], hash_a.clone(), &keystore_path);
870
871
872
873
874

		state.peer_views.insert(peer_b.clone(), view![hash_a]);

		let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
		let signed =
875
876
			executor::block_on(Signed::<AvailabilityBitfield>::sign(&keystore, payload, &signing_context, 42, &validator))
			.expect("should be signed");
877
878
879
880
881
882

		let msg = BitfieldGossipMessage {
			relay_parent: hash_a.clone(),
			signed_availability: signed.clone(),
		};

Bastian Köcher's avatar
Bastian Köcher committed
883
		let pool = sp_core::testing::TaskExecutor::new();
884
885
886
887
888
889
890
		let (mut ctx, mut handle) =
			make_subsystem_context::<BitfieldDistributionMessage, _>(pool);

		executor::block_on(async move {
			launch!(handle_network_msg(
				&mut ctx,
				&mut state,
891
				&Default::default(),
892
				NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.into_network_message()),
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
			));

			// reputation change due to invalid validator index
			assert_matches!(
				handle.recv().await,
				AllMessages::NetworkBridge(
					NetworkBridgeMessage::ReportPeer(peer, rep)
				) => {
					assert_eq!(peer, peer_b);
					assert_eq!(rep, COST_VALIDATOR_INDEX_INVALID)
				}
			);
		});
	}

	#[test]
	fn receive_duplicate_messages() {
		let _ = env_logger::builder()
			.filter(None, log::LevelFilter::Trace)
			.is_test(true)
			.try_init();

		let hash_a: Hash = [0; 32].into();
		let hash_b: Hash = [1; 32].into();

		let peer_a = PeerId::random();
		let peer_b = PeerId::random();
		assert_ne!(peer_a, peer_b);

922
		let keystore_path = tempfile::tempdir().expect("Creates keystore path");
923
		// validator 0 key pair
924
925
		let (mut state, signing_context, keystore, validator) =
			state_with_view(view![hash_a, hash_b], hash_a.clone(), &keystore_path);
926
927
928
929

		// create a signed message by validator 0
		let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
		let signed_bitfield =
930
931
			executor::block_on(Signed::<AvailabilityBitfield>::sign(&keystore, payload, &signing_context, 0, &validator))
			.expect("should be signed");
932
933
934
935
936
937

		let msg = BitfieldGossipMessage {
			relay_parent: hash_a.clone(),
			signed_availability: signed_bitfield.clone(),
		};

Bastian Köcher's avatar
Bastian Köcher committed
938
		let pool = sp_core::testing::TaskExecutor::new();
939
940
941
942
943
944
945
946
		let (mut ctx, mut handle) =
			make_subsystem_context::<BitfieldDistributionMessage, _>(pool);

		executor::block_on(async move {
			// send a first message
			launch!(handle_network_msg(
				&mut ctx,
				&mut state,
947
				&Default::default(),
948
949
950
951
				NetworkBridgeEvent::PeerMessage(
					peer_b.clone(),
					msg.clone().into_network_message(),
				),
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
			));

			// none of our peers has any interest in any messages
			// so we do not receive a network send type message here
			// but only the one for the next subsystem
			assert_matches!(
				handle.recv().await,
				AllMessages::Provisioner(ProvisionerMessage::ProvisionableData(
					ProvisionableData::Bitfield(hash, signed)
				)) => {
					assert_eq!(hash, hash_a);
					assert_eq!(signed, signed_bitfield)
				}
			);

			assert_matches!(
				handle.recv().await,
				AllMessages::NetworkBridge(
					NetworkBridgeMessage::ReportPeer(peer, rep)
				) => {
					assert_eq!(peer, peer_b);
973
					assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST)
974
975
976
977
978
979
980
				}
			);

			// let peer A send the same message again
			launch!(handle_network_msg(
				&mut ctx,
				&mut state,
981
				&Default::default(),
982
983
984
985
				NetworkBridgeEvent::PeerMessage(
					peer_a.clone(),
					msg.clone().into_network_message(),
				),
986
987
988
989
990
991
992
993
			));

			assert_matches!(
				handle.recv().await,
				AllMessages::NetworkBridge(
					NetworkBridgeMessage::ReportPeer(peer, rep)
				) => {
					assert_eq!(peer, peer_a);
994
					assert_eq!(rep, BENEFIT_VALID_MESSAGE)
995
996
997
998
999
1000
				}
			);

			// let peer B send the initial message again
			launch!(handle_network_msg(
				&mut ctx,
For faster browsing, not all history is shown. View entire blame